Source code for susi.SOMClassifier

"""SOMClassifier class."""

from typing import Optional, Sequence, Tuple, Union

import numpy as np
from scipy.special import softmax
from sklearn.base import ClassifierMixin
from sklearn.preprocessing import LabelBinarizer
from sklearn.utils import class_weight
from sklearn.utils.validation import check_array, check_is_fitted
from tqdm import tqdm

from .SOMEstimator import SOMEstimator
from .SOMUtils import check_estimation_input


[docs] class SOMClassifier(SOMEstimator, ClassifierMixin): """Supervised SOM for estimating discrete variables (= classification). Parameters ---------- n_rows : int, optional (default=10) Number of rows for the SOM grid n_columns : int, optional (default=10) Number of columns for the SOM grid init_mode_unsupervised : str, optional (default="random") Initialization mode of the unsupervised SOM init_mode_supervised : str, optional (default="majority") Initialization mode of the classification SOM n_iter_unsupervised : int, optional (default=1000) Number of iterations for the unsupervised SOM n_iter_supervised : int, optional (default=1000) Number of iterations for the classification SOM train_mode_unsupervised : str, optional (default="online") Training mode of the unsupervised SOM train_mode_supervised : str, optional (default="online") Training mode of the classification SOM neighborhood_mode_unsupervised : str, optional (default="linear") Neighborhood mode of the unsupervised SOM neighborhood_mode_supervised : str, optional (default="linear") Neighborhood mode of the classification SOM learn_mode_unsupervised : str, optional (default="min") Learning mode of the unsupervised SOM learn_mode_supervised : str, optional (default="min") Learning mode of the classification SOM distance_metric : str, optional (default="euclidean") Distance metric to compare on feature level (not SOM grid). Possible metrics: {"euclidean", "manhattan", "mahalanobis", "tanimoto", "spectralangle"}. Note that "tanimoto" tends to be slow. .. versionadded:: 1.1.1 Spectral angle metric. learning_rate_start : float, optional (default=0.5) Learning rate start value learning_rate_end : float, optional (default=0.05) Learning rate end value (only needed for some lr definitions) nbh_dist_weight_mode : str, optional (default="pseudo-gaussian") Formula of the neighborhood distance weight. Possible formulas are: {"pseudo-gaussian", "mexican-hat"}. missing_label_placeholder : int or str or None, optional (default=None) Label placeholder for datapoints with no label. This is needed for semi-supervised learning. do_class_weighting : bool, optional (default=True) If true, classes are weighted. n_jobs : int or None, optional (default=None) The number of jobs to run in parallel. random_state : int, RandomState instance or None, optional (default=None) If int, random_state is the seed used by the random number generator; If RandomState instance, random_state is the random number generator; If None, the random number generator is the RandomState instance used by `np.random`. verbose : int, optional (default=0) Controls the verbosity. Attributes ---------- node_list_ : np.ndarray of (int, int) tuples List of 2-dimensional coordinates of SOM nodes radius_max_ : float, int Maximum radius of the neighborhood function radius_min_ : float, int Minimum radius of the neighborhood function unsuper_som_ : np.ndarray Weight vectors of the unsupervised SOM shape = (self.n_rows, self.n_columns, X.shape[1]) X_ : np.ndarray Input data fitted_ : bool States if estimator is fitted to X max_iterations_ : int Maximum number of iterations for the current training bmus_ : list of (int, int) tuples List of best matching units (BMUs) of the dataset *X*. placeholder_dict_ : dict Dict of placeholders for initializing nodes without mapped class. n_features_in_ : int Number of input features in *X*. classes_ : np.ndarray Unique classes in the dataset labels *y*. class_counts_ : np.ndarray Number of datapoints per unique class in *y*. class_dtype_ : type Type of a label in *y*. """ def __init__( self, n_rows: int = 10, n_columns: int = 10, *, init_mode_unsupervised: str = "random", init_mode_supervised: str = "majority", n_iter_unsupervised: int = 1000, n_iter_supervised: int = 1000, train_mode_unsupervised: str = "online", train_mode_supervised: str = "online", neighborhood_mode_unsupervised: str = "linear", neighborhood_mode_supervised: str = "linear", learn_mode_unsupervised: str = "min", learn_mode_supervised: str = "min", distance_metric: str = "euclidean", learning_rate_start: float = 0.5, learning_rate_end: float = 0.05, nbh_dist_weight_mode: str = "pseudo-gaussian", missing_label_placeholder: Optional[Union[int, str]] = None, do_class_weighting: bool = True, n_jobs: Optional[int] = None, random_state=None, verbose: Optional[int] = 0, ) -> None: """Initialize SOMClassifier object.""" super().__init__( n_rows=n_rows, n_columns=n_columns, init_mode_unsupervised=init_mode_unsupervised, init_mode_supervised=init_mode_supervised, n_iter_unsupervised=n_iter_unsupervised, n_iter_supervised=n_iter_supervised, train_mode_unsupervised=train_mode_unsupervised, train_mode_supervised=train_mode_supervised, neighborhood_mode_unsupervised=neighborhood_mode_unsupervised, neighborhood_mode_supervised=neighborhood_mode_supervised, learn_mode_unsupervised=learn_mode_unsupervised, learn_mode_supervised=learn_mode_supervised, distance_metric=distance_metric, learning_rate_start=learning_rate_start, learning_rate_end=learning_rate_end, nbh_dist_weight_mode=nbh_dist_weight_mode, missing_label_placeholder=missing_label_placeholder, n_jobs=n_jobs, random_state=random_state, verbose=verbose, ) self.do_class_weighting = do_class_weighting def _init_super_som(self) -> None: """Initialize map.""" self.max_iterations_ = self.n_iter_supervised self.placeholder_dict_ = { "str": "PLACEHOLDER", "int": -999999, "float": -99.999, } # get class information self.classes_, self.class_counts_ = np.unique( self.y_[self.labeled_indices_], return_counts=True ) self.class_dtype_ = type(self.y_.flatten()[0]) self._set_placeholder() # check if forbidden class name exists in classes if self.placeholder_ in self.classes_: raise ValueError("Forbidden class:", self.placeholder_) if self.placeholder_ == self.missing_label_placeholder: raise ValueError( "Forbidden missing_label_placeholder:", self.missing_label_placeholder, ) # class weighting: if self.do_class_weighting: self.class_weights_ = class_weight.compute_class_weight( "balanced", classes=np.unique(self.y_[self.labeled_indices_]), y=self.y_[self.labeled_indices_].flatten(), ) else: self.class_weights_ = np.ones(shape=self.classes_.shape) # initialize classification SOM if self.init_mode_supervised == "majority": # define dtype if self.class_dtype_ in [str, np.str_]: init_dtype = "U" + str( len( max(np.unique(self.y_[self.labeled_indices_]), key=len) ) ) else: init_dtype = self.class_dtype_ som = np.empty((self.n_rows, self.n_columns, 1), dtype=init_dtype) for node in self.node_list_: dp_in_node = self.get_datapoints_from_node(node) # if no datapoint with label is mapped on this node: # node_class = self.placeholder_ node_class = np.random.choice( self.classes_, p=self.class_counts_ / np.sum(self.class_counts_), ) # if at least one datapoint with label is mapped to this node: if dp_in_node != []: y_in_node = self.y_.flatten()[dp_in_node] if not any(y_in_node == self.missing_label_placeholder): node_class = np.argmax( np.unique(y_in_node, return_counts=True)[1] ) som[node[0], node[1], 0] = node_class else: raise ValueError( "Invalid init_mode_supervised: " + str(self.init_mode_supervised) ) self.super_som_ = som def _set_placeholder(self) -> None: """Set placeholder depending on the class dtype. Raises ------ ValueError Raised if no placeholder defined for dtype of a class. """ if self.class_dtype_ in [str, np.str_]: self.placeholder_ = self.placeholder_dict_["str"] elif self.class_dtype_ in [int, np.uint8, np.int64]: self.placeholder_ = self.placeholder_dict_["int"] elif self.class_dtype_ in [float, np.float_, np.float64]: self.placeholder_ = self.placeholder_dict_["float"] else: raise ValueError( "No placeholder defined " + "for the dtype of the classes:", self.class_dtype_, )
[docs] def fit(self, X: Sequence, y: Optional[Sequence] = None): """Fit classification SOM to the input data. Parameters ---------- X : array-like matrix of shape = [n_samples, n_features] The prediction input samples. y : array-like matrix of shape = [n_samples, 1], optional The labels (ground truth) of the input samples Returns ------- self : object Examples -------- Load the SOM and fit it to your input data `X` and the labels `y` with: >>> import susi >>> som = susi.SOMClassifier() >>> som.fit(X, y) """ X, y = check_estimation_input(X, y, is_classification=True) self.X_: np.ndarray = X self.y_: np.ndarray = y self.n_features_in_ = self.X_.shape[1] return self._fit_estimator()
[docs] def predict_proba( self, X: Sequence, y: Optional[Sequence] = None ) -> np.ndarray: """Predict class probabilities for `X`. .. versionadded:: 1.1.3 Parameters ---------- X : array-like matrix of shape = [n_samples, n_features] The prediction input samples. y : array-like matrix of shape = [n_samples, 1], optional The labels (ground truth) of the input samples Returns ------- np.ndarray List of probabilities of shape (n_samples, n_classes) """ # Check is fit had been called check_is_fitted(self, ["X_", "y_"]) # Input validation X = check_array(X, dtype=np.float64) proba_list = [] for dp in tqdm(X, desc="predict", **self.tqdm_params_): _, proba = self._calc_estimation_output(dp, proba=True) proba_list.append(proba) # transform to numpy array return np.array(proba_list)
def _modify_weight_matrix_supervised( self, dist_weight_matrix: np.ndarray, true_vector: Optional[np.ndarray] = None, learning_rate: Optional[float] = None, ) -> np.ndarray: """Modify weight matrix of the SOM. Parameters ---------- dist_weight_matrix : np.ndarray of float Current distance weight of the SOM for the specific node learning_rate : float, optional Current learning rate of the SOM true_vector : np.ndarray Datapoint = one row of the dataset X Returns ------- new_matrix : np.ndarray Weight vector of the SOM after the modification Raises ------ ValueError Raised if *train_mode_supervised* is invalid. """ if self.train_mode_supervised == "online": # require valid values for true_vector and learning_rate if not isinstance(true_vector, np.ndarray) or not isinstance( learning_rate, float ): raise ValueError("Parameters required to be not None.") class_weight = self.class_weights_[ np.argwhere(self.classes_ == true_vector)[0, 0] ] change_class_bool = self._change_class_proba( learning_rate, dist_weight_matrix, class_weight ) different_classes_matrix = ( self.super_som_ != true_vector ).reshape((self.n_rows, self.n_columns, 1)) change_mask = np.multiply( change_class_bool, different_classes_matrix ) new_matrix = np.copy(self.super_som_) new_matrix[change_mask] = true_vector return new_matrix.reshape((self.n_rows, self.n_columns, 1)) if self.train_mode_supervised == "batch": # transform labels lb = LabelBinarizer() y_bin = lb.fit_transform(self.y_) # calculate numerator and divisor for the batch formula numerator = np.sum( [ np.multiply( y_bin[i], dist_weight_matrix[i].reshape( (self.n_rows, self.n_columns, 1) ), ) for i in self.labeled_indices_ ], axis=0, ) # update weights return lb.inverse_transform( softmax(numerator, axis=2).reshape( (self.n_rows * self.n_columns, y_bin.shape[1]) ) ).reshape((self.n_rows, self.n_columns, 1)) raise ValueError( f"Invalid train_mode_supervised: {self.train_mode_supervised}" ) def _change_class_proba( self, learning_rate: float, dist_weight_matrix: np.ndarray, class_weight: float, ) -> np.ndarray: """Calculate probability of changing class in a node. Parameters ---------- learning_rate : float Current learning rate of the SOM dist_weight_matrix : np.ndarray of float Current distance weight of the SOM for the specific node class_weight : float Weight of the class of the current datapoint Returns ------- change_class_bool : np.ndarray, shape = (n_rows, n_columns) Matrix with one boolean for each node on the SOM node. If true, the value of the respective SOM node gets changed. If false, the value of the respective SOM node stays the same. """ _change_class_proba = learning_rate * dist_weight_matrix _change_class_proba *= class_weight random_matrix = np.random.rand(self.n_rows, self.n_columns, 1) change_class_bool = random_matrix < _change_class_proba return change_class_bool def _calc_proba(self, bmu_pos: Tuple[int, int]) -> np.ndarray: """Calculate probability for `predict_proba()`. .. versionadded:: 1.1.3 Parameters ---------- bmu_pos : Tuple[int, int] BMU position on the SOM grid. Returns ------- proba : np.ndarray List of probabilities of shape (n_samples, n_classes) """ # find all nodes around the BMU nbh_nodes = self._get_node_neighbors(bmu_pos) # get node predictions nodes_predictions = [ self.super_som_[node[0], node[1]][0] for node in nbh_nodes ] # calculate weights (Exponent 3 is chosen to make the results # consistent with the current estimation while using radius=1. This can # be changed if we switch to a np.argmax(proba, axis=1) estimation # instead of a node-based estimation.) nbh_nodes_weights = ( np.divide(1, 1 + np.linalg.norm(nbh_nodes - bmu_pos, axis=1)) ** 3 ) # calculate probabilities proba = np.zeros(shape=self.classes_.shape) for prediction, weight in zip(nodes_predictions, nbh_nodes_weights): class_index = np.argwhere(self.classes_ == prediction)[0, 0] proba[class_index] += weight # normalize probabilities proba /= proba.sum() return proba