Collaborative Filtering is a technique used in recommender systems to make predictions about the interests of a user by collecting preferences for many users. The underlying assumption of the collaborative filtering approach is that if a person A has the same opinion as a person B on an issue, A is more likely to have B’s opinion on a different issue than that of a randomly chosen person. For instance, given the scenario below where users are rating movies from 1 to 5:
collaborative filtering can learn parameters for each user and movie and help us predict the rating User 1 would give to Movie 2 and Movie 5 and User 2 would give to movie 4. This could then in turn be used to recommend movies to a specific user.
Problem Specification
Our task will be to have the algorithm learn the the parameters for a user J: and for a movie i: in order to have the rating that user J gives to a movie i equal to:
Note: the number of features, m in the case above, is a hyperparameter which is chosen during validation. To make it easier to follow, I will choose a value of 2 going forward.
Going back to example where we had 5 movies and 3 users, the prediction of all the ratings is therefore:
and the actual ratings are:
Cost Function
The cost function will have to be calculated only for those combinations of movie/user where a rating is available. For this purpose, we define a matrix R made up of 0’s and 1’s. A 0 indicates a missing rating, a 1 indicates a provided rating. Continuing with the example above, R is equal to:
The cost function to minimize is the following:
where lambda is the regularization hyperparameter
In order to minimize the cost function, we need to calculate the partial derivatives for each of the parameters. After some simple calculus, it is possible to show that the derivatives with respect to the matrix is equal to:
and the one with respect to the matrix X is equal to:
when calculating the derivatives, we need to discard the combinations of users/movies with no rating provided once again.
Let’s start coding
Now that we have the math out of the way, let’s start writing the class.
__init__ method:
the init method will accept our 2 hyperparameters plus the maximum number of iteration when finding the parameters that minimize the cost function:
1 2 3 4 5 6 7 8 9 10 11 12 | def __init__(self, num_features:int = 2, regularization_param:float = 0.0, max_iter:int = 500, random_state:int=None): ''' PARAMTERS ----------------- num_features: the number of features regularization_parameter: the regularization parameters alpha max_iter: the number of iterations to ''' self._num_features = num_features self._regularization_param = regularization_param self._max_iter = max_iter self._random_state = random_state |
_initialize_thetas method:
this method initializes the thetas parameters with random values between 0 and 1:
1 2 3 4 5 6 7 8 9 | def _initialize_thetas(self, rows:int, cols:int): ''' DESCRIPTION ------------ initializes the parameters with random values between 0 and 1 and unrolls them into a 1 dimenional vector ''' thetas1 = np.random.rand(rows, self._num_features) # X thetas2 = np.random.rand(self._num_features, cols) # theta self._thetas = np.concatenate((thetas1.flatten(), thetas2.flatten())) |
_reshape_thetas method:
This method is used to reshape the 1 dimensional parameters vectors into their respective matrices
1 2 3 4 | def _reshape_thetas(self, thetas:np.ndarray, rows:int, cols:int): thetas1 = thetas[:rows * self._num_features].reshape(rows, self._num_features) thetas2 = thetas[rows * self._num_features:].reshape(self._num_features, cols) return thetas1, thetas2 |
_gradient method:
This method is used to calculate the partial derivatives for each of our parameters:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | def _gradient(self, thetas:np.ndarray, Y:np.ndarray, R:np.ndarray): ''' calculates the partial derivatives with respect to each the parameters and unrolls them into a 1 dimensional vector ''' rows, cols = Y.shape thetas1, thetas2 = self._reshape_thetas(thetas, rows, cols) predictions = np.matmul(thetas1, thetas2) prediction_error = (predictions - Y) * R thetas1_gradient = np.matmul(prediction_error, thetas2.T) thetas2_gradient = np.matmul(prediction_error.T, thetas1).T if self._regularization_param > 0: thetas1_gradient += thetas1 * self._regularization_param thetas2_gradient += thetas2 * self._regularization_param return np.concatenate((thetas1_gradient.flatten(), thetas2_gradient.flatten())) |
_cost method:
This method calculates the value of the cost function:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | def _cost(self, thetas:np.ndarray, Y:np.ndarray, R:np.ndarray): ''' calculates the value of the cost function ''' rows, cols = Y.shape thetas1, thetas2 = self._reshape_thetas(thetas, rows, cols) # calculate predictions predictions = np.matmul(thetas1, thetas2) # square errors mse = np.power(predictions - Y, 2) # only take into account those where R = 1 relevant_mse = np.sum(mse * R) # add regularization if provided if self._regularization_param > 0: relevant_mse += self._regularization_param * np.sum(np.power(thetas, 2)) relevant_mse *= 0.5 return relevant_mse |
fit method:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | def fit(self, Y:np.ndarray, R:np.ndarray): ''' PARAMETERS --------- Y: ratings matrix R: matrix of 1's (rating provided) or 0's (rating not provided) ''' self._Y = Y self._R = R # initialize thetas values self._rows, self._cols = Y.shape self._initialize_thetas(self._rows, self._cols) # store the means for each row, take into account only values for which R = 1 self._means = np.nanmean(np.where(R, Y, np.nan), axis=1, keepdims=True) # subtract the mean from Y # the mean is added back when doing predictions # this helps us deal with cases when a user has not rated any movie yet normalized_Y = np.where(R, Y - self._means, 0) # optimize optimized = minimize(fun=self._cost, x0=self._thetas, args=(normalized_Y, R), method='TNC', jac=self._gradient, options={'maxiter':self._max_iter, 'disp':True}) self._thetas = optimized.x |
predict method:
Used to predict the rating a certain user would give to a certain item
1 2 3 4 5 6 7 8 9 10 11 12 13 | def predict(self, iid:int, uid:int): ''' DESCRIPTION ------------ Returns the predicted rating that user uid gives to item iid PARAMETERS ------------- iid: index of the item to predict the rating for. Index is zero based uid: index of the user who rates item iid. Index is zero based ''' thetas1, thetas2 = self._reshape_thetas(self._thetas, self._rows, self._cols) return np.matmul(thetas1[iid, :], thetas2[:, uid] + self._means[iid]) |
recommended_items method:
Used to return the indexes (sorted in descending order) of the top items a certain user would rate the best
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | def recommended_items(self, uid:int): ''' DESCRIPTION ----------- Returns the sorted indexes (descending order) of the predicted ratings user uuid would give to items she has yet to rate ''' # reshape thetas thetas1, thetas2 = self._reshape_thetas(self._thetas, self._rows, self._cols) # predict ratings for the ith user predictions = np.where(self._R[:, [uid]], np.nan, np.matmul(thetas1, thetas2[:, [uid]]) + self._means) # total number of items user has not rated total_user_unrated_items = np.sum(self._R[:, uid] == 0) return np.argsort(-predictions, axis=0)[:total_user_unrated_items] |
compute_similarities method:
This method returns the cosine similarity matrix if either the users or the items. Each user or item is represented by a features vector of length ‘num_features‘ .
The cosine similarity is a measure of the cosine of the angle between two non zero vectors. It goes from -1 to 1. It is 1 when two vectors point in the same direction and -1 then they point in opposite directions.
The results of this method can be used to find out the users most similar to a certain user or the items most similar to a certain item
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | def compute_similarities(self, users_items:int = 0): ''' DESCRIPTION ----------- computes the cosine similarities matrix of either users or items PARAMETERS ----------- users_items: 0 if you want to return the cosine similarities of the items, 1 if you want to return the cosine similarities of the users ''' # reshape thetas thetas1, thetas2 = self._reshape_thetas(self._thetas, self._rows, self._cols) if users_items == 0: return cosine_similarity(thetas1) elif users_items == 1: return cosine_similarity(thetas2.T) else: raise ValueError('users_items must be either 0 (items) or 1 (users)') |
The whole class
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 | from sklearn.metrics.pairwise import cosine_similarity import numpy as np from scipy.optimize import minimize class collaborative_filtering(object): def __init__(self, num_features:int = 2, regularization_param:float = 0.0, max_iter:int = 500, random_state:int=None): ''' PARAMTERS ----------------- num_features: the number of features regularization_parameter: the regularization parameters alpha max_iter: the number of iterations to ''' self._num_features = num_features self._regularization_param = regularization_param self._max_iter = max_iter self._random_state = random_state def _initialize_thetas(self, rows:int, cols:int): ''' DESCRIPTION ------------ initializes the parameters with random values between 0 and 1 and unrolls them into a 1 dimenional vector ''' np.random.seed(self._random_state) thetas1 = np.random.rand(rows, self._num_features) # X np.random.seed(self._random_state) thetas2 = np.random.rand(self._num_features, cols) # theta self._thetas = np.concatenate((thetas1.flatten(), thetas2.flatten())) def _reshape_thetas(self, thetas:np.ndarray, rows:int, cols:int): thetas1 = thetas[:rows * self._num_features].reshape(rows, self._num_features) thetas2 = thetas[rows * self._num_features:].reshape(self._num_features, cols) return thetas1, thetas2 def _gradient(self, thetas:np.ndarray, Y:np.ndarray, R:np.ndarray): ''' calculates the partial derivatives with respect to each the parameters and unrolls them into a 1 dimensional vector ''' rows, cols = Y.shape thetas1, thetas2 = self._reshape_thetas(thetas, rows, cols) predictions = np.matmul(thetas1, thetas2) prediction_error = (predictions - Y) * R thetas1_gradient = np.matmul(prediction_error, thetas2.T) thetas2_gradient = np.matmul(prediction_error.T, thetas1).T if self._regularization_param > 0: thetas1_gradient += thetas1 * self._regularization_param thetas2_gradient += thetas2 * self._regularization_param return np.concatenate((thetas1_gradient.flatten(), thetas2_gradient.flatten())) def _cost(self, thetas:np.ndarray, Y:np.ndarray, R:np.ndarray): ''' calculates the value of the cost function ''' rows, cols = Y.shape thetas1, thetas2 = self._reshape_thetas(thetas, rows, cols) # calculate predictions predictions = np.matmul(thetas1, thetas2) # square errors mse = np.power(predictions - Y, 2) # only take into account those where R = 1 relevant_mse = np.sum(mse * R) # add regularization if provided if self._regularization_param > 0: relevant_mse += self._regularization_param * np.sum(np.power(thetas, 2)) relevant_mse *= 0.5 return relevant_mse def fit(self, Y:np.ndarray, R:np.ndarray): ''' PARAMETERS --------- Y: ratings matrix R: matrix of 1's (rating provided) or 0's (rating not provided) ''' self._R = R self._Y = Y # initialize thetas values self._rows, self._cols = Y.shape self._initialize_thetas(self._rows, self._cols) # store the means for each row, take into account only values for which R = 1 self._means = np.nanmean(np.where(R, Y, np.nan), axis=1, keepdims=True) # subtract the mean from Y # the mean is added back when doing predictions # this helps us deal with cases when a user has not rated any movie yet normalized_Y = np.where(R, Y - self._means, 0) # optimize optimized = minimize(fun=self._cost, x0=self._thetas, args=(normalized_Y, R), method='TNC', jac=self._gradient, options={'maxiter':self._max_iter, 'disp':True}) self._thetas = optimized.x def predict(self, iid:int, uid:int): ''' DESCRIPTION ------------ Returns the predicted rating that user uid gives to item iid PARAMETERS ------------- iid: index of the item to predict the rating for. Index is zero based uid: index of the user who rates item iid. Index is zero based ''' thetas1, thetas2 = self._reshape_thetas(self._thetas, self._rows, self._cols) return np.matmul(thetas1[iid, :], thetas2[:, uid]) + self._means[iid] def recommended_items(self, uid:int): ''' DESCRIPTION ----------- Returns the sorted indexes (descending order) of the predicted ratings user uuid would give to items she has yet to rate ''' # reshape thetas thetas1, thetas2 = self._reshape_thetas(self._thetas, self._rows, self._cols) # predict ratings for the ith user predictions = np.where(self._R[:, [uid]], np.nan, np.matmul(thetas1, thetas2[:, [uid]]) + self._means) # total number of items user has not rated total_user_unrated_items = np.sum(self._R[:, uid] == 0) return np.argsort(-predictions, axis=0)[:total_user_unrated_items] def compute_similarities(self, users_items:int = 0): ''' DESCRIPTION ----------- computes the cosine similarities matrix of either users or items PARAMETERS ----------- users_items: 0 if you want to return the cosine similarities of the items, 1 if you want to return the cosine similarities of the users ''' # reshape thetas thetas1, thetas2 = self._reshape_thetas(self._thetas, self._rows, self._cols) if users_items == 0: return cosine_similarity(thetas1) elif users_items == 1: return cosine_similarity(thetas2.T) else: raise ValueError('users_items must be either 0 (items) or 1 (users)') |
An example of how to use the class: