Source code for hdbscan.branches

# Support branch detection within clusters.
import numpy as np

from sklearn.base import BaseEstimator, ClusterMixin
from scipy.sparse import coo_array
from scipy.sparse.csgraph import minimum_spanning_tree, connected_components
from joblib import Memory
from joblib import Parallel, delayed
from joblib.parallel import cpu_count
from ._hdbscan_linkage import label
from .plots import CondensedTree, SingleLinkageTree, ApproximationGraph
from .prediction import approximate_predict
from ._hdbscan_tree import recurse_leaf_dfs
from .hdbscan_ import _tree_to_labels


[docs] def detect_branches_in_clusters( clusterer, cluster_labels=None, cluster_probabilities=None, branch_detection_method="full", label_sides_as_branches=False, min_cluster_size=None, max_cluster_size=None, allow_single_cluster=None, cluster_selection_method=None, cluster_selection_epsilon=0.0, cluster_selection_persistence=0.0, ): """ Performs a flare-detection post-processing step to detect branches within clusters [1]_. For each cluster, a graph is constructed connecting the data points based on their mutual reachability distances. Each edge is given a centrality value based on how far it lies from the cluster's center. Then, the edges are clustered as if that centrality was a distance, progressively removing the 'center' of each cluster and seeing how many branches remain. Parameters ---------- clusterer : hdbscan.HDBSCAN The clusterer object that has been fit to the data with branch detection data generated. cluster_labels : np.ndarray, shape (n_samples, ), optional (default=None) The cluster labels for each point in the data set. If not provided, the clusterer's labels will be used. cluster_probabilities : np.ndarray, shape (n_samples, ), optional (default=None) The cluster probabilities for each point in the data set. If not provided, the clusterer's probabilities will be used, or all points will be given 1.0 probability if labels are overridden. branch_detection_method : str, optional (default=``full``) Determines which graph is constructed to detect branches with. Valid values are, ordered by increasing computation cost and decreasing sensitivity to noise: - ``core``: Contains the edges that connect each point to all other points within a mutual reachability distance lower than or equal to the point's core distance. This is the cluster's subgraph of the k-NN graph over the entire data set (with k = ``min_samples``). - ``full``: Contains all edges between points in each cluster with a mutual reachability distance lower than or equal to the distance of the most-distance point in each cluster. These graphs represent the 0-dimensional simplicial complex of each cluster at the first point in the filtration where they contain all their points. label_sides_as_branches : bool, optional (default=False), When this flag is False, branches are only labelled for clusters with at least three branches (i.e., at least y-shapes). Clusters with only two branches represent l-shapes. The two branches describe the cluster's outsides growing towards each other. Enabling this flag separates these branches from each other in the produced labelling. min_cluster_size : int, optional (default=None) The minimum number of samples in a group for that group to be considered a branch; groupings smaller than this size will seen as points falling out of a branch. Defaults to the clusterer's min_cluster_size. allow_single_cluster : bool, optional (default=None) Analogous to HDBSCAN's ``allow_single_cluster``. cluster_selection_method : str, optional (default=None) The method used to select branches from the cluster's condensed tree. The standard approach for FLASC is to use the ``eom`` approach. Options are: * ``eom`` * ``leaf`` cluster_selection_epsilon: float, optional (default=0.0) A lower epsilon threshold. Only branches with a death above this value will be considered. See [3]_ for more information. Note that this should not be used if we want to predict the cluster labels for new points in future (e.g. using approximate_predict), as the :func:`~hdbscan.branches.approximate_predict` function is not aware of this argument. cluster_selection_persistence: float, optional (default=0.0) An eccentricity persistence threshold. Branches with a persistence below this value will be merged. See [3]_ for more information. Note that this should not be used if we want to predict the cluster labels for new points in future (e.g. using approximate_predict), as the :func:`~hdbscan.branches.approximate_predict` function is not aware of this argument. max_cluster_size : int, optional (default=0) A limit to the size of clusters returned by the ``eom`` algorithm. Has no effect when using ``leaf`` clustering (where clusters are usually small regardless). Note that this should not be used if we want to predict the cluster labels for new points in future (e.g. using :func:`~hdbscan.branches.approximate_predict`), as that function is not aware of this argument. Returns ------- labels : np.ndarray, shape (n_samples, ) Labels that differentiate all subgroups (clusters and branches). Noisy samples are given the label -1. probabilities : np.ndarray, shape (n_samples, ) Probabilities considering both cluster and branch membership. Noisy samples are assigned 0. cluster_labels : np.ndarray, shape (n_samples, ) The cluster labels for each point in the data set. Noisy samples are given the label -1. cluster_probabilities : np.ndarray, shape (n_samples, ) The cluster probabilities for each point in the data set. Noisy samples are assigned 1.0. branch_labels : np.ndarray, shape (n_samples, ) Branch labels for each point. Noisy samples are given the label -1. branch_probabilities : np.ndarray, shape (n_samples, ) Branch membership strengths for each point. Noisy samples are assigned 0. branch_persistences : tuple (n_clusters) A branch persistence (eccentricity range) for each detected branch. approximation_graphs : tuple (n_clusters) The graphs used to detect branches in each cluster stored as a numpy array with four columns: source, target, centrality, mutual reachability distance. Points are labelled by their row-index into the input data. The edges contained in the graphs depend on the ``branch_detection_method``: - ``core``: Contains the edges that connect each point to all other points in a cluster within a mutual reachability distance lower than or equal to the point's core distance. This is an extension of the minimum spanning tree introducing only edges with equal distances. The reachability distance introduces ``num_points`` * ``min_samples`` of such edges. - ``full``: Contains all edges between points in each cluster with a mutual reachability distance lower than or equal to the distance of the most-distance point in each cluster. These graphs represent the 0-dimensional simplicial complex of each cluster at the first point in the filtration where they contain all their points. condensed_trees : tuple (n_clusters) A condensed branch hierarchy for each cluster produced during the branch detection step. Data points are numbered with in-cluster ids. linkage_trees : tuple (n_clusters) A single linkage tree for each cluster produced during the branch detection step, in the scipy hierarchical clustering format. (see http://docs.scipy.org/doc/scipy/reference/cluster.hierarchy.html). Data points are numbered with in-cluster ids. centralities : np.ndarray, shape (n_samples, ) Centrality values for each point in a cluster. Overemphasizes points' eccentricity within the cluster as the values are based on minimum spanning trees that do not contain the equally distanced edges resulting from the mutual reachability distance. cluster_points : list (n_clusters) The data point row indices for each cluster. References ---------- .. [1] Bot, D. M., Peeters, J., Liesenborgs J., & Aerts, J. (2023, November). FLASC: A Flare-Sensitive Clustering Algorithm: Extending HDBSCAN* for Detecting Branches in Clusters. arXiv:2311.15887 """ # Check clusterer state if clusterer._min_spanning_tree is None: raise ValueError( "Clusterer does not have an explicit minimum spanning tree!" " Try fitting with branch_detection_data=True or" " gen_min_span_tree=True set." ) if clusterer.branch_detection_data_ is None: raise ValueError( "Clusterer does not have branch detection data!" " Try fitting with branch_detection_data=True set," " or run generate_branch_detection_data on the clusterer" ) # Validate parameters if min_cluster_size is None: min_cluster_size = clusterer.min_cluster_size if max_cluster_size is None: max_cluster_size = clusterer.max_cluster_size if allow_single_cluster is None: allow_single_cluster = clusterer.allow_single_cluster if cluster_selection_method is None: cluster_selection_method = clusterer.cluster_selection_method cluster_selection_epsilon = float(cluster_selection_epsilon) cluster_selection_persistence = float(cluster_selection_persistence) if not ( np.issubdtype(type(min_cluster_size), np.integer) and min_cluster_size >= 2 ): raise ValueError( f"min_cluster_size must be an integer greater or equal " f"to 2, {min_cluster_size} given." ) if not ( np.issubdtype(type(cluster_selection_persistence), np.floating) and cluster_selection_persistence >= 0.0 ): raise ValueError( f"cluster_selection_persistence must be a float greater or equal to " f"0.0, {cluster_selection_persistence} given." ) if not ( np.issubdtype(type(cluster_selection_epsilon), np.floating) and cluster_selection_epsilon >= 0.0 ): raise ValueError( f"cluster_selection_epsilon must be a float greater or equal to " f"0.0, {cluster_selection_epsilon} given." ) if cluster_selection_method not in ("eom", "leaf"): raise ValueError( f"Invalid cluster_selection_method: {cluster_selection_method}\n" f'Should be one of: "eom", "leaf"\n' ) if branch_detection_method not in ("core", "full"): raise ValueError( f"Invalid ``branch_detection_method``: {branch_detection_method}\n" 'Should be one of: "core", "full"\n' ) # Extract state memory = clusterer.memory if isinstance(memory, str): memory = Memory(memory, verbose=0) if cluster_labels is None: overridden_labels = False num_clusters = len(clusterer.cluster_persistence_) cluster_labels, _cluster_probabilities = update_single_cluster_labels( clusterer._condensed_tree, clusterer.labels_, clusterer.probabilities_, clusterer.cluster_persistence_, allow_single_cluster=clusterer.allow_single_cluster, cluster_selection_epsilon=clusterer.cluster_selection_epsilon, ) if cluster_probabilities is None: cluster_probabilities = _cluster_probabilities else: overridden_labels = True num_clusters = cluster_labels.max() + 1 if cluster_probabilities is None: cluster_probabilities = np.where(cluster_labels == -1, 0.0, 1.0) if not clusterer.branch_detection_data_.all_finite: finite_index = clusterer.branch_detection_data_.finite_index cluster_labels = cluster_labels[finite_index] cluster_probabilities = cluster_probabilities[finite_index] # Configure parallelization run_core = branch_detection_method == "core" num_jobs = clusterer.core_dist_n_jobs if num_jobs < 1: num_jobs = max(cpu_count() + 1 + num_jobs, 1) thread_pool = ( SequentialPool() if run_core else Parallel(n_jobs=num_jobs, max_nbytes=None) ) # Detect branches ( points, centralities, linkage_trees, approximation_graphs, ) = memory.cache(compute_branch_linkage, ignore=["thread_pool"])( cluster_labels, cluster_probabilities, clusterer._min_spanning_tree, clusterer.branch_detection_data_.tree, clusterer.branch_detection_data_.neighbors, clusterer.branch_detection_data_.core_distances, clusterer.branch_detection_data_.dist_metric, num_clusters, thread_pool, run_core=run_core, overridden_labels=overridden_labels, ) ( branch_labels, branch_probabilities, branch_persistences, condensed_trees, linkage_trees, ) = memory.cache(compute_branch_segmentation, ignore=["thread_pool"])( linkage_trees, thread_pool, min_cluster_size=min_cluster_size, allow_single_cluster=allow_single_cluster, cluster_selection_method=cluster_selection_method, cluster_selection_epsilon=cluster_selection_epsilon, cluster_selection_persistence=cluster_selection_persistence, max_cluster_size=max_cluster_size, ) ( labels, probabilities, branch_labels, branch_probabilities, centralities, ) = memory.cache(update_labelling)( cluster_probabilities, linkage_trees, points, centralities, branch_labels, branch_probabilities, branch_persistences, label_sides_as_branches=label_sides_as_branches, ) # Maintain data indices for non-finite data if not clusterer.branch_detection_data_.all_finite: internal_to_raw = clusterer.branch_detection_data_.internal_to_raw _remap_point_lists(points, internal_to_raw) _remap_edge_lists(approximation_graphs, internal_to_raw) num_points = len(clusterer.labels_) labels = _remap_labels(labels, finite_index, num_points) probabilities = _remap_probabilities(probabilities, finite_index, num_points) cluster_labels = _remap_labels(cluster_labels, finite_index, num_points) cluster_probabilities = _remap_probabilities( cluster_probabilities, finite_index, num_points ) branch_labels = _remap_labels(branch_labels, finite_index, num_points, 0) branch_probabilities = _remap_probabilities( branch_probabilities, finite_index, num_points ) centralities = _remap_probabilities(centralities, finite_index, num_points) return ( labels, probabilities, cluster_labels, cluster_probabilities, branch_labels, branch_probabilities, branch_persistences, approximation_graphs, condensed_trees, linkage_trees, centralities, points, )
def update_single_cluster_labels( condensed_tree, labels, probabilities, persistences, allow_single_cluster=False, cluster_selection_epsilon=0.0, ): """Sets all points up to cluster_selection_epsilon to the zero-cluster if a single cluster is detected.""" if allow_single_cluster and len(persistences) == 1: labels = np.zeros_like(labels) probabilities = np.ones_like(probabilities) if cluster_selection_epsilon > 0.0: size_mask = condensed_tree["child_size"] == 1 lambda_mask = condensed_tree["lambda_val"] < (1 / cluster_selection_epsilon) noise_points = condensed_tree["child"][lambda_mask & size_mask] labels[noise_points] = -1 probabilities[noise_points] = 0.0 return labels, probabilities def compute_branch_linkage( cluster_labels, cluster_probabilities, min_spanning_tree, space_tree, neighbors, core_distances, dist_metric, num_clusters, thread_pool, run_core=False, overridden_labels=False, ): result = thread_pool( delayed(_compute_branch_linkage_of_cluster)( cluster_labels, cluster_probabilities, min_spanning_tree, space_tree, neighbors, core_distances, dist_metric, run_core, overridden_labels, cluster_id, ) for cluster_id in range(num_clusters) ) if len(result): return tuple(zip(*result)) return (), (), (), () def _compute_branch_linkage_of_cluster( cluster_labels, cluster_probabilities, min_spanning_tree, space_tree, neighbors, core_distances, dist_metric, run_core, overridden_labels, cluster_id, ): """Detect branches within one cluster.""" # List points within cluster cluster_mask = cluster_labels == cluster_id cluster_points = np.where(cluster_mask)[0] in_cluster_ids = np.full(cluster_labels.shape[0], -1, dtype=np.double) in_cluster_ids[cluster_points] = np.arange(len(cluster_points), dtype=np.double) # Extract MST edges within cluster parent_mask = cluster_labels[min_spanning_tree[:, 0].astype(np.intp)] == cluster_id child_mask = cluster_labels[min_spanning_tree[:, 1].astype(np.intp)] == cluster_id cluster_mst = min_spanning_tree[parent_mask & child_mask] cluster_mst[:, 0] = in_cluster_ids[cluster_mst[:, 0].astype(np.intp)] cluster_mst[:, 1] = in_cluster_ids[cluster_mst[:, 1].astype(np.intp)] # Compute in cluster centrality points = space_tree.data.base[cluster_points] centroid = np.average(points, weights=cluster_probabilities[cluster_mask], axis=0) centralities = dist_metric.pairwise(centroid[None], points)[0, :] with np.errstate(divide="ignore"): centralities = 1 / centralities # Construct cluster approximation graph if run_core: edges = extract_core_cluster_graph( cluster_mst, core_distances, neighbors[cluster_points], in_cluster_ids ) else: max_dist = cluster_mst.T[2].max() edges = extract_full_cluster_graph( space_tree, core_distances, cluster_points, in_cluster_ids, max_dist ) # Compute linkage over the graph return compute_branch_linkage_from_graph( cluster_points, centralities, edges, overridden_labels ) def compute_branch_linkage_from_graph( cluster_points, centralities, edges, overridden_labels ): # Set max centrality as 'distance' np.maximum( centralities[edges[:, 0].astype(np.intp)], centralities[edges[:, 1].astype(np.intp)], edges[:, 2], ) # Extract MST edges centrality_mst = minimum_spanning_tree( coo_array( (edges[:, 2], (edges[:, 0].astype(np.int32), edges[:, 1].astype(np.int32))), shape=(len(cluster_points), len(cluster_points)), ), overwrite=True, ) # Re-label edges with data ids edges[:, 0] = cluster_points[edges[:, 0].astype(np.intp)] edges[:, 1] = cluster_points[edges[:, 1].astype(np.intp)] # Stop if the graph is disconnected, return component labels in # place of linkage tree, which is detected later on! if overridden_labels: num_components, labels = connected_components(centrality_mst, directed=False) if num_components > 1: return cluster_points, centralities, labels, edges # Compute linkage tree centrality_mst = centrality_mst.tocoo() centrality_mst = np.column_stack( (centrality_mst.row, centrality_mst.col, centrality_mst.data) ) centrality_mst = centrality_mst[np.argsort(centrality_mst[:, 2]), :] linkage_tree = label(centrality_mst) # Return values return cluster_points, centralities, linkage_tree, edges def extract_core_cluster_graph( cluster_spanning_tree, core_distances, neighbors, in_cluster_ids, ): """Create a graph connecting all points within each point's core distance.""" # Allocate output (won't be filled completely) num_points = neighbors.shape[0] num_neighbors = neighbors.shape[1] count = cluster_spanning_tree.shape[0] edges = np.zeros((count + num_points * num_neighbors, 4), dtype=np.double) # Fill (undirected) MST edges with within-cluster-ids mst_parents = cluster_spanning_tree[:, 0].astype(np.intp) mst_children = cluster_spanning_tree[:, 1].astype(np.intp) np.minimum(mst_parents, mst_children, edges[:count, 0]) np.maximum(mst_parents, mst_children, edges[:count, 1]) # Fill neighbors with within-cluster-ids core_parent = np.repeat(np.arange(num_points, dtype=np.double), num_neighbors) core_children = in_cluster_ids[neighbors.flatten()] np.minimum(core_parent, core_children, edges[count:, 0]) np.maximum(core_parent, core_children, edges[count:, 1]) # Fill mutual reachabilities edges[:count, 3] = cluster_spanning_tree[:, 2] np.maximum( core_distances[edges[count:, 0].astype(np.intp)], core_distances[edges[count:, 1].astype(np.intp)], edges[count:, 3], ) # Extract unique edges that stay within the cluster edges = np.unique(edges[edges[:, 0] > -1.0, :], axis=0) return edges def extract_full_cluster_graph( space_tree, core_distances, cluster_points, in_cluster_ids, max_dist ): # Query KDTree/BallTree for neighors within the distance children_map, distances_map = space_tree.query_radius( space_tree.data.base[cluster_points], r=max_dist + 1e-8, return_distance=True ) # Count number of returned edges per point num_children = np.zeros(len(cluster_points), dtype=np.intp) for i, children in enumerate(children_map): num_children[i] += len(children) # Create full edge list full_parents = np.repeat( np.arange(len(cluster_points), dtype=np.double), num_children ) full_children = in_cluster_ids[np.concatenate(children_map)] full_distances = np.concatenate(distances_map) # Create output mask = ( (full_children != -1.0) & (full_parents < full_children) & (full_distances <= max_dist) ) edges = np.zeros((mask.sum(), 4), dtype=np.double) edges[:, 0] = full_parents[mask] edges[:, 1] = full_children[mask] np.maximum( np.maximum( core_distances[edges[:, 0].astype(np.intp)], core_distances[edges[:, 1].astype(np.intp)], ), full_distances[mask], edges[:, 3], ) return edges def compute_branch_segmentation(cluster_linkage_trees, thread_pool, **kwargs): """Extracts branches from the linkage hierarchies.""" results = thread_pool( delayed(segment_branch_linkage_hierarchy)(cluster_linkage_tree, **kwargs) for cluster_linkage_tree in cluster_linkage_trees ) if len(results): return tuple(zip(*results)) return (), (), (), (), () def segment_branch_linkage_hierarchy( single_linkage_tree, allow_single_cluster=False, cluster_selection_epsilon=0.0, **kwargs, ): """Select branches within one cluster.""" # Return component labels if the graph is disconnected if len(single_linkage_tree.shape) == 1: return ( single_linkage_tree, np.ones(single_linkage_tree.shape[0], dtype=np.double), [0 for _ in range(single_linkage_tree.max() + 1)], None, None, ) # Run normal branch detection (labels, probabilities, stabilities, condensed_tree, linkage_tree) = ( _tree_to_labels( None, single_linkage_tree, allow_single_cluster=allow_single_cluster, cluster_selection_epsilon=cluster_selection_epsilon, **kwargs, ) ) labels, probabilities = update_single_cluster_labels( condensed_tree, labels, probabilities, stabilities, allow_single_cluster=allow_single_cluster, cluster_selection_epsilon=cluster_selection_epsilon, ) return (labels, probabilities, stabilities, condensed_tree, linkage_tree) def update_labelling( cluster_probabilities, tree_list, points_list, centrality_list, branch_label_list, branch_prob_list, branch_pers_list, label_sides_as_branches=False, ): """Updates the labelling with the detected branches.""" # Allocate output num_points = len(cluster_probabilities) labels = -1 * np.ones(num_points, dtype=np.intp) probabilities = cluster_probabilities.copy() branch_labels = np.zeros(num_points, dtype=np.intp) branch_probabilities = np.ones(num_points, dtype=np.double) branch_centralities = np.zeros(num_points, dtype=np.double) # Compute the labels and probabilities running_id = 0 for tree, _points, _labels, _probs, _centrs, _pers in zip( tree_list, points_list, branch_label_list, branch_prob_list, centrality_list, branch_pers_list, ): num_branches = len(_pers) branch_centralities[_points] = _centrs if num_branches <= (1 if label_sides_as_branches else 2) and tree is not None: labels[_points] = running_id running_id += 1 else: has_noise = int(-1 in _labels) labels[_points] = _labels + running_id + has_noise branch_labels[_points] = _labels branch_probabilities[_points] = _probs probabilities[_points] += _probs probabilities[_points] /= 2 running_id += num_branches + has_noise # Reorder other parts return ( labels, probabilities, branch_labels, branch_probabilities, branch_centralities, ) def _remap_edge_lists(edge_lists, internal_to_raw): """ Takes a list of edge lists and replaces the internal indices to raw indices. Parameters ---------- edge_lists : list[np.ndarray] A list of numpy edgelists with the first two columns indicating datapoints. internal_to_raw: dict A mapping from internal integer index to the raw integer index. """ for graph in edge_lists: for edge in graph: edge[0] = internal_to_raw[edge[0]] edge[1] = internal_to_raw[edge[1]] def _remap_point_lists(point_lists, internal_to_raw): """ Takes a list of points lists and replaces the internal indices to raw indices. Parameters ---------- point_lists : list[np.ndarray] A list of numpy arrays with point indices. internal_to_raw: dict A mapping from internal integer index to the raw integer index. """ for points in point_lists: for idx in range(len(points)): points[idx] = internal_to_raw[points[idx]] def _remap_labels(old_labels, finite_index, num_points, fill_value=-1): """Creates new label array with infinite points set to -1.""" new_labels = np.full(num_points, fill_value) new_labels[finite_index] = old_labels return new_labels def _remap_probabilities(old_probs, finite_index, num_points): """Creates new probability array with infinite points set to 0.""" new_probs = np.zeros(num_points) new_probs[finite_index] = old_probs return new_probs
[docs] class BranchDetector(BaseEstimator, ClusterMixin): """Performs a flare-detection post-processing step to detect branches within clusters [1]_. For each cluster, a graph is constructed connecting the data points based on their mutual reachability distances. Each edge is given a centrality value based on how far it lies from the cluster's center. Then, the edges are clustered as if that centrality was a distance, progressively removing the 'center' of each cluster and seeing how many branches remain. Parameters ---------- branch_detection_method : str, optional (default=``full``) Determines which graph is constructed to detect branches with. Valid values are, ordered by increasing computation cost and decreasing sensitivity to noise: - ``core``: Contains the edges that connect each point to all other points within a mutual reachability distance lower than or equal to the point's core distance. This is the cluster's subgraph of the k-NN graph over the entire data set (with k = ``min_samples``). - ``full``: Contains all edges between points in each cluster with a mutual reachability distance lower than or equal to the distance of the most-distance point in each cluster. These graphs represent the 0-dimensional simplicial complex of each cluster at the first point in the filtration where they contain all their points. label_sides_as_branches : bool, optional (default=False), When this flag is False, branches are only labelled for clusters with at least three branches (i.e., at least y-shapes). Clusters with only two branches represent l-shapes. The two branches describe the cluster's outsides growing towards each other. Enabling this flag separates these branches from each other in the produced labelling. min_cluster_size : int, optional (default=None) The minimum number of samples in a group for that group to be considered a branch; groupings smaller than this size will seen as points falling out of a branch. Defaults to the clusterer's min_cluster_size. allow_single_cluster : bool, optional (default=None) Analogous to ``allow_single_cluster``. cluster_selection_method : str, optional (default=None) The method used to select branches from the cluster's condensed tree. The standard approach for FLASC is to use the ``eom`` approach. Options are: * ``eom`` * ``leaf`` cluster_selection_epsilon: float, optional (default=0.0) A lower epsilon threshold. Only branches with a death above this value will be considered. cluster_selection_persistence: float, optional (default=0.0) An eccentricity persistence threshold. Branches with a persistence below this value will be merged. max_cluster_size : int, optional (default=None) A limit to the size of clusters returned by the ``eom`` algorithm. Has no effect when using ``leaf`` clustering (where clusters are usually small regardless). Note that this should not be used if we want to predict the cluster labels for new points in future because `approximate_predict` is not aware of this argument. Attributes ---------- labels_ : np.ndarray, shape (n_samples, ) Labels that differentiate all subgroups (clusters and branches). Noisy samples are given the label -1. probabilities_ : np.ndarray, shape (n_samples, ) Probabilities considering both cluster and branch membership. Noisy samples are assigned 0. cluster_labels_ : np.ndarray, shape (n_samples, ) The cluster labels for each point in the data set. Noisy samples are given the label -1. cluster_probabilities_ : np.ndarray, shape (n_samples, ) The cluster probabilities for each point in the data set. Noisy samples are assigned 1.0. branch_labels_ : np.ndarray, shape (n_samples, ) Branch labels for each point. Noisy samples are given the label -1. branch_probabilities_ : np.ndarray, shape (n_samples, ) Branch membership strengths for each point. Noisy samples are assigned 0. branch_persistences_ : tuple (n_clusters) A branch persistence (eccentricity range) for each detected branch. approximation_graph_ : ApproximationGraph The graphs used to detect branches in each cluster stored as a numpy array with four columns: source, target, centrality, mutual reachability distance. Points are labelled by their row-index into the input data. The edges contained in the graphs depend on the ``branch_detection_method``: - ``core``: Contains the edges that connect each point to all other points in a cluster within a mutual reachability distance lower than or equal to the point's core distance. This is an extension of the minimum spanning tree introducing only edges with equal distances. The reachability distance introduces ``num_points`` * ``min_samples`` of such edges. - ``full``: Contains all edges between points in each cluster with a mutual reachability distance lower than or equal to the distance of the most-distance point in each cluster. These graphs represent the 0-dimensional simplicial complex of each cluster at the first point in the filtration where they contain all their points. condensed_trees_ : tuple (n_clusters) A condensed branch hierarchy for each cluster produced during the branch detection step. Data points are numbered with in-cluster ids. linkage_trees_ : tuple (n_clusters) A single linkage tree for each cluster produced during the branch detection step, in the scipy hierarchical clustering format. (see http://docs.scipy.org/doc/scipy/reference/cluster.hierarchy.html). Data points are numbered with in-cluster ids. centralities_ : np.ndarray, shape (n_samples, ) Centrality values for each point in a cluster. Overemphasizes points' eccentricity within the cluster as the values are based on minimum spanning trees that do not contain the equally distanced edges resulting from the mutual reachability distance. cluster_points_ : list (n_clusters) The data point row indices for each cluster. References ---------- .. [1] Bot, D. M., Peeters, J., Liesenborgs J., & Aerts, J. (2023, November). FLASC: A Flare-Sensitive Clustering Algorithm: Extending HDBSCAN* for Detecting Branches in Clusters. arXiv:2311.15887 """ def __init__( self, branch_detection_method="full", label_sides_as_branches=False, min_cluster_size=None, max_cluster_size=None, allow_single_cluster=None, cluster_selection_method=None, cluster_selection_epsilon=0.0, cluster_selection_persistence=0.0, ): self.branch_detection_method = branch_detection_method self.label_sides_as_branches = label_sides_as_branches self.min_cluster_size = min_cluster_size self.max_cluster_size = max_cluster_size self.allow_single_cluster = allow_single_cluster self.cluster_selection_method = cluster_selection_method self.cluster_selection_epsilon = cluster_selection_epsilon self.cluster_selection_persistence = cluster_selection_persistence self._approximation_graphs = None self._condensed_trees = None self._cluster_linkage_trees = None self._branch_exemplars = None
[docs] def fit(self, clusterer, labels=None, probabilities=None): """ Perform a flare-detection post-processing step to detect branches within clusters. Parameters ---------- clusterer : HDBSCAN A fitted HDBSCAN object with branch detection data generated. labels : np.ndarray, shape (n_samples, ), optional (default=None) The cluster labels for each point in the data set. If not provided, the clusterer's labels will be used. probabilities : np.ndarray, shape (n_samples, ), optional (default=None) The cluster probabilities for each point in the data set. If not provided, the clusterer's probabilities will be used, or all points will be given 1.0 probability if labels are overridden. Returns ------- self : object Returns self. """ self._clusterer = clusterer kwargs = self.get_params() ( self.labels_, self.probabilities_, self.cluster_labels_, self.cluster_probabilities_, self.branch_labels_, self.branch_probabilities_, self.branch_persistences_, self._approximation_graphs, self._condensed_trees, self._linkage_trees, self.centralities_, self.cluster_points_, ) = detect_branches_in_clusters(clusterer, labels, probabilities, **kwargs) return self
[docs] def fit_predict(self, clusterer, labels=None, probabilities=None): """ Perform a flare-detection post-processing step to detect branches within clusters [1]_. Parameters ---------- clusterer : HDBSCAN A fitted HDBSCAN object with branch detection data generated. labels : np.ndarray, shape (n_samples, ), optional (default=None) The cluster labels for each point in the data set. If not provided, the clusterer's labels will be used. probabilities : np.ndarray, shape (n_samples, ), optional (default=None) The cluster probabilities for each point in the data set. If not provided, the clusterer's probabilities will be used, or all points will be given 1.0 probability if labels are overridden. Returns ------- labels : ndarray, shape (n_samples, ) subgroup labels differentiated by cluster and branch. """ self.fit(clusterer, labels, probabilities) return self.labels_
[docs] def weighted_centroid(self, label_id, data=None): """Provides an approximate representative point for a given branch. Note that this technique assumes a euclidean metric for speed of computation. For more general metrics use the ``weighted_medoid`` method which is slower, but can work with the metric the model trained with. Parameters ---------- label_id: int The id of the cluster to compute a centroid for. data : np.ndarray (n_samples, n_features), optional (default=None) A dataset to use instead of the raw data that was clustered on. Returns ------- centroid: array of shape (n_features,) A representative centroid for cluster ``label_id``. """ if self.labels_ is None: raise AttributeError("Model has not been fit to data") if self._clusterer._raw_data is None and data is None: raise AttributeError("Raw data not available") if label_id == -1: raise ValueError( "Cannot calculate weighted centroid for -1 cluster " "since it is a noise cluster" ) if data is None: data = self._clusterer._raw_data mask = self.labels_ == label_id cluster_data = data[mask] cluster_membership_strengths = self.probabilities_[mask] return np.average(cluster_data, weights=cluster_membership_strengths, axis=0)
[docs] def weighted_medoid(self, label_id, data=None): """Provides an approximate representative point for a given branch. Note that this technique can be very slow and memory intensive for large clusters. For faster results use the ``weighted_centroid`` method which is faster, but assumes a euclidean metric. Parameters ---------- label_id: int The id of the cluster to compute a medoid for. data : np.ndarray (n_samples, n_features), optional (default=None) A dataset to use instead of the raw data that was clustered on. Returns ------- centroid: array of shape (n_features,) A representative medoid for cluster ``label_id``. """ if self.labels_ is None: raise AttributeError("Model has not been fit to data") if self._clusterer._raw_data is None and data is None: raise AttributeError("Raw data not available") if label_id == -1: raise ValueError( "Cannot calculate weighted centroid for -1 cluster " "since it is a noise cluster" ) if data is None: data = self._clusterer._raw_data mask = self.labels_ == label_id cluster_data = data[mask] cluster_membership_strengths = self.probabilities_[mask] dist_metric = self._clusterer.branch_detection_data_.dist_metric dist_mat = dist_metric.pairwise(cluster_data) * cluster_membership_strengths medoid_index = np.argmin(dist_mat.sum(axis=1)) return cluster_data[medoid_index]
@property def approximation_graph_(self): """See :class:`~hdbscan.branches.BranchDetector` for documentation.""" if self._approximation_graphs is None: raise AttributeError( "No approximation graph was generated; try running fit first." ) return ApproximationGraph( self._approximation_graphs, self.labels_, self.probabilities_, self.centralities_, self.cluster_labels_, self.cluster_probabilities_, self.branch_labels_, self.branch_probabilities_, lens_name="centrality", sub_cluster_name="branch", raw_data=self._clusterer._raw_data, ) @property def condensed_trees_(self): """See :class:`~hdbscan.branches.BranchDetector` for documentation.""" if self._condensed_trees is None: raise AttributeError( "No condensed trees were generated; try running fit first." ) return [ CondensedTree(tree, self.branch_labels_[points]) for tree, points in zip( self._condensed_trees, self.cluster_points_, ) ] @property def linkage_trees_(self): """See :class:`~hdbscan.branches.BranchDetector` for documentation.""" if self._linkage_trees is None: raise AttributeError( "No linkage trees were generated; try running fit first." ) return [ SingleLinkageTree(tree) if tree is not None else None for tree in self._linkage_trees ] @property def exemplars_(self): """See :class:`~hdbscan.branches.BranchDetector` for documentation.""" if self._branch_exemplars is not None: return self._branch_exemplars if self._clusterer._raw_data is None: raise AttributeError( "Branch exemplars not available with precomputed " "distances." ) if self._condensed_trees is None: raise AttributeError("No branches detected; try running fit first.") num_clusters = len(self._condensed_trees) branch_cluster_trees = [ branch_tree[branch_tree["child_size"] > 1] for branch_tree in self._condensed_trees ] selected_branch_ids = [ sorted(branch_tree._select_clusters()) for branch_tree in self.condensed_trees_ ] self._branch_exemplars = [None] * num_clusters for i, points in enumerate(self.cluster_points_): selected_branches = selected_branch_ids[i] if len(selected_branches) <= (1 if self.label_sides_as_branches else 2): continue self._branch_exemplars[i] = [] raw_condensed_tree = self._condensed_trees[i] for branch in selected_branches: _branch_exemplars = np.array([], dtype=np.intp) for leaf in recurse_leaf_dfs(branch_cluster_trees[i], np.intp(branch)): leaf_max_lambda = raw_condensed_tree["lambda_val"][ raw_condensed_tree["parent"] == leaf ].max() candidates = raw_condensed_tree["child"][ (raw_condensed_tree["parent"] == leaf) & (raw_condensed_tree["lambda_val"] == leaf_max_lambda) ] _branch_exemplars = np.hstack([_branch_exemplars, candidates]) ids = points[_branch_exemplars] self._branch_exemplars[i].append(self._clusterer._raw_data[ids, :]) return self._branch_exemplars
[docs] def approximate_predict_branch(branch_detector, points_to_predict): """Predict the cluster and branch label of new points. Extends ``approximate_predict`` to also predict in which branch new points lie (if the cluster they are part of has branches). Parameters ---------- branch_detector : BranchDetector A clustering object that has been fit to vector input data. points_to_predict : array, or array-like (n_samples, n_features) The new data points to predict cluster labels for. They should have the same dimensionality as the original dataset over which clusterer was fit. Returns ------- labels : array (n_samples,) The predicted cluster and branch labels. probabilities : array (n_samples,) The soft cluster scores for each. cluster_labels : array (n_samples,) The predicted cluster labels. cluster_probabilities : array (n_samples,) The soft cluster scores for each. branch_labels : array (n_samples,) The predicted cluster labels. branch_probabilities : array (n_samples,) The soft cluster scores for each. """ cluster_labels, cluster_probabilities, connecting_points = approximate_predict( branch_detector._clusterer, points_to_predict, return_connecting_points=True ) num_predict = len(points_to_predict) labels = np.empty(num_predict, dtype=np.intp) probabilities = np.zeros(num_predict, dtype=np.double) branch_labels = np.zeros(num_predict, dtype=np.intp) branch_probabilities = np.ones(num_predict, dtype=np.double) min_num_branches = 2 if not branch_detector.label_sides_as_branches else 1 for i, (label, prob, connecting_point) in enumerate( zip(cluster_labels, cluster_probabilities, connecting_points) ): if label < 0: labels[i] = -1 elif len(branch_detector.branch_persistences_[label]) <= min_num_branches: labels[i] = label probabilities[i] = prob else: labels[i] = branch_detector.labels_[connecting_point] branch_labels[i] = branch_detector.branch_labels_[connecting_point] branch_probabilities[i] = branch_detector.branch_probabilities_[ connecting_point ] probabilities[i] = (prob + branch_probabilities[i]) / 2 return ( labels, probabilities, cluster_labels, cluster_probabilities, branch_labels, branch_probabilities, )
class SequentialPool: """API of a Joblib Parallel pool but sequential execution""" def __init__(self): self.n_jobs = 1 def __call__(self, jobs): return [fun(*args, **kwargs) for (fun, args, kwargs) in jobs]