Module autostreamtree.genetic_distances

Expand source code
import sys
import itertools
import numpy as np

import autostreamtree.aggregators as agg
import autostreamtree.sequence as seq


"""
Note several distance calculations not currently in use, I plan to expand this
further in the future, but for now only the following are supported:
- p distances
- Weir and Cockerham's Fst (linearised or non-linearised)
- Jost's D
- Chord distance
"""


def get_pop_genmat(dist, indmat, popmap, dat, seqs, pop_agg="ARITH",
                   loc_agg="ARITH", ploidy=2, global_het=False):
    # make matrix
    genmat = np.zeros((len(popmap), len(popmap)))
    # establish as nan
    genmat[:] = np.nan
    # for each combination, either average ind distances or calc freq dist
    for ia, ib in itertools.combinations(range(0, len(popmap)), 2):
        if dist in ["PDIST"]:
            inds1 = [dat.index(x) for x in popmap.values()[ia]]
            inds2 = [dat.index(x) for x in popmap.values()[ib]]
            genmat[ia, ib] = (agg.aggregate_dist(
                pop_agg, ([indmat[i, j] for i in inds1 for j in inds2]))
            )
            genmat[ib, ia] = genmat[ia, ib]
        elif dist == "JOST" or dist == "HARMD":
            # Arrays to store Hs, Ht, and D for each locus
            hs_vals, ht_vals, d_vals = [], [], []
            gn = len(popmap)

            for loc in range(0, len(seqs[popmap.values()[ia][-1]])):
                seqs1 = get_alleles(
                    [seqs[x][loc] for x in popmap.values()[ia]])
                seqs2 = get_alleles(
                    [seqs[x][loc] for x in popmap.values()[ib]])
                if (not clean_list(set(seqs1), ["n", "N", "-", "?"]) or
                        not clean_list(set(seqs2), ["n", "N", "-", "?"])):
                    continue
                hs, ht, d = two_pop_jost_d(seqs1, seqs2, global_het)
                hs_vals.append(hs)
                ht_vals.append(ht)
                d_vals.append(d)

            if loc_agg not in ("ARITH", "HARM", "ADJHARM"):
                raise ValueError("Invalid aggregation method for loci.")

            # Aggregate the results for Hs and Ht
            global_hs = agg.aggregate_dist(loc_agg, hs_vals)
            global_ht = agg.aggregate_dist(loc_agg, ht_vals)
            hs_vals.append(global_hs)

            # Calculate global D
            if dist == "JOST":
                if global_ht == 0:
                    global_d = 0.0
                else:
                    global_d = (global_ht - global_hs) / (1 - global_hs) * \
                        (gn / (gn - 1))
            else:
                if loc_agg in ("HARM", "ADJHARM"):
                    global_d = agg.aggregate_dist(loc_agg, d_vals)
                else:
                    global_d = agg.aggregate_dist("HARM", d_vals)
            genmat[ia, ib] = genmat[ib, ia] = global_d

        # elif dist == "GST" or dist == "GSTPRIME":
        #     HT = list()
        #     HS = list()
        #     for loc in range(0, len(seqs[popmap.values()[ia][-1]])):
        #         seqs1 = get_alleles(
        #             [seqs[x][loc] for x in popmap.values()[ia]]
        #         )
        #         seqs2 = get_alleles(
        #             [seqs[x][loc] for x in popmap.values()[ib]]
        #         )
        #         if (not clean_list(set(seqs1), ["n", "N", "-", "?"]) or
        #                 not clean_list(set(seqs2), ["n", "N", "-", "?"])):
        #             continue
        #         if dist == "GST" or "GSTPRIME":
        #             (ht, hs) = two_pop_ht_hs(seqs1, seqs2, ploidy,global_het)
        #             HT.append(ht)
        #             HS.append(hs)
        #     Ht_global = np.mean(HT)
        #     Hs_global = np.mean(HS)
        #     if dist == "GST":
        #         if Ht_global <= 0.0:
        #             genmat[ia, ib] = genmat[ib, ia] = 0.0
        #         Gst = ((Ht_global - Hs_global) / Ht_global)
        #         GprimeST = ((Gst * (1.0 + Hs_global)) / (1.0 - Hs_global))
        #         genmat[ia, ib] = genmat[ib, ia] = GprimeST
        #     elif dist == "GSTPRIME":
        #         Ghedrick = ((2.0*(Ht_global - Hs_global)) /
        #                     (((2.0*Ht_global) - Hs_global) *
        #                      (1.0 - Hs_global)))
        #         genmat[ia, ib] = genmat[ib, ia] = Ghedrick
        elif dist == "FST" or dist == "LINFST":
            num = list()  # numerator; a
            denom = list()  # denominator; a*b*c
            for loc in range(0, len(seqs[popmap.values()[ia][-1]])):
                seqs1 = clean_inds([seqs[x][loc] for x in popmap.values()[ia]])
                seqs2 = clean_inds([seqs[x][loc] for x in popmap.values()[ib]])
                if (not all("/" in x for x in seqs1) or
                        not all("/" in x for x in seqs2)):
                    print("ERROR: FST estimates require phased data.")
                    sys.exit(1)
                if len(seqs1) == 0 or len(seqs2) == 0:
                    continue
                (n, d) = two_pop_weir_cockerham_fst(seqs1, seqs2)
                num.append(n)
                denom.append(d)

            # if either population lacking data, set value to nan
            if len(num) <= 0 or len(denom) <= 0:
                np.nan
            # if denominator is 0, set Fst to 0
            elif np.sum(denom) == 0.0:
                theta = 0.0
            # otherwise, calculate as normal
            else:
                theta = np.sum(num) / np.sum(denom)

            if dist == "FST":
                genmat[ia, ib] = genmat[ib, ia] = theta
            elif dist == "LINFST":
                if theta != 1.0:
                    genmat[ia, ib] = genmat[ib, ia] = (theta / (1-theta))
                else:
                    genmat[ia, ib] = genmat[ib, ia] = theta
        # elif dist == "NEI83":
        #     results = list()
        #     loci = 0.0
        #     for loc in range(0, len(seqs[popmap.values()[ia][-1]])):
        #         seqs1 = get_alleles(
        #             [seqs[x][loc] for x in popmap.values()[ia]]
        #         )
        #         seqs2 = get_alleles(
        #             [seqs[x][loc] for x in popmap.values()[ib]]
        #         )
        #         if (not clean_list(set(seqs1), ["n", "N", "-", "?"]) or
        #                 not clean_list(set(seqs2), ["n", "N", "-", "?"])):
        #             continue
        #         loci += 1.0
        #         results.append(two_pop_nei_da(seqs1, seqs2))
        #     Da = (1.0 - (np.sum(results) / loci))
        #     genmat[ia, ib] = genmat[ib, ia] = Da
        elif dist == "CHORD":
            sum_squares = 0.0
            for loc in range(0, len(seqs[popmap.values()[ia][-1]])):
                seqs1 = get_alleles(
                    [seqs[x][loc] for x in popmap.values()[ia]])
                seqs2 = get_alleles(
                    [seqs[x][loc] for x in popmap.values()[ib]])

                if (not clean_list(set(seqs1), ["n", "N", "-", "?"]) or
                        not clean_list(set(seqs2), ["n", "N", "-", "?"])):
                    continue

                sq_diff, _ = two_pop_chord_dist(seqs1, seqs2)
                sum_squares += sq_diff
            Dch = np.sqrt(sum_squares)
            genmat[ia, ib] = genmat[ib, ia] = Dch
        else:
            raise ValueError(f"Unsupported distance option {dist}")
    np.fill_diagonal(genmat, 0.0)
    if 0.0 in genmat:
        print("WARNING: Coercing negative distances to 0.0")
        genmat[genmat < 0.0] = 0.0
    return genmat


