Module autostreamtree.functions
Expand source code
import sys
import os
import itertools
import traceback
import math
import momepy
import warnings
import pyogrio
import pandas as pd
import numpy as np
import networkx as nx
import seaborn as sns
from pysam import VariantFile
from scipy import stats
from sortedcontainers import SortedDict
from networkx import NodeNotFound
import matplotlib.pyplot as plt
import pickle
import mantel
import geopandas as gpd
from math import radians, sin, cos, acos
import autostreamtree.cluster_pops as clust
import autostreamtree.sequence as seq
import autostreamtree.genetic_distances as gendist
# suppress irrelevant warning from momepy
def custom_warn_handler(message, category, filename, lineno, file=None,
line=None):
if "momepy/utils.py" in filename and issubclass(category, UserWarning):
return
return original_showwarning(message, category, filename, lineno, file,
line)
original_showwarning = warnings.showwarning
warnings.showwarning = custom_warn_handler
def read_vcf(vcf, concat="none", popmap=None):
"""
Reads a VCF file and returns a dictionary of sample genotypes.
ARgs:
vcf: Path to the input VCF file.
concat: Specifies the concatenation mode for genotypes. Options are
"all", "loc", and "none".
"all": Concatenate genotypes of all loci for each sample.
"loc": Concatenate genotypes within the same chromosome for
each sample.
"none": Do not concatenate genotypes.
popmap: Optional dictionary that maps populations to a list of samples.
If provided, only samples in the popmap will be retained in the
output dictionary.
Returns:
A dictionary with sample names as keys and lists of genotypes as values
"""
bcf_in = VariantFile(vcf)
# get all samples in the VCF
vcf_samples = list(bcf_in.header.samples)
# set up data dict
dat = dict()
samples = list((bcf_in.header.samples))
for s in samples:
if concat == "all":
dat[s] = list()
dat[s].append(["", ""])
else:
dat[s] = list()
# if popmap, make list of samples to drop that aren't in a pop
if popmap:
keep = list()
for pop in popmap:
keep.extend(popmap[pop])
keep = [s for s in keep if s in vcf_samples]
bcf_in.subset_samples(keep)
chrom = "FIRST"
for record in bcf_in.fetch():
for i, sample in enumerate(record.samples):
if concat == "all":
loc = seq.decode(record.samples[i]['GT'], record.ref,
record.alts, as_list=True)
dat[sample][-1][0] = dat[sample][-1][0]+loc[0]
dat[sample][-1][1] = dat[sample][-1][1]+loc[1]
elif concat == "loc":
if record.chrom != chrom:
dat[sample].append(["", ""])
loc = seq.decode(record.samples[i]['GT'], record.ref,
record.alts, as_list=True)
dat[sample][-1][0] = dat[sample][-1][0]+loc[0]
dat[sample][-1][1] = dat[sample][-1][1]+loc[1]
else:
loc = seq.decode(record.samples[i]['GT'], record.ref,
record.alts)
dat[sample].append(loc)
chrom = record.chrom
if concat != "none":
for sample in dat:
dat[sample] = ["/".join(x) for x in dat[sample]]
for sample in list(dat.keys()):
if len(dat[sample]) < 1:
del dat[sample]
elif len(dat[sample]) == 1 and dat[sample][0][0] == "":
del dat[sample]
return dat
def prune_graph(G, edge_list, reachid_col):
"""
Prunes a graph to only retain edges whose 'reachid_col' matches values in
'edge_list'.
Args:
G: The input NetworkX Graph.
edge_list: A list of values to filter edges.
reachid_col: The edge attribute to be checked against 'edge_list'.
Returns:
A pruned NetworkX Graph.
"""
for u, v in G.edges():
_ = G.get_edge_data(u, v)
# print(f"Edge ({u}, {v}): {data}")
# Get edges to be retained using get_edge_data method
edges_to_keep = [(u, v) for u, v in G.edges() if
G.get_edge_data(u, v).get(reachid_col) in edge_list]
# Check if there are no edges to keep
if not edges_to_keep:
raise ValueError(
"There are no edges to retain based on the given edge",
"list and attribute."
)
# Create a new graph with only the edges to be retained
pruned_G = G.edge_subgraph(edges_to_keep).copy()
# Remove isolated nodes
isolated_nodes = list(nx.isolates(pruned_G))
pruned_G.remove_nodes_from(isolated_nodes)
return pruned_G
def read_network(network, shapefile):
"""
Reads a network from a saved file or builds a network from a shapefile.
Args:
network: Path to the saved network file (pickle format). If provided,
the function will read the network from this file.
shapefile: Path to the shapefile to build the network from. This is
used if the `network` parameter is not provided.
Returns:
A NetworkX Graph object representing the network.
"""
# Check if a saved network file is provided
if network:
print("Reading network from saved file: ", network)
# Read the network from the saved file and convert it to an undirected
# graph
with open(network, 'rb') as f:
G = nx.Graph(pickle.load(f)).to_undirected()
else:
# If no saved network file is provided, build the network from the
# shapefile
print("Building network from shapefile:", shapefile)
print("WARNING: This can take a while with very large files!")
# Read the shapefile
# rivers = gpd.read_file(shapefile)
rivers = pyogrio.read_dataframe(shapefile)
# print(rivers.head())
# Convert the GeoDataFrame to a NetworkX Graph object
G = momepy.gdf_to_nx(rivers, approach="primal", directed=False,
multigraph=False)
return G
def parse_subgraph_from_points(params, point_coords, pop_coords, G):
"""
Extracts a subgraph from a given graph based on input points.
Args:
params: A custom object containing various input parameters.
point_coords: A list of point coordinates to use for extracting the
subgraph.
pop_coords: A list of population coordinates to use for extracting the
subgraph.
G: A NetworkX Graph object representing the input graph.
Returns:
A NetworkX Graph object representing the extracted subgraph.
"""
# Choose the appropriate set of points based on input parameters
if params.pop or params.geopop or params.clusterpop:
points = pop_coords
else:
points = point_coords
# Output points to a table
p = get_point_table(points)
p.to_csv((str(params.out) + ".pointCoords.txt"), sep="\t", index=False)
del p
# First pass extracts a subgraph from the master shapefile graph
print("\nExtracting full subgraph...")
ktemp = path_subgraph(G, points, extract_full_subgraph, params.reachid_col,
params.length_col)
del G
# Second pass to simplify subgraph and collapse redundant nodes
print("\nMerging redundant paths...\n")
K = path_subgraph(ktemp, points, extract_minimal_subgraph,
params.reachid_col, params.length_col)
# Grab real coordinates as node positions for plotting
pos = {n: n for n in K.nodes}
# Make a color map to color sample points and junctions differently
color_map = ["blue" if node in points.values() else "black" for node in K]
# Draw networkx
nx.draw_networkx(K, pos, with_labels=False, node_color=color_map,
node_size=50)
# Get LENGTH_KM attributes for labelling edges
edge_labels = nx.get_edge_attributes(K, params.length_col)
for e in edge_labels:
edge_labels[e] = "{:.2f}".format(edge_labels[e])
nx.draw_networkx_edge_labels(K, pos, edge_labels=edge_labels, font_size=6)
# Save minimized network to file (unless we already read from one)
if not params.network or params.overwrite:
net_out = str(params.out) + ".network"
with open(net_out, 'wb') as f:
pickle.dump(K, f, pickle.HIGHEST_PROTOCOL)
net_full_out = str(params.out) + ".full.network"
with open(net_full_out, 'wb') as f:
pickle.dump(ktemp, f, pickle.HIGHEST_PROTOCOL)
else:
print(
"NOTE: Not over-writing existing network. To change this, use",
"--overwrite"
)
network_plot = str(params.out) + ".subGraph.pdf"
plt.savefig(network_plot)
del ktemp
return K
def report_genmats(params, gen, pop_gen, point_coords, pop_coords):
"""
Prints genetic distance matrices and writes them to files.
ARgs:
params: A custom object containing various input parameters.
gen: A NumPy array representing the individual genetic distance matrix.
pop_gen: A NumPy array representing the population genetic distance
matrix.
point_coords: A dictionary containing individual point coordinates.
pop_coords: A dictionary containing population point coordinates.
"""
# If the individual genetic distance matrix is not None, print and write
# it to file
if gen is not None:
print("Genetic distances:")
np.set_printoptions(precision=3)
print(gen, "\n")
# Write individual genetic distances to file
ind_genDF = pd.DataFrame(gen, columns=list(point_coords.keys()),
index=list(point_coords.keys()))
ind_genDF.to_csv((str(params.out) + ".indGenDistMat.txt"), sep="\t",
index=True)
# If the population genetic distance matrix is not None, print and write
# it to file
if pop_gen is not None:
print("Population genetic distances:")
np.set_printoptions(precision=3)
print(pop_gen, "\n")
# Write population genetic distances to file
pop_genDF = pd.DataFrame(pop_gen, columns=list(pop_coords.keys()),
index=list(pop_coords.keys()))
pop_genDF.to_csv((str(params.out) + ".popGenDistMat.txt"), sep="\t",
index=True)
del pop_genDF
def get_loc_data(seqs):
"""
Generator function that yields a dictionary of individual loci data for
each locus in the input sequences.
Args:
seqs: A dictionary containing sequences as values and individual
identifiers as keys.
Returns:
A generator that yields a dictionary with individual identifiers as
keys and a list containing the
corresponding locus as the value.
"""
# Iterate through the loci in the sequences
for loc in range(0, len(seqs[list(seqs.keys())[0]])):
d = dict()
# Iterate through the individuals in the sequences
for ind in seqs.keys():
# Add the locus data for the individual to the dictionary
d[ind] = [seqs[ind][loc]]
# Yield the dictionary containing the locus data for the current locus
yield d
def report_genmats_list(params, genlist, popgenlist, point_coords, pop_coords):
"""
Writes individual and population genetic distance matrices to files for
each locus in genlist and popgenlist.
Args:
params: A namespace object containing parameters, including the output
directory.
genlist: A list of individual genetic distance matrices for each locus.
popgenlist: A list of population genetic distance matrices for each
locus.
point_coords: A dictionary containing individual point coordinates.
pop_coords: A dictionary containing population point coordinates.
"""
# Create an output directory for the genetic distance matrices
dir = str(params.out) + "_locmats"
os.makedirs(dir, exist_ok=True)
i = 0
# Iterate through the individual genetic distance matrices
for gen in genlist:
if gen is not None:
# Write individual genetic distances to a file
ind_genDF = pd.DataFrame(gen, columns=list(point_coords.keys()),
index=list(point_coords.keys()))
ind_genDF.to_csv((str(dir) + "/loc_" + str(i) +
".indGenDistMat.txt"), sep="\t", index=True)
del ind_genDF
i += 1
j = 0
# Iterate through the population genetic distance matrices
for pop_gen in popgenlist:
if pop_gen is not None:
# Write population genetic distances to a file
pop_genDF = pd.DataFrame(pop_gen, columns=list(pop_coords.keys()),
index=list(pop_coords.keys()))
pop_genDF.to_csv((str(dir) + "/loc_" + str(j) +
".popGenDistMat.txt"), sep="\t", index=True)
del pop_genDF
j += 1
def block_print():
"""
Disables standard output by redirecting it to a null device, effectively
blocking any print statements.
"""
sys.stdout = open(os.devnull, 'w')
def enable_print():
"""
Restores standard output to its original state, allowing print statements
to be displayed again.
"""
sys.stdout = sys.__stdout__
def parse_input_genmat(params, inmat, point_coords, popmap, seqs=None):
"""
Parses an input genetic distance matrix and verifies if it matches the
user input parameters. Aggregates individual distances if required by the
user input.
Args:
params: Input parameters provided by the user.
inmat (pd.DataFrame): The input genetic distance matrix.
point_coords (dict): Dictionary containing point coordinates.
popmap (dict): Dictionary containing the population map.
Returns:
tuple: A tuple containing the genetic distance matrix (gen) and
population genetic distance matrix (pop_gen).
"""
gen = None
pop_gen = None
if params.coercemat:
inmat[inmat < 0.0] = 0.0
if set(list(inmat.columns.values)) != set(list(inmat.index.values)):
print(inmat.columns.values)
print(inmat.index.values)
print("Input matrix columns and/ or rows don't appear to be",
"labelled. Please provide an input matrix with column and row",
"names!")
sys.exit(1)
else:
agg = False
# first check if it fits whatever the user input was (i.e. --pop)
if params.pop:
if len(inmat.columns) != len(popmap.keys()):
print("Found", str(len(inmat.columns)),
"columns in provided matrix. This doesn't match number",
"of populations from popmap.")
if (len(inmat.columns)) != len(point_coords):
print("Doesn't match number of individuals either! Please",
"check your matrix format.")
sys.exit(1)
else:
print("Assuming input matrix has individual distances...",
"Aggregating using the following (--pop_agg):",
str(params.pop_agg))
agg = True
else:
# re-order using pop orders
inmat = inmat.reindex(list(popmap.keys()))
inmat = inmat[list(popmap.keys())]
pop_gen = inmat.to_numpy()
del inmat
elif params.geopop or params.clusterpop:
if (len(inmat.columns)) != len(point_coords):
print("Found", str(len(inmat.columns)),
"columns in provided matrix. This doesn't match number",
"of individuals.")
print("When using --geopop or --clusterpop, the provided",
"matrix must represent individual-level distances.")
sys.exit(1)
else:
# re-order using pop orders
inmat = inmat.reindex(list(point_coords.keys()))
inmat = inmat[list(point_coords.keys())]
gen = inmat.to_numpy()
agg = True
del inmat
else:
if (len(inmat.columns)) != len(point_coords):
print("Found", str(len(inmat.columns)),
"columns in provided matrix. This doesn't match number",
"of individuals.")
sys.exit(1)
else:
# re-order using pop orders
inmat = inmat.reindex(list(point_coords.keys()))
inmat = inmat[list(point_coords.keys())]
gen = inmat.to_numpy()
del inmat
# if --geopop or --clusterpop, it should be an ind matrix
# if so, need to aggregate according to --pop_agg
# print(pop_gen)
if agg:
print("Aggregating user-provided individual-level distance matrix",
"using:", params.pop_agg)
pop_gen = gendist.get_pop_genmat("PDIST", gen, popmap,
point_coords, seqs,
pop_agg=params.pop_agg,
loc_agg=params.loc_agg,
ploidy=params.ploidy,
global_het=params.global_het)
return (gen, pop_gen)
def read_popmap(popmap):
"""
Reads a population map file and returns a dictionary with individuals as
keys and populations as values.
Args:
popmap (str): Path to the population map file.
Returns:
dict: A dictionary with individuals as keys and populations as values.
"""
popdict = dict()
with open(popmap, "r") as fin:
for line in fin:
line = line.strip()
if not line:
continue
cols = line.split()
ind = cols[0]
pop = cols[1]
popdict[ind] = pop
return popdict
def process_samples(params, points, G):
"""
Processes input sample data by snapping points to a graph, calculating
coordinates, and processing populations if required.
Args:
params: Input parameters provided by the user.
points (pd.DataFrame): DataFrame containing sample points.
G (networkx.Graph): Graph object representing the road network.
Returns:
tuple: A tuple containing point coordinates, population coordinates,
and the population map.
"""
popmap = SortedDict()
point_coords = SortedDict()
pop_coords = SortedDict()
snapDists = dict()
if params.pop:
popmap_temp = read_popmap(params.pop)
mask = points[points.columns[0]].isin(popmap_temp)
points = points[mask]
for _, row in points.iterrows():
name = None
data = None
row["lat"] = float(row["lat"])
row["long"] = float(row["long"])
if params.run == "GENDIST":
name = row["sample"]
data = tuple([row["long"], row["lat"]])
else:
if not params.pop and not params.clusterpop:
# --geopop and individual-level snap coordinates to nodes here
node = snap_to_node(G, tuple([row["long"], row["lat"]]))
snapDists[row["sample"]] = great_circle(node[0], node[1],
row["long"],
row["lat"])
else:
# if pop or clusterpop, extract centroid later
node = tuple([row["long"], row["lat"]])
data = node
name = row["sample"]
point_coords[name] = data
# Process population-level analyses
if params.geopop:
if point_coords[name] not in popmap:
names = [name]
popmap[point_coords[name]] = names
else:
popmap[point_coords[name]].append(row["sample"])
elif params.pop:
if popmap_temp[row.iloc[0]] not in popmap:
names = [name]
popmap[popmap_temp[row.iloc[0]]] = names
else:
popmap[popmap_temp[row.iloc[0]]].append(name)
print("Read", str(len(point_coords.keys())), "individuals.")
print()
if params.pop or params.geopop:
print("Read", str(len(popmap.keys())), "populations.")
print()
# For population-level analyses, generate population maps and centroids
# here according to user-input options: --pop, --geopop, --clusterpop
# get population centroid
if params.pop or params.geopop or params.clusterpop:
if params.clusterpop:
# create population clusters using DBSCAN
print("Running DBSCAN clustering with min_samples=",
params.min_samples, "and epsilon=", params.epsilon)
popmap = clust.dbscan_cluster(point_coords, params.epsilon,
params.min_samples)
num_clusters = len(popmap.keys())
print("Found", str(num_clusters), "clusters!")
# calculate centroids for clusters
pop_temp = clust.get_cluster_centroid(point_coords, popmap,
params.out)
elif params.pop or params.geopop:
# popmap generated earlier when parsing input file!
# still need to calculate centroids:
print("Calculating population centroids...")
pop_temp = clust.get_cluster_centroid(point_coords, popmap,
params.out)
# note in the case of --geopop the centroid is the joint
# snapped-to location
# now, snap pop_coords to nodes
pop_coords = SortedDict()
for p in pop_temp:
node = snap_to_node(G, pop_temp[p])
snapDists[p] = great_circle(node[0], node[1], pop_temp[p][0],
pop_temp[p][1])
pop_coords[p] = node
# write popmap to file
flat = clust.flatten_popmap(popmap)
temp = pd.DataFrame(list(flat.items()), columns=['IND_ID', 'POP_ID'])
temp.to_csv((str(params.out) + ".popmap.txt"), sep="\t", index=False)
del flat
del temp
# plot grouped samples
clust.plot_clustered_points(point_coords, popmap, params.out,
pop_coords)
# Plot histogram of snap distances and write snap distances to a file
clust.plot_histogram(list(snapDists.values()), params.out)
dtemp = pd.DataFrame(list(snapDists.items()), columns=['name', 'km'])
dtout = str(params.out) + ".snapDistances.txt"
dtemp.to_csv(dtout, sep="\t", index=False)
del dtemp
del dtout
del snapDists
# return everything
return point_coords, pop_coords, popmap
def get_gendist_mats(params, point_coords, popmap, seqs):
"""Returns population genetic distance matrices.
Args:
params (object): An object that contains parameters for the analysis.
point_coords (list): A list of coordinates for the sampled points.
popmap (dict): A dictionary that maps each sample to its corresponding
population.
seqs (list): A list of DNA sequences for the sampled points.
Returns:
tuple: A tuple containing two matrices. The first is a pairwise
distance matrix for all
samples. The second is a matrix of pairwise genetic distances between
populations.
Raises:
SystemExit: If distance metric is not possible without population data.
"""
gen = None # Initialize variable to hold pairwise dist matrix
pop_gen = None # Initialize variable to hold population dist matrix
if params.dist in ["PDIST", "TN84", "TN93", "K2P", "JC69"]:
# Calculate pairwise distance matrix using selected method
gen = gendist.get_genmat(params.dist, point_coords, seqs,
ploidy=params.ploidy, het=params.het,
loc_agg=params.loc_agg)
if params.pop or params.geopop or params.clusterpop:
print("Aggregating pairwise population genetic distances from",
"individual distances using:", params.pop_agg)
else:
# If distance metric requires population data, but none is provided
if not params.pop and not params.geopop:
print("ERROR: Distance metric", params.dist,
"not possible without population data.")
sys.exit(1)
# Calculate population genetic distance matrix
if params.pop or params.geopop or params.clusterpop:
pop_gen = gendist.get_pop_genmat(params.dist, gen, popmap,
point_coords, seqs,
pop_agg=params.pop_agg,
loc_agg=params.loc_agg,
ploidy=params.ploidy,
global_het=params.global_het)
# Return pairwise distance matrix and population genetic distance matrix
return gen, pop_gen
def get_point_table(points):
"""Returns a pandas DataFrame from a dictionary of points.
Args:
points (dict): A dictionary of points with their latitude and longitude
values.
Returns:
pandas.DataFrame: A dataframe with columns for 'sample', 'lat', and
'long'.
"""
temp = [] # Initialize list to hold temporary values
for p in points:
# Append values for each point to temporary list
temp.append([p, points[p][1], points[p][0]])
p = pd.DataFrame(temp, columns=['sample', 'lat', 'long'])
return p
def r2(x, y):
"""Returns the Pearson correlation coefficient squared between two arrays.
Args:
x (array): An array of values.
y (array): An array of values.
Returns:
float: The squared Pearson correlation coefficient between the two
arrays.
"""
return (stats.pearsonr(x, y)[0] ** 2)
def get_fitted_d(points, genmat, inc, r):
"""Calculates predicted genetic distances based on fitted streamtree
distances.
Args:
points (dict): A dictionary of points with their latitude and
longitude values.
genmat (ndarray): A pairwise genetic distance matrix.
inc (ndarray): An incidence matrix representing the presence or absence
of streams for each point.
r (ndarray): A fitted streamtree distance matrix.
Returns:
pandas.DataFrame: A dataframe with columns for 'from', 'to',
'observed_D', 'predicted_D', and 'abs_diff'.
"""
rows = [] # Initialize list to hold temporary values
names = list(points.keys()) # Get names of points
# Iterate over all pairwise combinations of points
inc_row = 0 # tracks what ROW (pair) we are in the incidence matrix
for ia, ib in itertools.combinations(range(0, len(points)), 2):
obs = genmat[ia, ib]
inc_streams = inc[inc_row,]
pred_dist = np.sum(r[inc_streams == 1])
inc_row += 1
rows.append([names[ia], names[ib], obs, pred_dist,
np.abs(obs-pred_dist)])
# Create dataframe from temporary list and return it
D = pd.DataFrame(rows, columns=['from', 'to', 'observed_D',
'predicted_D', 'abs_diff'])
return D
def plot_gen_by_geo(gen, sdist, out, log=False):
"""
Plots genetic distance against geographic distance to visualize isolation
by distance.
Args:
gen (numpy.ndarray): Genetic distance matrix.
sdist (numpy.ndarray): Spatial distance matrix.
out (str): Output file prefix for the generated plot.
log (bool, optional): If True, the geographic distance axis will be
log-transformed. Defaults to False.
"""
genetic_distance = get_lower_tri(gen)
geographic_distance = get_lower_tri(sdist)
data = pd.DataFrame({'Geographic Distance': geographic_distance,
'Genetic Distance': genetic_distance})
if not log:
sns.jointplot(data=data, x='Geographic Distance', y='Genetic Distance',
kind="reg")
plt.savefig(str(out) + ".isolationByDistance.pdf")
else:
geographic_distance = replace_zeroes(geographic_distance)
log_geo = np.log(geographic_distance)
data['Log Geographic Distance'] = log_geo
sns.jointplot(data=data, x='Log Geographic Distance',
y='Genetic Distance', kind="reg")
plt.savefig(str(out) + ".isolationByDistance.pdf")
del geographic_distance
del genetic_distance
def great_circle(lon1, lat1, lon2, lat2, thresh=0.0000001):
"""
Calculates the great circle distance between two points on a sphere, using
their longitudes and latitudes.
Args:
lon1 (float): Longitude of the first point.
lat1 (float): Latitude of the first point.
lon2 (float): Longitude of the second point.
lat2 (float): Latitude of the second point.
thresh (float, optional): Threshold for determining if the points are
the same. Defaults to 0.0000001.
Returns:
float: Great circle distance in kilometers.
"""
if (abs(lon1 - lon2)) < thresh and (abs(lat1 - lat2)) < thresh:
return 0.0
else:
lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])
return 6371 * (
acos(sin(lat1) * sin(lat2) + cos(lat1) * cos(lat2) *
cos(lon1 - lon2))
)
def get_lower_tri(mat):
"""
Extracts the lower triangular elements from a square matrix.
Args:
mat (numpy.ndarray): Input square matrix.
Returns:
numpy.ndarray: 1D array containing the lower triangular elements of
the input matrix.
"""
n = mat.shape[0]
i = np.tril_indices(n, -1)
return mat[i]
def replace_zeroes(data):
"""
Replaces zeroes in the input array with the smallest non-zero value.
Args:
data (numpy.ndarray): Input array.
Returns:
numpy.ndarray: Array with zeroes replaced by the smallest non-zero
value.
"""
min_nonzero = np.min(data[np.nonzero(data)])
data[data == 0] = min_nonzero
return data
# function computes Mantel test using various transformations
def test_ibd(gen, geo, out, perms, log=False):
# get flattened lower triangle of each matrix
gen = get_lower_tri(gen)
geo = get_lower_tri(geo)
if log is True:
geo = replace_zeroes(geo)
geo = np.log(geo)
# non-log pearson
res = mantel.test(geo, gen, perms=int(perms), method='pearson')
rows = list()
rows.append(['genXgeo', 'pearson', str(perms), res.r, res.p, res.z])
# non-log spearman
res = mantel.test(geo, gen, perms=int(perms), method='spearman')
rows.append(['genXgeo', 'spearman', str(perms), res.r, res.p, res.z])
ibd = pd.DataFrame(rows, columns=['test', 'method', 'perms', 'r', 'p',
'z'])
print("Mantel test results:")
print(ibd)
ibd.to_csv((str(out) + ".isolationByDistance.txt"), sep="\t", index=False)
print()
def output_fitted_d(pred, out):
pred.to_csv((str(out)+".obsVersusFittedD.txt"), sep="\t", index=False)
sns.jointplot(x="observed_D", y="predicted_D", data=pred, kind="reg")
plt.savefig((str(out)+".obsVersusFittedD.pdf"))
del pred
def fit_least_squares_distances(D, X, iterative, out, weight="CSE67"):
"""
Computes least-squares branch lengths from a vector of genetic distances D
and incidence matrix X. When iterative=True, negative distances are
constrained to 0 and then recomputed.
Args:
D (numpy.ndarray): Vector of genetic distances.
X (numpy.ndarray): Incidence matrix.
iterative (bool): Whether to use an iterative approach to constrain
negative distances.
out (str): Output file prefix.
weight: Weight type. Defaults to CSE67.
Returns:
numpy.ndarray: Least-squared optimized distances.
"""
num_segments = (np.size(X, 1))
ls = np.zeros(num_segments)
d = vectorize_mat(D)
# calculate weights matrix and write to file
W = generate_weights_matrix(d, weight)
print("Weights matrix:")
print(W)
# ofh=out+".weightsMatrix.txt"
# np.savetxt(ofh, W, delimiter="\t")
# weighted least-squares optimization
ls = np.matmul(np.linalg.inv(
np.matmul(np.matmul(X.transpose(), W), X)),
np.matmul(np.matmul(X.transpose(), W), d))
if iterative:
ls_old = ls
if (np.count_nonzero(ls < 0.0) > 0):
print("\nLS-optimized distances contain negative values: Using",
"iterative approach to re-calculate...")
constrains = list() # save indices of all constrained values
# if negative distances, use iterative procedure to re-calculate
while (np.count_nonzero(ls < 0.0) > 0):
bad_ind = np.argmin(ls)
constrains.append(bad_ind)
# constrain to 0 by removing from incidence matrix
X = np.delete(X, bad_ind, 1)
# re-compute values
ls = np.matmul(np.linalg.inv(
np.matmul(np.matmul(X.transpose(), W), X)),
np.matmul(np.matmul(X.transpose(), W), d))
for i in reversed(constrains):
ls = np.insert(ls, i, 0.0)
# print(ls)
# write original and constrained results to log file
ofh = out+".leastSquaresConstrained.txt"
df = pd.DataFrame({'LS.original': ls_old, 'LS.constrained': ls})
df.to_csv(ofh, sep="\t", index=False)
return ls
else:
return ls
def generate_weights_matrix(d, weight):
"""
Generates a weights matrix for the least-squares method, where weights are
on the diagonals.
Args:
d (numpy.ndarray): Vector of genetic distances.
weight (str): Weighting method to use, options: 'CSE67', 'BEYER74',
'FM67'.
Returns:
numpy.ndarray: Weights matrix.
"""
W = np.zeros((len(d), len(d)), dtype=float)
row, col = np.diag_indices(W.shape[0])
if weight.upper() == "CSE67":
W[row, col] = np.ones(len(d))
elif weight.upper() == "BEYER74":
if np.count_nonzero(d == 0) > 0:
print(
"WARNING: Divide-by-zero in weighted least-squares."
)
W[row, col] = np.divide(1.0, d, out=np.zeros_like(d, dtype=float),
where=d != 0)
elif weight.upper() == "FM67":
if np.count_nonzero(d == 0) > 0:
print(
"WARNING: Divide-by-zero in weighted least-squares."
)
W[row, col] = np.divide(1.0, np.square(d), out=np.zeros_like(d,
dtype=float), where=d != 0)
else:
print(f"ERROR: Weight option {weight} not recognized. Using ordinary",
"least-squares instead.")
W[row, col] = np.ones(len(d))
return W
def vectorize_mat(mat):
"""
Converts a pairwise matrix to a 1D vector.
Args:
mat (numpy.ndarray): Pairwise matrix.
Returns:
numpy.ndarray: 1D vector of matrix elements.
"""
size = nCr(np.size(mat, 0), 2)
vec = np.zeros(size)
index = 0
for ia, ib in itertools.combinations(range(0, np.size(mat, 0)), 2):
vec[index] = mat[ia, ib]
index += 1
return vec
def get_stream_mats(points, graph, len_col):
"""
Computes pairwise stream distances and 0/1 incidence matrix for StreamTree
calculations.
Args:
points (dict): Dictionary of point indices and their corresponding node
IDs in the graph.
graph (networkx.Graph): NetworkX graph object representing the stream
network.
len_col (str): Attribute name for the length of the edges in the graph.
Returns:
tuple: Pair of numpy.ndarray representing the pairwise stream distance
matrix and incidence matrix.
"""
dist = np.zeros((len(points), len(points)))
inc = np.zeros((nCr(len(points), 2), len(graph.edges())), dtype=int)
# Establish as NaN
dist[:] = np.nan
def dijkstra_weight(left, right, attributes):
# Calculates weights for Dijkstra's shortest path algorithm by
# inverting the edge length with a small constant to avoid division by
# zero.
epsilon = 1e-9
return 1 / (attributes[len_col] + epsilon)
index = 0
for ia, ib in itertools.combinations(range(0, len(points)), 2):
path = nx.bidirectional_dijkstra(graph, points.values()[ia],
points.values()[ib],
weight=dijkstra_weight)
if path:
dist[ia, ib] = float(sum(path_edge_attributes(graph, path[1],
len_col)))
dist[ib, ia] = dist[ia, ib]
# Incidence matrix: assign 1 if edge is in the path, 0 otherwise
for ie, edge in enumerate(graph.edges()):
if find_pair(path[1], edge[0], edge[1]):
inc[index, ie] = 1
else:
inc[index, ie] = 0
index += 1
np.fill_diagonal(dist, 0.0)
return dist, inc
def find_pair(lst, x, y):
"""
Check if two elements are consecutive in a list, irrespective of their
order.
Args:
lst (list): The list to search for the pair.
x (Any): The first element of the pair.
y (Any): The second element of the pair.
Returns:
bool: True if the elements are consecutive in the list, False otherwise
"""
if x not in lst or y not in lst:
return False
elif abs(lst.index(x) - lst.index(y)) == 1:
return True
else:
return False
def nCr(n, k):
"""
Calculate the number of combinations, n choose k.
Args:
n (int): The number of elements.
k (int): The number of elements to choose.
Returns:
int: The number of possible combinations.
"""
f = math.factorial
return f(n) // f(k) // f(n - k)
def path_edge_attributes(graph, path, attribute):
"""
Get the attribute values for edges in a given path.
Args:
graph (NetworkX Graph): The graph containing the edges.
path (list): The list of nodes forming the path.
attribute (str): The edge attribute to get the values for.
Returns:
list: A list of attribute values for the edges in the path.
"""
return [graph[u][v][attribute] for (u, v) in zip(path, path[1:])]
def path_subgraph(graph, nodes, method, id_col, len_col):
"""
Find and extract paths between points from a graph.
Args:
graph (NetworkX Graph): The input graph.
nodes (dict): A dictionary of nodes to extract paths between.
method (callable): The method to build the subgraph.
id_col (str): The column name for node ID.
len_col (str): The column name for edge length.
Returns:
NetworkX Graph: A subgraph containing the extracted paths.
"""
k = nx.Graph()
def dijkstra_weight(left, right, attributes):
# Calculates weights for Dijkstra's shortest path algorithm by
# inverting the edge length with a small constant to avoid division by
# zero.
epsilon = 1e-9
return 1 / (attributes[len_col] + epsilon)
p1 = list(nodes.values())[0]
for p2 in list(nodes.values())[1:]:
try:
# Find the shortest path between the two points
path = nx.bidirectional_dijkstra(graph, p1, p2,
weight=dijkstra_weight)
# Traverse the nodes in the path to build a minimal set of edges
method(k, graph, nodes.values(), id_col, len_col, path[1])
if p1 not in k:
k.add_node(p1)
if p2 not in k:
k.add_node(p2)
except NodeNotFound as e:
print("Node not found:", e)
except Exception as e:
traceback.print_exc()
print("Something unexpected happened:", e)
sys.exit(1)
return k
def extract_full_subgraph(subgraph, graph, nodelist, id_col, len_col, path):
"""
Extracts the full subgraph from the given nodes.
Args:
subgraph (NetworkX Graph): The subgraph to be modified.
graph (NetworkX Graph): The input graph.
nodelist (list): The list of nodes.
id_col (str): The column name for node ID.
len_col (str): The column name for edge length.
path (list): The path between nodes.
"""
# Iterate through the nodes in the path
for first, second in zip(path, path[1:]):
# Add nodes to the subgraph if they are not already present
if first not in subgraph:
subgraph.add_node(first)
if second not in subgraph:
subgraph.add_node(second)
# Add the edge between the nodes with the corresponding edge data
dat = graph.get_edge_data(first, second)
subgraph.add_edge(first, second, **dat)
def extract_minimal_subgraph(subgraph, graph, nodelist, id_col, len_col, path):
"""
Extracts a simplified subgraph from paths, keeping only terminal and
junction nodes.
Args:
subgraph (NetworkX Graph): The subgraph to be modified.
graph (NetworkX Graph): The input graph.
nodelist (list): The list of nodes.
id_col (str): The column name for edge ID.
len_col (str): The column name for edge length.
path (list): The path between nodes.
"""
curr_edge = {id_col: list(), len_col: 0.0}
curr_start = None
# Iterate through each pair of nodes in the path
for first, second in zip(path, path[1:]):
if not curr_start:
curr_start = first
if first in nodelist or len(graph[first]) > 2:
subgraph.add_node(first)
# Add path attributes to current edge
dat = graph.get_edge_data(first, second)
curr_edge[id_col].extend([dat[id_col]] if not
isinstance(dat[id_col], list) else
dat[id_col])
curr_edge[len_col] = float(curr_edge[len_col]) + float(dat[len_col])
# If the second node is a STOP node (in nodelist or is a junction)
if second in nodelist or len(graph[second]) > 2:
# Add node to subgraph
subgraph.add_node(second)
# Link current attribute data
subgraph.add_edge(curr_start, second, **curr_edge)
# Empty edge attributes and set current second to curr_start
curr_edge = {id_col: list(), len_col: 0}
curr_start = second
else:
# Otherwise, continue building the current edge
continue
def snap_to_node(graph, pos):
"""
Finds the closest node to the given [x, y] coordinates in the graph.
Args:
graph (NetworkX Graph): The input graph.
pos (tuple): A tuple of [x, y] coordinates.
Returns:
tuple: The closest node to the input coordinates.
"""
nodes = np.array(graph.nodes())
node_pos = np.argmin(np.sum((nodes - pos) ** 2, axis=1))
return tuple(nodes[node_pos])
def write_geodataframe(gdf, output_prefix, output_driver):
gpd.options.io_engine = "pyogrio"
extension = {
"SHP": ".shp",
"GPKG": ".gpkg",
"GDB": ".gdb"
}.get(output_driver.upper(), ".gpkg") # Default to .gpkg
if output_driver.upper() == "SHP":
output_driver = "ESRI Shapefile"
output_path = f"{output_prefix}{extension}"
if output_driver == 'GDB' and not os.path.exists(output_path):
os.makedirs(output_path)
gdf.to_file(output_path, driver=output_driver.upper())
Functions
def block_print()
-
Disables standard output by redirecting it to a null device, effectively blocking any print statements.
Expand source code
def block_print(): """ Disables standard output by redirecting it to a null device, effectively blocking any print statements. """ sys.stdout = open(os.devnull, 'w')
def custom_warn_handler(message, category, filename, lineno, file=None, line=None)
-
Expand source code
def custom_warn_handler(message, category, filename, lineno, file=None, line=None): if "momepy/utils.py" in filename and issubclass(category, UserWarning): return return original_showwarning(message, category, filename, lineno, file, line)
def enable_print()
-
Restores standard output to its original state, allowing print statements to be displayed again.
Expand source code
def enable_print(): """ Restores standard output to its original state, allowing print statements to be displayed again. """ sys.stdout = sys.__stdout__
def extract_full_subgraph(subgraph, graph, nodelist, id_col, len_col, path)
-
Extracts the full subgraph from the given nodes.
Args
subgraph
:NetworkX Graph
- The subgraph to be modified.
graph
:NetworkX Graph
- The input graph.
nodelist
:list
- The list of nodes.
id_col
:str
- The column name for node ID.
len_col
:str
- The column name for edge length.
path
:list
- The path between nodes.
Expand source code
def extract_full_subgraph(subgraph, graph, nodelist, id_col, len_col, path): """ Extracts the full subgraph from the given nodes. Args: subgraph (NetworkX Graph): The subgraph to be modified. graph (NetworkX Graph): The input graph. nodelist (list): The list of nodes. id_col (str): The column name for node ID. len_col (str): The column name for edge length. path (list): The path between nodes. """ # Iterate through the nodes in the path for first, second in zip(path, path[1:]): # Add nodes to the subgraph if they are not already present if first not in subgraph: subgraph.add_node(first) if second not in subgraph: subgraph.add_node(second) # Add the edge between the nodes with the corresponding edge data dat = graph.get_edge_data(first, second) subgraph.add_edge(first, second, **dat)
def extract_minimal_subgraph(subgraph, graph, nodelist, id_col, len_col, path)
-
Extracts a simplified subgraph from paths, keeping only terminal and junction nodes.
Args
subgraph
:NetworkX Graph
- The subgraph to be modified.
graph
:NetworkX Graph
- The input graph.
nodelist
:list
- The list of nodes.
id_col
:str
- The column name for edge ID.
len_col
:str
- The column name for edge length.
path
:list
- The path between nodes.
Expand source code
def extract_minimal_subgraph(subgraph, graph, nodelist, id_col, len_col, path): """ Extracts a simplified subgraph from paths, keeping only terminal and junction nodes. Args: subgraph (NetworkX Graph): The subgraph to be modified. graph (NetworkX Graph): The input graph. nodelist (list): The list of nodes. id_col (str): The column name for edge ID. len_col (str): The column name for edge length. path (list): The path between nodes. """ curr_edge = {id_col: list(), len_col: 0.0} curr_start = None # Iterate through each pair of nodes in the path for first, second in zip(path, path[1:]): if not curr_start: curr_start = first if first in nodelist or len(graph[first]) > 2: subgraph.add_node(first) # Add path attributes to current edge dat = graph.get_edge_data(first, second) curr_edge[id_col].extend([dat[id_col]] if not isinstance(dat[id_col], list) else dat[id_col]) curr_edge[len_col] = float(curr_edge[len_col]) + float(dat[len_col]) # If the second node is a STOP node (in nodelist or is a junction) if second in nodelist or len(graph[second]) > 2: # Add node to subgraph subgraph.add_node(second) # Link current attribute data subgraph.add_edge(curr_start, second, **curr_edge) # Empty edge attributes and set current second to curr_start curr_edge = {id_col: list(), len_col: 0} curr_start = second else: # Otherwise, continue building the current edge continue
def find_pair(lst, x, y)
-
Check if two elements are consecutive in a list, irrespective of their order.
Args
lst
:list
- The list to search for the pair.
x
:Any
- The first element of the pair.
y
:Any
- The second element of the pair.
Returns
bool
- True if the elements are consecutive in the list, False otherwise
Expand source code
def find_pair(lst, x, y): """ Check if two elements are consecutive in a list, irrespective of their order. Args: lst (list): The list to search for the pair. x (Any): The first element of the pair. y (Any): The second element of the pair. Returns: bool: True if the elements are consecutive in the list, False otherwise """ if x not in lst or y not in lst: return False elif abs(lst.index(x) - lst.index(y)) == 1: return True else: return False
def fit_least_squares_distances(D, X, iterative, out, weight='CSE67')
-
Computes least-squares branch lengths from a vector of genetic distances D and incidence matrix X. When iterative=True, negative distances are constrained to 0 and then recomputed.
Args
D
:numpy.ndarray
- Vector of genetic distances.
X
:numpy.ndarray
- Incidence matrix.
iterative
:bool
- Whether to use an iterative approach to constrain negative distances.
out
:str
- Output file prefix.
weight
- Weight type. Defaults to CSE67.
Returns
numpy.ndarray
- Least-squared optimized distances.
Expand source code
def fit_least_squares_distances(D, X, iterative, out, weight="CSE67"): """ Computes least-squares branch lengths from a vector of genetic distances D and incidence matrix X. When iterative=True, negative distances are constrained to 0 and then recomputed. Args: D (numpy.ndarray): Vector of genetic distances. X (numpy.ndarray): Incidence matrix. iterative (bool): Whether to use an iterative approach to constrain negative distances. out (str): Output file prefix. weight: Weight type. Defaults to CSE67. Returns: numpy.ndarray: Least-squared optimized distances. """ num_segments = (np.size(X, 1)) ls = np.zeros(num_segments) d = vectorize_mat(D) # calculate weights matrix and write to file W = generate_weights_matrix(d, weight) print("Weights matrix:") print(W) # ofh=out+".weightsMatrix.txt" # np.savetxt(ofh, W, delimiter="\t") # weighted least-squares optimization ls = np.matmul(np.linalg.inv( np.matmul(np.matmul(X.transpose(), W), X)), np.matmul(np.matmul(X.transpose(), W), d)) if iterative: ls_old = ls if (np.count_nonzero(ls < 0.0) > 0): print("\nLS-optimized distances contain negative values: Using", "iterative approach to re-calculate...") constrains = list() # save indices of all constrained values # if negative distances, use iterative procedure to re-calculate while (np.count_nonzero(ls < 0.0) > 0): bad_ind = np.argmin(ls) constrains.append(bad_ind) # constrain to 0 by removing from incidence matrix X = np.delete(X, bad_ind, 1) # re-compute values ls = np.matmul(np.linalg.inv( np.matmul(np.matmul(X.transpose(), W), X)), np.matmul(np.matmul(X.transpose(), W), d)) for i in reversed(constrains): ls = np.insert(ls, i, 0.0) # print(ls) # write original and constrained results to log file ofh = out+".leastSquaresConstrained.txt" df = pd.DataFrame({'LS.original': ls_old, 'LS.constrained': ls}) df.to_csv(ofh, sep="\t", index=False) return ls else: return ls
def generate_weights_matrix(d, weight)
-
Generates a weights matrix for the least-squares method, where weights are on the diagonals.
Args
d
:numpy.ndarray
- Vector of genetic distances.
weight
:str
- Weighting method to use, options: 'CSE67', 'BEYER74', 'FM67'.
Returns
numpy.ndarray
- Weights matrix.
Expand source code
def generate_weights_matrix(d, weight): """ Generates a weights matrix for the least-squares method, where weights are on the diagonals. Args: d (numpy.ndarray): Vector of genetic distances. weight (str): Weighting method to use, options: 'CSE67', 'BEYER74', 'FM67'. Returns: numpy.ndarray: Weights matrix. """ W = np.zeros((len(d), len(d)), dtype=float) row, col = np.diag_indices(W.shape[0]) if weight.upper() == "CSE67": W[row, col] = np.ones(len(d)) elif weight.upper() == "BEYER74": if np.count_nonzero(d == 0) > 0: print( "WARNING: Divide-by-zero in weighted least-squares." ) W[row, col] = np.divide(1.0, d, out=np.zeros_like(d, dtype=float), where=d != 0) elif weight.upper() == "FM67": if np.count_nonzero(d == 0) > 0: print( "WARNING: Divide-by-zero in weighted least-squares." ) W[row, col] = np.divide(1.0, np.square(d), out=np.zeros_like(d, dtype=float), where=d != 0) else: print(f"ERROR: Weight option {weight} not recognized. Using ordinary", "least-squares instead.") W[row, col] = np.ones(len(d)) return W
def get_fitted_d(points, genmat, inc, r)
-
Calculates predicted genetic distances based on fitted streamtree distances.
Args
points
:dict
- A dictionary of points with their latitude and longitude values.
genmat
:ndarray
- A pairwise genetic distance matrix.
inc
:ndarray
- An incidence matrix representing the presence or absence of streams for each point.
r
:ndarray
- A fitted streamtree distance matrix.
Returns
pandas.DataFrame
- A dataframe with columns for 'from', 'to', 'observed_D', 'predicted_D', and 'abs_diff'.
Expand source code
def get_fitted_d(points, genmat, inc, r): """Calculates predicted genetic distances based on fitted streamtree distances. Args: points (dict): A dictionary of points with their latitude and longitude values. genmat (ndarray): A pairwise genetic distance matrix. inc (ndarray): An incidence matrix representing the presence or absence of streams for each point. r (ndarray): A fitted streamtree distance matrix. Returns: pandas.DataFrame: A dataframe with columns for 'from', 'to', 'observed_D', 'predicted_D', and 'abs_diff'. """ rows = [] # Initialize list to hold temporary values names = list(points.keys()) # Get names of points # Iterate over all pairwise combinations of points inc_row = 0 # tracks what ROW (pair) we are in the incidence matrix for ia, ib in itertools.combinations(range(0, len(points)), 2): obs = genmat[ia, ib] inc_streams = inc[inc_row,] pred_dist = np.sum(r[inc_streams == 1]) inc_row += 1 rows.append([names[ia], names[ib], obs, pred_dist, np.abs(obs-pred_dist)]) # Create dataframe from temporary list and return it D = pd.DataFrame(rows, columns=['from', 'to', 'observed_D', 'predicted_D', 'abs_diff']) return D
def get_gendist_mats(params, point_coords, popmap, seqs)
-
Returns population genetic distance matrices.
Args
params
:object
- An object that contains parameters for the analysis.
point_coords
:list
- A list of coordinates for the sampled points.
popmap
:dict
- A dictionary that maps each sample to its corresponding population.
seqs
:list
- A list of DNA sequences for the sampled points.
Returns
tuple
- A tuple containing two matrices. The first is a pairwise distance matrix for all
samples. The second is a matrix
ofpairwise genetic distances between
- populations.
Raises
SystemExit
- If distance metric is not possible without population data.
Expand source code
def get_gendist_mats(params, point_coords, popmap, seqs): """Returns population genetic distance matrices. Args: params (object): An object that contains parameters for the analysis. point_coords (list): A list of coordinates for the sampled points. popmap (dict): A dictionary that maps each sample to its corresponding population. seqs (list): A list of DNA sequences for the sampled points. Returns: tuple: A tuple containing two matrices. The first is a pairwise distance matrix for all samples. The second is a matrix of pairwise genetic distances between populations. Raises: SystemExit: If distance metric is not possible without population data. """ gen = None # Initialize variable to hold pairwise dist matrix pop_gen = None # Initialize variable to hold population dist matrix if params.dist in ["PDIST", "TN84", "TN93", "K2P", "JC69"]: # Calculate pairwise distance matrix using selected method gen = gendist.get_genmat(params.dist, point_coords, seqs, ploidy=params.ploidy, het=params.het, loc_agg=params.loc_agg) if params.pop or params.geopop or params.clusterpop: print("Aggregating pairwise population genetic distances from", "individual distances using:", params.pop_agg) else: # If distance metric requires population data, but none is provided if not params.pop and not params.geopop: print("ERROR: Distance metric", params.dist, "not possible without population data.") sys.exit(1) # Calculate population genetic distance matrix if params.pop or params.geopop or params.clusterpop: pop_gen = gendist.get_pop_genmat(params.dist, gen, popmap, point_coords, seqs, pop_agg=params.pop_agg, loc_agg=params.loc_agg, ploidy=params.ploidy, global_het=params.global_het) # Return pairwise distance matrix and population genetic distance matrix return gen, pop_gen
def get_loc_data(seqs)
-
Generator function that yields a dictionary of individual loci data for each locus in the input sequences.
Args
seqs
- A dictionary containing sequences as values and individual
identifiers as keys.
Returns
A generator that yields a dictionary with individual identifiers as
keys and a list containing the
- corresponding locus as the value.
Expand source code
def get_loc_data(seqs): """ Generator function that yields a dictionary of individual loci data for each locus in the input sequences. Args: seqs: A dictionary containing sequences as values and individual identifiers as keys. Returns: A generator that yields a dictionary with individual identifiers as keys and a list containing the corresponding locus as the value. """ # Iterate through the loci in the sequences for loc in range(0, len(seqs[list(seqs.keys())[0]])): d = dict() # Iterate through the individuals in the sequences for ind in seqs.keys(): # Add the locus data for the individual to the dictionary d[ind] = [seqs[ind][loc]] # Yield the dictionary containing the locus data for the current locus yield d
def get_lower_tri(mat)
-
Extracts the lower triangular elements from a square matrix.
Args
mat
:numpy.ndarray
- Input square matrix.
Returns
numpy.ndarray
- 1D array containing the lower triangular elements of the input matrix.
Expand source code
def get_lower_tri(mat): """ Extracts the lower triangular elements from a square matrix. Args: mat (numpy.ndarray): Input square matrix. Returns: numpy.ndarray: 1D array containing the lower triangular elements of the input matrix. """ n = mat.shape[0] i = np.tril_indices(n, -1) return mat[i]
def get_point_table(points)
-
Returns a pandas DataFrame from a dictionary of points.
Args
points
:dict
- A dictionary of points with their latitude and longitude values.
Returns
pandas.DataFrame
- A dataframe with columns for 'sample', 'lat', and 'long'.
Expand source code
def get_point_table(points): """Returns a pandas DataFrame from a dictionary of points. Args: points (dict): A dictionary of points with their latitude and longitude values. Returns: pandas.DataFrame: A dataframe with columns for 'sample', 'lat', and 'long'. """ temp = [] # Initialize list to hold temporary values for p in points: # Append values for each point to temporary list temp.append([p, points[p][1], points[p][0]]) p = pd.DataFrame(temp, columns=['sample', 'lat', 'long']) return p
def get_stream_mats(points, graph, len_col)
-
Computes pairwise stream distances and 0/1 incidence matrix for StreamTree calculations.
Args
points
:dict
- Dictionary of point indices and their corresponding node IDs in the graph.
graph
:networkx.Graph
- NetworkX graph object representing the stream network.
len_col
:str
- Attribute name for the length of the edges in the graph.
Returns
tuple
- Pair of numpy.ndarray representing the pairwise stream distance matrix and incidence matrix.
Expand source code
def get_stream_mats(points, graph, len_col): """ Computes pairwise stream distances and 0/1 incidence matrix for StreamTree calculations. Args: points (dict): Dictionary of point indices and their corresponding node IDs in the graph. graph (networkx.Graph): NetworkX graph object representing the stream network. len_col (str): Attribute name for the length of the edges in the graph. Returns: tuple: Pair of numpy.ndarray representing the pairwise stream distance matrix and incidence matrix. """ dist = np.zeros((len(points), len(points))) inc = np.zeros((nCr(len(points), 2), len(graph.edges())), dtype=int) # Establish as NaN dist[:] = np.nan def dijkstra_weight(left, right, attributes): # Calculates weights for Dijkstra's shortest path algorithm by # inverting the edge length with a small constant to avoid division by # zero. epsilon = 1e-9 return 1 / (attributes[len_col] + epsilon) index = 0 for ia, ib in itertools.combinations(range(0, len(points)), 2): path = nx.bidirectional_dijkstra(graph, points.values()[ia], points.values()[ib], weight=dijkstra_weight) if path: dist[ia, ib] = float(sum(path_edge_attributes(graph, path[1], len_col))) dist[ib, ia] = dist[ia, ib] # Incidence matrix: assign 1 if edge is in the path, 0 otherwise for ie, edge in enumerate(graph.edges()): if find_pair(path[1], edge[0], edge[1]): inc[index, ie] = 1 else: inc[index, ie] = 0 index += 1 np.fill_diagonal(dist, 0.0) return dist, inc
def great_circle(lon1, lat1, lon2, lat2, thresh=1e-07)
-
Calculates the great circle distance between two points on a sphere, using their longitudes and latitudes.
Args
lon1
:float
- Longitude of the first point.
lat1
:float
- Latitude of the first point.
lon2
:float
- Longitude of the second point.
lat2
:float
- Latitude of the second point.
thresh
:float
, optional- Threshold for determining if the points are the same. Defaults to 0.0000001.
Returns
float
- Great circle distance in kilometers.
Expand source code
def great_circle(lon1, lat1, lon2, lat2, thresh=0.0000001): """ Calculates the great circle distance between two points on a sphere, using their longitudes and latitudes. Args: lon1 (float): Longitude of the first point. lat1 (float): Latitude of the first point. lon2 (float): Longitude of the second point. lat2 (float): Latitude of the second point. thresh (float, optional): Threshold for determining if the points are the same. Defaults to 0.0000001. Returns: float: Great circle distance in kilometers. """ if (abs(lon1 - lon2)) < thresh and (abs(lat1 - lat2)) < thresh: return 0.0 else: lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2]) return 6371 * ( acos(sin(lat1) * sin(lat2) + cos(lat1) * cos(lat2) * cos(lon1 - lon2)) )
def nCr(n, k)
-
Calculate the number of combinations, n choose k.
Args
n
:int
- The number of elements.
k
:int
- The number of elements to choose.
Returns
int
- The number of possible combinations.
Expand source code
def nCr(n, k): """ Calculate the number of combinations, n choose k. Args: n (int): The number of elements. k (int): The number of elements to choose. Returns: int: The number of possible combinations. """ f = math.factorial return f(n) // f(k) // f(n - k)
def output_fitted_d(pred, out)
-
Expand source code
def output_fitted_d(pred, out): pred.to_csv((str(out)+".obsVersusFittedD.txt"), sep="\t", index=False) sns.jointplot(x="observed_D", y="predicted_D", data=pred, kind="reg") plt.savefig((str(out)+".obsVersusFittedD.pdf")) del pred
def parse_input_genmat(params, inmat, point_coords, popmap, seqs=None)
-
Parses an input genetic distance matrix and verifies if it matches the user input parameters. Aggregates individual distances if required by the user input.
Args
params
- Input parameters provided by the user.
inmat
:pd.DataFrame
- The input genetic distance matrix.
point_coords
:dict
- Dictionary containing point coordinates.
popmap
:dict
- Dictionary containing the population map.
Returns
tuple
- A tuple containing the genetic distance matrix (gen) and
population genetic distance matrix (pop_gen).
Expand source code
def parse_input_genmat(params, inmat, point_coords, popmap, seqs=None): """ Parses an input genetic distance matrix and verifies if it matches the user input parameters. Aggregates individual distances if required by the user input. Args: params: Input parameters provided by the user. inmat (pd.DataFrame): The input genetic distance matrix. point_coords (dict): Dictionary containing point coordinates. popmap (dict): Dictionary containing the population map. Returns: tuple: A tuple containing the genetic distance matrix (gen) and population genetic distance matrix (pop_gen). """ gen = None pop_gen = None if params.coercemat: inmat[inmat < 0.0] = 0.0 if set(list(inmat.columns.values)) != set(list(inmat.index.values)): print(inmat.columns.values) print(inmat.index.values) print("Input matrix columns and/ or rows don't appear to be", "labelled. Please provide an input matrix with column and row", "names!") sys.exit(1) else: agg = False # first check if it fits whatever the user input was (i.e. --pop) if params.pop: if len(inmat.columns) != len(popmap.keys()): print("Found", str(len(inmat.columns)), "columns in provided matrix. This doesn't match number", "of populations from popmap.") if (len(inmat.columns)) != len(point_coords): print("Doesn't match number of individuals either! Please", "check your matrix format.") sys.exit(1) else: print("Assuming input matrix has individual distances...", "Aggregating using the following (--pop_agg):", str(params.pop_agg)) agg = True else: # re-order using pop orders inmat = inmat.reindex(list(popmap.keys())) inmat = inmat[list(popmap.keys())] pop_gen = inmat.to_numpy() del inmat elif params.geopop or params.clusterpop: if (len(inmat.columns)) != len(point_coords): print("Found", str(len(inmat.columns)), "columns in provided matrix. This doesn't match number", "of individuals.") print("When using --geopop or --clusterpop, the provided", "matrix must represent individual-level distances.") sys.exit(1) else: # re-order using pop orders inmat = inmat.reindex(list(point_coords.keys())) inmat = inmat[list(point_coords.keys())] gen = inmat.to_numpy() agg = True del inmat else: if (len(inmat.columns)) != len(point_coords): print("Found", str(len(inmat.columns)), "columns in provided matrix. This doesn't match number", "of individuals.") sys.exit(1) else: # re-order using pop orders inmat = inmat.reindex(list(point_coords.keys())) inmat = inmat[list(point_coords.keys())] gen = inmat.to_numpy() del inmat # if --geopop or --clusterpop, it should be an ind matrix # if so, need to aggregate according to --pop_agg # print(pop_gen) if agg: print("Aggregating user-provided individual-level distance matrix", "using:", params.pop_agg) pop_gen = gendist.get_pop_genmat("PDIST", gen, popmap, point_coords, seqs, pop_agg=params.pop_agg, loc_agg=params.loc_agg, ploidy=params.ploidy, global_het=params.global_het) return (gen, pop_gen)
def parse_subgraph_from_points(params, point_coords, pop_coords, G)
-
Extracts a subgraph from a given graph based on input points.
Args
params
- A custom object containing various input parameters.
point_coords
- A list of point coordinates to use for extracting the subgraph.
pop_coords
- A list of population coordinates to use for extracting the subgraph.
G
- A NetworkX Graph object representing the input graph.
Returns
A NetworkX Graph object representing the extracted subgraph.
Expand source code
def parse_subgraph_from_points(params, point_coords, pop_coords, G): """ Extracts a subgraph from a given graph based on input points. Args: params: A custom object containing various input parameters. point_coords: A list of point coordinates to use for extracting the subgraph. pop_coords: A list of population coordinates to use for extracting the subgraph. G: A NetworkX Graph object representing the input graph. Returns: A NetworkX Graph object representing the extracted subgraph. """ # Choose the appropriate set of points based on input parameters if params.pop or params.geopop or params.clusterpop: points = pop_coords else: points = point_coords # Output points to a table p = get_point_table(points) p.to_csv((str(params.out) + ".pointCoords.txt"), sep="\t", index=False) del p # First pass extracts a subgraph from the master shapefile graph print("\nExtracting full subgraph...") ktemp = path_subgraph(G, points, extract_full_subgraph, params.reachid_col, params.length_col) del G # Second pass to simplify subgraph and collapse redundant nodes print("\nMerging redundant paths...\n") K = path_subgraph(ktemp, points, extract_minimal_subgraph, params.reachid_col, params.length_col) # Grab real coordinates as node positions for plotting pos = {n: n for n in K.nodes} # Make a color map to color sample points and junctions differently color_map = ["blue" if node in points.values() else "black" for node in K] # Draw networkx nx.draw_networkx(K, pos, with_labels=False, node_color=color_map, node_size=50) # Get LENGTH_KM attributes for labelling edges edge_labels = nx.get_edge_attributes(K, params.length_col) for e in edge_labels: edge_labels[e] = "{:.2f}".format(edge_labels[e]) nx.draw_networkx_edge_labels(K, pos, edge_labels=edge_labels, font_size=6) # Save minimized network to file (unless we already read from one) if not params.network or params.overwrite: net_out = str(params.out) + ".network" with open(net_out, 'wb') as f: pickle.dump(K, f, pickle.HIGHEST_PROTOCOL) net_full_out = str(params.out) + ".full.network" with open(net_full_out, 'wb') as f: pickle.dump(ktemp, f, pickle.HIGHEST_PROTOCOL) else: print( "NOTE: Not over-writing existing network. To change this, use", "--overwrite" ) network_plot = str(params.out) + ".subGraph.pdf" plt.savefig(network_plot) del ktemp return K
def path_edge_attributes(graph, path, attribute)
-
Get the attribute values for edges in a given path.
Args
graph
:NetworkX Graph
- The graph containing the edges.
path
:list
- The list of nodes forming the path.
attribute
:str
- The edge attribute to get the values for.
Returns
list
- A list of attribute values for the edges in the path.
Expand source code
def path_edge_attributes(graph, path, attribute): """ Get the attribute values for edges in a given path. Args: graph (NetworkX Graph): The graph containing the edges. path (list): The list of nodes forming the path. attribute (str): The edge attribute to get the values for. Returns: list: A list of attribute values for the edges in the path. """ return [graph[u][v][attribute] for (u, v) in zip(path, path[1:])]
def path_subgraph(graph, nodes, method, id_col, len_col)
-
Find and extract paths between points from a graph.
Args
graph
:NetworkX Graph
- The input graph.
nodes
:dict
- A dictionary of nodes to extract paths between.
method
:callable
- The method to build the subgraph.
id_col
:str
- The column name for node ID.
len_col
:str
- The column name for edge length.
Returns
NetworkX Graph
- A subgraph containing the extracted paths.
Expand source code
def path_subgraph(graph, nodes, method, id_col, len_col): """ Find and extract paths between points from a graph. Args: graph (NetworkX Graph): The input graph. nodes (dict): A dictionary of nodes to extract paths between. method (callable): The method to build the subgraph. id_col (str): The column name for node ID. len_col (str): The column name for edge length. Returns: NetworkX Graph: A subgraph containing the extracted paths. """ k = nx.Graph() def dijkstra_weight(left, right, attributes): # Calculates weights for Dijkstra's shortest path algorithm by # inverting the edge length with a small constant to avoid division by # zero. epsilon = 1e-9 return 1 / (attributes[len_col] + epsilon) p1 = list(nodes.values())[0] for p2 in list(nodes.values())[1:]: try: # Find the shortest path between the two points path = nx.bidirectional_dijkstra(graph, p1, p2, weight=dijkstra_weight) # Traverse the nodes in the path to build a minimal set of edges method(k, graph, nodes.values(), id_col, len_col, path[1]) if p1 not in k: k.add_node(p1) if p2 not in k: k.add_node(p2) except NodeNotFound as e: print("Node not found:", e) except Exception as e: traceback.print_exc() print("Something unexpected happened:", e) sys.exit(1) return k
def plot_gen_by_geo(gen, sdist, out, log=False)
-
Plots genetic distance against geographic distance to visualize isolation by distance.
Args
gen
:numpy.ndarray
- Genetic distance matrix.
sdist
:numpy.ndarray
- Spatial distance matrix.
out
:str
- Output file prefix for the generated plot.
log
:bool
, optional- If True, the geographic distance axis will be log-transformed. Defaults to False.
Expand source code
def plot_gen_by_geo(gen, sdist, out, log=False): """ Plots genetic distance against geographic distance to visualize isolation by distance. Args: gen (numpy.ndarray): Genetic distance matrix. sdist (numpy.ndarray): Spatial distance matrix. out (str): Output file prefix for the generated plot. log (bool, optional): If True, the geographic distance axis will be log-transformed. Defaults to False. """ genetic_distance = get_lower_tri(gen) geographic_distance = get_lower_tri(sdist) data = pd.DataFrame({'Geographic Distance': geographic_distance, 'Genetic Distance': genetic_distance}) if not log: sns.jointplot(data=data, x='Geographic Distance', y='Genetic Distance', kind="reg") plt.savefig(str(out) + ".isolationByDistance.pdf") else: geographic_distance = replace_zeroes(geographic_distance) log_geo = np.log(geographic_distance) data['Log Geographic Distance'] = log_geo sns.jointplot(data=data, x='Log Geographic Distance', y='Genetic Distance', kind="reg") plt.savefig(str(out) + ".isolationByDistance.pdf") del geographic_distance del genetic_distance
def process_samples(params, points, G)
-
Processes input sample data by snapping points to a graph, calculating coordinates, and processing populations if required.
Args
params
- Input parameters provided by the user.
points
:pd.DataFrame
- DataFrame containing sample points.
G
:networkx.Graph
- Graph object representing the road network.
Returns
tuple
- A tuple containing point coordinates, population coordinates,
and the population map.
Expand source code
def process_samples(params, points, G): """ Processes input sample data by snapping points to a graph, calculating coordinates, and processing populations if required. Args: params: Input parameters provided by the user. points (pd.DataFrame): DataFrame containing sample points. G (networkx.Graph): Graph object representing the road network. Returns: tuple: A tuple containing point coordinates, population coordinates, and the population map. """ popmap = SortedDict() point_coords = SortedDict() pop_coords = SortedDict() snapDists = dict() if params.pop: popmap_temp = read_popmap(params.pop) mask = points[points.columns[0]].isin(popmap_temp) points = points[mask] for _, row in points.iterrows(): name = None data = None row["lat"] = float(row["lat"]) row["long"] = float(row["long"]) if params.run == "GENDIST": name = row["sample"] data = tuple([row["long"], row["lat"]]) else: if not params.pop and not params.clusterpop: # --geopop and individual-level snap coordinates to nodes here node = snap_to_node(G, tuple([row["long"], row["lat"]])) snapDists[row["sample"]] = great_circle(node[0], node[1], row["long"], row["lat"]) else: # if pop or clusterpop, extract centroid later node = tuple([row["long"], row["lat"]]) data = node name = row["sample"] point_coords[name] = data # Process population-level analyses if params.geopop: if point_coords[name] not in popmap: names = [name] popmap[point_coords[name]] = names else: popmap[point_coords[name]].append(row["sample"]) elif params.pop: if popmap_temp[row.iloc[0]] not in popmap: names = [name] popmap[popmap_temp[row.iloc[0]]] = names else: popmap[popmap_temp[row.iloc[0]]].append(name) print("Read", str(len(point_coords.keys())), "individuals.") print() if params.pop or params.geopop: print("Read", str(len(popmap.keys())), "populations.") print() # For population-level analyses, generate population maps and centroids # here according to user-input options: --pop, --geopop, --clusterpop # get population centroid if params.pop or params.geopop or params.clusterpop: if params.clusterpop: # create population clusters using DBSCAN print("Running DBSCAN clustering with min_samples=", params.min_samples, "and epsilon=", params.epsilon) popmap = clust.dbscan_cluster(point_coords, params.epsilon, params.min_samples) num_clusters = len(popmap.keys()) print("Found", str(num_clusters), "clusters!") # calculate centroids for clusters pop_temp = clust.get_cluster_centroid(point_coords, popmap, params.out) elif params.pop or params.geopop: # popmap generated earlier when parsing input file! # still need to calculate centroids: print("Calculating population centroids...") pop_temp = clust.get_cluster_centroid(point_coords, popmap, params.out) # note in the case of --geopop the centroid is the joint # snapped-to location # now, snap pop_coords to nodes pop_coords = SortedDict() for p in pop_temp: node = snap_to_node(G, pop_temp[p]) snapDists[p] = great_circle(node[0], node[1], pop_temp[p][0], pop_temp[p][1]) pop_coords[p] = node # write popmap to file flat = clust.flatten_popmap(popmap) temp = pd.DataFrame(list(flat.items()), columns=['IND_ID', 'POP_ID']) temp.to_csv((str(params.out) + ".popmap.txt"), sep="\t", index=False) del flat del temp # plot grouped samples clust.plot_clustered_points(point_coords, popmap, params.out, pop_coords) # Plot histogram of snap distances and write snap distances to a file clust.plot_histogram(list(snapDists.values()), params.out) dtemp = pd.DataFrame(list(snapDists.items()), columns=['name', 'km']) dtout = str(params.out) + ".snapDistances.txt" dtemp.to_csv(dtout, sep="\t", index=False) del dtemp del dtout del snapDists # return everything return point_coords, pop_coords, popmap
def prune_graph(G, edge_list, reachid_col)
-
Prunes a graph to only retain edges whose 'reachid_col' matches values in 'edge_list'.
Args
G
- The input NetworkX Graph.
edge_list
- A list of values to filter edges.
reachid_col
- The edge attribute to be checked against 'edge_list'.
Returns
A pruned NetworkX Graph.
Expand source code
def prune_graph(G, edge_list, reachid_col): """ Prunes a graph to only retain edges whose 'reachid_col' matches values in 'edge_list'. Args: G: The input NetworkX Graph. edge_list: A list of values to filter edges. reachid_col: The edge attribute to be checked against 'edge_list'. Returns: A pruned NetworkX Graph. """ for u, v in G.edges(): _ = G.get_edge_data(u, v) # print(f"Edge ({u}, {v}): {data}") # Get edges to be retained using get_edge_data method edges_to_keep = [(u, v) for u, v in G.edges() if G.get_edge_data(u, v).get(reachid_col) in edge_list] # Check if there are no edges to keep if not edges_to_keep: raise ValueError( "There are no edges to retain based on the given edge", "list and attribute." ) # Create a new graph with only the edges to be retained pruned_G = G.edge_subgraph(edges_to_keep).copy() # Remove isolated nodes isolated_nodes = list(nx.isolates(pruned_G)) pruned_G.remove_nodes_from(isolated_nodes) return pruned_G
def r2(x, y)
-
Returns the Pearson correlation coefficient squared between two arrays.
Args
x
:array
- An array of values.
y
:array
- An array of values.
Returns
float
- The squared Pearson correlation coefficient between the two arrays.
Expand source code
def r2(x, y): """Returns the Pearson correlation coefficient squared between two arrays. Args: x (array): An array of values. y (array): An array of values. Returns: float: The squared Pearson correlation coefficient between the two arrays. """ return (stats.pearsonr(x, y)[0] ** 2)
def read_network(network, shapefile)
-
Reads a network from a saved file or builds a network from a shapefile.
Args
network
- Path to the saved network file (pickle format). If provided, the function will read the network from this file.
shapefile
- Path to the shapefile to build the network from. This is
used if the
network
parameter is not provided.
Returns
A NetworkX Graph object representing the network.
Expand source code
def read_network(network, shapefile): """ Reads a network from a saved file or builds a network from a shapefile. Args: network: Path to the saved network file (pickle format). If provided, the function will read the network from this file. shapefile: Path to the shapefile to build the network from. This is used if the `network` parameter is not provided. Returns: A NetworkX Graph object representing the network. """ # Check if a saved network file is provided if network: print("Reading network from saved file: ", network) # Read the network from the saved file and convert it to an undirected # graph with open(network, 'rb') as f: G = nx.Graph(pickle.load(f)).to_undirected() else: # If no saved network file is provided, build the network from the # shapefile print("Building network from shapefile:", shapefile) print("WARNING: This can take a while with very large files!") # Read the shapefile # rivers = gpd.read_file(shapefile) rivers = pyogrio.read_dataframe(shapefile) # print(rivers.head()) # Convert the GeoDataFrame to a NetworkX Graph object G = momepy.gdf_to_nx(rivers, approach="primal", directed=False, multigraph=False) return G
def read_popmap(popmap)
-
Reads a population map file and returns a dictionary with individuals as keys and populations as values.
Args
popmap
:str
- Path to the population map file.
Returns
dict
- A dictionary with individuals as keys and populations as values.
Expand source code
def read_popmap(popmap): """ Reads a population map file and returns a dictionary with individuals as keys and populations as values. Args: popmap (str): Path to the population map file. Returns: dict: A dictionary with individuals as keys and populations as values. """ popdict = dict() with open(popmap, "r") as fin: for line in fin: line = line.strip() if not line: continue cols = line.split() ind = cols[0] pop = cols[1] popdict[ind] = pop return popdict
def read_vcf(vcf, concat='none', popmap=None)
-
Reads a VCF file and returns a dictionary of sample genotypes.
Args
vcf
- Path to the input VCF file.
concat
- Specifies the concatenation mode for genotypes. Options are "all", "loc", and "none". "all": Concatenate genotypes of all loci for each sample. "loc": Concatenate genotypes within the same chromosome for each sample. "none": Do not concatenate genotypes.
popmap
- Optional dictionary that maps populations to a list of samples. If provided, only samples in the popmap will be retained in the output dictionary.
Returns
A dictionary with sample names as keys and lists
ofgenotypes as values
Expand source code
def read_vcf(vcf, concat="none", popmap=None): """ Reads a VCF file and returns a dictionary of sample genotypes. ARgs: vcf: Path to the input VCF file. concat: Specifies the concatenation mode for genotypes. Options are "all", "loc", and "none". "all": Concatenate genotypes of all loci for each sample. "loc": Concatenate genotypes within the same chromosome for each sample. "none": Do not concatenate genotypes. popmap: Optional dictionary that maps populations to a list of samples. If provided, only samples in the popmap will be retained in the output dictionary. Returns: A dictionary with sample names as keys and lists of genotypes as values """ bcf_in = VariantFile(vcf) # get all samples in the VCF vcf_samples = list(bcf_in.header.samples) # set up data dict dat = dict() samples = list((bcf_in.header.samples)) for s in samples: if concat == "all": dat[s] = list() dat[s].append(["", ""]) else: dat[s] = list() # if popmap, make list of samples to drop that aren't in a pop if popmap: keep = list() for pop in popmap: keep.extend(popmap[pop]) keep = [s for s in keep if s in vcf_samples] bcf_in.subset_samples(keep) chrom = "FIRST" for record in bcf_in.fetch(): for i, sample in enumerate(record.samples): if concat == "all": loc = seq.decode(record.samples[i]['GT'], record.ref, record.alts, as_list=True) dat[sample][-1][0] = dat[sample][-1][0]+loc[0] dat[sample][-1][1] = dat[sample][-1][1]+loc[1] elif concat == "loc": if record.chrom != chrom: dat[sample].append(["", ""]) loc = seq.decode(record.samples[i]['GT'], record.ref, record.alts, as_list=True) dat[sample][-1][0] = dat[sample][-1][0]+loc[0] dat[sample][-1][1] = dat[sample][-1][1]+loc[1] else: loc = seq.decode(record.samples[i]['GT'], record.ref, record.alts) dat[sample].append(loc) chrom = record.chrom if concat != "none": for sample in dat: dat[sample] = ["/".join(x) for x in dat[sample]] for sample in list(dat.keys()): if len(dat[sample]) < 1: del dat[sample] elif len(dat[sample]) == 1 and dat[sample][0][0] == "": del dat[sample] return dat
def replace_zeroes(data)
-
Replaces zeroes in the input array with the smallest non-zero value.
Args
data
:numpy.ndarray
- Input array.
Returns
numpy.ndarray
- Array with zeroes replaced by the smallest non-zero value.
Expand source code
def replace_zeroes(data): """ Replaces zeroes in the input array with the smallest non-zero value. Args: data (numpy.ndarray): Input array. Returns: numpy.ndarray: Array with zeroes replaced by the smallest non-zero value. """ min_nonzero = np.min(data[np.nonzero(data)]) data[data == 0] = min_nonzero return data
def report_genmats(params, gen, pop_gen, point_coords, pop_coords)
-
Prints genetic distance matrices and writes them to files.
Args
params
- A custom object containing various input parameters.
gen
- A NumPy array representing the individual genetic distance matrix.
pop_gen
- A NumPy array representing the population genetic distance matrix.
point_coords
- A dictionary containing individual point coordinates.
pop_coords
- A dictionary containing population point coordinates.
Expand source code
def report_genmats(params, gen, pop_gen, point_coords, pop_coords): """ Prints genetic distance matrices and writes them to files. ARgs: params: A custom object containing various input parameters. gen: A NumPy array representing the individual genetic distance matrix. pop_gen: A NumPy array representing the population genetic distance matrix. point_coords: A dictionary containing individual point coordinates. pop_coords: A dictionary containing population point coordinates. """ # If the individual genetic distance matrix is not None, print and write # it to file if gen is not None: print("Genetic distances:") np.set_printoptions(precision=3) print(gen, "\n") # Write individual genetic distances to file ind_genDF = pd.DataFrame(gen, columns=list(point_coords.keys()), index=list(point_coords.keys())) ind_genDF.to_csv((str(params.out) + ".indGenDistMat.txt"), sep="\t", index=True) # If the population genetic distance matrix is not None, print and write # it to file if pop_gen is not None: print("Population genetic distances:") np.set_printoptions(precision=3) print(pop_gen, "\n") # Write population genetic distances to file pop_genDF = pd.DataFrame(pop_gen, columns=list(pop_coords.keys()), index=list(pop_coords.keys())) pop_genDF.to_csv((str(params.out) + ".popGenDistMat.txt"), sep="\t", index=True) del pop_genDF
def report_genmats_list(params, genlist, popgenlist, point_coords, pop_coords)
-
Writes individual and population genetic distance matrices to files for each locus in genlist and popgenlist.
Args
params
- A namespace object containing parameters, including the output directory.
genlist
- A list of individual genetic distance matrices for each locus.
popgenlist
- A list of population genetic distance matrices for each locus.
point_coords
- A dictionary containing individual point coordinates.
pop_coords
- A dictionary containing population point coordinates.
Expand source code
def report_genmats_list(params, genlist, popgenlist, point_coords, pop_coords): """ Writes individual and population genetic distance matrices to files for each locus in genlist and popgenlist. Args: params: A namespace object containing parameters, including the output directory. genlist: A list of individual genetic distance matrices for each locus. popgenlist: A list of population genetic distance matrices for each locus. point_coords: A dictionary containing individual point coordinates. pop_coords: A dictionary containing population point coordinates. """ # Create an output directory for the genetic distance matrices dir = str(params.out) + "_locmats" os.makedirs(dir, exist_ok=True) i = 0 # Iterate through the individual genetic distance matrices for gen in genlist: if gen is not None: # Write individual genetic distances to a file ind_genDF = pd.DataFrame(gen, columns=list(point_coords.keys()), index=list(point_coords.keys())) ind_genDF.to_csv((str(dir) + "/loc_" + str(i) + ".indGenDistMat.txt"), sep="\t", index=True) del ind_genDF i += 1 j = 0 # Iterate through the population genetic distance matrices for pop_gen in popgenlist: if pop_gen is not None: # Write population genetic distances to a file pop_genDF = pd.DataFrame(pop_gen, columns=list(pop_coords.keys()), index=list(pop_coords.keys())) pop_genDF.to_csv((str(dir) + "/loc_" + str(j) + ".popGenDistMat.txt"), sep="\t", index=True) del pop_genDF j += 1
def snap_to_node(graph, pos)
-
Finds the closest node to the given [x, y] coordinates in the graph.
Args
graph
:NetworkX Graph
- The input graph.
pos
:tuple
- A tuple of [x, y] coordinates.
Returns
tuple
- The closest node to the input coordinates.
Expand source code
def snap_to_node(graph, pos): """ Finds the closest node to the given [x, y] coordinates in the graph. Args: graph (NetworkX Graph): The input graph. pos (tuple): A tuple of [x, y] coordinates. Returns: tuple: The closest node to the input coordinates. """ nodes = np.array(graph.nodes()) node_pos = np.argmin(np.sum((nodes - pos) ** 2, axis=1)) return tuple(nodes[node_pos])
def test_ibd(gen, geo, out, perms, log=False)
-
Expand source code
def test_ibd(gen, geo, out, perms, log=False): # get flattened lower triangle of each matrix gen = get_lower_tri(gen) geo = get_lower_tri(geo) if log is True: geo = replace_zeroes(geo) geo = np.log(geo) # non-log pearson res = mantel.test(geo, gen, perms=int(perms), method='pearson') rows = list() rows.append(['genXgeo', 'pearson', str(perms), res.r, res.p, res.z]) # non-log spearman res = mantel.test(geo, gen, perms=int(perms), method='spearman') rows.append(['genXgeo', 'spearman', str(perms), res.r, res.p, res.z]) ibd = pd.DataFrame(rows, columns=['test', 'method', 'perms', 'r', 'p', 'z']) print("Mantel test results:") print(ibd) ibd.to_csv((str(out) + ".isolationByDistance.txt"), sep="\t", index=False) print()
def vectorize_mat(mat)
-
Converts a pairwise matrix to a 1D vector.
Args
mat
:numpy.ndarray
- Pairwise matrix.
Returns
numpy.ndarray
- 1D vector of matrix elements.
Expand source code
def vectorize_mat(mat): """ Converts a pairwise matrix to a 1D vector. Args: mat (numpy.ndarray): Pairwise matrix. Returns: numpy.ndarray: 1D vector of matrix elements. """ size = nCr(np.size(mat, 0), 2) vec = np.zeros(size) index = 0 for ia, ib in itertools.combinations(range(0, np.size(mat, 0)), 2): vec[index] = mat[ia, ib] index += 1 return vec
def write_geodataframe(gdf, output_prefix, output_driver)
-
Expand source code
def write_geodataframe(gdf, output_prefix, output_driver): gpd.options.io_engine = "pyogrio" extension = { "SHP": ".shp", "GPKG": ".gpkg", "GDB": ".gdb" }.get(output_driver.upper(), ".gpkg") # Default to .gpkg if output_driver.upper() == "SHP": output_driver = "ESRI Shapefile" output_path = f"{output_prefix}{extension}" if output_driver == 'GDB' and not os.path.exists(output_path): os.makedirs(output_path) gdf.to_file(output_path, driver=output_driver.upper())