Source code for hypercluster.additional_clusterers

"""
Additonal clustering classes can be added here, as long as they have a 'fit' method.


Attributes:
    HDBSCAN (clustering class): See `hdbscan`_

.. _hdbscan:
    https://hdbscan.readthedocs.io/en/latest/basic_hdbscan.html#the-simple-case/
"""
from typing import Optional, Iterable
import logging
import numpy as np
import pandas as pd
from scipy.spatial.distance import pdist
from sklearn.decomposition import NMF
from sklearn.neighbors import NearestNeighbors
from hdbscan import HDBSCAN
from .constants import pdist_adjacency_methods, valid_partition_types
import igraph as ig
import louvain
import leidenalg


[docs]class NMFCluster: """Uses non-negative factorization from sklearn to assign clusters to samples, based on the maximum membership score of the sample per component. Args: n_clusters: The number of clusters to find. Used as n_components when fitting. **nmf_kwargs: """ def __init__(self, n_clusters: int = 8, **nmf_kwargs): nmf_kwargs['n_components'] = n_clusters self.NMF = NMF(**nmf_kwargs) self.n_clusters = n_clusters
[docs] def fit(self, data): """If negative numbers are present, creates one data matrix with all negative numbers zeroed. Create another data matrix with all positive numbers zeroed and the signs of all negative numbers reversed. Concatenate both matrices resulting in a data matrix twice as large as the original, but with positive values only and zeros and hence appropriate for NMF. Uses decomposed matrix H, which is nxk (with n=number of samples and k=number of components) to assign cluster membership. Each sample is assigned to the cluster for which it has the highest membership score. See `sklearn.decomposition.NMF`_ Args: data (DataFrame): Data to fit with samples as rows and features as columns. Returns: self with labels\_ attribute. .. _sklearn.decomposition.NMF: https://scikit-learn.org/stable/modules/generated/sklearn.decomposition.NMF.html """ if np.any(data<0): positive = data.copy() positive[positive < 0] = 0 negative = data.copy() negative[negative > 0] = 0 negative = -negative data = pd.concat([positive, negative], axis=1, join='outer') self.labels_ = pd.DataFrame(self.NMF.fit_transform(data)).idxmax(axis=1).values return self
[docs]class LouvainCluster: """Louvain clustering on graph derived from an adjacency matrix. Args: adjacency_method: Method to use to construct adjacency matrix, which is used to construct \ graph that will be clustered. Valid methods are any metric valid in \ scipy.spatial.distance.pdist, or MNN, for mutual nearest neighbors and CNN for common \ nearest neighbors. Both use sklearn.neighbors.NearestNeighbors at a given k to calculate \ NNs. MNN then uses whether points i and j are each others NNs as edge weights. CNN uses \ the count of how many NNs i and j have in common as the edge weight. k: If using CNN or MNN, k to use to construct the NearestNeighbors matrix. resolution: If using 'RBConfigurationVertexPartition', 'CPMVertexPartition' which \ resolution to use. If using other partitioners, this is ignored but any other kwargs for \ those partitioners can be passed too. adjacency_kwargs: Additional keyword arguments to pass to \ sklearn.neighbors.NearestNeighbors or scipy.spatial.distance.pdist to construct the \ adjacency matrix. partition_type: Which partition to use for louvain clustering, see `louvain-igraph`_ for \ more info. **louvain_kwargs: Additional kwargs to be passed to `find_partition`_ .. _louvain-igraph: https://louvain-igraph.readthedocs.io/en/latest/reference.html .. _find_partition: https://louvain-igraph.readthedocs.io/en/latest/reference.html#louvain.find_partition """ def __init__( self, adjacency_method: str = 'MNN', k: int = 20, resolution: float = 0.8, adjacency_kwargs: Optional[dict] = None, partition_type: str = 'RBConfigurationVertexPartition', **louvain_kwargs ): if adjacency_method not in ['MNN', 'CNN'] + pdist_adjacency_methods: raise ValueError( 'Adjacency method %s invalid. Must be "SNN", "CNN" or a valid metric for ' 'scipy.spatial.distance.pdist.' % adjacency_method ) if partition_type not in valid_partition_types: raise ValueError( 'Partition type %s not valid, must be in constants.valid_partition_types' % partition_type ) self.adjacency_method = adjacency_method self.k = int(k) self.resolution = resolution self.adjacency_kwargs = adjacency_kwargs self.partition_type = partition_type self.louvain_kwargs = louvain_kwargs
[docs] def fit( self, data: pd.DataFrame, ): adjacency_method = self.adjacency_method k = self.k resolution = self.resolution adjacency_kwargs = self.adjacency_kwargs louvain_kwargs = self.louvain_kwargs partition_type = self.partition_type if k >= len(data): logging.warning( 'k was set to %s, with only %s samples. Changing to k to %s-1' % (k, len(data), len(data)) ) k = len(data) - 1 if (adjacency_method == 'MNN') | (adjacency_method == 'CNN'): if adjacency_kwargs is None: adjacency_kwargs = {} adjacency_kwargs['n_neighbors'] = adjacency_kwargs.get('n_neighbors', k) nns = NearestNeighbors(**adjacency_kwargs) nns.fit(data) adjacency_mat = nns.kneighbors_graph(data) if adjacency_method == 'MNN': adjacency_mat = adjacency_mat.multiply(adjacency_mat.transpose()) if adjacency_method == 'CNN': adjacency_mat = adjacency_mat*adjacency_mat.transpose() elif adjacency_method in pdist_adjacency_methods: adjacency_mat = pdist(data, metric=adjacency_method, **adjacency_kwargs) if louvain_kwargs is None: louvain_kwargs = {} g = ig.Graph.Weighted_Adjacency(adjacency_mat.toarray().tolist()) if partition_type in ['RBConfigurationVertexPartition', 'CPMVertexPartition']: louvain_kwargs['resolution_parameter'] = resolution labels = eval('louvain.find_partition(g, louvain.%s, **louvain_kwargs)' % partition_type) labels = pd.Series({v: i for i in range(len(labels)) for v in labels[i]}).sort_index() if labels.is_unique or (len(labels.unique()) == 1): labels = pd.Series([-1 for i in range(len(labels))]) labels = labels.values self.labels_ = labels return self
[docs]class LeidenCluster: """Leidein clustering on graph derived from an adjacency matrix. See `reference`_ for more info Args: adjacency_method: Method to use to construct adjacency matrix, which is used to construct \ graph that will be clustered. Valid methods are any metric valid in \ scipy.spatial.distance.pdist, or MNN, for mutual nearest neighbors and CNN for common \ nearest neighbors. Both use sklearn.neighbors.NearestNeighbors at a given k to calculate \ NNs. MNN then uses whether points i and j are each others NNs as edge weights. CNN uses \ the count of how many NNs i and j have in common as the edge weight. k: If using CNN or MNN, k to use to construct the NearestNeighbors matrix. resolution: If using 'RBConfigurationVertexPartition', 'CPMVertexPartition' which \ resolution to use. If using other partitioners, this is ignored but any other kwargs for \ those partitioners can be passed too. adjacency_kwargs: Additional keyword arguments to pass to \ sklearn.neighbors.NearestNeighbors or scipy.spatial.distance.pdist to construct the \ adjacency matrix. partition_type: Which partition to use for leiden clustering, see `leidenalg`_ for \ more info. **leiden_kwargs: Additional kwargs to be passed to `find_partition`_ .. _reference: https://www.nature.com/articles/s41598-019-41695-z .. _leidenalg: https://leidenalg.readthedocs.io/en/latest/reference.html .. _find_partition: https://leidenalg.readthedocs.io/en/latest/reference.html#leidenalg.find_partition """ def __init__( self, adjacency_method: str = 'MNN', k: int = 20, resolution: float = 0.8, adjacency_kwargs: Optional[dict] = None, partition_type: str = 'RBConfigurationVertexPartition', **leiden_kwargs ): self.adjacency_method = adjacency_method self.k = int(k) self.resolution = resolution self.adjacency_kwargs = adjacency_kwargs self.partition_type = partition_type self.leiden_kwargs = leiden_kwargs
[docs] def fit( self, data: pd.DataFrame, ): adjacency_method = self.adjacency_method k = self.k resolution = self.resolution adjacency_kwargs = self.adjacency_kwargs leiden_kwargs = self.leiden_kwargs partition_type = self.partition_type if k >= len(data): logging.warning( 'k was set to %s, with only %s samples. Changing to k to %s-1' % (k, len(data), len(data)) ) k = len(data) - 1 if (adjacency_method == 'MNN') | (adjacency_method == 'CNN'): if adjacency_kwargs is None: adjacency_kwargs = {} adjacency_kwargs['n_neighbors'] = adjacency_kwargs.get('n_neighbors', k) nns = NearestNeighbors(**adjacency_kwargs) nns.fit(data) adjacency_mat = nns.kneighbors_graph(data) if adjacency_method == 'MNN': adjacency_mat = adjacency_mat.multiply(adjacency_mat.transpose()) if adjacency_method == 'CNN': adjacency_mat = adjacency_mat * adjacency_mat.transpose() elif adjacency_method in pdist_adjacency_methods: adjacency_mat = pdist(data, metric=adjacency_method, **adjacency_kwargs) if leiden_kwargs is None: leiden_kwargs = {} g = ig.Graph.Weighted_Adjacency(adjacency_mat.toarray().tolist()) if partition_type in ['RBConfigurationVertexPartition', 'CPMVertexPartition']: leiden_kwargs['resolution_parameter'] = resolution labels = eval('leidenalg.find_partition(g, leidenalg.%s,**leiden_kwargs)' % partition_type) labels = pd.Series({v:i for i in range(len(labels)) for v in labels[i]}).sort_index() if labels.is_unique or (len(labels.unique()) == 1): labels = pd.Series([-1 for i in range(len(labels))]) labels = labels.values self.labels_ = labels return self