Source code for singler._get_classic_markers

from typing import Any, Optional, Sequence, Union

import delayedarray
import mattress
import numpy
import biocutils

from . import _lib_singler as lib
from ._utils import (
    _clean_matrix,
    _create_map,
    _stable_intersect,
    _stable_union,
)


def _get_classic_markers_raw(ref_ptrs: list, ref_labels: list, ref_features: list, num_de=None, num_threads=1):
    nrefs = len(ref_ptrs)

    # We assume that ref_ptrs and ref_features contains the outputs of
    # _clean_matrix, so there's no need to re-check their consistency.
    for i, x in enumerate(ref_ptrs):
        nc = x.ncol()
        if nc != len(ref_labels[i]):
            raise ValueError(
                "number of columns of 'ref' should be equal to the length of the corresponding 'labels'"
            )

    # Defining the intersection of features.
    common_features = _stable_intersect(*ref_features)
    if len(common_features) == 0:
        for r in ref_ptrs:
            if r.nrow():
                raise ValueError("no common feature names across 'features'")

    common_features_map = _create_map(common_features)

    # Computing medians for each group within each median.
    ref2 = []
    ref2_ptrs = []
    tmp_labels = []

    for i, x in enumerate(ref_ptrs):
        survivors = []
        remap = [None] * len(common_features)
        for j, f in enumerate(ref_features[i]):
            if f is not None and f in common_features_map:
                survivors.append(j)
                remap[common_features_map[f]] = len(survivors) - 1

        da = delayedarray.DelayedArray(x)[survivors, :]
        ptr = mattress.initialize(da)
        med, lev = ptr.row_medians_by_group(ref_labels[i], num_threads=num_threads)
        tmp_labels.append(lev)

        finalptr = mattress.initialize(med[remap, :])
        ref2.append(finalptr)
        ref2_ptrs.append(finalptr.ptr)

    ref_labels = tmp_labels

    # Defining the union of labels across all references.
    common_labels = _stable_union(*ref_labels)
    common_labels_map = _create_map(common_labels)

    labels2 = []
    for i, lab in enumerate(ref_labels):
        converted = numpy.ndarray(len(lab), dtype=numpy.uint32)
        for j, x in enumerate(lab):
            converted[j] = common_labels_map[x]
        labels2.append(converted)

    # Finally getting around to calling markers.
    if num_de is None:
        num_de = -1
    elif num_de <= 0:
        raise ValueError("'num_de' should be positive")

    raw_markers = lib.find_classic_markers(
        len(common_labels),
        len(common_features),
        ref2_ptrs,
        labels2,
        num_de,
        num_threads
    )

    return raw_markers, common_labels, common_features


[docs] def get_classic_markers( ref_data: Union[Any, list[Any]], ref_labels: Union[Sequence, list[Sequence]], ref_features: Union[Sequence, list[Sequence]], assay_type: Union[str, int] = "logcounts", check_missing: bool = True, num_de: Optional[int] = None, num_threads: int = 1, ) -> dict[Any, dict[Any, list]]: """ Compute markers from a reference using the classic SingleR algorithm. This is typically done for reference datasets derived from replicated bulk transcriptomic experiments. Args: ref_data: A matrix-like object containing the log-normalized expression values of a reference dataset. Each column is a sample and each row is a feature. Alternatively, this can be a :py:class:`~summarizedexperiment.SummarizedExperiment.SummarizedExperiment` containing a matrix-like object in one of its assays. Alternatively, a list of such matrices or ``SummarizedExperiment`` objects, typically for multiple batches of the same reference. It is assumed that different batches exhibit at least some overlap in their ``ref_features`` and ``ref_labels``. ref_labels: If ``ref_data`` is not a list, ``ref_labels`` should be a sequence of length equal to the number of columns of ``ref_data``. Each entry should be a label (usually a string) for each column of ``ref_data``. If ``ref_data`` is a list, ``ref_labels`` should also be a list of the same length. Each entry should be a sequence of length equal to the number of columns of the corresponding entry of ``ref_data`` and should contain the column labels. ref_features: If ``ref_data`` is not a list, ``ref_features`` should be a sequence of length equal to the number of rows of ``ref_data``. Each entry should be a feature name (usually a string) for each row. If ``ref_data`` is a list, ``ref_features`` should also be a list of the same length. Each entry should be a sequence of length equal to the number of rows of the corresponding entry of ``ref`` and should containg the feature names. assay_type: Name or index of the assay of interest, if ``ref`` is or contains :py:class:`~summarizedexperiment.SummarizedExperiment.SummarizedExperiment` objects. check_missing: Whether to check for and remove rows with missing (NaN) values in the reference matrices. This can be set to ``False`` if it is known that no NaN values exist. num_de: Number of differentially expressed genes to use as markers for each pairwise comparison between labels. If ``None``, an appropriate number of genes is automatically determined. num_threads: Number of threads to use for the calculations. Returns: A dictionary of dictionary of lists containing the markers for each pairwise comparison between labels, i.e., ``markers[a][b]`` contains the upregulated markers for label ``a`` over label ``b``. Examples: >>> import singler >>> ref = singler.mock_reference_data() >>> import scranpy >>> ref = scranpy.normalize_rna_counts_se(ref) >>> classical = singler.get_classic_markers(ref, ref.get_column_data()["label"], ref.get_row_names()) >>> classical["A"]["B"] """ if not isinstance(ref_data, list): ref_data = [ref_data] ref_labels = [ref_labels] ref_features = [ref_features] nrefs = len(ref_data) if nrefs != len(ref_labels): raise ValueError("length of 'ref' and 'labels' should be the same") if nrefs != len(ref_features): raise ValueError("length of 'ref' and 'features' should be the same") ref_ptrs = [] cleaned_features = [] for i in range(nrefs): r, f = _clean_matrix( ref_data[i], ref_features[i], assay_type=assay_type, check_missing=check_missing, num_threads=num_threads, ) ref_ptrs.append(mattress.initialize(r)) cleaned_features.append(f) raw_markers, common_labels, common_features = _get_classic_markers_raw( ref_ptrs=ref_ptrs, ref_labels=ref_labels, ref_features=cleaned_features, num_de=num_de, num_threads=num_threads, ) markers = {} for i, x in enumerate(common_labels): current = {} for j, y in enumerate(common_labels): current[y] = biocutils.StringList(common_features[k] for k in raw_markers[i][j]) markers[x] = current return markers
[docs] def number_of_classic_markers(num_labels: int) -> int: """Compute the number of markers to detect for a given number of labels, using the classic SingleR marker detection algorithm. Args: num_labels: Number of labels. Returns: Number of markers. Examples: >>> import singler >>> singler.number_of_classic_markers(15) """ return lib.number_of_classic_markers(num_labels)