# function computes pairwise p-distance or JC69-corrected distances
def get_genmat(dist, points, seqs, ploidy, het, loc_agg):
    # make matrix
    genmat = np.zeros((len(points), len(points)))
    # establish as nan
    genmat[:] = np.nan

    # NOT USED CURRENTLY
    # for models which relax equal nuc frequencies, get global frequencies
    # for each locus
    # freqs will be a list of loci, with each locus as a dist of freqs
    # if dist in ["TN84", "TN93"]:
    #     freqs = seq.get_nuc_freqs(seqs, ploidy)
    #     index = 1
    #     for f in freqs:
    #         print("Empirical base frequencies for locus",index, end=": [ ")
    #         for n in f:
    #             print(f'{n}={f[n]:.3f} ', end="")
    #         print("]")
    #         index = index + 1

    # for each combination, calc jukes-cantor corrected distance
    for ia, ib in itertools.combinations(range(0, len(points)), 2):
        results = list()
        for loc in range(0, len(seqs[points.keys()[ia]])):
            seq1 = seqs[points.keys()[ia]][loc]
            seq2 = seqs[points.keys()[ib]][loc]
            if "/" in seq1:
                seq1 = seq.dna_consensus(seq1)
            if "/" in seq2:
                seq2 = seq.dna_consensus(seq2)
            if dist == "PDIST":
                if het:
                    results.append(p_distance(seq1, seq2))
                else:
                    results.append(hamming_distance(seq1, seq2))
            else:
                raise ValueError(f"{dist} not implemented")
        # aggregate results across loci
        genmat[ia, ib] = agg.aggregate_dist(loc_agg, results)
        genmat[ib, ia] = genmat[ia, ib]
    # fill diagonals
    np.fill_diagonal(genmat, 0.0)
    return genmat


# NOT IN USE
# def jukes_cantor_distance(seq1: str, seq2: str, het: bool = False) -> float:
#     """
#     Description: This function calculates the JC69-corrected p-distance
#                  between two DNA sequences.

#     Args:
#     - seq1 (str): The first DNA sequence.
#     - seq2 (str): The second DNA sequence.
#     - het (bool): Whether to use the Jukes-Cantor correction for heterozygous
#                   sites (True) or for all sites (False). Default is False.

#     Returns:
#     - dist (float): The JC69-corrected p-distance between the two sequences.
#     """
#     obs = 0.0
#     # calculate the observed distance
#     if het:
#         obs = p_distance(seq1, seq2)
#     else:
#         obs = hamming_distance(seq1, seq2)

#     # apply the JC69 correction
#     if obs >= 0.74999:
#         obs = 0.749
#     dist = -0.75 * np.log(1.0 - ((4.0/3.0) * obs))

#     # ensure distance is not negative and return
#     if not dist > 0.0:
#         return 0.0
#     return dist


# NOT IN USE
# # function to return Kimura 2-parameter distances
# def k2p_distance(seq1, seq2, het=False):
#     """
#     Computes the Kimura 2-parameter distance between two DNA sequences.

#     Args:
#     - seq1 (str): The first DNA sequence to compare.
#     - seq2 (str): The second DNA sequence to compare.
#     - het (bool): Whether to use the p-distance or Jukes-Cantor distance.
#                   Defaults to False (Jukes-Cantor distance).

#     Returns:
#     - dist (float): The Kimura 2-parameter distance between the two sequences
#     """
#     P = 0.0
#     Q = 0.0
#     if het:
#         (P, Q) = p_distance(seq1, seq2, trans=True)
#     else:
#         (P,Q)=hamming_distance(seq1, seq2, trans=True)

#     dist=-0.5*(np.log((1.0-(2.0*P)-Q) * math.sqrt(1.0-(2.0*Q))))
#     #print(dist)
#     if dist <= 0.0:
#         return(0.0)
#     return(dist)


# NOT IN USE
# def tn84_distance(seq1: str, seq2: str, freqs: Dict[str, float], het:
#                   bool = False) -> float:
#     """
#     Compute the TN84 distance between two DNA sequences.

#     Args:
#     seq1: str
#         A DNA sequence.
#     seq2: str
#         Another DNA sequence.
#     freqs: Dict[str, float]
#         A dictionary containing the frequency of each nucleotide.
#         For example, {'A': 0.3, 'C': 0.2, 'G': 0.2, 'T': 0.3}.
#     het: bool, optional (default=False)
#         Indicates whether heterozygosity is used.

#     Returns:
#     dist: float
#         TN84 distance between seq1 and seq2.

