Module autostreamtree.cluster_pops

Expand source code
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.cluster import DBSCAN
from shapely.geometry import MultiPoint
from sortedcontainers import SortedDict

from typing import Tuple, Dict, Optional


# credit to Geoff Boeing at
# https://geoffboeing.com/2014/08/clustering-to-reduce-spatial-data-set-size/
# for the tutorial
# here coords should be a dictionary with key=ID and value=tuple of lat,long
# epsilon=max distance (in km) between points in a cluster
def dbscan_cluster(coords: Dict[str, Tuple[float, float]], epsilon: float,
                   min_samples: int, out: Optional[str] = None) -> SortedDict:
    """
    Cluster geographic coordinates using DBSCAN algorithm.

    Parameters:
    coords (Dict[str, Tuple[float, float]]): A dictionary of coordinate pairs
                                             with the key as the location
                                             identifier and values as tuples of
                                             (latitude, longitude).
    epsilon (float): Maximum distance between two samples for them to be
                     considered as in the same neighborhood.
    min_samples (int): The number of samples (or total weight) in a
                       neighborhood for a point to be considered as a core
                       point.
    out (Optional[str]): Output file path to save cluster labels.

    Returns:
    SortedDict: A sorted dictionary of cluster labels with keys as cluster name
                and values as lists of location identifiers in that cluster.

    Raises:
    ValueError: If any coordinate value is missing or not a float.

    Examples:
    >>> coords = {'A': (33.4484, -112.0740), 'B': (37.7749,
                  -122.4194), 'C': (40.7128, -74.0060)}
    >>> dbscan_cluster(coords, 300, 2)
    SortedDict({'DB_-1': ['C'], 'DB_0': ['A'], 'DB_1': ['B']})
    """
    # Check if coordinates have correct values
    for k, v in coords.items():
        if not isinstance(v, tuple) or len(v) != 2:
            raise ValueError(
                f"Coordinate value for '{k}' is missing or",
                "not a tuple of size 2."
            )
        if not all(isinstance(coord, float) for coord in v):
            raise ValueError(
                f"Coordinate value for '{k}' has non-float values."
            )

    # Convert coordinates to a numpy array for clustering
    points = np.array(list(coords.values()))
    kms_per_radian = 6371.0088
    eps = epsilon / kms_per_radian

    # Run DBSCAN clustering
    db = DBSCAN(
        eps=eps, min_samples=min_samples, algorithm='ball_tree',
        metric='haversine').fit(np.radians(points))

    # Save cluster labels
    cluster_labels = db.labels_

    # Build SortedDict to return
    popmap = SortedDict()
    for i, k in enumerate(coords.keys()):
        pop = "DB_" + str(cluster_labels[i])
        if pop not in popmap:
            names = [k]
            popmap[pop] = names
        else:
            popmap[pop].append(k)

    # Save cluster labels to file if output path provided
    if out:
        with open(out, 'w') as f:
            for i, k in enumerate(coords.keys()):
                f.write(f"{k}\t{cluster_labels[i]}\n")

    return popmap


def coords_to_matrix(coords):
    """
    Convert a dictionary of geographic coordinates to a NumPy array.

    Args:
    - coords (dict): A dictionary containing the coordinates in the format
                     {id:(long, lat)}.

    Returns:
    - A NumPy array of coordinates in the format [[long, lat], ...].

    """
    if not coords:
        return np.array([], dtype=float).reshape(0, 2)

    return pd.DataFrame(
        [[coords[k][0], coords[k][1]] for k in coords],
        columns=["long", "lat"]).to_numpy()


def coords_to_dataframe(coords):
    """
    Convert a dictionary of geographic coordinates to a Pandas DataFrame.

    Args:
    - coords (dict): A dictionary containing the coordinates in the format
                     {id:(long, lat)}.

    Returns:
    - A Pandas DataFrame of coordinates with columns "long" and "lat".

    """
    df = pd.DataFrame(
        [[coords[k][0], coords[k][1]] for k in coords],
        index=list(coords.keys()),
        columns=["long", "lat"]
    )
    if df.empty:
        df = df.astype({'long': 'float64', 'lat': 'float64'})
    return df


