Source code for pipeline.diversity_plots

"""Plotting routines for visualizing chemical diversity of datasets"""

import os
import pandas as pd
import numpy as np
import seaborn as sns
import umap
from scipy.stats.kde import gaussian_kde
from scipy.cluster.hierarchy import linkage
import matplotlib
import matplotlib.pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages
import logging
from rdkit import Chem
from rdkit.Chem import AllChem, Draw

from atomsci.ddm.utils import struct_utils
from atomsci.ddm.pipeline import dist_metrics as dm
from atomsci.ddm.pipeline import  chem_diversity as cd
from atomsci.ddm.utils import datastore_functions as dsf

#matplotlib.style.use('ggplot')
matplotlib.rc('xtick', labelsize=12)
matplotlib.rc('ytick', labelsize=12)
matplotlib.rc('axes', labelsize=12)

logging.basicConfig(format='%(asctime)-15s %(message)s')

ndist_max = 1000000

#------------------------------------------------------------------------------------------------------------------
[docs] def plot_dataset_dist_distr(dataset, feat_type, dist_metric, task_name, **metric_kwargs): """ Generate a density plot showing the distribution of distances between dataset feature vectors, using the specified feature type and distance metric. Args: dataset (deepchem.Dataset): A dataset object. At minimum, it should contain a 2D numpy array 'X' of feature vectors. feat_type (str): Type of features ('ECFP' or 'descriptors'). dist_metric (str): Name of metric to be used to compute distances; can be anything supported by scipy.spatial.distance.pdist. task_name (str): Abbreviated name to describe dataset in plot title. metric_kwargs: Additional arguments to pass to metric. Returns: np.ndarray: Distance matrix. """ log = logging.getLogger('ATOM') num_cmpds = dataset.X.shape[0] if num_cmpds > 50000: log.warning("Dataset has %d compounds, too big to calculate distance matrix" % num_cmpds) return log.warning("Starting distance matrix calculation for %d compounds" % num_cmpds) dists = cd.calc_dist_diskdataset(feat_type, dist_metric, dataset, calc_type='all', **metric_kwargs) log.warning("Finished calculation of %d distances" % len(dists)) if len(dists) > ndist_max: # Sample a subset of the distances so KDE doesn't take so long dist_sample = np.random.choice(dists, size=ndist_max) else: dist_sample = dists dist_pdf = gaussian_kde(dist_sample) x_plt = np.linspace(min(dist_sample), max(dist_sample), 500) y_plt = dist_pdf(x_plt) fig, ax = plt.subplots(figsize=(8.0,8.0)) ax.plot(x_plt, y_plt, color='forestgreen') ax.set_xlabel('%s distance' % dist_metric) ax.set_ylabel('Density') ax.set_title("%s dataset\nDistribution of %s distances between %s feature vectors" % ( task_name, dist_metric, feat_type)) return dists
#------------------------------------------------------------------------------------------------------------------
[docs] def plot_tani_dist_distr(df, smiles_col, df_name, radius=2, subset_col='subset', subsets=False, ref_subset='train', plot_width=6, ndist_max=None, **metric_kwargs): """Generate a density plot showing the distribution of nearest neighbor distances between ecfp feature vectors, using the Tanimoto metric. Optionally split by subset. Args: df (DataFrame): A data frame containing, at minimum, a column of SMILES strings. smiles_col (str): Name of the column containing SMILES strings. df_name (str): Name for the dataset, to be used in the plot title. radius (int): Radius parameter used to calculate ECFP fingerprints. The default is 2, meaning that ECFP4 fingerprints are calculated. subset_col (str): Name of the column containing subset names. subsets (bool): If True, distances are only calculated for compounds not in the reference subset, and the distances computed are to the nearest neighbors in the reference subset. ref_subset (str): Reference subset for nearest-neighbor distances, if `subsets` is True. plot_width (float): Plot width in inches. ndist_max (int): Not used, included only for backward compatibility. metric_kwargs: Additional arguments to pass to metric. Not used, included only for backward compatibility. Returns: dist (DataFrame): Table of individual nearest-neighbor Tanimoto distance values. If subsets is True, the table will include a column indicating the subset each compound belongs to. """ log = logging.getLogger('ATOM') num_cmpds = len(df) # TODO: Make max compounds a parameter, rather than hardcoding to 50000. Better yet, calculate a sample # of distances of size ndist_max for each non-reference subset, and plot KDEs based on the samples. if num_cmpds > 50000: log.warning("Dataset has %d compounds, too big to calculate distance matrix" % num_cmpds) return if subsets and subset_col not in df.columns: log.warning(f"{subset_col} column not found. Calculating total tanimoto distances instead.") subsets=False feat_type = 'ecfp' dist_metric = 'tanimoto' if not subsets: smiles_arr1 = df[smiles_col].values dists=cd.calc_dist_smiles(feat_type, dist_metric, smiles_arr1, calc_type='nearest', num_nearest=1) # dists=cd.calc_dist_smiles(feat_type, dist_metric, smiles_arr1, calc_type='all') #print(len(smiles_arr1), dists.shape) # flatten dists dists = dists.flatten() subs=['all']*len(dists) dists=pd.DataFrame(zip(dists,subs), columns=['dist','subset']) elif subsets: dists=pd.DataFrame([], columns=['dist','subset']) for subs in df[subset_col].unique(): if subs==ref_subset: continue smiles_arr1 = df.loc[df[subset_col]==ref_subset, smiles_col].values smiles_arr2 = df.loc[df[subset_col]==subs, smiles_col].values diststmp = cd.calc_dist_smiles(feat_type, dist_metric, smiles_arr2, smiles_arr2=smiles_arr1, calc_type='nearest', num_nearest=1) # diststmp = cd.calc_dist_smiles(feat_type, dist_metric, smiles_arr2, smiles_arr2=smiles_arr1, calc_type='all') #print(subs, diststmp.shape) # flatten dists diststmp = diststmp.flatten() substmp=[subs]*len(diststmp) diststmp = pd.DataFrame(zip(diststmp,substmp), columns=['dist','subset']) dists=pd.concat([dists,diststmp]) dists=dists.reset_index(drop=True) fig, ax = plt.subplots(1, figsize=(plot_width, plot_width), dpi=300) sns.kdeplot(data=dists[dists.subset!=ref_subset], x='dist', hue='subset', legend=True, common_norm=False, common_grid=True, fill=False, ax=ax) ax.set_xlabel('%s distance' % dist_metric) ax.set_ylabel('Density') if not subsets: ax.set_title("%s dataset\nDistribution of %s nearest neighbor distances between %s feature vectors" % ( df_name, dist_metric, feat_type)) else: ax.set_title(f"{df_name} dataset: Distribution of {dist_metric} distances\nbetween {feat_type} feature vectors from non-{ref_subset} subsets\nto their nearest neighbors in the {ref_subset} subset") return dists
#------------------------------------------------------------------------------------------------------------------
[docs] def diversity_plots(dset_key, datastore=True, bucket='public', title_prefix=None, ecfp_radius=4, umap_file=None, out_dir=None, id_col='compound_id', smiles_col='rdkit_smiles', is_base_smiles=False, response_col=None, max_for_mcs=300, colorpal=None): """Plot visualizations of diversity for an arbitrary table of compounds. At minimum, the file should contain columns for a compound ID and a SMILES string. Produces a clustered heatmap display of Tanimoto distances between compounds along with a 2D UMAP projection plot based on ECFP fingerprints, with points colored according to the response variable. Args: dset_key (str): Datastore key or filepath for dataset. datastore (bool): Whether to load dataset from datastore or from filesystem. bucket (str): Name of datastore bucket containing dataset. title_prefix (str): Prefix for plot titles. ecfp_radius (int): Radius for ECFP fingerprint calculation. umap_file (str, optional): Path to file to write UMAP coordinates to. out_dir (str, optional): Output directory for plots and tables. If provided, plots will be output as PDF files rather than in the current notebook, and some additional CSV files will be generated. id_col (str): Column in dataset containing compound IDs. smiles_col (str): Column in dataset containing SMILES strings. is_base_smiles (bool): True if SMILES strings do not need to be salt-stripped and standardized. response_col (str): Column in dataset containing response values. max_for_mcs (int): Maximum dataset size for plots based on MCS distance. If the number of compounds is less than this value, an additional cluster heatmap and UMAP projection plot will be produced based on maximum common substructure distance. """ # Load table of compound names, IDs and SMILES strings if datastore: cmpd_df = dsf.retrieve_dataset_by_datasetkey(dset_key, bucket) else: cmpd_df = pd.read_csv(dset_key, index_col=False) cmpd_df = cmpd_df.drop_duplicates(subset=smiles_col) file_prefix = os.path.splitext(os.path.basename(dset_key))[0] if title_prefix is None: title_prefix = file_prefix.replace('_', ' ') compound_ids = cmpd_df[id_col].values smiles_strs = cmpd_df[smiles_col].values ncmpds = len(smiles_strs) # Strip salts, canonicalize SMILES strings and create RDKit Mol objects if is_base_smiles: base_mols = np.array([Chem.MolFromSmiles(s) for s in smiles_strs]) else: print("Canonicalizing %d molecules..." % ncmpds) base_mols = np.array([struct_utils.base_mol_from_smiles(smiles) for smiles in smiles_strs]) for i, mol in enumerate(base_mols): if mol is None: print('Unable to get base molecule for compound %d = %s' % (i, compound_ids[i])) print("Done") has_good_smiles = np.array([mol is not None for mol in base_mols]) base_mols = base_mols[has_good_smiles] cmpd_df = cmpd_df[has_good_smiles] ncmpds = cmpd_df.shape[0] compound_ids = cmpd_df[id_col].values responses = None if response_col is not None: responses = cmpd_df[response_col].values uniq_responses = set(responses) if colorpal is None: if uniq_responses == set([0,1]): _response_type = 'binary' colorpal = {0 : 'forestgreen', 1 : 'red'} elif len(uniq_responses) <= 10: _response_type = 'categorical' colorpal = sns.color_palette('husl', n_colors=len(uniq_responses)) else: _response_type = 'continuous' colorpal = sns.blend_palette(['red', 'green', 'blue'], 12, as_cmap=True) # Generate ECFP fingerprints print("Computing fingerprints...") fps = [AllChem.GetMorganFingerprintAsBitVect(mol, ecfp_radius, 1024) for mol in base_mols if mol is not None] print("Done") if ncmpds <= max_for_mcs: # Get MCS distance matrix and draw a heatmap print("Computing MCS distance matrix...") mcs_dist = dm.mcs(base_mols) print("Done") cmpd1 = [] cmpd2 = [] dist = [] ind1 = [] ind2 = [] for i in range(ncmpds-1): for j in range(i+1, ncmpds): cmpd1.append(compound_ids[i]) cmpd2.append(compound_ids[j]) dist.append(mcs_dist[i,j]) ind1.append(i) ind2.append(j) dist_df = pd.DataFrame({'compound_1' : cmpd1, 'compound_2' : cmpd2, 'dist' : dist, 'i' : ind1, 'j' : ind2}) dist_df = dist_df.sort_values(by='dist') print(dist_df.head(10)) if out_dir is not None: dist_df.to_csv('%s/%s_mcs_dist_table.csv' % (out_dir, file_prefix), index=False) for k in range(10): mol_i = base_mols[dist_df.i.values[k]] mol_j = base_mols[dist_df.j.values[k]] img_file_i = '%s/%d_%s.png' % (out_dir, k, compound_ids[dist_df.i.values[k]]) img_file_j = '%s/%d_%s.png' % (out_dir, k, compound_ids[dist_df.j.values[k]]) Draw.MolToFile(mol_i, img_file_i, size=(500,500), fitImage=False) Draw.MolToFile(mol_j, img_file_j, size=(500,500), fitImage=False) mcs_linkage = linkage(mcs_dist, method='complete') mcs_df = pd.DataFrame(mcs_dist, columns=compound_ids, index=compound_ids) if out_dir is not None: pdf_path = '%s/%s_mcs_clustermap.pdf' % (out_dir, file_prefix) pdf = PdfPages(pdf_path) g = sns.clustermap(mcs_df, row_linkage=mcs_linkage, col_linkage=mcs_linkage, figsize=(12,12), cmap='plasma') if out_dir is not None: pdf.savefig(g.fig) pdf.close() # Draw a UMAP projection based on MCS distance mapper = umap.UMAP(n_neighbors=20, min_dist=0.1, n_components=2, metric='precomputed', random_state=17) reps = mapper.fit_transform(mcs_dist) rep_df = pd.DataFrame.from_records(reps, columns=['x', 'y']) rep_df['compound_id'] = compound_ids if out_dir is not None: pdf_path = '%s/%s_mcs_umap_proj.pdf' % (out_dir, file_prefix) pdf = PdfPages(pdf_path) fig, ax = plt.subplots(figsize=(12,12)) if responses is None: sns.scatterplot(x='x', y='y', data=rep_df, ax=ax) else: rep_df['response'] = responses sns.scatterplot(x='x', y='y', hue='response', palette=colorpal, data=rep_df, ax=ax) ax.set_title("%s, 2D projection based on MCS distance" % title_prefix) if out_dir is not None: pdf.savefig(fig) pdf.close() rep_df.to_csv('%s/%s_mcs_umap_proj.csv' % (out_dir, file_prefix), index=False) # Get Tanimoto distance matrix print("Computing Tanimoto distance matrix...") tani_dist = dm.tanimoto(fps) print("Done") # Draw a UMAP projection based on Tanimoto distance mapper = umap.UMAP(n_neighbors=20, min_dist=0.1, n_components=2, metric='precomputed', random_state=17) reps = mapper.fit_transform(tani_dist) rep_df = pd.DataFrame.from_records(reps, columns=['x', 'y']) rep_df['compound_id'] = compound_ids if responses is not None: rep_df['response'] = responses if umap_file is not None: rep_df.to_csv(umap_file, index=False) print("Wrote UMAP mapping to %s" % umap_file) if out_dir is not None: pdf_path = '%s/%s_tani_umap_proj.pdf' % (out_dir, file_prefix) pdf = PdfPages(pdf_path) fig, ax = plt.subplots(figsize=(12,12)) if responses is None: sns.scatterplot(x='x', y='y', data=rep_df, ax=ax) else: sns.scatterplot(x='x', y='y', hue='response', palette=colorpal, data=rep_df, ax=ax) ax.set_title("%s, 2D projection based on Tanimoto distance" % title_prefix) if out_dir is not None: pdf.savefig(fig) pdf.close() # Draw a cluster heatmap based on Tanimoto distance tani_linkage = linkage(tani_dist, method='complete') tani_df = pd.DataFrame(tani_dist, columns=compound_ids, index=compound_ids) if out_dir is not None: pdf_path = '%s/%s_tanimoto_clustermap.pdf' % (out_dir, file_prefix) pdf = PdfPages(pdf_path) g = sns.clustermap(tani_df, row_linkage=tani_linkage, col_linkage=tani_linkage, figsize=(12,12), cmap='plasma') if out_dir is not None: pdf.savefig(g.fig) pdf.close()