#     """
#     D=0.0
#     if het:
#         D=p_distance(seq1, seq2, trans=False)
#     else:
#         D=hamming_distance(seq1, seq2, trans=False)

#     ss=0.0
#     for n in freqs:
#         ss = ss + np.square(freqs[n])
#     b=float(1.0-ss)
#     dist=-b*np.log(1.0-((1.0/b)*D))
#     #print(dist)
#     if dist <= 0.0:
#         return(0.0)
#     return(dist)


# NOT IN USE
# def tn93_distance(seq1: str, seq2: str, freqs: Dict[str, float],
#                   het: bool = False) -> float:
#     """
#     Compute TN93 distances between two sequences.

#     Args:
#         seq1: A DNA sequence.
#         seq2: A DNA sequence.
#         freqs: A dictionary of nucleotide frequencies, where the keys are
#                nucleotides (A, T, C, G)
#                and the values are the frequency of the corresponding nucleoti
#                in the alignment.
#         het: Whether the nucleotide differences are considered heterozygous.
#              Default is False.

#     Returns:
#         The TN93 distance between the two input sequences.

#     """

#     P1 = 0.0
#     P2 = 0.0
#     Q = 0.0
#     if het:
#         P1, P2, Q = p_distance(seq1, seq2, trans=False, transSplit=True)
#     else:
#         P1, P2, Q = hamming_distance(seq1, seq2, trans=False,transSplit=True)

#     # Calculate nucleotide frequencies
#     gR = freqs['g'] + freqs['a']
#     gY = freqs['c'] + freqs['t']

#     # Calculate k values
#     k1 = (2.0 * (freqs['a'] * freqs['g'])) / gR
#     k2 = (2.0 * (freqs['t'] * freqs['c'])) / gY
#     k3 = 2.0 * ((gR * gY) - ((freqs['a'] * freqs['g'] * gY) / gR) -
#                 ((freqs['t'] * freqs['c'] * gR) / gY))

#     # Calculate weight values
#     w1 = 1.0 - (P1 / k1) - (Q / (2.0 * gR))
#     w2 = 1.0 - (P2 / k3) - (Q / (2.0 * gY))
#     w3 = 1.0 - (Q / (2.0 * gR * gY))

#     # Calculate the distance
#     dist = -(k1 * np.log(w1)) - (k2 * np.log(w2)) - (k2 * np.log(w3))

#     # Check for negative or NaN distances
#     if dist <= 0.0 or np.isnan(dist):
#         return 0.0

#     return dist


# p distance = D / L (differences / length)
# L is the UNGAPPED distance
# ambigs are expanded
# when trans = true, returns two values:
# P (transitions/L) and Q (transversions/L)
def p_distance(seq1, seq2):
    """
    Calculate p-distance, accounting for IUPAC ambiguity codes and partial
    matches.

    Args:
        seq1 (str): First sequence.
        seq2 (str): Second sequence.

    Returns:
        float: p-distance.
    """
    L = min(len(seq1), len(seq2))
    D = 0.0

    for n1, n2 in zip(seq1.lower(), seq2.lower()):
        if n1 in ["?", "-", "n"] or n2 in ["?", "-", "n"]:
            L -= 1
            continue
        else:
            ex1 = set(seq.get_iupac_caseless(n1))
            ex2 = set(seq.get_iupac_caseless(n2))

            if ex1 == ex2:
                # Exact match
                continue
            elif ex1.isdisjoint(ex2):
                # No match
                D += 1.0
            else:
                # Partial match
                D += 0.5

    if L <= 0:
        return 0.0
    return D / L


# p distance = D / L (differences / length)
# gaps ignored
# ambigs are treated as alleles
def hamming_distance(seq1, seq2):
    """
    Calculate the Hamming distance between two sequences.

    Args:
        seq1 (str): First sequence.
        seq2 (str): Second sequence.

    Returns:
        float: Hamming distance.
    """
    # Ensure sequences are of equal length
    L = min(len(seq1), len(seq2))
    D = 0.0

    for n1, n2 in zip(seq1.lower(), seq2.lower()):
        if n1 in ["?", "-", "n"] or n2 in ["?", "-", "n"]:
            L -= 1  # Ignore positions with gaps or ambiguous nucleotides
        elif n1 != n2:
            D += 1.0  # Count mismatches

    if L <= 0:
        return 0.0  # Avoid division by zero
    return D / L


# NOT IN USE
# def two_pop_nei_da(s1: List[str], s2: List[str]) -> float:
#     """
#     Computes Nei's 1983 Da estimator for two populations.

#     Args:
#         s1 (List[str]): A list of phased genotypes from population 1,
# e.g. ['A/A', 'A/B', 'B/B', ...].
#         s2 (List[str]): A list of phased genotypes from population 2,
# e.g. ['A/A', 'A/C', 'C/C', ...].
#     Returns:
#         float: The Nei's 1983 Da estimator.
#     """
#     # Clean the input lists by removing individuals with unknown or gap
# alleles.
#     s1 = clean_list(s1, ["n", "?", "-", "N"])
#     s2 = clean_list(s2, ["n", "?", "-", "N"])
#     # Get the list of unique alleles from both populations.
#     uniques = uniq_alleles(s1+s2)
#     # Compute the sum of squared roots of frequencies for each allele.
#     sumSqRt = 0.0
#     for allele in uniques:
#         if allele in ["-", "?", "n", "N"]:
#             continue
#         else:
#             Xu = float(s1.count(allele) / len(s1))
#             Yu = float(s2.count(allele) / len(s2))
#             sumSqRt += np.sqrt(Xu*Yu)
#     return sumSqRt


# NOT IN USE
# def two_pop_euclid_dist(s1: List[str], s2: List[str]) -> float:
#     """
#     Computes Euclidean distance between two populations for a single locus.

#     Args:
#     - s1 (List[str]): List of allele calls for population 1 at a given locus
#     - s2 (List[str]): List of allele calls for population 2 at a given locus

#     Returns:
#     - Euclidean distance between the two populations for the given locus