# function to find the centroid of a set of points
# requires a SortedDict of coordinates and a SortedDict giving population IDs
# """Coords:
#     key     value
#     SampleName    Tuple(Lat, Long)
#     popmap:
#     PopulationName    list(SampleName,...)
# """
def get_cluster_centroid(coords, popmap, out=None):
    centroids = SortedDict()
    ofh = None
    if out:
        ofh = out + ".clusterCentroids.txt"
    log = ""
    for i, pop in enumerate(list(popmap.keys())):
        cluster = get_pop_coords_matrix(coords, popmap[pop])
        if len(cluster) < 1:
            raise ValueError(f"No coordinates in cluster: {pop}")

        # add cluster to logfile (if provided)
        log = log + "Population=" + str(i) + "\n"
        log = log + str(cluster) + "\n"

        # get centroid point
        centroid = (
            MultiPoint(cluster).centroid.x, MultiPoint(cluster).centroid.y
        )
        log = log + "Centroid=" + str(centroid) + "\n"
        centroids[pop] = centroid

    # write optional logfile
    if out:
        f = open(ofh, "w")
        f.write(log)
        f.close()
    return centroids


def get_pop_coords_matrix(d, subset):
    if not isinstance(d, dict):
        raise TypeError("Expected 'd' to be a dictionary")
    if not isinstance(subset, list):
        raise TypeError("Expected 'subset' to be a list")

    ret = []
    for k in subset:
        if k in d:
            if isinstance(d[k], tuple) and len(d[k]) == 2:
                ret.append([d[k][0], d[k][1]])
            else:
                raise ValueError(f"Invalid coordinate format for key: {k}")
        else:
            raise ValueError(f"Key not found in dictionary: {k}")

    return np.array(ret, dtype=float).reshape(-1, 2)


# function plots clustered coordinates given a SortedDict of coords and a
# population map
def plot_clustered_points(point_coords, popmap, out, centroids=None):
    # set output file name
    ofh = out+".clusteredPoints.pdf"
    sns.set(style="ticks")

    # get 1D popmap
    pmap = flatten_popmap(popmap)

    df = pd.DataFrame(
        [[ind, pmap[ind], point_coords[ind][0], point_coords[ind][1]]
         for ind in point_coords], columns=["sample", "pop", "long", "lat"])

    ax = sns.scatterplot(x="long", y="lat", hue="pop", palette="Set2", data=df)

    # plot centroid positions if available
    if centroids:
        cdf = pd.DataFrame(
            [[p, centroids[p][0], centroids[p][1]] for p in centroids],
            columns=["pop", "long", "lat"])
        sns.scatterplot(
            x="long", y="lat", hue="pop", palette="Set2", data=cdf,
            legend=False, marker="X", ax=ax
        )

    plt.savefig(ofh)
    plt.clf()


def plot_histogram(dat, out):
    """
    Plots a histogram of snap distances and saves to a PDF file.

    Args:
        dat (list): List of snap distances.
        out (str): Output file path without extension.

    Returns:
        None
    """
    of = f"{out}.snapDistances.pdf"
    sns.set(style="ticks")

    x = pd.Series(dat, name="Snap distance (km)")
    _ = sns.displot(x, kde=True, rug=True)
    plt.savefig(of)
    plt.clf()


def flatten_popmap(popmap):
    """
    Flattens the popmap dictionary from a form of key=pop; value=list(inds) to
    key=ind; value=pop.

    Args:
        popmap (dict): Dictionary in key=pop; value=list(inds) format.

    Returns:
        dict: Dictionary in key=ind; value=pop format.
    """
    new_popmap = {}
    for k, v in popmap.items():
        for i in v:
            new_popmap[i] = k
    return new_popmap

Functions

def coords_to_dataframe(coords)

Convert a dictionary of geographic coordinates to a Pandas DataFrame.

Args: - coords (dict): A dictionary containing the coordinates in the format {id:(long, lat)}.

Returns: - A Pandas DataFrame of coordinates with columns "long" and "lat".

Expand source code
def coords_to_dataframe(coords):
    """
    Convert a dictionary of geographic coordinates to a Pandas DataFrame.

    Args:
    - coords (dict): A dictionary containing the coordinates in the format
                     {id:(long, lat)}.

    Returns:
    - A Pandas DataFrame of coordinates with columns "long" and "lat".

    """
    df = pd.DataFrame(
        [[coords[k][0], coords[k][1]] for k in coords],
        index=list(coords.keys()),
        columns=["long", "lat"]
    )
    if df.empty:
        df = df.astype({'long': 'float64', 'lat': 'float64'})
    return df
