Source code for hypercluster.utilities

from sklearn.cluster import *
from sklearn.metrics import *
from .additional_clusterers import *
from .additional_metrics import *
from pandas import DataFrame
import pandas as pd
import numpy as np
import logging
from typing import Optional, Iterable, Dict
from .constants import *
from hypercluster.constants import param_delim, val_delim


[docs]def calculate_row_weights( row: Iterable, param_weights: dict, vars_to_optimize: dict ) -> float: """Used to select random rows of parameter combinations using individual parameter weights. Args: row (Iterable): Series of parameters, with parameter names as index. param_weights (dict): Dictionary of str: dictionaries. Ex format - {'parameter_name':{ \ 'param_option_1':0.5, 'param_option_2':0.5}}. vars_to_optimize (Iterable): Dictionary with possibilities for different parameters. Ex \ format - {'parameter_name':[1, 2, 3, 4, 5]}. Returns (float): Float representing the probability of seeing that combination of parameters, given their \ individual weights. """ param_weights.update({ param: { val: param_weights.get(param, {}).get( val, (1-sum(param_weights.get(param, {}).values()))/len([ notweighted for notweighted in vars_to_optimize.get(param, {}) if notweighted not in param_weights.get(param, {}).keys() ]) ) for val in vals } for param, vals in vars_to_optimize.items() }) return np.prod([param_weights[param][val] for param, val in row.to_dict().items()])
[docs]def cluster(clusterer_name: str, data: DataFrame, params: dict = {}): """Runs a given clusterer with a given set of parameters. Args: clusterer_name (str): String name of clusterer. data (DataFrame): Dataframe with elements to cluster as index and examples as columns. params (dict): Dictionary of parameter names and values to feed into clusterer. Default {} Returns: Instance of the clusterer fit with the data provided. """ clusterer = eval(clusterer_name)(**params) return clusterer.fit(data)
[docs]def evaluate_one( labels: Iterable, method: str = "silhouette_score", data: Optional[DataFrame] = None, gold_standard: Optional[Iterable] = None, metric_kwargs: Optional[dict] = None, ) -> dict: """Uses a given metric to evaluate clustering results. Args: labels (Iterable): Series of labels. method (str): Str of name of evaluation to use. Default is silhouette. data (DataFrame): If using an inherent metric, must provide DataFrame with which to \ calculate the metric. gold_standard (Iterable): If using a metric that compares to ground truth, must provide a \ set of gold standard labels. metric_kwargs (dict): Additional kwargs to use in evaluation. Returns (float): Metric value """ if isinstance(labels, pd.Series) is False: labels = pd.Series(labels) if len(labels[labels != -1].unique()) < 2: return np.nan if metric_kwargs is None: metric_kwargs = {} if method in need_ground_truth: if gold_standard is None: raise ValueError( "Chosen evaluation metric %s requires gold standard set." % method ) clustered = (gold_standard != -1) & (labels != -1) compare_to = gold_standard[clustered] elif method in inherent_metrics: if data is None: raise ValueError( "Chosen evaluation metric %s requires data input." % method ) clustered = labels != -1 compare_to = data.loc[clustered] else: compare_to = None clustered = labels.index return eval(method)(compare_to, labels[clustered], **metric_kwargs)
[docs]def generate_flattened_df(df_dict: Dict[str, DataFrame]) -> DataFrame: """Takes dictionary of results from many clusterers and makes 1 DataFrame. Opposite of \ convert_to_multiind. Args: df_dict (Dict[str, DataFrame]): Dictionary of dataframes to flatten. Can be .labels_ or \ .evaluations_ from MultiAutoClusterer. Returns: Flattened DataFrame with all data. """ merged_df = pd.DataFrame() for clus_name, df in df_dict.items(): df = df.transpose() cols_for_labels = df.index.to_frame() inds = cols_for_labels.apply( lambda row: param_delim.join( [clus_name] + ["%s%s%s" % (k, val_delim, v) for k, v in row.to_dict().items()] ), axis=1, ) df.index = inds df = df.transpose() merged_df = pd.concat( [merged_df, df], join="outer", axis=1 ) return merged_df
[docs]def convert_to_multiind(key: str, df: DataFrame) -> DataFrame: """Takes columns from a single clusterer from Clusterer.labels_df or .evaluation_df and converts to a multiindexed rather than collapsed into string. Equivalent to grabbing Clusterer.labels[clusterer] or .evaluations[clusterer]. Opposite of generate_flattened_df. Args: key (str): Name of clusterer, must match beginning of columns to convert. df (DataFrame): Dataframe to grab chunk from. Returns: Subset DataFrame with multiindex. """ clus_cols = [col for col in df.columns if col.split(param_delim, 1)[0] == key] temp = df[clus_cols].transpose() temp.index = pd.MultiIndex.from_frame( pd.DataFrame([{ s.split(val_delim, 1)[0]: s.split(val_delim, 1)[1] for s in i.split(param_delim)[1:] } for i in temp.index]).astype(float, errors='ignore') ) return temp.sort_index().transpose()
[docs]def pick_best_labels( evaluation_results_df: DataFrame, clustering_labels_df: DataFrame, method: Optional[str] = None, min_or_max: Optional[str] = None ) -> Iterable: """From evaluations and a metric to minimize or maximize, return all labels with top pick. Args: evaluation_results_df (DataFrame): Evaluations DataFrame from optimize_clustering. clustering_labels_df (DataFrame): Labels DataFrame from optimize_clustering. method (str): Method with which to choose the best labels. min_or_max (str): Whether to minimize or maximize the metric. Must be 'min' or 'max'. Returns (DataFrame): DataFrame of all top labels. """ if method is None: method = "silhouette_score" if min_or_max is None: min_or_max = 'max' best_labels = evaluation_results_df.loc[method, :] if min_or_max == 'min': best_labels = best_labels.index[best_labels == best_labels.min()] return clustering_labels_df[best_labels] elif min_or_max == 'max': best_labels = best_labels.index[best_labels == best_labels.max()] return clustering_labels_df[best_labels] logging.error('min_or_max must be either min or max, %s invalid choice' % min_or_max)