import math
import numpy
from scipy import sparse
from sklearn.base import BaseEstimator, ClusterMixin
from sklearn.cluster import KMeans
from . import coreset_sc, utils
def signless_laplacian_and_D_sparse(A, D=None):
n = A.shape[0]
if D is None:
D = sparse.diags(A.sum(axis=1).A1)
else:
D = sparse.diags(D)
D_inv_half = sparse.diags(1 / numpy.sqrt(D.diagonal()))
L = D - A
N = D_inv_half @ L @ D_inv_half
M = sparse.eye(n) - (0.5) * N
return M, D
def fast_spectral_cluster(M, D, k: int, kmeans_alg=None):
# M is the signless laplacian: I - (1/2) * D^(-1/2) * A * D^(-1/2)
n = M.shape[0]
_l = min(k, math.ceil(math.log(k, 2)))
t = 10 * math.ceil(math.log(n / k, 2))
Y = numpy.random.normal(size=(n, _l))
# We know the top eigenvector of the normalised laplacian.
# It doesn't help with clustering, so we will project our power method to
# be orthogonal to it.
top_eigvec = numpy.sqrt(D @ numpy.full((n,), 1))
norm = numpy.linalg.norm(top_eigvec)
if norm > 0:
top_eigvec /= norm
for _ in range(t):
Y = M @ Y
# Project Y to be orthogonal to the top eigenvector
for i in range(_l):
Y[:, i] -= (top_eigvec.transpose() @ Y[:, i]) * top_eigvec
kmeans = (
kmeans_alg if kmeans_alg is not None else KMeans(n_clusters=k, n_init="auto")
)
kmeans.fit(Y)
return kmeans.labels_
[docs]
class CoresetSpectralClustering(BaseEstimator, ClusterMixin):
"""
Coreset Spectral Clustering
Parameters
----------
num_clusters : int
Number of clusters to form.
coreset_ratio : float, default=0.01
Ratio of the coreset size to the original data size. If set to 1.0,
coreset clustering will be skipped and the full graph will be clustered
directly.
k_over_sampling_factor : float, default=2.0
The factor to oversample the number of clusters for the coreset
seeding stage. Higher values will increase the "resolution" of the
sampling distribution, but take longer to compute.
shift: float, default=0.0
The shift to add to the implicit kernel matrix of the form K' = K + shift*D^{-1}.
This is useful for graphs with large edge weights relative to degree, which can
cause the kernel matrix to be indefinite.
kmeans_alg : sklearn.cluster.KMeans, default=None
The KMeans algorithm to use for clustering the coreset embeddings.
If None, a default KMeans algorithm will be used.
full_labels : bool, default=True
Whether to return the full labels of the graph after fitting.
If False, only the coreset labels will be returned.
ignore_warnings : bool, default=False
Whether to ignore warnings about the implicit Kernel matrix being indefinite.
Distances that do become negative will be clipped to zero.
Attributes
----------
"""
def __init__(
self,
num_clusters,
coreset_ratio=0.01,
k_over_sampling_factor=2.0,
shift=0.0,
kmeans_alg=None,
full_labels=True,
ignore_warnings=False,
):
if not isinstance(num_clusters, int):
raise TypeError("Number of clusters must be an integer")
if num_clusters <= 1:
raise ValueError("Number of clusters must be greater than 1")
if not isinstance(coreset_ratio, float):
raise TypeError("Coreset ratio must be a float")
if not (0.0 < coreset_ratio <= 1.0):
raise ValueError("Coreset ratio must be in the range (0, 1)")
if not isinstance(k_over_sampling_factor, float):
raise TypeError("k_over_sampling_factor must be a float")
if k_over_sampling_factor < 1.0:
raise ValueError("k_over_sampling_factor must be greater than 1.0")
if not isinstance(shift, float):
raise TypeError("shift must be a float")
if shift < 0.0:
raise ValueError("shift must be non-negative")
if full_labels not in [True, False]:
raise TypeError("full_labels must be a boolean")
if ignore_warnings not in [True, False]:
raise TypeError("ignore_warnings must be a boolean")
self.num_clusters = num_clusters
self.coreset_ratio = coreset_ratio
self.k_over_sampling_factor = k_over_sampling_factor
self.shift = shift
self.full_labels = full_labels
self.kmeans_alg = (
kmeans_alg
if kmeans_alg is not None
else KMeans(n_clusters=num_clusters, n_init=10)
)
self.ignore_warnings = ignore_warnings
self.n_ = None
self.data_ = None
self.indices_ = None
self.indptr_ = None
self.nnz_per_row_ = None
self.degrees_ = None
self.coreset_size_ = None
self.coreset_indices_ = None
self.coreset_labels_ = None
self.coreset_weights_ = None
self.coreset_graph = None
self.labels_ = None
self.closest_cluster_distance_ = None
[docs]
def fit(self, adjacency_matrix, y=None):
"""
Fit the coreset clustering algorithm on the sparse adjacency matrix.
Parameters
----------
adjacency_matrix : scipy.sparse.csr_matrix, shape = (n_samples, n_samples)
The adjacency matrix of the graph. This must contain self loops
for each node.
y : Ignored
Not used, present here for API consistency by convention.
Returns
-------
self : object
Fitted estimator.
"""
if not isinstance(adjacency_matrix, sparse.csr_matrix):
raise TypeError("Adjacency matrix must be a scipy.sparse.csr_matrix")
if adjacency_matrix.shape[0] != adjacency_matrix.shape[1]:
raise ValueError("Adjacency matrix must be square")
if self.num_clusters * self.k_over_sampling_factor > adjacency_matrix.shape[0]:
raise ValueError(
"Number of clusters times the oversampling factor must be less than the number of samples"
)
# check that self loops are present:
if not numpy.all(adjacency_matrix.diagonal() > 0):
raise ValueError(
"Adjacency matrix must contain self loop weights for each node. "
"To set all self loops to 1 (recommended), use adjacency_matrix.setdiag(1)"
)
self.n_ = adjacency_matrix.shape[0]
# Compute degree vector
self.degrees_ = adjacency_matrix.sum(axis=1).A1
# decompose the adjacency matrix into its components
self.data_, self.indices_, self.indptr_ = utils.convert_from_csr_matrix(
adjacency_matrix
)
self.nnz_per_row_ = numpy.diff(self.indptr_).astype(numpy.uint64)
if self.coreset_ratio == 1.0:
# If the coreset ratio is 1, we can skip the coreset stage
M, D = signless_laplacian_and_D_sparse(adjacency_matrix, self.degrees_)
self.labels_ = fast_spectral_cluster(
M, D, self.num_clusters, self.kmeans_alg
)
return self
rough_coreset_size = int(self.coreset_ratio * self.n_)
self.coreset_indices_, self.coreset_weights_, coreset_embeddings_ = (
coreset_sc.coreset_embeddings(
self.num_clusters,
self.n_,
rough_coreset_size,
self.k_over_sampling_factor,
self.data_,
self.indices_,
self.indptr_,
self.nnz_per_row_,
self.degrees_,
self.shift,
self.ignore_warnings,
)
)
self.coreset_size_ = self.coreset_indices_.shape[0]
# run kmeans on the coreset embeddings
self.kmeans_alg.fit(coreset_embeddings_)
self.coreset_labels_ = self.kmeans_alg.labels_.astype(numpy.uint64)
if self.full_labels:
self.label_full_graph()
return self
[docs]
def get_coreset_graph(self, adjacency_matrix, y=None):
"""
Extract a coreset graph from the adjacency matrix.
Parameters
----------
adjacency_matrix : scipy.sparse.csr_matrix, shape = (n_samples, n_samples)
The adjacency matrix of the graph. This must contain self loops
for each node.
y : Ignored
Not used, present here for API consistency by convention.
Returns
-------
self : object
Fitted estimator.
"""
if not isinstance(adjacency_matrix, sparse.csr_matrix):
raise TypeError("Adjacency matrix must be a scipy.sparse.csr_matrix")
if adjacency_matrix.shape[0] != adjacency_matrix.shape[1]:
raise ValueError("Adjacency matrix must be square")
if self.num_clusters * self.k_over_sampling_factor > adjacency_matrix.shape[0]:
raise ValueError(
"Number of clusters times the oversampling factor must be less than the number of samples"
)
# check that self loops are present:
if not numpy.all(adjacency_matrix.diagonal() > 0):
raise ValueError(
"Adjacency matrix must contain self loop weights for each node. "
"To set all self loops to 1 (recommended), use adjacency_matrix.setdiag(1)"
)
self.n_ = adjacency_matrix.shape[0]
# Compute degree vector
self.degrees_ = adjacency_matrix.sum(axis=1).A1
# decompose the adjacency matrix into its components
self.data_, self.indices_, self.indptr_ = utils.convert_from_csr_matrix(
adjacency_matrix
)
self.nnz_per_row_ = numpy.diff(self.indptr_).astype(numpy.uint64)
if self.coreset_ratio == 1.0:
# If the coreset ratio is 1, we can skip the coreset stage
M, D = signless_laplacian_and_D_sparse(adjacency_matrix, self.degrees_)
self.labels_ = fast_spectral_cluster(
M, D, self.num_clusters, self.kmeans_alg
)
return self
rough_coreset_size = int(self.coreset_ratio * self.n_)
(
self.coreset_indices_,
self.coreset_weights_,
cg_inptr,
cg_indices,
cg_data,
cg_nnz,
) = coreset_sc.coreset_graph_no_embeddings(
self.num_clusters,
self.n_,
rough_coreset_size,
self.k_over_sampling_factor,
self.data_,
self.indices_,
self.indptr_,
self.nnz_per_row_,
self.degrees_,
self.shift,
self.ignore_warnings,
)
self.coreset_size_ = self.coreset_indices_.shape[0]
# Construct the coreset graph as a scipy.sparse.csr_matrix
num_coreset_nodes = self.coreset_indices_.shape[0]
self.coreset_graph = sparse.csr_matrix(
(cg_data, cg_indices, cg_inptr),
shape=(num_coreset_nodes, num_coreset_nodes),
dtype=numpy.float64,
)
return self.coreset_graph
[docs]
def set_coreset_graph_labels(self, labels):
"""
Allow the user to set the coreset graph labels manually.
Must have constructed self.coreset_graph with
get_coreset_graph() first.
Parameters
----------
labels : numpy.ndarray, shape = (num_coreset_nodes,)
Cluster assignments for the coreset graph.
"""
if self.coreset_graph is None:
raise ValueError(
"Coreset graph must be constructed first using get_coreset_graph()"
)
if labels.shape[0] != self.coreset_graph.shape[0]:
raise ValueError(
"Labels must have the same number of nodes as the coreset graph"
)
self.coreset_labels_ = labels.astype(numpy.uint64)
[docs]
def compute_conductances(self):
"""
Compute the conductance of the labelled graph after fitting.
Returns
-------
conductances : numpy.ndarray, shape = (num_clusters,)
The conductance of each cluster
"""
assert self.labels_ is not None, "Labels must be computed before conductances"
# This also assumes all the other required attributes are set
return coreset_sc.compute_conductances(
self.num_clusters,
self.n_,
self.data_,
self.indices_,
self.indptr_,
self.nnz_per_row_,
self.degrees_,
self.labels_,
)
[docs]
def label_full_graph(self):
"""
Label the full graph using the coreset labels.
Skip this if the coreset ratio is 1.0.
Returns
-------
labels : numpy.ndarray, shape = (n_samples,)
Cluster assignments.
"""
if self.coreset_ratio == 1.0:
return None
self.labels_, self.closest_cluster_distance_ = coreset_sc.label_full_graph(
self.num_clusters,
self.n_,
self.data_,
self.indices_,
self.indptr_,
self.nnz_per_row_,
self.degrees_,
self.coreset_indices_,
self.coreset_weights_,
self.coreset_labels_,
self.shift,
)
[docs]
def fit_predict(self, adjacency_matrix, y=None):
"""
Fit the coreset clustering algorithm on the sparse adjacency matrix
and return the cluster assignments.
Parameters
----------
adjacency_matrix : scipy.sparse.csr_matrix, shape = (n_samples, n_samples)
The adjacency matrix of the graph. This must contain self loops
for each node.
y : Ignored
Not used, present here for API consistency by convention.
Returns
-------
labels : numpy.ndarray, shape = (n_samples,)
Cluster assignments.
"""
self.fit(adjacency_matrix)
if self.coreset_ratio == 1.0:
# If the coreset ratio is 1, we've already labelled the full graph
return self.labels_
else:
self.label_full_graph()
return self.labels_