Source code for matchclot.preprocessing.preprocess
from typing import Optional
import anndata
import numpy as np
import pandas as pd
import scipy.sparse
import sklearn.decomposition
import sklearn.feature_extraction.text
import sklearn.neighbors
import sklearn.preprocessing
import sklearn.utils.extmath
from harmony import harmonize
[docs]def tfidf(X):
"""
TF-IDF normalization (following the Seurat v3 approach)
Parameters
----------
X
Input matrix
Returns
-------
X_tfidf
TF-IDF normalized matrix
"""
idf = X.shape[0] / X.sum(axis=0)
if scipy.sparse.issparse(X):
tf = X.multiply(1 / X.sum(axis=1))
return tf.multiply(idf)
else:
tf = X / X.sum(axis=1, keepdims=True)
return tf * idf
[docs]class tfidfTransformer:
def __init__(self):
self.idf = None
self.fitted = False
[docs] def transform(self, X):
if not self.fitted:
raise RuntimeError("Transformer was not fitted on any data")
if scipy.sparse.issparse(X):
tf = X.multiply(1 / X.sum(axis=1))
return tf.multiply(self.idf)
else:
tf = X / X.sum(axis=1, keepdims=True)
return tf * self.idf
[docs]class lsiTransformer:
def __init__(
self, n_components: int = 20, drop_first=True, use_highly_variable=None
):
self.drop_first = drop_first
self.n_components = n_components + drop_first
self.use_highly_variable = use_highly_variable
self.tfidfTransformer = tfidfTransformer()
self.normalizer = sklearn.preprocessing.Normalizer(norm="l1")
self.pcaTransformer = sklearn.decomposition.TruncatedSVD(
n_components=self.n_components, random_state=777
)
self.fitted = None
[docs] def fit(self, adata: anndata.AnnData):
if self.use_highly_variable is None:
self.use_highly_variable = "highly_variable" in adata.var
adata_use = (
adata[:, adata.var["highly_variable"]]
if self.use_highly_variable
else adata
)
X = self.tfidfTransformer.fit_transform(adata_use.X)
X_norm = self.normalizer.fit_transform(X)
X_norm = np.log1p(X_norm * 1e4)
self.pcaTransformer.fit(X_norm)
self.fitted = True
[docs] def transform(self, adata):
if not self.fitted:
raise RuntimeError("Transformer was not fitted on any data")
adata_use = (
adata[:, adata.var["highly_variable"]]
if self.use_highly_variable
else adata
)
X = self.tfidfTransformer.transform(adata_use.X)
X_norm = self.normalizer.transform(X)
X_norm = np.log1p(X_norm * 1e4)
X_lsi = self.pcaTransformer.transform(X_norm)
X_lsi -= X_lsi.mean(axis=1, keepdims=True)
X_lsi /= X_lsi.std(axis=1, ddof=1, keepdims=True)
lsi_df = pd.DataFrame(X_lsi, index=adata_use.obs_names).iloc[
:, int(self.drop_first) :
]
return lsi_df
[docs]def lsi(
adata: anndata.AnnData,
n_components: int = 20,
use_highly_variable: Optional[bool] = None,
**kwargs,
) -> None:
"""
LSI analysis (following the Seurat v3 approach)
Parameters
----------
adata
Input dataset
n_components
Number of dimensions to use
use_highly_variable
Whether to use highly variable features only, stored in
``adata.var['highly_variable']``. By default uses them if they
have been determined beforehand.
**kwargs
Additional keyword arguments are passed to
:func:`sklearn.utils.extmath.randomized_svd`
"""
if use_highly_variable is None:
use_highly_variable = "highly_variable" in adata.var
adata_use = adata[:, adata.var["highly_variable"]] if use_highly_variable else adata
X = tfidf(adata_use.X)
X_norm = sklearn.preprocessing.Normalizer(norm="l1").fit_transform(X)
X_norm = np.log1p(X_norm * 1e4)
X_lsi = sklearn.utils.extmath.randomized_svd(
X_norm, n_components, random_state=777, **kwargs
)[0]
X_lsi -= X_lsi.mean(axis=1, keepdims=True)
X_lsi /= X_lsi.std(axis=1, ddof=1, keepdims=True)
adata.obsm["X_lsi"] = X_lsi
[docs]def harmony(df_list, use_gpu=True):
"""
Harmony batch effect correction applied jointly to multiple dataframes
"""
all = pd.concat(df_list)
all_batches = all.pop("batch")
all_batches.columns = ["batch"]
all_batches = all_batches.to_frame()
all_harmony = harmonize(
all.to_numpy(), all_batches, batch_key="batch", use_gpu=use_gpu, verbose=True
)
out_df_list = []
curr_idx = 0
for df in df_list:
out_df_list.append(all_harmony[curr_idx : curr_idx + len(df)])
curr_idx += len(df)
return out_df_list