A vectorized implementation of Gaussian Mixture Model - EM algorithm

This notebook summarises the theory and vectorized implementation of a Gaussian Mixture Model using the EM algorithm.

Theory and formulas recap

Given a generative representation with the latent variable $\Delta_j^{(i)}$ which follows a multinomial distribution with parameter $\pi$. The parameter $\pi_j$ gives $p(\Delta^{(i)} = j)$

log likelihood

$$l(\theta ; \mathbf{X}) = \sum_{i=1}^N \log \ p(x_i, \Delta_i ; \theta) = \sum_{i=1}^N \log \ p(x_i | \Delta_i ) \ p(\Delta_i ) $$

$$ = \sum_{i=1}^N \log p(x_i | \Delta_i ; \mu, \Sigma) + \log p(\Delta_i ; \pi) $$

For each observation $i$ and Gaussian $j$ set the weight in the E step:

$$ w_j^{(i)} = p(\Delta^{(i)} = j | x^{(i)} ; \pi, \mu, \Sigma) = \frac{p(x^{(i)} | \Delta^{(i)} = j; \mu, \Sigma ) \ p(\Delta^{(i)} = j ; \pi) }{ \sum_{l = 1}^k p(x^{(i)} | \Delta^{(i)} = l; \mu, \Sigma ) \ p(\Delta^{(i)} = l ; \pi) }$$

Where

  • $p(x^{(i)} | \Delta^{(i)} = j; \mu, \Sigma )$ is given by evaluation the density of a Gaussian distribution with mean $\mu_j$ and covariance matrix $\Sigma_j$.
  • $p(\Delta^{(i)} = l ; \pi) $ is given by $\pi$

In the M step step update the parameters

\begin{aligned} \pi_j & := \frac{\sum_{i = 1}^N w_j^{(i)}}{N} \\ \mu_j & := \frac{ \sum_{i = 1}^N w_j^{(i)} x^{(i)} }{ \sum_{i = 1}^N w_j^{(i)}} \\ \Sigma_j & := \frac{\sum_{i = 1}^N w_j^{(i)} (x^{(i)} - \mu_j) (x^{(i)} - \mu_j)^T }{\sum_{i = 1}^N w_j^{(i)} } \end{aligned}

Note that the matrix $W$ is the matrix of responsibilities (or weights) where each entry corresponds to $w^i_j$

Libraries

In [1]:
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap
from matplotlib import patches

import numpy as np
import pandas as pd
import seaborn as sns
sns.set()

from scipy.stats import multivariate_normal

import warnings
warnings.filterwarnings('ignore')

Dataset

In [2]:
N = 300
np.random.seed(3)
m1, cov1 = [9, 8], [[.5, 1], [.25, 1]] ## first gaussian
data1 = np.random.multivariate_normal(m1, cov1, N)
label1 = np.ones(N)

m2, cov2 = [6, 13], [[.5, -.5], [-.5, .1]] ## second gaussian
data2 = np.random.multivariate_normal(m2, cov2, N)
label2 = np.ones(N) * 2    

m3, cov3 = [4, 7], [[0.25, 0.5], [-0.1, 0.5]] ## third gaussian
data3 = np.random.multivariate_normal(m3, cov3, N)
label3 = np.ones(N) * 3

X = np.vstack((data1,np.vstack((data2,data3))))
y = np.concatenate((label1,label2,label3))

plt.scatter(X[:,0],X[:,1], c = y, cmap = 'viridis')
plt.xlabel('X1'), plt.ylabel('X2')
plt.show()

Vectorized implementation of Gaussian Mixture Model (EM)