def coords_to_matrix(coords)

Convert a dictionary of geographic coordinates to a NumPy array.

Args: - coords (dict): A dictionary containing the coordinates in the format {id:(long, lat)}.

Returns: - A NumPy array of coordinates in the format [[long, lat], …].

Expand source code
def coords_to_matrix(coords):
    """
    Convert a dictionary of geographic coordinates to a NumPy array.

    Args:
    - coords (dict): A dictionary containing the coordinates in the format
                     {id:(long, lat)}.

    Returns:
    - A NumPy array of coordinates in the format [[long, lat], ...].

    """
    if not coords:
        return np.array([], dtype=float).reshape(0, 2)

    return pd.DataFrame(
        [[coords[k][0], coords[k][1]] for k in coords],
        columns=["long", "lat"]).to_numpy()
def dbscan_cluster(coords: Dict[str, Tuple[float, float]], epsilon: float, min_samples: int, out: Optional[str] = None) -> sortedcontainers.sorteddict.SortedDict

Cluster geographic coordinates using DBSCAN algorithm.

Parameters: coords (Dict[str, Tuple[float, float]]): A dictionary of coordinate pairs with the key as the location identifier and values as tuples of (latitude, longitude). epsilon (float): Maximum distance between two samples for them to be considered as in the same neighborhood. min_samples (int): The number of samples (or total weight) in a neighborhood for a point to be considered as a core point. out (Optional[str]): Output file path to save cluster labels.

Returns: SortedDict: A sorted dictionary of cluster labels with keys as cluster name and values as lists of location identifiers in that cluster.

Raises: ValueError: If any coordinate value is missing or not a float.

Examples:

>>> coords = {'A': (33.4484, -112.0740), 'B': (37.7749,
              -122.4194), 'C': (40.7128, -74.0060)}
>>> dbscan_cluster(coords, 300, 2)
SortedDict({'DB_-1': ['C'], 'DB_0': ['A'], 'DB_1': ['B']})
Expand source code
def dbscan_cluster(coords: Dict[str, Tuple[float, float]], epsilon: float,
                   min_samples: int, out: Optional[str] = None) -> SortedDict:
    """
    Cluster geographic coordinates using DBSCAN algorithm.

    Parameters:
    coords (Dict[str, Tuple[float, float]]): A dictionary of coordinate pairs
                                             with the key as the location
                                             identifier and values as tuples of
                                             (latitude, longitude).
    epsilon (float): Maximum distance between two samples for them to be
                     considered as in the same neighborhood.
    min_samples (int): The number of samples (or total weight) in a
                       neighborhood for a point to be considered as a core
                       point.
    out (Optional[str]): Output file path to save cluster labels.

    Returns:
    SortedDict: A sorted dictionary of cluster labels with keys as cluster name
                and values as lists of location identifiers in that cluster.

    Raises:
    ValueError: If any coordinate value is missing or not a float.

    Examples:
    >>> coords = {'A': (33.4484, -112.0740), 'B': (37.7749,
                  -122.4194), 'C': (40.7128, -74.0060)}
    >>> dbscan_cluster(coords, 300, 2)
    SortedDict({'DB_-1': ['C'], 'DB_0': ['A'], 'DB_1': ['B']})
    """
    # Check if coordinates have correct values
    for k, v in coords.items():
        if not isinstance(v, tuple) or len(v) != 2:
            raise ValueError(
                f"Coordinate value for '{k}' is missing or",
                "not a tuple of size 2."
            )
        if not all(isinstance(coord, float) for coord in v):
            raise ValueError(
                f"Coordinate value for '{k}' has non-float values."
            )

    # Convert coordinates to a numpy array for clustering
    points = np.array(list(coords.values()))
    kms_per_radian = 6371.0088
    eps = epsilon / kms_per_radian

    # Run DBSCAN clustering
    db = DBSCAN(
        eps=eps, min_samples=min_samples, algorithm='ball_tree',
        metric='haversine').fit(np.radians(points))

    # Save cluster labels
    cluster_labels = db.labels_

    # Build SortedDict to return
    popmap = SortedDict()
    for i, k in enumerate(coords.keys()):
        pop = "DB_" + str(cluster_labels[i])
        if pop not in popmap:
            names = [k]
            popmap[pop] = names
        else:
            popmap[pop].append(k)

    # Save cluster labels to file if output path provided
    if out:
        with open(out, 'w') as f:
            for i, k in enumerate(coords.keys()):
                f.write(f"{k}\t{cluster_labels[i]}\n")

    return popmap
