# This file is part of the Gudhi Library - https://gudhi.inria.fr/ - which is released under MIT.
# See file LICENSE or go to https://gudhi.inria.fr/licensing/ for full license details.
# Author(s): Mathieu Carrière
#
# Copyright (C) 2021 Inria
#
# Modification(s):
# - YYYY/MM Author: Description of the modification
import numpy as np
import itertools
import matplotlib
import matplotlib.pyplot as plt
from sklearn.base import BaseEstimator
from sklearn.cluster import DBSCAN, AgglomerativeClustering
from sklearn.metrics import pairwise_distances
from scipy.spatial.distance import directed_hausdorff
from sklearn import __version__ as sklearn_version
from sklearn.utils.fixes import parse_version
agglomerative_clustering_metric = parse_version(sklearn_version) >= parse_version('1.2.0')
from . import SimplexTree, CoverComplex
def _save_to_html(dat, lens, color, param, points, edges, html_output_filename):
from ._kepler_mapper import KeplerMapper
network = {}
mapper = KeplerMapper(verbose=0)
data = np.zeros((3,3))
projected_data = mapper.fit_transform( data, projection="sum", scaler=None )
from collections import defaultdict
nodes = defaultdict(list)
links = defaultdict(list)
custom = defaultdict(list)
for point in points:
nodes[ str(int(point[0])) ] = [ int(point[0]), point[1], int(point[2]) ]
links[ str(int(point[0])) ] = []
custom[ int(point[0]) ] = point[1]
for edge in edges:
links[ str(edge[0]) ].append( str(edge[1]) )
links[ str(edge[1]) ].append( str(edge[0]) )
custom_val = custom.values()
m = min(custom_val)
M = max(custom_val)
network["nodes"] = nodes
network["links"] = links
network["meta"] = lens
mapper.visualize(network, color_function=color, path_html=html_output_filename, title=dat,
graph_link_distance=30, graph_gravity=0.1, graph_charge=-120, custom_tooltips=custom, width_html=0,
height_html=0, show_tooltips=True, show_title=True, show_meta=True, res=param[0], gain=param[1], minimum=m, maximum=M)
message = repr(html_output_filename) + " is generated. You can now use your favorite web browser to visualize it."
print(message)
class CoverComplexPy(BaseEstimator):
"""
This is a mother class for MapperComplex, GraphInducedComplex and NerveComplex.
Attributes:
simplex_tree_ (gudhi SimplexTree): simplicial complex representing the cover complex computed after calling the fit() method.
node_info_ (dictionary): various information associated to the nodes of the cover complex.
"""
def __init__(self, verbose=False):
"""
Constructor for the CoverComplexPy class.
Parameters
----------
"""
self.verbose = verbose
def get_networkx(self, set_attributes_from_colors=False):
"""
Turn the 1-skeleton of the cover complex computed after calling fit() method into a networkx graph.
This function requires networkx (https://networkx.org/documentation/stable/install.html).
Parameters
----------
set_attributes_from_colors : bool
if True, the color functions will be used as attributes for the networkx graph.
Returns
-------
G : networkx graph
graph representing the 1-skeleton of the cover complex.
"""
import networkx as nx
st = self.simplex_tree_
G = nx.Graph()
for (splx,_) in st.get_skeleton(1):
if len(splx) == 1:
G.add_node(splx[0])
if len(splx) == 2:
G.add_edge(splx[0], splx[1])
if set_attributes_from_colors:
attrs = {k: {"attr_name": self.node_info_[k]["colors"]} for k in G.nodes()}
nx.set_node_attributes(G, attrs)
return G
def save_to_dot(self, file_name="cover_complex", color_name="color", eps_color=.1, eps_size=.1):
"""
Write the 0-skeleton of the cover complex in a DOT file called "{file_name}.dot", that can be processed with, e.g., neato. The vertices of the cover complex are colored with the first color function, ie, the first column of self.colors. This function also produces an extra pdf file "colorbar_{color_name}.pdf" containing a colorbar corresponding to the node colors in the DOT file.
Parameters
----------
file_name : string
name for the output .dot file, default "cover_complex"
color_name : string
name for the output .pdf showing the colorbar of the color used for the Mapper nodes, default "color"
eps_color : float
scale the node colors between [eps_color, 1-eps_color]. Should be between 0 and 1/2. When close to 0., the color varies a lot across the nodes, if close to 1/2, the color tends to be more uniform.
eps_size : float
scale the node sizes between [eps_size, 1-eps_size]. Should be between 0 and 1/2. When close to 0., the size varies a lot across the nodes, if close to 1/2, the nodes tend to have the same size.
"""
st = self.simplex_tree_
node_info_ = self.node_info_
maxv, minv = max(node_info_[k]["colors"][0] for k in node_info_.keys()), min(node_info_[k]["colors"][0] for k in node_info_.keys())
maxs, mins = max(node_info_[k]["size"] for k in node_info_.keys()), min(node_info_[k]["size"] for k in node_info_.keys())
if not file_name.lower().endswith(".dot"):
file_name += ".dot"
with open(file_name, "w") as f:
f.write("graph MAP{")
cols = []
for (simplex,_) in st.get_skeleton(0):
cnode = (1.-2*eps_color) * (node_info_[simplex[0]]["colors"][0] - minv)/(maxv-minv) + eps_color if maxv != minv else 0
snode = (1.-2*eps_size) * (node_info_[simplex[0]]["size"]-mins)/(maxs-mins) + eps_size if maxs != mins else 1
f.write( str(simplex[0]) + "[shape=circle width=" + str(snode) + " fontcolor=black color=black label=\"" + "\" style=filled fillcolor=\"" + str(cnode) + ", 1, 1\"]")
cols.append(cnode)
for (simplex,_) in st.get_simplices():
if len(simplex) == 2:
f.write(" " + str(simplex[0]) + " -- " + str(simplex[1]) + " [weight=15];")
f.write("}")
L = np.linspace(eps_color, 1.-eps_color, 100)
colsrgb = []
import colorsys
for c in L:
colsrgb.append(colorsys.hsv_to_rgb(c,1,1))
fig, ax = plt.subplots(figsize=(6, 1))
fig.subplots_adjust(bottom=0.5)
my_cmap = matplotlib.colors.ListedColormap(colsrgb, name=color_name)
cb = matplotlib.colorbar.ColorbarBase(ax, cmap=my_cmap, norm=matplotlib.colors.Normalize(vmin=minv, vmax=maxv), orientation="horizontal")
cb.set_label(color_name)
fig.savefig("colorbar_" + color_name + ".pdf", format="pdf")
plt.close()
def save_to_txt(self, file_name="cover_complex", data_name="data", cover_name="cover", color_name="color"):
"""
Write the cover complex to a TXT file called "{file_name}.txt", that can be processed with the KeplerMapper Python script "KeplerMapperVisuFromTxtFile.py" available under "src/Nerve_GIC/utilities/".
Parameters
----------
file_name : string
name for the output .txt file, default "cover_complex"
data_name : string
name to use for the data on which the cover complex was computed, default "data". It will be used when generating an html visualization with KeplerMapperVisuFromTxtFile.py
cover_name : string
name to use for the cover used to compute the cover complex, default "cover". It will be used when generating an html visualization with KeplerMapperVisuFromTxtFile.py
color_name : string
name to use for the color used to color the cover complex nodes, default "color". It will be used when generating an html visualization with KeplerMapperVisuFromTxtFile.py
"""
st = self.simplex_tree_
if not file_name.lower().endswith(".txt"):
file_name += ".txt"
with open(file_name, "w") as f:
f.write(data_name + "\n")
f.write(cover_name + "\n")
f.write(color_name + "\n")
f.write(str(self.resolutions[0]) + " " + str(self.gains[0]) + "\n")
f.write(str(st.num_vertices()) + " " + str(len(list(st.get_skeleton(1)))-st.num_vertices()) + "\n")
name2id = {}
idv = 0
for s,_ in st.get_skeleton(0):
f.write(str(idv) + " " + str(self.node_info_[s[0]]["colors"][0]) + " " + str(self.node_info_[s[0]]["size"]) + "\n")
name2id[s[0]] = idv
idv += 1
for s,_ in st.get_skeleton(1):
if len(s) == 2:
f.write(str(name2id[s[0]]) + " " + str(name2id[s[1]]) + "\n")
def save_to_html(self, file_name="cover_complex", data_name="data", cover_name="cover", color_name="color"):
"""
Write the cover complex to an HTML file called "{file_name}.html", that can be visualized in a browser. This function is based on a fork of https://github.com/MLWave/kepler-mapper
Parameters
----------
file_name : string
name for the output .html file, default "cover_complex"
data_name : string
name to use for the data on which the cover complex was computed, default "data".
cover_name : string
name to use for the cover used to compute the cover complex, default "cover".
color_name : string
name to use for the color used to color the cover complex nodes, default "color".
"""
st = self.simplex_tree_
if not file_name.lower().endswith(".html"):
file_name += ".html"
points, edges, name2id, idv = [], [], {}, 0
for s,_ in st.get_skeleton(0):
points.append([ idv, self.node_info_[s[0]]["colors"][0], self.node_info_[s[0]]["size"] ])
name2id[s[0]] = idv
idv += 1
for s,_ in st.get_skeleton(1):
if len(s) == 2:
edges.append([ name2id[s[0]] , name2id[s[1]] ])
_save_to_html(data_name, cover_name, color_name, [self.resolutions[0], self.gains[0]], points, edges, file_name)
class _constant_clustering():
def fit_predict(X):
return np.zeros([len(X)], dtype=np.int32)
[docs]
class MapperComplex(CoverComplexPy):
"""
This is a class for computing Mapper simplicial complexes on point clouds or distance matrices.
"""
[docs]
def __init__(self, *, input_type="point cloud", colors=None, min_points_per_node=0, filter_bnds=None, resolutions=None, gains=None, clustering=DBSCAN(), N=100, beta=0., C=10., verbose=False):
"""
Constructor for the MapperComplex class.
Parameters
----------
input_type : string
type of input data. Either "point cloud" or "distance matrix".
min_points_per_node : int
threshold on the size of the cover complex nodes (default 0). Any node associated to a subpopulation with less than **min_points_per_node** points will be removed.
filter_bnds : list of lists or numpy array of shape (num_filters) x 2)
limits of each filter, of the form [[f_1^min, f_1^max], ..., [f_n^min, f_n^max]]. If one of the values is numpy.nan, it can be computed from the dataset with the fit() method.
resolutions : list or numpy array of shape num_filters containing integers
resolution of each filter function, ie number of intervals required to cover each filter image. If None, it is estimated from data.
gains : list or numpy array of shape num_filters containing doubles in [0,1]
gain of each filter function, ie overlap percentage of the intervals covering each filter image. If None, it is set as 1/3 for all filters, since in the automatic parameter selection method in http://www.jmlr.org/papers/volume19/17-291/17-291.pdf, any arbitrary value between 1/3 and 1/2 works, so we go with the minimal one (ensuring that the complex is a graph if only given one filter).
clustering : class
clustering class (default sklearn.cluster.DBSCAN()). Common clustering classes can be found in the scikit-learn library (such as AgglomerativeClustering for instance). If None, it is set to hierarchical clustering, with scale estimated from data.
N : int
subsampling iterations (default 100) for estimating scale and resolutions. Used only if clustering or resolutions = None. See http://www.jmlr.org/papers/volume19/17-291/17-291.pdf for details.
beta : float
exponent parameter (default 0.) for estimating scale and resolutions. Used only if clustering or resolutions = None. See http://www.jmlr.org/papers/volume19/17-291/17-291.pdf for details.
C : float
constant parameter (default 10.) for estimating scale and resolutions. Used only if clustering or resolutions = None. See http://www.jmlr.org/papers/volume19/17-291/17-291.pdf for details.
verbose : bool
whether to display info while computing.
"""
self.filter_bnds, self.resolutions, self.gains, self.clustering = filter_bnds, resolutions, gains, clustering
self.input_type, self.min_points_per_node, self.N, self.beta, self.C = input_type, min_points_per_node, N, beta, C
CoverComplexPy.__init__(self, verbose)
[docs]
def estimate_scale(self, X, N=100, beta=0., C=10.):
"""
Compute estimated scale of a point cloud or a distance matrix.
Parameters
----------
X : numpy array of shape (num_points) x (num_coordinates) if point cloud and (num_points) x (num_points) if distance matrix
input point cloud or distance matrix.
N : int
subsampling iterations (default 100). See http://www.jmlr.org/papers/volume19/17-291/17-291.pdf for details.
beta : float
exponent parameter (default 0.). See http://www.jmlr.org/papers/volume19/17-291/17-291.pdf for details.
C : float
constant parameter (default 10.). See http://www.jmlr.org/papers/volume19/17-291/17-291.pdf for details.
Returns
-------
delta : float
estimated scale that can be used with, e.g., agglomerative clustering.
"""
num_pts = X.shape[0]
delta, m = 0., int( num_pts / np.exp((1+beta) * np.log(np.log(num_pts)/np.log(C))) )
for _ in range(N):
subpop = np.random.choice(num_pts, size=m, replace=False)
if self.input_type == "point cloud":
d, _, _ = directed_hausdorff(X, X[subpop,:])
if self.input_type == "distance matrix":
d = np.max(np.min(X[:,subpop], axis=1), axis=0)
delta += d/N
return delta
[docs]
def get_optimal_parameters_for_agglomerative_clustering(self, X, beta=0., C=10., N=100):
"""
Compute optimal scale and resolutions for a point cloud or a distance matrix.
Parameters
----------
X : numpy array of shape (num_points) x (num_coordinates) if point cloud and (num_points) x (num_points) if distance matrix
input point cloud or distance matrix.
beta : float
exponent parameter (default 0.). See http://www.jmlr.org/papers/volume19/17-291/17-291.pdf for details.
C : float
constant parameter (default 10.). See http://www.jmlr.org/papers/volume19/17-291/17-291.pdf for details.
N : int
subsampling iterations (default 100). See http://www.jmlr.org/papers/volume19/17-291/17-291.pdf for details.
Returns
-------
delta : float
optimal scale that can be used with agglomerative clustering.
resolutions : numpy array of shape (num_filters)
optimal resolutions associated to each filter.
"""
num_filt, delta = self.filters.shape[1], 0
delta = self.estimate_scale(X=X, N=N, C=C, beta=beta)
pairwise = pairwise_distances(X, metric="euclidean") if self.input_type == "point cloud" else X
pairs = np.argwhere(pairwise <= delta)
num_pairs = pairs.shape[0]
res = []
for f in range(num_filt):
F = self.filters[:,f]
resf = 0
for p in range(num_pairs):
resf = max(resf, abs(F[pairs[p,0]] - F[pairs[p,1]]))
res.append(resf)
return delta, np.array(res)
[docs]
def fit(self, X, y=None, filters=None, colors=None):
"""
Fit the MapperComplex class on a point cloud or a distance matrix: compute the Mapper complex and store it in a simplex tree called `simplex_tree_`.
Parameters
----------
X : numpy array of shape (num_points) x (num_coordinates) if point cloud and (num_points) x (num_points) if distance matrix
input point cloud or distance matrix.
y : n x 1 array
point labels (unused).
filters : list of lists or numpy array of shape (num_points) x (num_filters)
filter functions (sometimes called lenses) used to compute the cover. Each column of the numpy array defines a scalar function defined on the input points.
colors : list of lists or numpy array of shape (num_points) x (num_colors)
functions used to color the nodes of the cover complex. More specifically, coloring is done by computing the means of these functions on the subpopulations corresponding to each node. If None, first coordinate is used if input is point cloud, and eccentricity is used if input is distance matrix.
"""
if self.resolutions is not None:
self.resolutions = np.array(self.resolutions)
if len(self.resolutions.shape) == 0:
self.resolutions = np.broadcast_to(self.resolutions, 1)
if self.gains is not None:
self.gains = np.array(self.gains)
if len(self.gains.shape) == 0:
self.gains = np.broadcast_to(self.gains, 1)
if self.filter_bnds is not None:
self.filter_bnds = np.array(self.filter_bnds)
self.filters, self.colors = filters, colors
if self.filters is None:
if self.input_type == "point cloud":
self.filters = X[:,0:1]
elif self.input_type == "distance matrix":
self.filters = X.max(axis=0)[:,None]
else:
if isinstance(self.filters, np.ndarray) == False:
self.filters = np.array(self.filters).T
if self.colors is None:
if self.input_type == "point cloud":
self.colors = X[:,0:1]
elif self.input_type == "distance matrix":
self.colors = X.max(axis=0)[:,None]
else:
if isinstance(self.colors, np.ndarray) == False:
self.colors = np.array(self.colors).T
if len(self.filters.shape) == 1: # if self.filters is a 1D filter, convert it to an array of shape [n,1]
self.filters = np.reshape(self.filters, [len(X),1])
if len(self.colors.shape) == 1: # if self.colors is a 1D filter, convert it to an array of shape [n,1]
self.colors = np.reshape(self.colors, [len(X),1])
num_pts, num_filters = self.filters.shape[0], self.filters.shape[1]
# If some filter limits are unspecified, automatically compute them
if self.filter_bnds is None:
self.filter_bnds = np.hstack([np.min(self.filters, axis=0)[:,np.newaxis], np.max(self.filters, axis=0)[:,np.newaxis]])
# If some resolutions are not specified, automatically compute them
if self.gains is None:
self.gains = np.broadcast_to(1/3, num_filters)
if self.resolutions is None or self.clustering is None:
delta, resolutions = self.get_optimal_parameters_for_agglomerative_clustering(X=X, beta=self.beta, C=self.C, N=self.N)
if self.clustering is None:
self.clustering = AgglomerativeClustering(n_clusters=None, linkage="single", distance_threshold=delta, **{
"metric" if agglomerative_clustering_metric else "affinity":
"euclidean" if self.input_type == "point cloud" else "precomputed"
})
if self.resolutions is None:
self.resolutions = np.multiply(resolutions, 1./self.gains)
self.resolutions = np.array([int( (self.filter_bnds[ir,1]-self.filter_bnds[ir,0])/r) for ir, r in enumerate(self.resolutions)])
# Initialize attributes
self.simplex_tree_, self.node_info_ = SimplexTree(), {}
if np.all(self.gains < .5):
# Compute which points fall in which patch or patch intersections
interval_inds, intersec_inds = np.empty(self.filters.shape), np.empty(self.filters.shape)
for i in range(num_filters):
f, r, g = self.filters[:,i], self.resolutions[i], self.gains[i]
min_f, max_f = self.filter_bnds[i,0], np.nextafter(self.filter_bnds[i,1], np.inf)
interval_endpoints, l = np.linspace(min_f, max_f, num=r+1, retstep=True)
intersec_endpoints = []
for j in range(1, len(interval_endpoints)-1):
intersec_endpoints.append(interval_endpoints[j] - g*l / (2 - 2*g))
intersec_endpoints.append(interval_endpoints[j] + g*l / (2 - 2*g))
interval_inds[:,i] = np.digitize(f, interval_endpoints)
intersec_inds[:,i] = 0.5 * (np.digitize(f, intersec_endpoints) + 1)
# Build the binned_data map that takes a patch or a patch intersection and outputs the indices of the points contained in it
binned_data = {}
for i in range(num_pts):
list_preimage = []
for j in range(num_filters):
a, b = interval_inds[i,j], intersec_inds[i,j]
list_preimage.append([a])
if b == a:
list_preimage[j].append(a+1)
if b == a-1:
list_preimage[j].append(a-1)
list_preimage = list(itertools.product(*list_preimage))
for pre_idx in list_preimage:
try:
binned_data[pre_idx].append(i)
except KeyError:
binned_data[pre_idx] = [i]
else:
# Compute interval endpoints for each filter
l_int, r_int = [], []
for i in range(num_filters):
L, R = [], []
f, r, g = self.filters[:,i], self.resolutions[i], self.gains[i]
min_f, max_f = self.filter_bnds[i,0], np.nextafter(self.filter_bnds[i,1], np.inf)
interval_endpoints, l = np.linspace(min_f, max_f, num=r+1, retstep=True)
for j in range(len(interval_endpoints)-1):
L.append(interval_endpoints[j] - g*l / (2 - 2*g))
R.append(interval_endpoints[j+1] + g*l / (2 - 2*g))
l_int.append(L)
r_int.append(R)
# Build the binned_data map that takes a patch or a patch intersection and outputs the indices of the points contained in it
binned_data = {}
for i in range(num_pts):
list_preimage = []
for j in range(num_filters):
fval = self.filters[i,j]
start, end = int(min(np.argwhere(np.array(r_int[j]) >= fval))), int(max(np.argwhere(np.array(l_int[j]) <= fval)))
list_preimage.append(list(range(start, end+1)))
list_preimage = list(itertools.product(*list_preimage))
for pre_idx in list_preimage:
try:
binned_data[pre_idx].append(i)
except KeyError:
binned_data[pre_idx] = [i]
# Initialize the cover map, that takes a point and outputs the clusters to which it belongs
cover, clus_base = [[] for _ in range(num_pts)], 0
# For each patch
for preimage in binned_data:
# Apply clustering on the corresponding subpopulation
idxs = np.array(binned_data[preimage])
if len(idxs) > 1:
clusters = self.clustering.fit_predict(X[idxs,:]) if self.input_type == "point cloud" else self.clustering.fit_predict(X[idxs,:][:,idxs])
elif len(idxs) == 1:
clusters = np.array([0])
else:
continue
# Collect various information on each cluster
num_clus_pre = np.max(clusters) + 1
for clus_i in range(num_clus_pre):
node_name = clus_base + clus_i
subpopulation = idxs[clusters == clus_i]
self.node_info_[node_name] = {}
self.node_info_[node_name]["indices"] = subpopulation
self.node_info_[node_name]["size"] = len(subpopulation)
self.node_info_[node_name]["colors"] = np.mean(self.colors[subpopulation,:], axis=0)
self.node_info_[node_name]["patch"] = preimage
# Update the cover map
for pt in range(clusters.shape[0]):
node_name = clus_base + clusters[pt]
if clusters[pt] != -1 and self.node_info_[node_name]["size"] >= self.min_points_per_node:
cover[idxs[pt]].append(node_name)
clus_base += np.max(clusters) + 1
# Insert the simplices of the Mapper complex
for i in range(num_pts):
self.simplex_tree_.insert(cover[i])
return self
[docs]
class GraphInducedComplex(CoverComplexPy):
"""
This is a class for computing graph induced simplicial complexes on point clouds or distance matrices.
"""
[docs]
def __init__(self, *, input_type="point cloud", cover="functional", min_points_per_node=0,
voronoi_samples=100, assignments=None, filter_bnds=None, resolution=None, gain=None, N=100, beta=0., C=10.,
graph="rips", rips_threshold=None, verbose=False):
"""
Constructor for the GraphInducedComplex class.
Parameters:
input_type (string): type of input data. Either "point cloud" or "distance matrix".
cover (string): specifies the cover. Either "functional" (preimages of filter function), "voronoi" or "precomputed".
min_points_per_node (int): threshold on the size of the cover complex nodes (default 0). Any node associated to a subpopulation with less than **min_points_per_node** points will be removed.
voronoi_samples (int): number of Voronoi germs used for partitioning the input dataset. Used only if cover = "voronoi".
assignments (list of length (num_points) of lists of integers): cover assignment for each point. Used only if cover = "precomputed".
filter_bnds (list or numpy array of shape 2): limits of the filter function, of the form [f^min, f^max]. If one of the values is numpy.nan, it can be computed from the dataset with the fit() method. Used only if cover = "functional".
resolution (int): resolution of the filter function, ie number of intervals required to cover each filter image. Used only if cover = "functional". If None, it is estimated from data.
gain (double in [0,1]): gain of the filter function, ie overlap percentage of the intervals covering each filter image. Used only if cover = "functional".
N (int): subsampling iterations (default 100) for estimating scale and resolutions. Used only if cover = "functional". See http://www.jmlr.org/papers/volume19/17-291/17-291.pdf for details.
beta (double): exponent parameter (default 0.) for estimating scale and resolutions. Used only if cover = "functional". See http://www.jmlr.org/papers/volume19/17-291/17-291.pdf for details.
C (double): constant parameter (default 10.) for estimating scale and resolutions. Used only if cover = "functional". See http://www.jmlr.org/papers/volume19/17-291/17-291.pdf for details.
graph (string): type of graph to use for GIC. Currently accepts "rips" only.
rips_threshold (float): Rips parameter. Used only if graph = "rips".
verbose (bool): whether to display info while computing.
"""
self.input_type, self.cover, self.min_points_per_node = input_type, cover, min_points_per_node
self.voronoi_samples, self.assignments, self.filter_bnds, self.resolution, self.gain = voronoi_samples, assignments, filter_bnds, resolution, gain
self.graph, self.rips_threshold, self.N, self.beta, self.C = graph, rips_threshold, N, beta, C
CoverComplexPy.__init__(self, verbose)
[docs]
def fit(self, X, y=None, filter=None, color=None):
"""
Fit the GraphInducedComplex class on a point cloud or a distance matrix: compute the graph induced complex and store it in a simplex tree called `simplex_tree_`.
Parameters:
X (numpy array of shape (num_points) x (num_coordinates) if point cloud and (num_points) x (num_points) if distance matrix): input point cloud or distance matrix.
y (n x 1 array): point labels (unused).
filter (list or numpy array of shape (num_points)): filter function (sometimes called lens) used to compute the cover. Used only if cover = "functional".
color (list or numpy array of shape (num_points)): function used to color the nodes of the cover complex. More specifically, coloring is done by computing the means of this function on the subpopulations corresponding to each node. If None, first coordinate is used if input is point cloud, and eccentricity is used if input is distance matrix.
"""
self.data, self.filter, self.color = X, filter, color
self.complex = CoverComplex()
self.complex.set_type("GIC")
self.complex.set_verbose(self.verbose)
if self.input_type == "point cloud":
self.complex.set_point_cloud_from_range(X)
elif self.input_type == "distance matrix":
self.complex.set_distances_from_range(X)
# Set vertex color
if self.color is None:
if self.input_type == "point cloud":
self.color = X[:,0]
elif self.input_type == "distance matrix":
self.color = X.max(axis=0)
else:
self.color = np.array(self.color)
self.complex.set_color_from_range(self.color)
# Set underlying graph
if self.graph == "rips":
if self.rips_threshold is not None:
self.complex.set_graph_from_rips(self.rips_threshold)
else:
self.complex.set_subsampling(self.C, self.beta)
self.complex.set_graph_from_automatic_rips(self.N)
# Set vertex cover
if self.cover == "voronoi":
self.complex.set_cover_from_Voronoi(self.voronoi_samples)
elif self.cover == "functional":
if self.filter is None:
if self.input_type == "point cloud":
self.filter = X[:,0]
elif self.input_type == "distance matrix":
self.filter = X.max(axis=0)
else:
self.filter = np.array(self.filter)
self.complex.set_function_from_range(self.filter)
if self.resolution is None:
self.complex.set_automatic_resolution()
else:
self.complex.set_resolution_with_interval_number(self.resolution)
if self.gain is None:
self.complex.set_gain(.33)
else:
self.complex.set_gain(self.gain)
self.complex.set_cover_from_function()
elif self.cover == "precomputed":
self.complex.set_cover_from_range(self.assignments)
# Compute simplex tree
self.complex.set_mask(self.min_points_per_node)
self.complex.find_simplices()
simplex_tree_ = self.complex.create_simplex_tree()
# Normalize vertex names of simplex tree
self.simplex_tree_ = SimplexTree()
idv, names = 0, {}
for v,_ in simplex_tree_.get_skeleton(0):
if len(self.complex.subpopulation(v[0])) > self.min_points_per_node:
names[v[0]] = idv
self.simplex_tree_.insert([idv])
idv += 1
for s,_ in simplex_tree_.get_simplices():
if len(s) >= 2 and np.all([len(self.complex.subpopulation(v)) > self.min_points_per_node for v in s]):
self.simplex_tree_.insert([names[v] for v in s])
# Store vertex info
self.node_info_ = {}
for v,_ in simplex_tree_.get_skeleton(0):
if len(self.complex.subpopulation(v[0])) > self.min_points_per_node:
node = names[v[0]]
pop = self.complex.subpopulation(v[0])
self.node_info_[node] = {"indices": pop, "size": len(pop), "colors": [self.complex.subcolor(v[0])]}
return self
[docs]
class NerveComplex(CoverComplexPy):
"""
This is a class for computing nerve simplicial complexes on point clouds or distance matrices.
"""
[docs]
def __init__(self, *, input_type="point cloud", min_points_per_node=0, verbose=False):
"""
Constructor for the NerveComplex class.
Parameters:
input_type (string): type of input data. Either "point cloud" or "distance matrix".
min_points_per_node (int): threshold on the size of the cover complex nodes (default 0). Any node associated to a subpopulation with less than **min_points_per_node** points will be removed.
verbose (bool): whether to display info while computing.
"""
self.input_type, self.min_points_per_node = input_type, min_points_per_node
CoverComplexPy.__init__(self, verbose)
[docs]
def fit(self, X, y=None, assignments=None, color=None):
"""
Fit the NerveComplex class on a point cloud or a distance matrix: compute the nerve complex and store it in a simplex tree called `simplex_tree_`.
Parameters:
X (numpy array of shape (num_points) x (num_coordinates) if point cloud and (num_points) x (num_points) if distance matrix): input point cloud or distance matrix.
y (n x 1 array): point labels (unused).
assignments (list of length (num_points) of lists of integers): cover assignment for each point.
color (numpy array of shape (num_points) x (num_colors)): functions used to color the nodes of the cover complex. More specifically, coloring is done by computing the means of these functions on the subpopulations corresponding to each node. If None, first coordinate is used if input is point cloud, and eccentricity is used if input is distance matrix.
"""
self.data, self.color = X, color
self.assignments = assignments
self.complex = CoverComplex()
self.complex.set_type("Nerve")
self.complex.set_verbose(self.verbose)
if self.input_type == "point cloud":
self.complex.set_point_cloud_from_range(X)
elif self.input_type == "distance matrix":
self.complex.set_distances_from_range(X)
# Set vertex color
if self.color is None:
if self.input_type == "point cloud":
self.color = X[:,0]
elif self.input_type == "distance matrix":
self.color = X.max(axis=0)
else:
self.color = np.array(self.color)
self.complex.set_color_from_range(self.color)
# Set vertex cover
self.complex.set_cover_from_range(self.assignments)
# Compute simplex tree
self.complex.set_mask(self.min_points_per_node)
self.complex.find_simplices()
simplex_tree_ = self.complex.create_simplex_tree()
# Normalize vertex names of simplex tree
self.simplex_tree_ = SimplexTree()
idv, names = 0, {}
for v,_ in simplex_tree_.get_skeleton(0):
if len(self.complex.subpopulation(v[0])) > self.min_points_per_node:
names[v[0]] = idv
self.simplex_tree_.insert([idv])
idv += 1
for s,_ in simplex_tree_.get_simplices():
if len(s) >= 2 and np.all([len(self.complex.subpopulation(v)) > self.min_points_per_node for v in s]):
self.simplex_tree_.insert([names[v] for v in s])
# Store vertex info
self.node_info_ = {}
for v,_ in simplex_tree_.get_skeleton(0):
if len(self.complex.subpopulation(v[0])) > self.min_points_per_node:
node = names[v[0]]
pop = self.complex.subpopulation(v[0])
self.node_info_[node] = {"indices": pop, "size": len(pop), "colors": [self.complex.subcolor(v[0])]}
return self