Source code for nueramic_mathml.ml.classification

from __future__ import annotations

import sys
from typing import Callable

import numpy as np
import torch

if sys.version_info >= (3, 8):
    from typing import Literal
else:
    from typing_extensions import Literal

from .metrics import binary_classification_report, best_threshold, f_score


[docs]class BaseClassification(torch.nn.Module):
[docs] def __init__(self): super(BaseClassification, self).__init__()
[docs] def metrics_tab(self, x: torch.Tensor, y: torch.Tensor) -> dict: """ Returns metrics dict with recall, precision, accuracy, f1, auc roc scores :param x: training set :param y: target value :param: dict with recall, precision, accuracy, f1, auc roc scores :return: dict with 5 metrics """ y_prob: torch.Tensor = self.forward(x) try: y_pred: torch.Tensor = (y_prob > self.best_threshold) * 1 except AttributeError: y_pred = self.predict(x) y_prob: None = None return binary_classification_report(y, y_pred, y_prob)
[docs]class LogisticRegression(BaseClassification): """ Binary classification model Let :math:`x \\in \\mathbb{R}^{n \\times m}, \\ w \\in \\mathbb{R}^{m \\times 1}, \\ I = [1]_{n \\times 1}`, :math:`x_i` -- is a row and :math:`x_i \\in \\mathbb{R}^{1 \\times m}` Model: .. math:: \\mathbb{P}(y_i = 1 | w) = \\frac{1}{1 + \\exp (x_i \\cdot w + b)} """
[docs] def __init__(self, kernel: Literal['linear', 'perceptron'] = 'linear'): """ :param kernel: 'linear' or 'perceptron'. linear - basic logistic regression, perceptron - nn with 2 hidden layer with dim1 = 1024, dim2 = 512 """ super(LogisticRegression, self).__init__() self.sigmoid = torch.nn.Sigmoid() self.weights = None self.kernel = kernel if kernel not in ['linear', 'perceptron']: raise TypeError('Invalid kernel. Choose "linear" or "perceptron"') self.best_threshold = 0.5
[docs] def init_weights(self, x: torch.Tensor): """ Initialization weights :param x: input torch tensor """ if self.kernel == 'linear': self.weights = torch.nn.Linear(x.shape[1], 1) elif self.kernel == 'perceptron': self.weights = torch.nn.Sequential( torch.nn.Linear(x.shape[1], 1024), torch.nn.ReLU(), torch.nn.Linear(1024, 512), torch.nn.ReLU(), torch.nn.Linear(512, 1) )
[docs] def forward(self, x: torch.Tensor) -> torch.Tensor: """ Returns confidence probabilities of first class :param x: training set :return: probabilities """ if self.weights is None: self.init_weights(x) x = x.float() return self.sigmoid(self.weights(x))
[docs] def predict(self, x): """ Returns binary class 0 or 1 instead of probabilities :param x: some tensor with shape[1] = n_features :return: """ y_prob: torch.Tensor = self.forward(x).flatten() y_pred: torch.Tensor = (y_prob > self.best_threshold) * 1 return y_pred
[docs] def fit(self, x: torch.Tensor, y: torch.Tensor, epochs=1000, l1_lambda: float = 0, show_epoch: int = 0, print_function: Callable = print) -> torch.nn.Module: """ Returns trained model Logistic Regression :param x: training set :param y: target value :param epochs: max number of sgd implements :param l1_lambda: l1 regularization weight :param show_epoch: amount of showing epochs :param print_function: print or streamlit.write :return: trained model """ x = x.float() y = y.float() self.forward(x) print_epochs = np.unique(np.linspace(1, epochs, min(epochs, show_epoch), dtype=int)) optimizer = torch.optim.Adam(self.parameters()) loss = torch.nn.BCELoss() for epoch in range(1, epochs + 1): optimizer.zero_grad() output = loss(self.forward(x).flatten(), y.flatten()) if l1_lambda > 0: for layer in self.parameters(): output += l1_lambda * layer.data.sum() output.backward() optimizer.step() with torch.no_grad(): if epoch in print_epochs: print_function(f'Epoch: {epoch: 5d} | CrossEntropyLoss: {output.item(): 0.5f}') self.best_threshold = best_threshold(x, y, self, metric='f1') return self
[docs]class LogisticRegressionRBF(BaseClassification): """ This is a logistic regression, but before we make a basic linear prediction and apply the sigmoid, we transfer x to another space using radial basis functions. The dimension of this space depends on the basis matrix x (x_basis) [1]_ .. rubric:: Radial basis functions #. gaussian :math:`\\displaystyle \\varphi (x, x_b)=e^{-\\Vert x - x_b \\Vert^2}` #. linear :math:`\\varphi (x, x_b) = \\Vert x - x_b \\Vert` #. multiquadratic :math:`\\displaystyle \\varphi (x, x_b) = \\sqrt{1 + \\Vert x - x_b \\Vert^2}` .. rubric:: References .. [1] https://en.wikipedia.org/wiki/Radial_basis_function """
[docs] def __init__(self, x_basis: torch.Tensor, rbf: Literal['linear', 'gaussian', 'multiquadratic'] = 'gaussian'): """ :param x_basis: centers of basis functions :param rbf: type of rbf function. Available: ['linear', 'gaussian'] """ super(LogisticRegressionRBF, self).__init__() self.w = torch.nn.Linear(x_basis.shape[0], 1) self.rbf = rbf self.x_basis = x_basis self.sigmoid = torch.nn.Sigmoid() self.best_threshold = 0.5
[docs] def forward(self, x: torch.Tensor = None, phi_matrix: torch.Tensor = None) -> torch.Tensor: """ Returns a "probability" (confidence) of class 1 :param x: 2D array :param phi_matrix: 2D array :return: 1D array """ if phi_matrix is None: phi_matrix = self.make_phi_matrix(x) return self.sigmoid(self.w(phi_matrix))
[docs] def make_phi_matrix(self, x: torch.Tensor) -> torch.Tensor: """ Returns n x k array with calculated phi(x_i, x_basis_j). n is number of observation from x (x.shape[0]) k is number of basis from initialization. .. math:: \\begin{bmatrix} \\varphi(x_1, x^\\text{basis}_1) & \\varphi(x_1, x^\\text{basis}_2) & \\dots & \\varphi(x_1, x^\\text{basis}_k) \\\\ \\varphi(x_2, x^\\text{basis}_1) & \\varphi(x_2, x^\\text{basis}_2) & \\dots & \\varphi(x_2, x^\\text{basis}_k) \\\\ \\vdots & \\vdots & \\ddots & \\vdots \\\\ \\varphi(x_n, x^\\text{basis}_1) & \\varphi(x_n, x^\\text{basis}_2) & \\dots & \\varphi(x_n, x^\\text{basis}_k) \\ \\end{bmatrix} :param x: Array k x m dimensional. k different x_i and m features """ x = x.float() n = self.x_basis.shape[0] k = x.shape[0] repeated_input_x = torch.tile(x, (n, 1)) repeated_basis_x = torch.tile(self.x_basis, (1, k)) repeated_basis_x = torch.reshape(repeated_basis_x, repeated_input_x.shape) phi = ((repeated_input_x - repeated_basis_x) ** 2).sum(dim=1) phi = torch.reshape(phi, (n, k)).T if self.rbf == 'linear': phi = phi ** 0.5 phi = phi / phi.max() elif self.rbf == 'gaussian': phi = torch.exp(-phi) elif self.rbf == 'multiquadratic': phi = (1 + phi) ** 0.5 phi = phi / phi.max() return phi.float()
[docs] def predict(self, x): """ Returns binary class 0 or 1 instead of -1; 1 :param x: some tensor with shape[1] = n_features :return: """ y_prob: torch.Tensor = self.forward(x).flatten() y_pred: torch.Tensor = (y_prob > self.best_threshold) * 1 return y_pred
[docs] def fit(self, x: torch.Tensor, y: torch.Tensor, epochs=100, l1_lambda: float = 0, show_epoch: int = 0, print_function: Callable = print) -> torch.nn.Module: """ Returns trained model Logistic Regression with RBF :param x: training set :param y: target value :param epochs: max number of sgd implements :param l1_lambda: l1 regularization weight :param show_epoch: amount of showing epochs :param print_function: e.g. print or streamlit.write :return: trained model """ x = x.float() y = y.float() print_epochs = np.unique(np.linspace(1, epochs, min(epochs, show_epoch), dtype=int)) phi_matrix = self.make_phi_matrix(x) optimizer = torch.optim.Adam(self.parameters()) loss = torch.nn.BCELoss() for epoch in range(1, epochs + 1): optimizer.zero_grad() output = loss(self.forward(x, phi_matrix).flatten(), y.flatten()) if l1_lambda > 0.: for layer in self.parameters(): output += l1_lambda * layer.data.sum() output.backward() optimizer.step() with torch.no_grad(): if epoch in print_epochs: print_function(f'Epoch: {epoch: 5d} | CrossEntropyLoss: {output.item(): 0.5f}') self.best_threshold = best_threshold(x, y, self, metric='f1') return self
[docs]class SVM(BaseClassification): """ Binary classification model. Method predict: SVM.predict(x) --> original names Mathematical model: .. math:: \\hat y = \\operatorname{sign}(x \\cdot w - b \\cdot I) :math:`x \\in \\mathbb{R}^{n \\times m}, \\ w \\in \\mathbb{R}^{m \\times 1}, \\ I = [1]_{n \\times 1}` And search of best :math:`w, b` calculates by minimization of Hinge loss .. math:: {\\displaystyle \\lambda \\lVert \\mathbf {w} \\rVert ^{2}+\\left[{\\frac {1}{n}}\\sum _{i=1}^{n}\\max \\left(0,1-y_{i}(x_i \\cdot w - b)\\right)\\right] \\longrightarrow \\min } or PEGASOS algorithm :ivar scale: for the best training and prediction, the model will standard normalize the input x data. The first time you call model, std and mean will be saved and in the future use the parameters for scaling. x = (x is the average value) / std :ivar weights: parameters of model. Initialize after first calling """
[docs] def __init__(self): """ Initialization of SVM """ super(SVM, self).__init__() self.scale = None self.weights = None self.class_names = {0: -1, 1: 1} # the null class has the name -1, and the first class has the name 1
[docs] def init_weights(self, x: torch.Tensor): """ Initialization weights :param x: input torch tensor """ self.weights = torch.nn.Linear(x.shape[1], 1)
[docs] def forward(self, x: torch.Tensor) -> torch.Tensor: """ Returns x @ w + b .. math:: f(x) = w_0 + w_1 \\cdot x_1 + w_2 \\cdot x_2 + \\dots + w_m \\cdot x_m :param x: input observations, tensor n x m (n is the number of observations that have m parameters) :return: regression value (yes, no classification, for binary classes call predict) """ x = x.float() if self.weights is None: self.init_weights(x) return self.weights(x)
[docs] def scaler(self, x: torch.Tensor) -> torch.Tensor: """ Returns the scaled value of x. Standard x scaling and storing settings :param x: torch.Tensor :return: """ if self.scale is None: self.scale = (x.mean(), x.flatten().std()) return (x - self.scale[0]) / self.scale[1]
[docs] def fit(self, x: torch.Tensor, y: torch.Tensor, method: Literal['pegasos', 'sgd'] = 'sgd', epochs=100, lambda_reg: float = 0.1, show_epoch: int = 0, print_function: Callable = print): """ Returns trained model SVM :param x: training set :param y: target value. binary classes :param method: optimization method. Available PEGASOS or sgd :param epochs: max number of sgd and pegasos steps :param lambda_reg: l2 regularization weight :param show_epoch: amount of showing epochs :param print_function: print or streamlit.write :return: trained model """ y = y.flatten().float().round().int() uniq_y = torch.unique(y) assert torch.unique(y).shape[0] <= 2, 'binary classification support only' if 1 in uniq_y: self.class_names = {int(uniq_y[uniq_y != 1][0]): -1, int(uniq_y[uniq_y == 1][0]): 1} else: self.class_names = dict(zip(map(int, uniq_y), (-1, 1))) y_replaced = torch.zeros_like(y) for i in range(y_replaced.shape[0]): y_replaced[i] = self.class_names[int(y[i])] y = y_replaced.float() if method == 'pegasos': return self._fit_pegasos(x, y, epochs, lambda_reg, show_epoch, print_function) else: return self._fit_sgd(x, y, epochs, lambda_reg, show_epoch, print_function)
[docs] def _fit_sgd(self, x: torch.Tensor, y: torch.Tensor, epochs=500, l2_lambda: float = 0, show_epoch: int = 0, print_function: Callable = print): """ Returns trained model SVM :param x: training set :param y: target value :param epochs: max number of sgd implements :param l2_lambda: l2 regularization weight :param show_epoch: amount of showing epochs :param print_function: print or streamlit.write :return: trained model """ x = self.scaler(x.float()) self.forward(x) print_epochs = np.unique(np.linspace(1, epochs, min(epochs, show_epoch), dtype=int)) optimizer = torch.optim.Adam(self.parameters(), lr=1e-2, weight_decay=l2_lambda) loss = torch.nn.MarginRankingLoss(margin=1) # hinge loss if x2 = 0 and margin = 1 for epoch in range(1, epochs + 1): optimizer.zero_grad() output = loss(self.forward(x).flatten(), torch.tensor([0]), y.flatten()) output.backward() optimizer.step() with torch.no_grad(): if epoch in print_epochs: print_function(f'Epoch: {epoch: 5d} | HingeLoss: {output.item(): 0.5f}') return self
[docs] def _fit_pegasos(self, x: torch.Tensor, y: torch.Tensor, epochs=20, lambda_reg: float = 0.95, show_epoch: int = 0, print_function: Callable = print) -> torch.nn.Module: """ Returns trained model SVM [2]_ :param x: training set :param y: target value :param epochs: max number of sgd implements :param lambda_reg: regularization parameter :param show_epoch: amount of showing epochs :param print_function: print or streamlit.write :return: trained model .. rubric:: References .. [2] Pegasos: Primal Estimated sub-GrAdient SOlver for SVM. Shai Shalev-Shwartz; Yoram Singer; Nathan Srebro; Andrew Cotter """ x = self.scaler(x.float()) self.forward(x) print_epochs = np.unique(np.linspace(1, epochs, min(epochs, show_epoch), dtype=int)) weights = torch.zeros(x.shape[1]) bias = torch.zeros(1) t = 0 for epoch in range(1, epochs + 1): t += 1 eta = 1 / (lambda_reg * t) for j in torch.randint(0, x.shape[0], (min(100, x.shape[0]),)): if y[j] * (weights @ x[j] + bias) < 1: weights = (1 - eta * lambda_reg) * weights + eta * y[j] * x[j] bias = (1 - eta * lambda_reg) * bias + eta * y[j] * 1 else: weights = (1 - eta * lambda_reg) * weights bias = (1 - eta * lambda_reg) * bias weights = min(1, 1 / lambda_reg ** 0.5 / torch.concat([weights, bias]).norm(2)) * weights with torch.no_grad(): if epoch in print_epochs: print_function(f'Epoch: {epoch: 5d} | F1 score: {f_score(y, self.predict(x)): 0.5f}') params = list(self.weights.parameters()) params[0].data = weights.reshape(1, -1) params[1].data = bias return self
[docs] def predict(self, x): """ Returns binary class from the first call, in training or just call :param x: some tensor with shape[1] = n_features :return: """ x = self.scaler(x).float() y = torch.zeros(x.shape[0]) reversed_names = {**{0: 1}, **{v: u for u, v in self.class_names.items()}} for i, xi in enumerate(self.weights(x).sign().flatten().int()): y[i] = reversed_names[int(xi)] return y
if __name__ == '__main__': # from sklearn.datasets import make_blobs # from metrics import accuracy # torch.random.manual_seed(7) # _x, _y = make_blobs(1000, centers=2, random_state=8) # _x, _y = torch.tensor(_x), torch.tensor(_y) # m = SVM().fit(_x, _y) # print(accuracy(_y, m.predict(_x))) # m = SVM().fit(_x, _y, method='pegasos') # print(accuracy(_y, m.predict(_x))) # m = LogisticRegression().fit(_x, _y) # print(accuracy(_y, m.predict(_x))) # m = LogisticRegressionRBF(_x[:100]).fit(_x, _y) # print(accuracy(_y, m.predict(_x))) from sklearn.datasets import make_moons _x, _y = make_moons(10_000, noise=.1, random_state=84) _x, _y = torch.tensor(_x), torch.tensor(_y) logistic_model_rbf = LogisticRegressionRBF(_x[:1000]).fit(_x, _y, show_epoch=10) print(logistic_model_rbf.metrics_tab(_x, _y))