#     """
#     # Get unique alleles in the two populations
#     uniques = seq.uniq_alleles(s1+s2)
#     # Clean the sequences by removing any unknown or gap alleles
#     s1 = clean_list(s1, ["n", "?", "-", "N"])
#     s2 = clean_list(s2, ["n", "?", "-", "N"])
#     # Calculate Euclidean distance
#     sumSq = 0.0
#     for allele in uniques:
#         if allele in ["-", "?", "n", "N"]:
#             continue
#         else:
#             Xu = float(s1.count(allele) / len(s1))
#             Yu = float(s2.count(allele) / len(s2))
#             sumSq += np.square(Xu - Yu)
#     return sumSq


# Cavalli-Sforza and Edwards 1967 chord distance
# non-nucleotide alleles are deleted
def two_pop_chord_dist(s1, s2):
    s1 = clean_list(s1, ["n", "?", "-", "N"])
    s2 = clean_list(s2, ["n", "?", "-", "N"])
    uniques = uniq_alleles(s1 + s2)

    sum_squares = 0.0
    for allele in uniques:
        if allele in ["-", "?", "n", "N"]:
            continue
        Xu = float(s1.count(allele) / len(s1))
        Yu = float(s2.count(allele) / len(s2))
        sum_squares += np.square(Xu - Yu)

    return sum_squares, len(uniques)


def two_pop_weir_cockerham_fst(s1, s2):
    """
    Computes Weir and Cockerham's THETAst Fst approximation for two populations

    Args:
        s1 (list): A list of phased genotypes for population 1.
        s2 (list): A list of phased genotypes for population 2.

    Returns:
        tuple: A tuple containing two floats. The first float is the numerator
        of the THETAst estimator for the locus. The second float is the
        denominator of the THETAst estimator for the locus.

    Raises:
        ValueError: If the inputs are not valid.

    """
    # Check inputs
    if not isinstance(s1, list) or not isinstance(s2, list):
        raise ValueError("Inputs must be lists.")
    if not s1 or not s2:
        raise ValueError("Inputs must not be empty.")
    if (not all(isinstance(x, str) for x in s1) or
            not all(isinstance(x, str) for x in s2)):
        raise ValueError("Inputs must only contain strings.")
    if not all("/" in x for x in s1) or not all("/" in x for x in s2):
        raise ValueError("Inputs must be phased genotypes.")

    # Initialize variables
    num = 0.0
    denom = 0.0

    # mean sample size
    alleles1 = get_alleles(s1)  # split alleles s1
    alleles2 = get_alleles(s2)  # split alleles s2
    uniques = uniq_alleles(s1+s2)  # list of unique alleles only
    r = 2.0  # number of pops
    n1 = float(len(s1))  # pop size of pop 1
    n2 = float(len(s2))  # pop size of pop 2
    csd = np.std([n1, n2])
    cm = np.mean([n1, n2])
    nbar = cm
    csquare = (csd*csd) / (cm*cm)
    nC = nbar * (1.0 - (csquare/r))  # coeff of pop size variance
    for allele in uniques:
        ac1 = float(alleles1.count(allele))
        ac2 = float(alleles2.count(allele))
        p1 = ac1 / float(len(alleles1))
        p2 = ac2 / float(len(alleles2))
        h1 = get_het_from_phased(allele, s1, count=True)
        h2 = get_het_from_phased(allele, s2, count=True)
        pbar = (ac1+ac2) / (float(len(alleles1)) + float(len(alleles2)))
        ssquare = ((np.sum([(n1 * (np.square(p1 - pbar))),
                            (n2 * (np.square(p2 - pbar)))])) / ((r-1.0)*nbar))
        hbar = ((h1 + h2) / (r * nbar))
        if nbar != 1.0:
            a = ((nbar/nC) * (ssquare -
                              ((1.0 / (nbar-1.0)) *
                               ((pbar * (1.0 - pbar)) -
                                ((r - 1.0) * ssquare / r) -
                                (hbar / 4.0)))))
            b = ((nbar / (nbar-1.0)) *
                 ((pbar * (1.0 - pbar)) -
                  ((r - 1.0) * ssquare / r) -
                  (((2.0 * nbar) - 1.0) * hbar / (4.0 * nbar))))
            c = hbar/2.0
            d = a+b+c
            num += a
            denom += d
    return num, denom


def clean_inds(inds):
    """
    Removes individuals with unknown or gap alleles.

    Args:
        inds (list): A list of individuals.

    Returns:
        list: A list of individuals without unknown or gap alleles.
    """
    ret = []
    for ind in inds:
        if ("-" not in ind and "?" not in ind and "n" not in ind and
                "N" not in ind):
            ret.append(ind)
    return ret


def two_pop_jost_d(seqs1, seqs2, global_het=False):
    if global_het:
        Ht = get_global_het(seqs1 + seqs2)
    else:
        Ht = get_average_het(seqs1, seqs2)
    Hs = np.mean([get_global_het(seqs1), get_global_het(seqs2)])
    n = len(seqs1 + seqs2)
    D = (Ht - Hs) / (1 - Hs) * (n / (n - 1))
    return Hs, Ht, D


# NOT IN USE
# def two_pop_ht_hs(seqs1, seqs2, ploidy, global_het=False):
#     """
#     Computes Nei's Fst estimator (Gst) using Nei and Chessers Hs and Ht
# estimators
#     also applies Hedrick's (2005) sample size correction, thus returning
# G'st.

#     Args:
#     - seqs1, seqs2 (list): lists of sequences for populations 1 and 2.
#     - ploidy (int): ploidy of the sequences.
#     - global_het (bool, optional): If True, computes global heterozygosity.
# Defaults to False.

#     Returns:
#     - A tuple containing Ht_est and Hs_est.

#     """
#     if global_het:
#         Ht = get_global_het(seqs1 + seqs2)
#     else:
#         Ht = get_average_het(seqs1, seqs2)

#     Hs = np.mean([get_global_het(seqs1), get_global_het(seqs2)])
#     harmN = scipy.stats.hmean([(len(seqs1) / ploidy), (len(seqs1) / ploidy)])

#     # Hs correction based on Hedrick (2005)
#     Hs_est = Hs * ((2.0 * harmN) / ((2.0 * harmN) - 1.0))

