Source code for singler.get_classic_markers

from typing import Any, Optional, Sequence, Union

import delayedarray
import mattress
import numpy
import biocutils

from . import lib_singler as lib
from ._utils import (
    _clean_matrix,
    _create_map,
    _stable_intersect,
    _stable_union,
)


def _get_classic_markers_raw(ref_ptrs: list, ref_labels: list, ref_features: list, num_de=None, num_threads=1):
    nrefs = len(ref_ptrs)

    # We assume that ref_ptrs and ref_features contains the outputs of
    # _clean_matrix, so there's no need to re-check their consistency.
    for i, x in enumerate(ref_ptrs):
        nc = x.ncol()
        if nc != len(ref_labels[i]):
            raise ValueError(
                "number of columns of 'ref' should be equal to the length of the corresponding 'labels'"
            )

    # Defining the intersection of features.
    common_features = _stable_intersect(*ref_features)
    if len(common_features) == 0:
        for r in ref_ptrs:
            if r.nrow():
                raise ValueError("no common feature names across 'features'")

    common_features_map = _create_map(common_features)

    # Computing medians for each group within each median.
    ref2 = []
    ref2_ptrs = []
    tmp_labels = []

    for i, x in enumerate(ref_ptrs):
        survivors = []
        remap = [None] * len(common_features)
        for j, f in enumerate(ref_features[i]):
            if f is not None and f in common_features_map:
                survivors.append(j)
                remap[common_features_map[f]] = len(survivors) - 1

        da = delayedarray.DelayedArray(x)[survivors, :]
        ptr = mattress.initialize(da)
        med, lev = ptr.row_medians_by_group(ref_labels[i], num_threads=num_threads)
        tmp_labels.append(lev)

        finalptr = mattress.initialize(med[remap, :])
        ref2.append(finalptr)
        ref2_ptrs.append(finalptr.ptr)

    ref_labels = tmp_labels

    # Defining the union of labels across all references.
    common_labels = _stable_union(*ref_labels)
    common_labels_map = _create_map(common_labels)

    labels2 = []
    for i, lab in enumerate(ref_labels):
        converted = numpy.ndarray(len(lab), dtype=numpy.uint32)
        for j, x in enumerate(lab):
            converted[j] = common_labels_map[x]
        labels2.append(converted)

    # Finally getting around to calling markers.
    if num_de is None:
        num_de = -1
    elif num_de <= 0:
        raise ValueError("'num_de' should be positive")

    raw_markers = lib.find_classic_markers(
        len(common_labels),
        len(common_features),
        ref2_ptrs,
        labels2,
        num_de,
        num_threads
    )

    return raw_markers, common_labels, common_features


[docs] def get_classic_markers( ref_data: Union[Any, list[Any]], ref_labels: Union[Sequence, list[Sequence]], ref_features: Union[Sequence, list[Sequence]], assay_type: Union[str, int] = "logcounts", check_missing: bool = True, num_de: Optional[int] = None, num_threads: int = 1, ) -> dict[Any, dict[Any, list]]: """Compute markers from a reference using the classic SingleR algorithm. This is typically done for reference datasets derived from replicated bulk transcriptomic experiments. Args: ref_data: A matrix-like object containing the log-normalized expression values of a reference dataset. Each column is a sample and each row is a feature. Alternatively, this can be a :py:class:`~summarizedexperiment.SummarizedExperiment.SummarizedExperiment` containing a matrix-like object in one of its assays. Alternatively, a list of such matrices or ``SummarizedExperiment`` objects, typically for multiple batches of the same reference; it is assumed that different batches exhibit at least some overlap in their ``ref_features`` and ``ref_labels``. ref_labels: If ``ref_data`` is not a list, ``ref_labels`` should be a sequence of length equal to the number of columns of ``ref_data``, containing a label (usually a string) for each column. If ``ref_data`` is a list, ``ref_labels`` should also be a list of the same length. Each entry should be a sequence of length equal to the number of columns of the corresponding entry of ``ref_data``. ref_features: If ``ref_data`` is not a list, ``ref_features`` should be a sequence of length equal to the number of rows of ``ref_data``, containing the feature name (usually a string) for each row. If ``ref_data`` is a list, ``ref_features`` should also be a list of the same length. Each entry should be a sequence of length equal to the number of rows of the corresponding entry of ``ref``. assay_type: Name or index of the assay of interest, if ``ref`` is or contains :py:class:`~summarizedexperiment.SummarizedExperiment.SummarizedExperiment` objects. check_missing: Whether to check for and remove rows with missing (NaN) values in the reference matrices. This can be set to False if it is known that no NaN values exist. num_de: Number of differentially expressed genes to use as markers for each pairwise comparison between labels. If ``None``, an appropriate number of genes is automatically determined. num_threads: Number of threads to use for the calculations. Returns: A dictionary of dictionary of lists containing the markers for each pairwise comparison between labels, i.e., ``markers[a][b]`` contains the upregulated markers for label ``a`` over label ``b``. """ if not isinstance(ref_data, list): ref_data = [ref_data] ref_labels = [ref_labels] ref_features = [ref_features] nrefs = len(ref_data) if nrefs != len(ref_labels): raise ValueError("length of 'ref' and 'labels' should be the same") if nrefs != len(ref_features): raise ValueError("length of 'ref' and 'features' should be the same") ref_ptrs = [] cleaned_features = [] for i in range(nrefs): r, f = _clean_matrix( ref_data[i], ref_features[i], assay_type=assay_type, check_missing=check_missing, num_threads=num_threads, ) ref_ptrs.append(mattress.initialize(r)) cleaned_features.append(f) raw_markers, common_labels, common_features = _get_classic_markers_raw( ref_ptrs=ref_ptrs, ref_labels=ref_labels, ref_features=cleaned_features, num_de=num_de, num_threads=num_threads, ) markers = {} for i, x in enumerate(common_labels): current = {} for j, y in enumerate(common_labels): current[y] = biocutils.StringList(common_features[k] for k in raw_markers[i][j]) markers[x] = current return markers
[docs] def number_of_classic_markers(num_labels: int) -> int: """Compute the number of markers to detect for a given number of labels, using the classic SingleR marker detection algorithm. Args: num_labels: Number of labels. Returns: int: Number of markers. """ return lib.number_of_classic_markers(num_labels)