import numpy as np
from sklearn.metrics import pairwise_distances
from scipy.spatial.distance import cdist
from ._hdbscan_linkage import mst_linkage_core
from .hdbscan_ import isclose
[docs]
def all_points_core_distance(distance_matrix, d=2.0):
"""
Compute the all-points-core-distance for all the points of a cluster.
Parameters
----------
distance_matrix : array (cluster_size, cluster_size)
The pairwise distance matrix between points in the cluster.
d : integer
The dimension of the data set, which is used in the computation
of the all-point-core-distance as per the paper.
Returns
-------
core_distances : array (cluster_size,)
The all-points-core-distance of each point in the cluster
References
----------
Moulavi, D., Jaskowiak, P.A., Campello, R.J., Zimek, A. and Sander, J.,
2014. Density-Based Clustering Validation. In SDM (pp. 839-847).
"""
distance_matrix[distance_matrix != 0] = (1.0 / distance_matrix[
distance_matrix != 0]) ** d
result = distance_matrix.sum(axis=1)
result /= distance_matrix.shape[0] - 1
if result.sum() == 0:
result = np.zeros(len(distance_matrix))
else:
result **= (-1.0 / d)
return result
def max_ratio(stacked_distances):
max_ratio = 0
for i in range(stacked_distances.shape[0]):
for j in range(stacked_distances.shape[1]):
dist = stacked_distances[i][j][0]
coredist = stacked_distances[i][j][1]
if dist == 0 or coredist/dist <= max_ratio:
continue
max_ratio = coredist/dist
return max_ratio
[docs]
def distances_between_points(X, labels, cluster_id,
metric='euclidean', d=None, no_coredist=False,
print_max_raw_to_coredist_ratio=False, **kwd_args):
"""
Compute pairwise distances for all the points of a cluster.
If metric is 'precomputed' then assume X is a distance matrix for the full
dataset. Note that in this case you must pass in 'd' the dimension of the
dataset.
Parameters
----------
X : array (n_samples, n_features) or (n_samples, n_samples)
The input data of the clustering. This can be the data, or, if
metric is set to `precomputed` the pairwise distance matrix used
for the clustering.
labels : array (n_samples)
The label array output by the clustering, providing an integral
cluster label to each data point, with -1 for noise points.
cluster_id : integer
The cluster label for which to compute the distances
metric : string
The metric used to compute distances for the clustering (and
to be re-used in computing distances for mr distance). If
set to `precomputed` then X is assumed to be the precomputed
distance matrix between samples.
d : integer (or None)
The number of features (dimension) of the dataset. This need only
be set in the case of metric being set to `precomputed`, where
the ambient dimension of the data is unknown to the function.
**kwd_args :
Extra arguments to pass to the distance computation for other
metrics, such as minkowski, Mahanalobis etc.
Returns
-------
distances : array (n_samples, n_samples)
The distances between all points in `X` with `label` equal to `cluster_id`.
core_distances : array (n_samples,)
The all-points-core_distance of all points in `X` with `label` equal
to `cluster_id`.
References
----------
Moulavi, D., Jaskowiak, P.A., Campello, R.J., Zimek, A. and Sander, J.,
2014. Density-Based Clustering Validation. In SDM (pp. 839-847).
"""
if metric == 'precomputed':
if d is None:
raise ValueError('If metric is precomputed a '
'd value must be provided!')
distance_matrix = X[labels == cluster_id, :][:, labels == cluster_id]
else:
subset_X = X[labels == cluster_id, :]
distance_matrix = pairwise_distances(subset_X, metric=metric,
**kwd_args)
d = X.shape[1]
if no_coredist:
return distance_matrix, None
else:
core_distances = all_points_core_distance(distance_matrix.copy(), d=d)
core_dist_matrix = np.tile(core_distances, (core_distances.shape[0], 1))
stacked_distances = np.dstack(
[distance_matrix, core_dist_matrix, core_dist_matrix.T])
if print_max_raw_to_coredist_ratio:
print("Max raw distance to coredistance ratio: " + str(max_ratio(stacked_distances)))
return stacked_distances.max(axis=-1), core_distances
[docs]
def internal_minimum_spanning_tree(mr_distances):
"""
Compute the 'internal' minimum spanning tree given a matrix of mutual
reachability distances. Given a minimum spanning tree the 'internal'
graph is the subgraph induced by vertices of degree greater than one.
Parameters
----------
mr_distances : array (cluster_size, cluster_size)
The pairwise mutual reachability distances, inferred to be the edge
weights of a complete graph. Since MSTs are computed per cluster
this is the all-points-mutual-reacability for points within a single
cluster.
Returns
-------
internal_nodes : array
An array listing the indices of the internal nodes of the MST
internal_edges : array (?, 3)
An array of internal edges in weighted edge list format; that is
an edge is an array of length three listing the two vertices
forming the edge and weight of the edge.
References
----------
Moulavi, D., Jaskowiak, P.A., Campello, R.J., Zimek, A. and Sander, J.,
2014. Density-Based Clustering Validation. In SDM (pp. 839-847).
"""
single_linkage_data = mst_linkage_core(mr_distances)
min_span_tree = single_linkage_data.copy()
for index, row in enumerate(min_span_tree[1:], 1):
candidates = np.where(isclose(mr_distances[int(row[1])], row[2]))[0]
candidates = np.intersect1d(candidates,
single_linkage_data[:index, :2].astype(
int))
candidates = candidates[candidates != row[1]]
assert len(candidates) > 0
row[0] = candidates[0]
vertices = np.arange(mr_distances.shape[0])[
np.bincount(min_span_tree.T[:2].flatten().astype(np.intp)) > 1]
if not len(vertices):
vertices = [0]
# A little "fancy" we select from the flattened array reshape back
# (Fortran format to get indexing right) and take the product to do an and
# then convert back to boolean type.
edge_selection = np.prod(
np.isin(min_span_tree.T[:2], vertices), axis=0).astype(bool)
# Density sparseness is not well defined if there are no
# internal edges (as per the referenced paper). However
# MATLAB code from the original authors simply selects the
# largest of *all* the edges in the case that there are
# no internal edges, so we do the same here
if np.any(edge_selection):
# If there are any internal edges, then subselect them out
edges = min_span_tree[edge_selection]
else:
# If there are no internal edges then we want to take the
# max over all the edges that exist in the MST, so we simply
# do nothing and return all the edges in the MST.
edges = min_span_tree.copy()
return vertices, edges
[docs]
def density_separation(X, labels, cluster_id1, cluster_id2,
internal_nodes1, internal_nodes2,
core_distances1, core_distances2,
metric='euclidean', no_coredist=False, **kwd_args):
"""
Compute the density separation between two clusters. This is the minimum
distance between pairs of points, one from internal nodes of MSTs of each cluster.
Parameters
----------
X : array (n_samples, n_features) or (n_samples, n_samples)
The input data of the clustering. This can be the data, or, if
metric is set to `precomputed` the pairwise distance matrix used
for the clustering.
labels : array (n_samples)
The label array output by the clustering, providing an integral
cluster label to each data point, with -1 for noise points.
cluster_id1 : integer
The first cluster label to compute separation between.
cluster_id2 : integer
The second cluster label to compute separation between.
internal_nodes1 : array
The vertices of the MST for `cluster_id1` that were internal vertices.
internal_nodes2 : array
The vertices of the MST for `cluster_id2` that were internal vertices.
core_distances1 : array (size of cluster_id1,)
The all-points-core_distances of all points in the cluster
specified by cluster_id1.
core_distances2 : array (size of cluster_id2,)
The all-points-core_distances of all points in the cluster
specified by cluster_id2.
metric : string
The metric used to compute distances for the clustering (and
to be re-used in computing distances for mr distance). If
set to `precomputed` then X is assumed to be the precomputed
distance matrix between samples.
**kwd_args :
Extra arguments to pass to the distance computation for other
metrics, such as minkowski, Mahanalobis etc.
Returns
-------
The 'density separation' between the clusters specified by
`cluster_id1` and `cluster_id2`.
References
----------
Moulavi, D., Jaskowiak, P.A., Campello, R.J., Zimek, A. and Sander, J.,
2014. Density-Based Clustering Validation. In SDM (pp. 839-847).
"""
if metric == 'precomputed':
sub_select = X[labels == cluster_id1, :][:, labels == cluster_id2]
distance_matrix = sub_select[internal_nodes1, :][:, internal_nodes2]
else:
cluster1 = X[labels == cluster_id1][internal_nodes1]
cluster2 = X[labels == cluster_id2][internal_nodes2]
distance_matrix = cdist(cluster1, cluster2, metric, **kwd_args)
if no_coredist:
return distance_matrix.min()
else:
core_dist_matrix1 = np.tile(core_distances1[internal_nodes1],
(distance_matrix.shape[1], 1)).T
core_dist_matrix2 = np.tile(core_distances2[internal_nodes2],
(distance_matrix.shape[0], 1))
mr_dist_matrix = np.dstack([distance_matrix,
core_dist_matrix1,
core_dist_matrix2]).max(axis=-1)
return mr_dist_matrix.min()
[docs]
def validity_index(X, labels, metric='euclidean',
d=None, per_cluster_scores=False, mst_raw_dist=False, verbose=False, **kwd_args):
"""
Compute the density based cluster validity index for the
clustering specified by `labels` and for each cluster in `labels`.
Parameters
----------
X : array (n_samples, n_features) or (n_samples, n_samples)
The input data of the clustering. This can be the data, or, if
metric is set to `precomputed` the pairwise distance matrix used
for the clustering.
labels : array (n_samples)
The label array output by the clustering, providing an integral
cluster label to each data point, with -1 for noise points.
metric : optional, string (default 'euclidean')
The metric used to compute distances for the clustering (and
to be re-used in computing distances for mr distance). If
set to `precomputed` then X is assumed to be the precomputed
distance matrix between samples.
d : optional, integer (or None) (default None)
The number of features (dimension) of the dataset. This need only
be set in the case of metric being set to `precomputed`, where
the ambient dimension of the data is unknown to the function.
per_cluster_scores : optional, boolean (default False)
Whether to return the validity index for individual clusters.
Defaults to False with the function returning a single float
value for the whole clustering.
mst_raw_dist : optional, boolean (default False)
If True, the MST's are constructed solely via 'raw' distances (depending on the given metric, e.g. euclidean distances)
instead of using mutual reachability distances. Thus setting this parameter to True avoids using 'all-points-core-distances' at all.
This is advantageous specifically in the case of elongated clusters that lie in close proximity to each other <citation needed>.
**kwd_args :
Extra arguments to pass to the distance computation for other
metrics, such as minkowski, Mahanalobis etc.
Returns
-------
validity_index : float
The density based cluster validity index for the clustering. This
is a numeric value between -1 and 1, with higher values indicating
a 'better' clustering.
per_cluster_validity_index : array (n_clusters,)
The cluster validity index of each individual cluster as an array.
The overall validity index is the weighted average of these values.
Only returned if per_cluster_scores is set to True.
References
----------
Moulavi, D., Jaskowiak, P.A., Campello, R.J., Zimek, A. and Sander, J.,
2014. Density-Based Clustering Validation. In SDM (pp. 839-847).
"""
core_distances = {}
density_sparseness = {}
mst_nodes = {}
mst_edges = {}
max_cluster_id = labels.max() + 1
density_sep = np.inf * np.ones((max_cluster_id, max_cluster_id),
dtype=np.float64)
cluster_validity_indices = np.empty(max_cluster_id, dtype=np.float64)
for cluster_id in range(max_cluster_id):
if np.sum(labels == cluster_id) == 0:
continue
distances_for_mst, core_distances[
cluster_id] = distances_between_points(
X,
labels,
cluster_id,
metric,
d,
no_coredist=mst_raw_dist,
print_max_raw_to_coredist_ratio=verbose,
**kwd_args
)
mst_nodes[cluster_id], mst_edges[cluster_id] = \
internal_minimum_spanning_tree(distances_for_mst)
density_sparseness[cluster_id] = mst_edges[cluster_id].T[2].max()
for i in range(max_cluster_id):
if np.sum(labels == i) == 0:
continue
internal_nodes_i = mst_nodes[i]
for j in range(i + 1, max_cluster_id):
if np.sum(labels == j) == 0:
continue
internal_nodes_j = mst_nodes[j]
density_sep[i, j] = density_separation(
X, labels, i, j,
internal_nodes_i, internal_nodes_j,
core_distances[i], core_distances[j],
metric=metric, no_coredist=mst_raw_dist,
**kwd_args
)
density_sep[j, i] = density_sep[i, j]
n_samples = float(X.shape[0])
result = 0
for i in range(max_cluster_id):
if np.sum(labels == i) == 0:
continue
min_density_sep = density_sep[i].min()
cluster_validity_indices[i] = (
(min_density_sep - density_sparseness[i]) /
max(min_density_sep, density_sparseness[i])
)
if verbose:
print("Minimum density separation: " + str(min_density_sep))
print("Density sparseness: " + str(density_sparseness[i]))
cluster_size = np.sum(labels == i)
result += (cluster_size / n_samples) * cluster_validity_indices[i]
if per_cluster_scores:
return result, cluster_validity_indices
else:
return result