#     # Ht correction based on Hedrick (2005)
#     Ht_est = Ht + (Hs / (harmN * 2.0 * 2.0))

#     # Gst = ((Ht_est - Hs_est) / Ht_est )

#     # GprimeST = ((Gst * (1.0 + Hs_est)) / (1.0 - Hs_est))
#     return (Ht_est, Hs_est)


def get_het_from_phased(allele, phasedList, count=False):
    """
    Returns observed heterozygosity of an allele given a list of phased
    genotypes (e.g. allele1/allele2 for each individual). Assumes diploid.

    Args:
    allele (str): The allele for which to compute heterozygosity.
    phasedList (list): A list of phased genotypes, where each element is a
                       string of the form 'allele1/allele2'.
    count (bool): Whether to return the number of heterozygotes instead of the
                  proportion.

    Returns:
    float: The proportion of heterozygotes for the given allele, unless `count`
           is True, in which case the count of heterozygotes is returned.
    """
    hets = 0.0
    twoN = (len(phasedList)) * 2.0
    for genotype in phasedList:
        if "/" not in genotype:
            print(
                "ERROR (get_het_from_phased): Phased genotypes are required."
            )
        gens = genotype.split("/")
        if gens[0] == allele and gens[1] != allele:
            hets += 1.0
            continue
        elif gens[1] == allele and gens[0] != allele:
            hets += 1.0
            continue
        else:
            continue
    if count:
        return hets
    else:
        return hets / twoN


def get_global_het(seqs):
    """
    Computes global expected heterozygosity (Ht) from a set of sequences.

    Args:
    seqs (list): A list of sequences.

    Returns:
    float: The Ht estimate for the given sequences.
    """
    hom = 0.0
    uniq_alleles = clean_list(set(seqs), ["n", "?", "-", "N"])
    freqs = [np.square(float(seqs.count(x) / len(seqs))) for x in uniq_alleles]
    hom = np.sum(freqs)
    return 1.0 - hom


def get_average_het(s1, s2):
    """Computes the mean expected heterozygosity (Ht) from two populations.

    Args:
        s1 (list): A list of alleles from population 1.
        s2 (list): A list of alleles from population 2.

    Returns:
        float: The Ht estimate per locus.
    """
    hom = 0.0
    # Get unique alleles
    uniq_alleles = clean_list(set(s1+s2), ["n", "?", "-", "N"])
    # Compute frequency for each allele
    freqs = [np.square(
        np.mean([float(s1.count(x)/len(s1)),
                 float(s2.count(x)/len(s2))])) for x in uniq_alleles]
    hom = np.sum(freqs)
    return 1.0 - hom


def clean_list(to_clean, bads):
    """Removes bad items from a list.

    Args:
        l (list): A list to clean.
        bads (list): A list of items to remove.

    Returns:
        list: A cleaned list.
    """
    if not any(item not in bads for item in set(to_clean)):
        return False
    for b in bads:
        if b in to_clean:
            to_clean.remove(b)
    return to_clean


def get_alleles(s):
    """Splits a string of alleles separated by "/" into a list.

    Args:
        s (str): A string of alleles separated by "/".

    Returns:
        list: A list of alleles.
    """
    return sum([x.split("/") for x in s], [])


def uniq_alleles(s):
    """Returns the unique alleles in a list of alleles.

    Args:
        s (list): A list of alleles.

    Returns:
        set: A set of unique alleles.
    """
    return set(sum([x.split("/") for x in s], []))

Functions

def clean_inds(inds)

Removes individuals with unknown or gap alleles.

Args

inds : list
A list of individuals.

Returns

list
A list of individuals without unknown or gap alleles.
Expand source code
def clean_inds(inds):
    """
    Removes individuals with unknown or gap alleles.

    Args:
        inds (list): A list of individuals.

    Returns:
        list: A list of individuals without unknown or gap alleles.
    """
    ret = []
    for ind in inds:
        if ("-" not in ind and "?" not in ind and "n" not in ind and
                "N" not in ind):
            ret.append(ind)
    return ret
def clean_list(to_clean, bads)

Removes bad items from a list.

Args

l : list
A list to clean.
bads : list
A list of items to remove.

Returns

list
A cleaned list.
Expand source code
def clean_list(to_clean, bads):
    """Removes bad items from a list.

    Args:
        l (list): A list to clean.
        bads (list): A list of items to remove.

    Returns:
        list: A cleaned list.
    """
    if not any(item not in bads for item in set(to_clean)):
        return False
    for b in bads:
        if b in to_clean:
            to_clean.remove(b)
    return to_clean
def get_alleles(s)

Splits a string of alleles separated by "/" into a list.

Args

s : str
A string of alleles separated by "/".

Returns

list
A list of alleles.
Expand source code
def get_alleles(s):
    """Splits a string of alleles separated by "/" into a list.

    Args:
        s (str): A string of alleles separated by "/".

    Returns:
        list: A list of alleles.
    """
    return sum([x.split("/") for x in s], [])
def get_average_het(s1, s2)

Computes the mean expected heterozygosity (Ht) from two populations.

Args

s1 : list
A list of alleles from population 1.
s2 : list
A list of alleles from population 2.

Returns

float
The Ht estimate per locus.
Expand source code
def get_average_het(s1, s2):
    """Computes the mean expected heterozygosity (Ht) from two populations.

    Args:
        s1 (list): A list of alleles from population 1.
        s2 (list): A list of alleles from population 2.

    Returns:
        float: The Ht estimate per locus.
    """
    hom = 0.0
    # Get unique alleles
    uniq_alleles = clean_list(set(s1+s2), ["n", "?", "-", "N"])
    # Compute frequency for each allele
    freqs = [np.square(
        np.mean([float(s1.count(x)/len(s1)),
                 float(s2.count(x)/len(s2))])) for x in uniq_alleles]
    hom = np.sum(freqs)
    return 1.0 - hom
