Source code for phippery.escprof

r"""
=================
Escape Profile
=================

Use 
`optimal transport method <https://en.wikipedia.org/wiki/Transportation_theory_(mathematics)>`_
to compare 
`phage-dms <https://www.sciencedirect.com/science/article/pii/S2589004220308142>`_
escape profiles.
See the escape profile :ref:`description and examples <sec_escape_profile_comparisons>`.
"""

# dependencies
import pandas as pd
import numpy as np
import xarray as xr
import ot
from phippery.utils import id_query
from Bio.Align import substitution_matrices


def _get_aa_ordered_list():
    """
    return the ordered list of amino acid.
    This convention is based on the BLOSUM
    matrix in biopython and assumed for the
    binned distribution presenting amino
    acid contribution to differential
    selection at a site
    """
    return list("ARNDCQEGHILKMFPSTWYV")


[docs]def get_cost_matrix(): """ Returns the default 40x40 cost matrix based on BLOSUM62 and assigns maximum cost to transport between opposite signed differential selection contributions. Returns ------- list : 40x40 matrix (as a list of lists) """ substitution_matrix = substitution_matrices.load("BLOSUM62") alphabet_list = _get_aa_ordered_list() Naa = len(alphabet_list) # chosen so that range of costs in the # matrix is within an order of magnitude nthroot = 7.0 # maximum cost assigned by the cost matrix maxMij = np.exp(np.max(-substitution_matrix) / nthroot) cost_matrix = [] # first 20 rows for aa in alphabet_list: row = [-x / nthroot for x in substitution_matrix[aa, :][:Naa]] cost_row = (np.exp(row)).tolist() + [maxMij for i in range(Naa)] cost_matrix.append(cost_row) # last 20 rows for aa in alphabet_list: row = [-x / nthroot for x in substitution_matrix[aa, :][:Naa]] cost_row = [maxMij for i in range(Naa)] + (np.exp(row)).tolist() cost_matrix.append(cost_row) return cost_matrix
[docs]def get_loc_esc_distr(ds, metric, sample_factor, sfact_val, loc): """ Returns the normalized distribution represented as a list for the amino acid pattern of scaled differential selection for a specified individual and amino acid site. Parameters ---------- ds : xarray.DataSet The dataset containing the sample of interest. metric : str The name of the scaled differential selection data in ds. sample_factor : str The sample annotation label to identify the individual sample (e.g. 'sample_ID'). sfact_val : str The sample_factor value to identify the sample of interest. loc : int The location number for the amino acid site of interest. Returns ------- list : The relative contributions to the total absolute scaled differential selection at the site. The first 20 entries are contributions to negative selection (binding loss). The last 20 entries are contributions to positive selection (binding gain). """ # Code assumes peptide annotation for location is called 'Loc', # and for amino acid substitution is called 'aa_sub' my_ds = ds.loc[ dict( peptide_id=id_query(ds, f"Loc == '{loc}'", "peptide"), sample_id=id_query(ds, f"{sample_factor} == '{sfact_val}'") ) ] diff_sel = my_ds[metric].to_pandas().to_numpy().flatten() my_df = my_ds.peptide_table.loc[:, ["aa_sub"]].to_pandas() my_df["diff_sel"] = diff_sel esc_data_neg = [] esc_data_pos = [] for aa in _get_aa_ordered_list(): val = my_df[my_df["aa_sub"] == aa]["diff_sel"].item() if val > 0: esc_data_neg.append(0) esc_data_pos.append(val) else: esc_data_neg.append(-val) esc_data_pos.append(0) esc_data = esc_data_neg + esc_data_pos if np.sum(esc_data) == 0: return esc_data else: return esc_data / np.sum(esc_data)
def _get_weights(ds, metric, sample_factor, sfact_val1, sfact_val2, loc_start, loc_end): """ return the list of weights for computing the weighted sum of similarity scores in region [loc_start, loc_end] """ # Code assumes peptide annotation for location is called 'Loc' loc_sums1 = [] loc_sums2 = [] for loc in range(loc_start, loc_end + 1): ds1 = ds.loc[ dict( peptide_id=id_query(ds, f"Loc == '{loc}'", "peptide"), sample_id=id_query(ds, f"{sample_factor} == '{sfact_val1}'") ) ] diff_sel1 = ds1[metric].to_pandas().to_numpy().flatten() loc_sums1.append(0) for val in diff_sel1: loc_sums1[-1] = loc_sums1[-1] + abs(val) ds2 = ds.loc[ dict( peptide_id=id_query(ds, f"Loc == '{loc}'", "peptide"), sample_id=id_query(ds, f"{sample_factor} == '{sfact_val2}'") ) ] diff_sel2 = ds2[metric].to_pandas().to_numpy().flatten() loc_sums2.append(0) for val in diff_sel2: loc_sums2[-1] = loc_sums2[-1] + abs(val) loc_sums1 = loc_sums1 / np.sum(loc_sums1) loc_sums2 = loc_sums2 / np.sum(loc_sums2) weights = {} total = 0 for i, loc in zip(range(loc_end - loc_start + 1), range(loc_start, loc_end + 1)): val = min(loc_sums1[i], loc_sums2[i]) total = total + val weights[loc] = val weights = {k: v / total for k, v in weights.items()} return weights
[docs]def compute_sim_score(a, b, cost_matrix): """ Returns the similarity score given two distributions and the cost matrix. Parameters ---------- a : list A distribution of relative contribution for each amino acid to scaled differential selection. b : list Another distribution of relative contribution for each amino acid to scaled differential selection. cost_matrix : list The cost matrix to evaluate optimal transport from a to b. Returns ------- float : The similarity score (reciprocal of the optimal transport cost). """ if np.sum(a) == 0 or np.sum(b) == 0: return 0 cost = ot.emd2(a, b, cost_matrix) return 1.0 / cost
[docs]def loc_sim_score(ds, metric, cost_matrix, sample_factor, sfact_val1, sfact_val2, loc): """ Returns the similarity score for comparison at a site between two samples. Parameters ---------- ds : xarray.DataSet The dataset containing the sample of interest. metric : str The name of the scaled differential selection data in ds. cost_matrix : list The cost matrix to evaluate optimal transport between two distributions. sample_factor : str The sample annotation label to identify the samples (e.g. 'sample_ID'). sfact_val1 : str The sample_factor value to identify sample 1. sfact_val2 : str The sample_factor value to identify sample 2. loc : int The location number for the amino acid site of interest. Returns ------- float : The similarity score at the amino acid site. """ a = get_loc_esc_distr(ds, metric, sample_factor, sfact_val1, loc) b = get_loc_esc_distr(ds, metric, sample_factor, sfact_val2, loc) return compute_sim_score(a, b, cost_matrix)
[docs]def region_sim_score( ds, metric, cost_matrix, sample_factor, sfact_val1, sfact_val2, loc_start, loc_end ): """ Returns the similarity score for comparison in the region [loc_start, loc_end]. Parameters ---------- ds : xarray.DataSet The dataset containing the sample of interest metric : str The name of the scaled differential selection data in ds. cost_matrix : list The cost matrix to evaluate optimal transport between two distributions. sample_factor : str The sample annotation label to identify the samples (e.g. 'sample_ID'). sfact_val1 : str The sample_factor value to identify sample 1. sfact_val2 : str The sample_factor value to identify sample 2. loc_start : int The location number for the first amino acid site in the region of interest. loc_end : int The location number for the last amino acid site in the region of interest. Returns ------- float : The similarity score for the region. """ weights = _get_weights( ds, metric, sample_factor, sfact_val1, sfact_val2, loc_start, loc_end ) region_sim = 0 for loc in range(loc_start, loc_end + 1): sim = weights[loc] * loc_sim_score( ds, metric, cost_matrix, sample_factor, sfact_val1, sfact_val2, loc ) region_sim = region_sim + sim return region_sim