def flatten_popmap(popmap)

Flattens the popmap dictionary from a form of key=pop; value=list(inds) to key=ind; value=pop.

Args

popmap : dict
Dictionary in key=pop; value=list(inds) format.

Returns

dict
Dictionary in key=ind; value=pop format.
Expand source code
def flatten_popmap(popmap):
    """
    Flattens the popmap dictionary from a form of key=pop; value=list(inds) to
    key=ind; value=pop.

    Args:
        popmap (dict): Dictionary in key=pop; value=list(inds) format.

    Returns:
        dict: Dictionary in key=ind; value=pop format.
    """
    new_popmap = {}
    for k, v in popmap.items():
        for i in v:
            new_popmap[i] = k
    return new_popmap
def get_cluster_centroid(coords, popmap, out=None)
Expand source code
def get_cluster_centroid(coords, popmap, out=None):
    centroids = SortedDict()
    ofh = None
    if out:
        ofh = out + ".clusterCentroids.txt"
    log = ""
    for i, pop in enumerate(list(popmap.keys())):
        cluster = get_pop_coords_matrix(coords, popmap[pop])
        if len(cluster) < 1:
            raise ValueError(f"No coordinates in cluster: {pop}")

        # add cluster to logfile (if provided)
        log = log + "Population=" + str(i) + "\n"
        log = log + str(cluster) + "\n"

        # get centroid point
        centroid = (
            MultiPoint(cluster).centroid.x, MultiPoint(cluster).centroid.y
        )
        log = log + "Centroid=" + str(centroid) + "\n"
        centroids[pop] = centroid

    # write optional logfile
    if out:
        f = open(ofh, "w")
        f.write(log)
        f.close()
    return centroids
def get_pop_coords_matrix(d, subset)
Expand source code
def get_pop_coords_matrix(d, subset):
    if not isinstance(d, dict):
        raise TypeError("Expected 'd' to be a dictionary")
    if not isinstance(subset, list):
        raise TypeError("Expected 'subset' to be a list")

    ret = []
    for k in subset:
        if k in d:
            if isinstance(d[k], tuple) and len(d[k]) == 2:
                ret.append([d[k][0], d[k][1]])
            else:
                raise ValueError(f"Invalid coordinate format for key: {k}")
        else:
            raise ValueError(f"Key not found in dictionary: {k}")

    return np.array(ret, dtype=float).reshape(-1, 2)
def plot_clustered_points(point_coords, popmap, out, centroids=None)
Expand source code
def plot_clustered_points(point_coords, popmap, out, centroids=None):
    # set output file name
    ofh = out+".clusteredPoints.pdf"
    sns.set(style="ticks")

    # get 1D popmap
    pmap = flatten_popmap(popmap)

    df = pd.DataFrame(
        [[ind, pmap[ind], point_coords[ind][0], point_coords[ind][1]]
         for ind in point_coords], columns=["sample", "pop", "long", "lat"])

    ax = sns.scatterplot(x="long", y="lat", hue="pop", palette="Set2", data=df)

    # plot centroid positions if available
    if centroids:
        cdf = pd.DataFrame(
            [[p, centroids[p][0], centroids[p][1]] for p in centroids],
            columns=["pop", "long", "lat"])
        sns.scatterplot(
            x="long", y="lat", hue="pop", palette="Set2", data=cdf,
            legend=False, marker="X", ax=ax
        )

    plt.savefig(ofh)
    plt.clf()
def plot_histogram(dat, out)

Plots a histogram of snap distances and saves to a PDF file.

Args

dat : list
List of snap distances.
out : str
Output file path without extension.

Returns

None
 
Expand source code
def plot_histogram(dat, out):
    """
    Plots a histogram of snap distances and saves to a PDF file.

    Args:
        dat (list): List of snap distances.
        out (str): Output file path without extension.

    Returns:
        None
    """
    of = f"{out}.snapDistances.pdf"
    sns.set(style="ticks")

    x = pd.Series(dat, name="Snap distance (km)")
    _ = sns.displot(x, kde=True, rug=True)
    plt.savefig(of)
    plt.clf()