Source code for gudhi.representations.metrics

# This file is part of the Gudhi Library - https://gudhi.inria.fr/ - which is released under MIT.
# See file LICENSE or go to https://gudhi.inria.fr/licensing/ for full license details.
# Author(s):       Mathieu Carrière
#
# Copyright (C) 2018-2019 Inria
#
# Modification(s):
#   - YYYY/MM Author: Description of the modification

import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.metrics import pairwise_distances
try:
    from .. import bottleneck_distance
    USE_GUDHI = True
except ImportError:
    USE_GUDHI = False
    print("Gudhi built without CGAL: BottleneckDistance will return a null matrix")

#############################################
# Metrics ###################################
#############################################

[docs]class SlicedWassersteinDistance(BaseEstimator, TransformerMixin): """ This is a class for computing the sliced Wasserstein distance matrix from a list of persistence diagrams. The Sliced Wasserstein distance is computed by projecting the persistence diagrams onto lines, comparing the projections with the 1-norm, and finally integrating over all possible lines. See http://proceedings.mlr.press/v70/carriere17a.html for more details. """
[docs] def __init__(self, num_directions=10): """ Constructor for the SlicedWassersteinDistance class. Parameters: num_directions (int): number of lines evenly sampled from [-pi/2,pi/2] in order to approximate and speed up the distance computation (default 10). """ self.num_directions = num_directions thetas = np.linspace(-np.pi/2, np.pi/2, num=self.num_directions+1)[np.newaxis,:-1] self.lines_ = np.concatenate([np.cos(thetas), np.sin(thetas)], axis=0)
[docs] def fit(self, X, y=None): """ Fit the SlicedWassersteinDistance class on a list of persistence diagrams: persistence diagrams are projected onto the different lines. The diagrams themselves and their projections are then stored in numpy arrays, called **diagrams_** and **approx_diag_**. Parameters: X (list of n x 2 numpy arrays): input persistence diagrams. y (n x 1 array): persistence diagram labels (unused). """ self.diagrams_ = X self.approx_ = [np.matmul(X[i], self.lines_) for i in range(len(X))] diag_proj = (1./2) * np.ones((2,2)) self.approx_diag_ = [np.matmul(np.matmul(X[i], diag_proj), self.lines_) for i in range(len(X))] return self
[docs] def transform(self, X): """ Compute all sliced Wasserstein distances between the persistence diagrams that were stored after calling the fit() method, and a given list of (possibly different) persistence diagrams. Parameters: X (list of n x 2 numpy arrays): input persistence diagrams. Returns: numpy array of shape (number of diagrams in **diagrams**) x (number of diagrams in X): matrix of pairwise sliced Wasserstein distances. """ Xfit = np.zeros((len(X), len(self.approx_))) if len(self.diagrams_) == len(X) and np.all([np.array_equal(self.diagrams_[i], X[i]) for i in range(len(X))]): for i in range(len(self.approx_)): for j in range(i+1, len(self.approx_)): A = np.sort(np.concatenate([self.approx_[i], self.approx_diag_[j]], axis=0), axis=0) B = np.sort(np.concatenate([self.approx_[j], self.approx_diag_[i]], axis=0), axis=0) L1 = np.sum(np.abs(A-B), axis=0) Xfit[i,j] = np.mean(L1) Xfit[j,i] = Xfit[i,j] else: diag_proj = (1./2) * np.ones((2,2)) approx = [np.matmul(X[i], self.lines_) for i in range(len(X))] approx_diag = [np.matmul(np.matmul(X[i], diag_proj), self.lines_) for i in range(len(X))] for i in range(len(approx)): for j in range(len(self.approx_)): A = np.sort(np.concatenate([approx[i], self.approx_diag_[j]], axis=0), axis=0) B = np.sort(np.concatenate([self.approx_[j], approx_diag[i]], axis=0), axis=0) L1 = np.sum(np.abs(A-B), axis=0) Xfit[i,j] = np.mean(L1) return Xfit
[docs]class BottleneckDistance(BaseEstimator, TransformerMixin): """ This is a class for computing the bottleneck distance matrix from a list of persistence diagrams. """
[docs] def __init__(self, epsilon=None): """ Constructor for the BottleneckDistance class. Parameters: epsilon (double): absolute (additive) error tolerated on the distance (default is the smallest positive float), see :func:`gudhi.bottleneck_distance`. """ self.epsilon = epsilon
[docs] def fit(self, X, y=None): """ Fit the BottleneckDistance class on a list of persistence diagrams: persistence diagrams are stored in a numpy array called **diagrams**. Parameters: X (list of n x 2 numpy arrays): input persistence diagrams. y (n x 1 array): persistence diagram labels (unused). """ self.diagrams_ = X return self
[docs] def transform(self, X): """ Compute all bottleneck distances between the persistence diagrams that were stored after calling the fit() method, and a given list of (possibly different) persistence diagrams. Parameters: X (list of n x 2 numpy arrays): input persistence diagrams. Returns: numpy array of shape (number of diagrams in **diagrams**) x (number of diagrams in X): matrix of pairwise bottleneck distances. """ num_diag1 = len(X) #if len(self.diagrams_) == len(X) and np.all([np.array_equal(self.diagrams_[i], X[i]) for i in range(len(X))]): if X is self.diagrams_: matrix = np.zeros((num_diag1, num_diag1)) if USE_GUDHI: for i in range(num_diag1): for j in range(i+1, num_diag1): matrix[i,j] = bottleneck_distance(X[i], X[j], self.epsilon) matrix[j,i] = matrix[i,j] else: print("Gudhi built without CGAL: returning a null matrix") else: num_diag2 = len(self.diagrams_) matrix = np.zeros((num_diag1, num_diag2)) if USE_GUDHI: for i in range(num_diag1): for j in range(num_diag2): matrix[i,j] = bottleneck_distance(X[i], self.diagrams_[j], self.epsilon) else: print("Gudhi built without CGAL: returning a null matrix") Xfit = matrix return Xfit
[docs]class PersistenceFisherDistance(BaseEstimator, TransformerMixin): """ This is a class for computing the persistence Fisher distance matrix from a list of persistence diagrams. The persistence Fisher distance is obtained by computing the original Fisher distance between the probability distributions associated to the persistence diagrams given by convolving them with a Gaussian kernel. See http://papers.nips.cc/paper/8205-persistence-fisher-kernel-a-riemannian-manifold-kernel-for-persistence-diagrams for more details. """
[docs] def __init__(self, bandwidth=1., kernel_approx=None): """ Constructor for the PersistenceFisherDistance class. Parameters: bandwidth (double): bandwidth of the Gaussian kernel used to turn persistence diagrams into probability distributions (default 1.). kernel_approx (class): kernel approximation class used to speed up computation (default None). Common kernel approximations classes can be found in the scikit-learn library (such as RBFSampler for instance). """ self.bandwidth, self.kernel_approx = bandwidth, kernel_approx
[docs] def fit(self, X, y=None): """ Fit the PersistenceFisherDistance class on a list of persistence diagrams: persistence diagrams are stored in a numpy array called **diagrams** and the kernel approximation class (if not None) is applied on them. Parameters: X (list of n x 2 numpy arrays): input persistence diagrams. y (n x 1 array): persistence diagram labels (unused). """ self.diagrams_ = X projection = (1./2) * np.ones((2,2)) self.diagonal_projections_ = [np.matmul(X[i], projection) for i in range(len(X))] if self.kernel_approx is not None: self.approx_ = [self.kernel_approx.transform(X[i]) for i in range(len(X))] self.approx_diagonal_ = [self.kernel_approx.transform(self.diagonal_projections_[i]) for i in range(len(X))] return self
[docs] def transform(self, X): """ Compute all persistence Fisher distances between the persistence diagrams that were stored after calling the fit() method, and a given list of (possibly different) persistence diagrams. Parameters: X (list of n x 2 numpy arrays): input persistence diagrams. Returns: numpy array of shape (number of diagrams in **diagrams**) x (number of diagrams in X): matrix of pairwise persistence Fisher distances. """ Xfit = np.zeros((len(X), len(self.diagrams_))) if len(self.diagrams_) == len(X) and np.all([np.array_equal(self.diagrams_[i], X[i]) for i in range(len(X))]): for i in range(len(self.diagrams_)): for j in range(i+1, len(self.diagrams_)): if self.kernel_approx is not None: Z = np.concatenate([self.approx_[i], self.approx_diagonal_[i], self.approx_[j], self.approx_diagonal_[j]], axis=0) U, V = np.sum(np.concatenate([self.approx_[i], self.approx_diagonal_[j]], axis=0), axis=0), np.sum(np.concatenate([self.approx_[j], self.approx_diagonal_[i]], axis=0), axis=0) vectori, vectorj = np.abs(np.matmul(Z, U.T)), np.abs(np.matmul(Z, V.T)) vectori_sum, vectorj_sum = np.sum(vectori), np.sum(vectorj) if vectori_sum != 0: vectori = vectori/vectori_sum if vectorj_sum != 0: vectorj = vectorj/vectorj_sum Xfit[i,j] = np.arccos( min(np.dot(np.sqrt(vectori), np.sqrt(vectorj)), 1.) ) Xfit[j,i] = Xfit[i,j] else: Z = np.concatenate([self.diagrams_[i], self.diagonal_projections_[i], self.diagrams_[j], self.diagonal_projections_[j]], axis=0) U, V = np.concatenate([self.diagrams_[i], self.diagonal_projections_[j]], axis=0), np.concatenate([self.diagrams_[j], self.diagonal_projections_[i]], axis=0) vectori = np.sum(np.exp(-np.square(pairwise_distances(Z,U))/(2 * np.square(self.bandwidth)))/(self.bandwidth * np.sqrt(2*np.pi)), axis=1) vectorj = np.sum(np.exp(-np.square(pairwise_distances(Z,V))/(2 * np.square(self.bandwidth)))/(self.bandwidth * np.sqrt(2*np.pi)), axis=1) vectori_sum, vectorj_sum = np.sum(vectori), np.sum(vectorj) if vectori_sum != 0: vectori = vectori/vectori_sum if vectorj_sum != 0: vectorj = vectorj/vectorj_sum Xfit[i,j] = np.arccos( min(np.dot(np.sqrt(vectori), np.sqrt(vectorj)), 1.) ) Xfit[j,i] = Xfit[i,j] else: projection = (1./2) * np.ones((2,2)) diagonal_projections = [np.matmul(X[i], projection) for i in range(len(X))] if self.kernel_approx is not None: approx = [self.kernel_approx.transform(X[i]) for i in range(len(X))] approx_diagonal = [self.kernel_approx.transform(diagonal_projections[i]) for i in range(len(X))] for i in range(len(X)): for j in range(len(self.diagrams_)): if self.kernel_approx is not None: Z = np.concatenate([approx[i], approx_diagonal[i], self.approx_[j], self.approx_diagonal_[j]], axis=0) U, V = np.sum(np.concatenate([approx[i], self.approx_diagonal_[j]], axis=0), axis=0), np.sum(np.concatenate([self.approx_[j], approx_diagonal[i]], axis=0), axis=0) vectori, vectorj = np.abs(np.matmul(Z, U.T)), np.abs(np.matmul(Z, V.T)) vectori_sum, vectorj_sum = np.sum(vectori), np.sum(vectorj) if vectori_sum != 0: vectori = vectori/vectori_sum if vectorj_sum != 0: vectorj = vectorj/vectorj_sum Xfit[i,j] = np.arccos( min(np.dot(np.sqrt(vectori), np.sqrt(vectorj)), 1.) ) else: Z = np.concatenate([X[i], diagonal_projections[i], self.diagrams_[j], self.diagonal_projections_[j]], axis=0) U, V = np.concatenate([X[i], self.diagonal_projections_[j]], axis=0), np.concatenate([self.diagrams_[j], diagonal_projections[i]], axis=0) vectori = np.sum(np.exp(-np.square(pairwise_distances(Z,U))/(2 * np.square(self.bandwidth)))/(self.bandwidth * np.sqrt(2*np.pi)), axis=1) vectorj = np.sum(np.exp(-np.square(pairwise_distances(Z,V))/(2 * np.square(self.bandwidth)))/(self.bandwidth * np.sqrt(2*np.pi)), axis=1) vectori_sum, vectorj_sum = np.sum(vectori), np.sum(vectorj) if vectori_sum != 0: vectori = vectori/vectori_sum if vectorj_sum != 0: vectorj = vectorj/vectorj_sum Xfit[i,j] = np.arccos( min(np.dot(np.sqrt(vectori), np.sqrt(vectorj)), 1.) ) return Xfit