In [14]:
class custom_GaussianMixture(object):
    '''Gaussian Mixture Model - vectorized implementation'''
    
    def __init__(self, n_components = 3, max_iter = 100, tol = 0.001):
        self.n_components = n_components
        self.max_iter = max_iter
        self.tol = tol
        self.means_ = None
        self.covariances_ = None
        self.log_likelihoods = None
        
    
    def fit(self,X, y = None):

        n,d = X.shape  ## n = datapoints, d = features
        k = self.n_components  ##K number of clusters

        # randomly initialize the starting means
        mu = X[np.random.choice(n,k,replace = False)]

        # initialize a covariance matrix for each gaussian
        Sigma = [np.eye(d)] * k

        # initialize the probability for each gaussian pi
        pi = np.array([1 / k] * k)

        # initialize responsibility matrix: n points for each gaussian
        W = np.zeros((n,k))

        # initialize list of log-likelihoods
        log_likelihoods = []

        # lambda function for gaussian pdf
        P = lambda m ,s: multivariate_normal.pdf(X, mean = m, cov = s)
        
       #===============================================================#
    
        while len(log_likelihoods) < self.max_iter:

            # E step

            # nominator of responsibilities: j is the j-th gaussian
            for j in range(k):
                W[:, j] = pi[j] * P(mu[j], Sigma[j])

            # log likelihood computation (same as nominator of responsibilities)    
            l = np.sum(np.log(np.sum(W, axis = 1)))

            # store log likelihood in list
            log_likelihoods.append(l)

            # compute W matrix by dividing by denominator (the sum along j) 
            W = (W.T / W.sum(axis = 1)).T

            # sum of w^i entries along j (used for parameter updates)
            # these are the soft weighted number of datapoints belonging to each gaussian
            W_s = np.sum(W, axis = 0)


            # M step

            for j in range(k):

                ## Update means
                mu[j] = (1. / W_s[j]) * np.sum(W[:, j] * X.T, axis = 1).T 

                ## Update covariances
                Sigma[j] = ((W[:,j] * ((X - mu[j]).T)) @ (X - mu[j])) / W_s[j]

                ## Update probabilities of each gaussian
                pi[j] = W_s[j] / n

            # check for convergence
            if len(log_likelihoods) < 2: continue
            if np.abs(l - log_likelihoods[-2]) < self.tol: break

        self.means_ = mu
        self.covariances_ = Sigma
        self.log_likelihoods = log_likelihoods
        
    def predict_proba(self,x0):
        probs = np.array([ multivariate_normal.pdf(x0, mean = self.means_[j], cov = self.covariances_[j]) for j in range(self.n_components) ])
        return probs
    
    def predict(self,x0):
        probs = np.array([ multivariate_normal.pdf(x0, mean = self.means_[j], cov = self.covariances_[j]) for j in range(self.n_components) ])
        return np.argmax(probs, axis = 0)
        

Plotting the resulting boundaries

In [15]:
c_gmm = custom_GaussianMixture()
c_gmm.fit(X)
plot_decision_boundary(c_gmm, X, y, N = 10 , ax = None )
plt.title('EM-Gausian Mixture Model')
Out[15]:
Text(0.5,1,'EM-Gausian Mixture Model')

Optimization of the log likelihood

In [21]:
plt.plot(c_gmm.log_likelihoods)
plt.xlabel('# iterations')
plt.ylabel('# log likelihood')
plt.title('Log likelihood')
Out[21]:
Text(0.5,1,'Log likelihood')

Comparison to SKlearn version

In [18]:
from sklearn.mixture import GaussianMixture

sk_gmm = GaussianMixture(n_components= 3)
sk_gmm.fit(X)

plot_decision_boundary(sk_gmm, X, y, N = 10 , ax = None )
plt.title('Sklearn Gaussian Mixture Model')
Out[18]:
Text(0.5,1,'Sklearn Gaussian Mixture Model')

Plotting utility function

In [19]:
def plot_decision_boundary(classifier, X, y, N = 10 , ax = None ):
    '''Utility function to plot decision boundary and scatter plot of data'''
    x_min, x_max = X[:, 0].min() - .1, X[:, 0].max() + .1
    y_min, y_max = X[:, 1].min() - .1, X[:, 1].max() + .1
    xx, yy = np.meshgrid( np.linspace(x_min, x_max, N), np.linspace(y_min, y_max, N))
    classes = len(np.unique(y))
    
    #Check what methods are available
    if hasattr(classifier, "predict"):
        zz = np.array( [classifier.predict(np.array([xi,yi]).reshape(1,-1)) for  xi, yi in zip(np.ravel(xx), np.ravel(yy)) ] )
    elif hasattr(classifier, "predict_proba"):
        zz = np.array( [classifier.predict_proba(np.array([xi,yi]).reshape(1,-1))[:,1] for  xi, yi in zip(np.ravel(xx), np.ravel(yy)) ] )
    else :
        zz = np.array( [classifier(np.array([xi,yi]).reshape(1,-1)) for  xi, yi in zip(np.ravel(xx), np.ravel(yy)) ] )
            
    # reshape result and plot
    Z = zz.reshape(xx.shape)
    
    
    #Get current axis and plot
    if ax is None:
        ax = plt.gca()
    ax.contourf(xx, yy, Z, classes-1, cmap='viridis', alpha=.2)
    ax.contour(xx, yy, Z,  classes-1, cmap='viridis')
    ax.scatter(X[:,0],X[:,1],c = classifier.predict(X), cmap = 'viridis')
    ax.set_xlabel('$X_1$')
    ax.set_ylabel('$X_2$')