A vectorized implementation of Gaussian Mixture Model - EM algorithm¶
This notebook summarises the theory and vectorized implementation of a Gaussian Mixture Model using the EM algorithm.
Theory and formulas recap¶
Given a generative representation with the latent variable $\Delta_j^{(i)}$ which follows a multinomial distribution with parameter $\pi$. The parameter $\pi_j$ gives $p(\Delta^{(i)} = j)$
log likelihood
$$l(\theta ; \mathbf{X}) = \sum_{i=1}^N \log \ p(x_i, \Delta_i ; \theta) = \sum_{i=1}^N \log \ p(x_i | \Delta_i ) \ p(\Delta_i ) $$
$$ = \sum_{i=1}^N \log p(x_i | \Delta_i ; \mu, \Sigma) + \log p(\Delta_i ; \pi) $$
For each observation $i$ and Gaussian $j$ set the weight in the E step:
$$ w_j^{(i)} = p(\Delta^{(i)} = j | x^{(i)} ; \pi, \mu, \Sigma) = \frac{p(x^{(i)} | \Delta^{(i)} = j; \mu, \Sigma ) \ p(\Delta^{(i)} = j ; \pi) }{ \sum_{l = 1}^k p(x^{(i)} | \Delta^{(i)} = l; \mu, \Sigma ) \ p(\Delta^{(i)} = l ; \pi) }$$
Where
- $p(x^{(i)} | \Delta^{(i)} = j; \mu, \Sigma )$ is given by evaluation the density of a Gaussian distribution with mean $\mu_j$ and covariance matrix $\Sigma_j$.
- $p(\Delta^{(i)} = l ; \pi) $ is given by $\pi$
In the M step step update the parameters
\begin{aligned} \pi_j & := \frac{\sum_{i = 1}^N w_j^{(i)}}{N} \\ \mu_j & := \frac{ \sum_{i = 1}^N w_j^{(i)} x^{(i)} }{ \sum_{i = 1}^N w_j^{(i)}} \\ \Sigma_j & := \frac{\sum_{i = 1}^N w_j^{(i)} (x^{(i)} - \mu_j) (x^{(i)} - \mu_j)^T }{\sum_{i = 1}^N w_j^{(i)} } \end{aligned}
Note that the matrix $W$ is the matrix of responsibilities (or weights) where each entry corresponds to $w^i_j$
Libraries¶
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap
from matplotlib import patches
import numpy as np
import pandas as pd
import seaborn as sns
sns.set()
from scipy.stats import multivariate_normal
import warnings
warnings.filterwarnings('ignore')
Dataset¶
N = 300
np.random.seed(3)
m1, cov1 = [9, 8], [[.5, 1], [.25, 1]] ## first gaussian
data1 = np.random.multivariate_normal(m1, cov1, N)
label1 = np.ones(N)
m2, cov2 = [6, 13], [[.5, -.5], [-.5, .1]] ## second gaussian
data2 = np.random.multivariate_normal(m2, cov2, N)
label2 = np.ones(N) * 2
m3, cov3 = [4, 7], [[0.25, 0.5], [-0.1, 0.5]] ## third gaussian
data3 = np.random.multivariate_normal(m3, cov3, N)
label3 = np.ones(N) * 3
X = np.vstack((data1,np.vstack((data2,data3))))
y = np.concatenate((label1,label2,label3))
plt.scatter(X[:,0],X[:,1], c = y, cmap = 'viridis')
plt.xlabel('X1'), plt.ylabel('X2')
plt.show()
Vectorized implementation of Gaussian Mixture Model (EM)¶
class custom_GaussianMixture(object):
'''Gaussian Mixture Model - vectorized implementation'''
def __init__(self, n_components = 3, max_iter = 100, tol = 0.001):
self.n_components = n_components
self.max_iter = max_iter
self.tol = tol
self.means_ = None
self.covariances_ = None
self.log_likelihoods = None
def fit(self,X, y = None):
n,d = X.shape ## n = datapoints, d = features
k = self.n_components ##K number of clusters
# randomly initialize the starting means
mu = X[np.random.choice(n,k,replace = False)]
# initialize a covariance matrix for each gaussian
Sigma = [np.eye(d)] * k
# initialize the probability for each gaussian pi
pi = np.array([1 / k] * k)
# initialize responsibility matrix: n points for each gaussian
W = np.zeros((n,k))
# initialize list of log-likelihoods
log_likelihoods = []
# lambda function for gaussian pdf
P = lambda m ,s: multivariate_normal.pdf(X, mean = m, cov = s)
#===============================================================#
while len(log_likelihoods) < self.max_iter:
# E step
# nominator of responsibilities: j is the j-th gaussian
for j in range(k):
W[:, j] = pi[j] * P(mu[j], Sigma[j])
# log likelihood computation (same as nominator of responsibilities)
l = np.sum(np.log(np.sum(W, axis = 1)))
# store log likelihood in list
log_likelihoods.append(l)
# compute W matrix by dividing by denominator (the sum along j)
W = (W.T / W.sum(axis = 1)).T
# sum of w^i entries along j (used for parameter updates)
# these are the soft weighted number of datapoints belonging to each gaussian
W_s = np.sum(W, axis = 0)
# M step
for j in range(k):
## Update means
mu[j] = (1. / W_s[j]) * np.sum(W[:, j] * X.T, axis = 1).T
## Update covariances
Sigma[j] = ((W[:,j] * ((X - mu[j]).T)) @ (X - mu[j])) / W_s[j]
## Update probabilities of each gaussian
pi[j] = W_s[j] / n
# check for convergence
if len(log_likelihoods) < 2: continue
if np.abs(l - log_likelihoods[-2]) < self.tol: break
self.means_ = mu
self.covariances_ = Sigma
self.log_likelihoods = log_likelihoods
def predict_proba(self,x0):
probs = np.array([ multivariate_normal.pdf(x0, mean = self.means_[j], cov = self.covariances_[j]) for j in range(self.n_components) ])
return probs
def predict(self,x0):
probs = np.array([ multivariate_normal.pdf(x0, mean = self.means_[j], cov = self.covariances_[j]) for j in range(self.n_components) ])
return np.argmax(probs, axis = 0)
Plotting the resulting boundaries¶
c_gmm = custom_GaussianMixture()
c_gmm.fit(X)
plot_decision_boundary(c_gmm, X, y, N = 10 , ax = None )
plt.title('EM-Gausian Mixture Model')
Optimization of the log likelihood¶
plt.plot(c_gmm.log_likelihoods)
plt.xlabel('# iterations')
plt.ylabel('# log likelihood')
plt.title('Log likelihood')
Comparison to SKlearn version¶
from sklearn.mixture import GaussianMixture
sk_gmm = GaussianMixture(n_components= 3)
sk_gmm.fit(X)
plot_decision_boundary(sk_gmm, X, y, N = 10 , ax = None )
plt.title('Sklearn Gaussian Mixture Model')
Plotting utility function¶
def plot_decision_boundary(classifier, X, y, N = 10 , ax = None ):
'''Utility function to plot decision boundary and scatter plot of data'''
x_min, x_max = X[:, 0].min() - .1, X[:, 0].max() + .1
y_min, y_max = X[:, 1].min() - .1, X[:, 1].max() + .1
xx, yy = np.meshgrid( np.linspace(x_min, x_max, N), np.linspace(y_min, y_max, N))
classes = len(np.unique(y))
#Check what methods are available
if hasattr(classifier, "predict"):
zz = np.array( [classifier.predict(np.array([xi,yi]).reshape(1,-1)) for xi, yi in zip(np.ravel(xx), np.ravel(yy)) ] )
elif hasattr(classifier, "predict_proba"):
zz = np.array( [classifier.predict_proba(np.array([xi,yi]).reshape(1,-1))[:,1] for xi, yi in zip(np.ravel(xx), np.ravel(yy)) ] )
else :
zz = np.array( [classifier(np.array([xi,yi]).reshape(1,-1)) for xi, yi in zip(np.ravel(xx), np.ravel(yy)) ] )
# reshape result and plot
Z = zz.reshape(xx.shape)
#Get current axis and plot
if ax is None:
ax = plt.gca()
ax.contourf(xx, yy, Z, classes-1, cmap='viridis', alpha=.2)
ax.contour(xx, yy, Z, classes-1, cmap='viridis')
ax.scatter(X[:,0],X[:,1],c = classifier.predict(X), cmap = 'viridis')
ax.set_xlabel('$X_1$')
ax.set_ylabel('$X_2$')