def get_genmat(dist, points, seqs, ploidy, het, loc_agg)
Expand source code
def get_genmat(dist, points, seqs, ploidy, het, loc_agg):
    # make matrix
    genmat = np.zeros((len(points), len(points)))
    # establish as nan
    genmat[:] = np.nan

    # NOT USED CURRENTLY
    # for models which relax equal nuc frequencies, get global frequencies
    # for each locus
    # freqs will be a list of loci, with each locus as a dist of freqs
    # if dist in ["TN84", "TN93"]:
    #     freqs = seq.get_nuc_freqs(seqs, ploidy)
    #     index = 1
    #     for f in freqs:
    #         print("Empirical base frequencies for locus",index, end=": [ ")
    #         for n in f:
    #             print(f'{n}={f[n]:.3f} ', end="")
    #         print("]")
    #         index = index + 1

    # for each combination, calc jukes-cantor corrected distance
    for ia, ib in itertools.combinations(range(0, len(points)), 2):
        results = list()
        for loc in range(0, len(seqs[points.keys()[ia]])):
            seq1 = seqs[points.keys()[ia]][loc]
            seq2 = seqs[points.keys()[ib]][loc]
            if "/" in seq1:
                seq1 = seq.dna_consensus(seq1)
            if "/" in seq2:
                seq2 = seq.dna_consensus(seq2)
            if dist == "PDIST":
                if het:
                    results.append(p_distance(seq1, seq2))
                else:
                    results.append(hamming_distance(seq1, seq2))
            else:
                raise ValueError(f"{dist} not implemented")
        # aggregate results across loci
        genmat[ia, ib] = agg.aggregate_dist(loc_agg, results)
        genmat[ib, ia] = genmat[ia, ib]
    # fill diagonals
    np.fill_diagonal(genmat, 0.0)
    return genmat
def get_global_het(seqs)

Computes global expected heterozygosity (Ht) from a set of sequences.

Args: seqs (list): A list of sequences.

Returns: float: The Ht estimate for the given sequences.

Expand source code
def get_global_het(seqs):
    """
    Computes global expected heterozygosity (Ht) from a set of sequences.

    Args:
    seqs (list): A list of sequences.

    Returns:
    float: The Ht estimate for the given sequences.
    """
    hom = 0.0
    uniq_alleles = clean_list(set(seqs), ["n", "?", "-", "N"])
    freqs = [np.square(float(seqs.count(x) / len(seqs))) for x in uniq_alleles]
    hom = np.sum(freqs)
    return 1.0 - hom
def get_het_from_phased(allele, phasedList, count=False)

Returns observed heterozygosity of an allele given a list of phased genotypes (e.g. allele1/allele2 for each individual). Assumes diploid.

Args: allele (str): The allele for which to compute heterozygosity. phasedList (list): A list of phased genotypes, where each element is a string of the form 'allele1/allele2'. count (bool): Whether to return the number of heterozygotes instead of the proportion.

Returns: float: The proportion of heterozygotes for the given allele, unless count is True, in which case the count of heterozygotes is returned.

Expand source code
def get_het_from_phased(allele, phasedList, count=False):
    """
    Returns observed heterozygosity of an allele given a list of phased
    genotypes (e.g. allele1/allele2 for each individual). Assumes diploid.

    Args:
    allele (str): The allele for which to compute heterozygosity.
    phasedList (list): A list of phased genotypes, where each element is a
                       string of the form 'allele1/allele2'.
    count (bool): Whether to return the number of heterozygotes instead of the
                  proportion.

    Returns:
    float: The proportion of heterozygotes for the given allele, unless `count`
           is True, in which case the count of heterozygotes is returned.
    """
    hets = 0.0
    twoN = (len(phasedList)) * 2.0
    for genotype in phasedList:
        if "/" not in genotype:
            print(
                "ERROR (get_het_from_phased): Phased genotypes are required."
            )
        gens = genotype.split("/")
        if gens[0] == allele and gens[1] != allele:
            hets += 1.0
            continue
        elif gens[1] == allele and gens[0] != allele:
            hets += 1.0
            continue
        else:
            continue
    if count:
        return hets
    else:
        return hets / twoN
