import anndata
import numpy as np
from sklearn.linear_model import LinearRegression
import pandas as pd
def resample(
data: anndata.AnnData, condition_name: str, random_seed: int = None
) -> anndata.AnnData:
"""Perform stratified bootstrap sampling by resampling cells within each sample.
This maintains the same number of cells per sample in the resampled dataset.
"""
# Set random seed for reproducibility if provided
if random_seed is not None:
np.random.seed(random_seed)
resampled_indices = []
for sample in data.obs[condition_name].unique():
# Get indices of cells in this sample
sample_cell_indices = np.where(data.obs[condition_name] == sample)[0]
n_cells = len(sample_cell_indices)
# Sample with replacement within this sample's cells
random_local_indices = np.random.randint(0, n_cells, size=n_cells)
resampled_indices.extend(sample_cell_indices[random_local_indices])
# Create new AnnData from resampled indices
resampled_data = data[resampled_indices].copy()
resampled_data.obs_names_make_unique()
return resampled_data
def correct_conditions(X: anndata.AnnData) -> np.ndarray:
"""Correct the conditions factors by overall read depth."""
sgIndex = X.obs["condition_unique_idxs"]
counts = np.zeros((np.amax(sgIndex.to_numpy()) + 1, 1))
cond_mean = np.linalg.norm(X.uns["A"], axis=1)
x_count = X.X.sum(axis=1)
for ii in range(counts.size):
counts[ii] = np.sum(x_count[X.obs["condition_unique_idxs"] == ii])
lr = LinearRegression()
lr.fit(counts, cond_mean.reshape(-1, 1))
counts_correct = lr.predict(counts)
return X.uns["A"] / counts_correct
def pseudobulk_X(
X: anndata, condition_name: str, groupby: str, type: str
) -> list[pd.DataFrame]:
"""
Calculate average gene expression for each groupby in each sample
"""
# Get unique samples and cell types
samples = np.unique(X.obs[condition_name].unique())
groupby_names = np.unique(X.obs[groupby].unique())
gene_names = X.var_names
total_df = []
for sample in samples:
# Subset to current sample
sample_mask = X.obs[condition_name] == sample
X_sample = X[sample_mask, :]
results = []
for group_name in groupby_names:
# Subset to current groupby
group_mask = X_sample.obs[groupby] == group_name
adata_subset = X_sample[group_mask, :]
result_dict = {}
if adata_subset.n_obs > 0 and type == "mean":
mean_expression = np.mean(adata_subset.X.toarray(), axis=0)
for i, gene in enumerate(gene_names):
result_dict[gene] = mean_expression[i]
elif adata_subset.n_obs > 0 and type == "fraction":
ct_df = adata_subset.X.toarray()
cell_fraction = (ct_df > 0).sum(axis=0) / ct_df.shape[0]
for i, gene in enumerate(gene_names):
result_dict[gene] = cell_fraction[i]
else:
# Set zero expression for missing combinations
for gene in gene_names:
result_dict[gene] = 0.0
results.append(result_dict)
# Dataframe with genes as rows and groupby as columns
results_df = pd.DataFrame(results, columns=gene_names, index=groupby_names)
total_df.append(results_df.T)
return total_df
def add_obs_cmp_unique_one(X: anndata.AnnData, cmp: str):
"""Creates AnnData observation column for a single component."""
label_col = f"Cmp{cmp}"
X.obs["Label"] = np.where(X.obs[label_col], label_col, "NoLabel")
return X
def add_obs_cmp_label(
X: anndata.AnnData,
cmp: int,
pos: bool = True,
top_perc: float = 1,
type: str = "receiver",
):
"""Adds a boolean label to X.obs for cells in the top or bottom percentage of a single component."""
if type == "sender":
factor_type = X.obsm["sc_B"]
elif type == "receiver":
factor_type = X.obsm["rc_C"]
if pos:
thres_value = 100 - top_perc
threshold = np.percentile(factor_type, thres_value, axis=0)
idx = factor_type[:, cmp - 1] > threshold[cmp - 1]
else:
thres_value = top_perc
threshold = np.percentile(factor_type, thres_value, axis=0)
idx = factor_type[:, cmp - 1] < threshold[cmp - 1]
X.obs[f"Cmp{cmp}"] = idx
return X
[docs]
def expression_product_matrix(
X1: anndata.AnnData, X2: anndata.AnnData, ligand: str, receptor: str
):
"""
For each cell in X1 and each cell in X2, compute the product:
X1[cell_i, ligand] * X2[cell_j, receptor]
Returns a DataFrame with X1 cells as rows and X2 cells as columns.
"""
# Ensure gene names are present
assert ligand in X1.var_names, f"{ligand} not in X1"
assert receptor in X2.var_names, f"{receptor} not in X2"
# Get expression vectors
# Ensure 1D dense arrays
# Convert to dense 1D arrays, even if sparse
expr1 = X1[:, ligand].X
if hasattr(expr1, "toarray"):
expr1 = expr1.toarray().flatten()
else:
expr1 = np.ravel(np.array(expr1))
expr2 = X2[:, receptor].X
if hasattr(expr2, "toarray"):
expr2 = expr2.toarray().flatten()
else:
expr2 = np.ravel(np.array(expr2))
# Compute outer product
product_matrix = np.outer(expr1, expr2)
# Build DataFrame
df = pd.DataFrame(product_matrix, index=X1.obs_names, columns=X2.obs_names)
return df
def add_obs_cmp_both_label(
X: anndata.AnnData,
cmp1: int,
cmp2: int,
pos1=True,
pos2=True,
top_perc=1,
type="sender",
):
"""Adds if cells in top/bot percentage"""
if type == "sender":
factor_type = X.obsm["sc_B"]
elif type == "receiver":
factor_type = X.obsm["rc_C"]
pos_neg = [pos1, pos2]
for i, cmp in enumerate([cmp1, cmp2]):
if i == 0:
if pos_neg[i] is True:
thres_value = 100 - top_perc
threshold1 = np.percentile(factor_type, thres_value, axis=0)
idx = factor_type[:, cmp - 1] > threshold1[cmp - 1]
else:
thres_value = top_perc
threshold1 = np.percentile(factor_type, thres_value, axis=0)
idx = factor_type[:, cmp - 1] < threshold1[cmp - 1]
if i == 1:
if pos_neg[i] is True:
thres_value = 100 - top_perc
threshold2 = np.percentile(factor_type, thres_value, axis=0)
idx = factor_type[:, cmp - 1] > threshold2[cmp - 1]
else:
thres_value = top_perc
threshold2 = np.percentile(factor_type, thres_value, axis=0)
idx = factor_type[:, cmp - 1] < threshold2[cmp - 1]
X.obs[f"Cmp{cmp}"] = idx
if pos1 is True and pos2 is True:
idx = (factor_type[:, cmp1 - 1] > threshold1[cmp1 - 1]) & (
factor_type[:, cmp2 - 1] > threshold2[cmp2 - 1]
)
elif pos1 is False and pos2 is False:
idx = (factor_type[:, cmp1 - 1] < threshold1[cmp1 - 1]) & (
factor_type[:, cmp2 - 1] < threshold2[cmp2 - 1]
)
elif pos1 is True and pos2 is False:
idx = (factor_type[:, cmp1 - 1] > threshold1[cmp1 - 1]) & (
factor_type[:, cmp2 - 1] < threshold2[cmp2 - 1]
)
elif pos1 is False and pos2 is True:
idx = (factor_type[:, cmp1 - 1] < threshold1[cmp1 - 1]) & (
factor_type[:, cmp2 - 1] > threshold2[cmp2 - 1]
)
X.obs["Both"] = idx
return X
def add_obs_cmp_unique_two(X: anndata.AnnData, cmp1: str, cmp2: str):
"""Creates AnnData observation column"""
X.obs.loc[
((X.obs[f"Cmp{cmp1}"] == True) & (X.obs[f"Cmp{cmp2}"] == False), "Label")
] = f"Cmp{cmp1}"
X.obs.loc[
(X.obs[f"Cmp{cmp1}"] == False) & (X.obs[f"Cmp{cmp2}"] == True), "Label"
] = f"Cmp{cmp2}"
X.obs.loc[
(X.obs[f"Cmp{cmp1}"] == True) & (X.obs[f"Cmp{cmp2}"] == True), "Label"
] = "Both"
X.obs.loc[
(X.obs[f"Cmp{cmp1}"] == False) & (X.obs[f"Cmp{cmp2}"] == False), "Label"
] = "NoLabel"
return X
[docs]
def average_product_matrix_ccc(df):
"""
Groups a DataFrame into a 10x10 matrix by binning rows and columns and averaging within bins.
Prints shape information for debugging.
"""
n_rows = len(df)
n_cols = len(df.columns)
row_group_size = n_rows // 10
col_group_size = n_cols // 10
row_groups = np.arange(n_rows) // row_group_size
col_groups = np.arange(n_cols) // col_group_size
row_groups = np.clip(row_groups, 0, 9)
col_groups = np.clip(col_groups, 0, 9)
# Group rows
df_grouped = df.groupby(row_groups).mean()
# Group columns by transposing, grouping, then transposing back
df_grouped = df_grouped.T.groupby(col_groups).mean().T
return df_grouped