def get_pop_genmat(dist, indmat, popmap, dat, seqs, pop_agg='ARITH', loc_agg='ARITH', ploidy=2, global_het=False)
Expand source code
def get_pop_genmat(dist, indmat, popmap, dat, seqs, pop_agg="ARITH",
                   loc_agg="ARITH", ploidy=2, global_het=False):
    # make matrix
    genmat = np.zeros((len(popmap), len(popmap)))
    # establish as nan
    genmat[:] = np.nan
    # for each combination, either average ind distances or calc freq dist
    for ia, ib in itertools.combinations(range(0, len(popmap)), 2):
        if dist in ["PDIST"]:
            inds1 = [dat.index(x) for x in popmap.values()[ia]]
            inds2 = [dat.index(x) for x in popmap.values()[ib]]
            genmat[ia, ib] = (agg.aggregate_dist(
                pop_agg, ([indmat[i, j] for i in inds1 for j in inds2]))
            )
            genmat[ib, ia] = genmat[ia, ib]
        elif dist == "JOST" or dist == "HARMD":
            # Arrays to store Hs, Ht, and D for each locus
            hs_vals, ht_vals, d_vals = [], [], []
            gn = len(popmap)

            for loc in range(0, len(seqs[popmap.values()[ia][-1]])):
                seqs1 = get_alleles(
                    [seqs[x][loc] for x in popmap.values()[ia]])
                seqs2 = get_alleles(
                    [seqs[x][loc] for x in popmap.values()[ib]])
                if (not clean_list(set(seqs1), ["n", "N", "-", "?"]) or
                        not clean_list(set(seqs2), ["n", "N", "-", "?"])):
                    continue
                hs, ht, d = two_pop_jost_d(seqs1, seqs2, global_het)
                hs_vals.append(hs)
                ht_vals.append(ht)
                d_vals.append(d)

            if loc_agg not in ("ARITH", "HARM", "ADJHARM"):
                raise ValueError("Invalid aggregation method for loci.")

            # Aggregate the results for Hs and Ht
            global_hs = agg.aggregate_dist(loc_agg, hs_vals)
            global_ht = agg.aggregate_dist(loc_agg, ht_vals)
            hs_vals.append(global_hs)

            # Calculate global D
            if dist == "JOST":
                if global_ht == 0:
                    global_d = 0.0
                else:
                    global_d = (global_ht - global_hs) / (1 - global_hs) * \
                        (gn / (gn - 1))
            else:
                if loc_agg in ("HARM", "ADJHARM"):
                    global_d = agg.aggregate_dist(loc_agg, d_vals)
                else:
                    global_d = agg.aggregate_dist("HARM", d_vals)
            genmat[ia, ib] = genmat[ib, ia] = global_d

        # elif dist == "GST" or dist == "GSTPRIME":
        #     HT = list()
        #     HS = list()
        #     for loc in range(0, len(seqs[popmap.values()[ia][-1]])):
        #         seqs1 = get_alleles(
        #             [seqs[x][loc] for x in popmap.values()[ia]]
        #         )
        #         seqs2 = get_alleles(
        #             [seqs[x][loc] for x in popmap.values()[ib]]
        #         )
        #         if (not clean_list(set(seqs1), ["n", "N", "-", "?"]) or
        #                 not clean_list(set(seqs2), ["n", "N", "-", "?"])):
        #             continue
        #         if dist == "GST" or "GSTPRIME":
        #             (ht, hs) = two_pop_ht_hs(seqs1, seqs2, ploidy,global_het)
        #             HT.append(ht)
        #             HS.append(hs)
        #     Ht_global = np.mean(HT)
        #     Hs_global = np.mean(HS)
        #     if dist == "GST":
        #         if Ht_global <= 0.0:
        #             genmat[ia, ib] = genmat[ib, ia] = 0.0
        #         Gst = ((Ht_global - Hs_global) / Ht_global)
        #         GprimeST = ((Gst * (1.0 + Hs_global)) / (1.0 - Hs_global))
        #         genmat[ia, ib] = genmat[ib, ia] = GprimeST
        #     elif dist == "GSTPRIME":
        #         Ghedrick = ((2.0*(Ht_global - Hs_global)) /
        #                     (((2.0*Ht_global) - Hs_global) *
        #                      (1.0 - Hs_global)))
        #         genmat[ia, ib] = genmat[ib, ia] = Ghedrick
        elif dist == "FST" or dist == "LINFST":
            num = list()  # numerator; a
            denom = list()  # denominator; a*b*c
            for loc in range(0, len(seqs[popmap.values()[ia][-1]])):
                seqs1 = clean_inds([seqs[x][loc] for x in popmap.values()[ia]])
                seqs2 = clean_inds([seqs[x][loc] for x in popmap.values()[ib]])
                if (not all("/" in x for x in seqs1) or
                        not all("/" in x for x in seqs2)):
                    print("ERROR: FST estimates require phased data.")
                    sys.exit(1)
                if len(seqs1) == 0 or len(seqs2) == 0:
                    continue
                (n, d) = two_pop_weir_cockerham_fst(seqs1, seqs2)
                num.append(n)
                denom.append(d)

            # if either population lacking data, set value to nan
            if len(num) <= 0 or len(denom) <= 0:
                np.nan
            # if denominator is 0, set Fst to 0
            elif np.sum(denom) == 0.0:
                theta = 0.0
            # otherwise, calculate as normal
            else:
                theta = np.sum(num) / np.sum(denom)

            if dist == "FST":
                genmat[ia, ib] = genmat[ib, ia] = theta
            elif dist == "LINFST":
                if theta != 1.0:
                    genmat[ia, ib] = genmat[ib, ia] = (theta / (1-theta))
                else:
                    genmat[ia, ib] = genmat[ib, ia] = theta
        # elif dist == "NEI83":
        #     results = list()
        #     loci = 0.0
        #     for loc in range(0, len(seqs[popmap.values()[ia][-1]])):
        #         seqs1 = get_alleles(
        #             [seqs[x][loc] for x in popmap.values()[ia]]
        #         )
        #         seqs2 = get_alleles(
        #             [seqs[x][loc] for x in popmap.values()[ib]]
        #         )
        #         if (not clean_list(set(seqs1), ["n", "N", "-", "?"]) or
        #                 not clean_list(set(seqs2), ["n", "N", "-", "?"])):
        #             continue
        #         loci += 1.0
        #         results.append(two_pop_nei_da(seqs1, seqs2))
        #     Da = (1.0 - (np.sum(results) / loci))
        #     genmat[ia, ib] = genmat[ib, ia] = Da
        elif dist == "CHORD":
            sum_squares = 0.0
            for loc in range(0, len(seqs[popmap.values()[ia][-1]])):
                seqs1 = get_alleles(
                    [seqs[x][loc] for x in popmap.values()[ia]])
                seqs2 = get_alleles(
                    [seqs[x][loc] for x in popmap.values()[ib]])

                if (not clean_list(set(seqs1), ["n", "N", "-", "?"]) or
                        not clean_list(set(seqs2), ["n", "N", "-", "?"])):
                    continue

                sq_diff, _ = two_pop_chord_dist(seqs1, seqs2)
                sum_squares += sq_diff
            Dch = np.sqrt(sum_squares)
            genmat[ia, ib] = genmat[ib, ia] = Dch
        else:
            raise ValueError(f"Unsupported distance option {dist}")
    np.fill_diagonal(genmat, 0.0)
    if 0.0 in genmat:
        print("WARNING: Coercing negative distances to 0.0")
        genmat[genmat < 0.0] = 0.0
    return genmat
def hamming_distance(seq1, seq2)

Calculate the Hamming distance between two sequences.

Args

seq1 : str
First sequence.
seq2 : str
Second sequence.

Returns

float
Hamming distance.
Expand source code
def hamming_distance(seq1, seq2):
    """
    Calculate the Hamming distance between two sequences.

    Args:
        seq1 (str): First sequence.
        seq2 (str): Second sequence.

    Returns:
        float: Hamming distance.
    """
    # Ensure sequences are of equal length
    L = min(len(seq1), len(seq2))
    D = 0.0

    for n1, n2 in zip(seq1.lower(), seq2.lower()):
        if n1 in ["?", "-", "n"] or n2 in ["?", "-", "n"]:
            L -= 1  # Ignore positions with gaps or ambiguous nucleotides
        elif n1 != n2:
            D += 1.0  # Count mismatches

    if L <= 0:
        return 0.0  # Avoid division by zero
    return D / L
def p_distance(seq1, seq2)

Calculate p-distance, accounting for IUPAC ambiguity codes and partial matches.

Args

seq1 : str
First sequence.
seq2 : str
Second sequence.

Returns

float
p-distance.
Expand source code
def p_distance(seq1, seq2):
    """
    Calculate p-distance, accounting for IUPAC ambiguity codes and partial
    matches.

    Args:
        seq1 (str): First sequence.
        seq2 (str): Second sequence.

    Returns:
        float: p-distance.
    """
    L = min(len(seq1), len(seq2))
    D = 0.0

    for n1, n2 in zip(seq1.lower(), seq2.lower()):
        if n1 in ["?", "-", "n"] or n2 in ["?", "-", "n"]:
            L -= 1
            continue
        else:
            ex1 = set(seq.get_iupac_caseless(n1))
            ex2 = set(seq.get_iupac_caseless(n2))

            if ex1 == ex2:
                # Exact match
                continue
            elif ex1.isdisjoint(ex2):
                # No match
                D += 1.0
            else:
                # Partial match
                D += 0.5

    if L <= 0:
        return 0.0
    return D / L
def two_pop_chord_dist(s1, s2)
Expand source code
def two_pop_chord_dist(s1, s2):
    s1 = clean_list(s1, ["n", "?", "-", "N"])
    s2 = clean_list(s2, ["n", "?", "-", "N"])
    uniques = uniq_alleles(s1 + s2)

    sum_squares = 0.0
    for allele in uniques:
        if allele in ["-", "?", "n", "N"]:
            continue
        Xu = float(s1.count(allele) / len(s1))
        Yu = float(s2.count(allele) / len(s2))
        sum_squares += np.square(Xu - Yu)

    return sum_squares, len(uniques)
def two_pop_jost_d(seqs1, seqs2, global_het=False)
Expand source code
def two_pop_jost_d(seqs1, seqs2, global_het=False):
    if global_het:
        Ht = get_global_het(seqs1 + seqs2)
    else:
        Ht = get_average_het(seqs1, seqs2)
    Hs = np.mean([get_global_het(seqs1), get_global_het(seqs2)])
    n = len(seqs1 + seqs2)
    D = (Ht - Hs) / (1 - Hs) * (n / (n - 1))
    return Hs, Ht, D
def two_pop_weir_cockerham_fst(s1, s2)

Computes Weir and Cockerham's THETAst Fst approximation for two populations

Args

s1 : list
A list of phased genotypes for population 1.
s2 : list
A list of phased genotypes for population 2.

Returns

tuple
A tuple containing two floats. The first float is the numerator
of the THETAst estimator for the locus. The second float is the
 

denominator of the THETAst estimator for the locus.

Raises

ValueError
If the inputs are not valid.
Expand source code
def two_pop_weir_cockerham_fst(s1, s2):
    """
    Computes Weir and Cockerham's THETAst Fst approximation for two populations

    Args:
        s1 (list): A list of phased genotypes for population 1.
        s2 (list): A list of phased genotypes for population 2.

    Returns:
        tuple: A tuple containing two floats. The first float is the numerator
        of the THETAst estimator for the locus. The second float is the
        denominator of the THETAst estimator for the locus.

    Raises:
        ValueError: If the inputs are not valid.

    """
    # Check inputs
    if not isinstance(s1, list) or not isinstance(s2, list):
        raise ValueError("Inputs must be lists.")
    if not s1 or not s2:
        raise ValueError("Inputs must not be empty.")
    if (not all(isinstance(x, str) for x in s1) or
            not all(isinstance(x, str) for x in s2)):
        raise ValueError("Inputs must only contain strings.")
    if not all("/" in x for x in s1) or not all("/" in x for x in s2):
        raise ValueError("Inputs must be phased genotypes.")

    # Initialize variables
    num = 0.0
    denom = 0.0

    # mean sample size
    alleles1 = get_alleles(s1)  # split alleles s1
    alleles2 = get_alleles(s2)  # split alleles s2
    uniques = uniq_alleles(s1+s2)  # list of unique alleles only
    r = 2.0  # number of pops
    n1 = float(len(s1))  # pop size of pop 1
    n2 = float(len(s2))  # pop size of pop 2
    csd = np.std([n1, n2])
    cm = np.mean([n1, n2])
    nbar = cm
    csquare = (csd*csd) / (cm*cm)
    nC = nbar * (1.0 - (csquare/r))  # coeff of pop size variance
    for allele in uniques:
        ac1 = float(alleles1.count(allele))
        ac2 = float(alleles2.count(allele))
        p1 = ac1 / float(len(alleles1))
        p2 = ac2 / float(len(alleles2))
        h1 = get_het_from_phased(allele, s1, count=True)
        h2 = get_het_from_phased(allele, s2, count=True)
        pbar = (ac1+ac2) / (float(len(alleles1)) + float(len(alleles2)))
        ssquare = ((np.sum([(n1 * (np.square(p1 - pbar))),
                            (n2 * (np.square(p2 - pbar)))])) / ((r-1.0)*nbar))
        hbar = ((h1 + h2) / (r * nbar))
        if nbar != 1.0:
            a = ((nbar/nC) * (ssquare -
                              ((1.0 / (nbar-1.0)) *
                               ((pbar * (1.0 - pbar)) -
                                ((r - 1.0) * ssquare / r) -
                                (hbar / 4.0)))))
            b = ((nbar / (nbar-1.0)) *
                 ((pbar * (1.0 - pbar)) -
                  ((r - 1.0) * ssquare / r) -
                  (((2.0 * nbar) - 1.0) * hbar / (4.0 * nbar))))
            c = hbar/2.0
            d = a+b+c
            num += a
            denom += d
    return num, denom
def uniq_alleles(s)

Returns the unique alleles in a list of alleles.

Args

s : list
A list of alleles.

Returns

set
A set of unique alleles.
Expand source code
def uniq_alleles(s):
    """Returns the unique alleles in a list of alleles.

    Args:
        s (list): A list of alleles.

    Returns:
        set: A set of unique alleles.
    """
    return set(sum([x.split("/") for x in s], []))