The purpose of this post is to explain how to use Apache Mahout to deploy a massively scalable, high throughput recommender system for a certain class of usecases.
I’ll describe the shape of usecases covered and give a step-by-step guide to setting up such a recommender system. Be aware that this is a guide intended for readers already familiar with Collaborative Filtering and recommender systems that are evaluating Mahout as a choice for building their production systems on. The focus is on making the right engineering decisions rather than on explaining algorithms here.
If you only want to learn about recommendation mining and try out Mahout, having a look at Mahout in Action might be a more suitable starting point.
Anatomy of covered usecases
If your usecase matches these conditions, then the techniques described in this article will be a good basis for your recommender system:
- the number of users is orders of magnitude larger than the number of items
- your users are browsing your site anonymously most of the time. More than 95% of all requests to your recommender system have to be answered with similar items to things the users are currently viewing, stuff they’ve put in a shopping cart or their history of “last seen items”
- only a tiny fraction of the users log in and need personalized recommendations
- your item churn rate is relatively low, items are available for weeks or months and it’s ok to have a waiting time of half a day or more until new items are included in the recommendations
Which scenarios or business models do meet these conditions? I’d say most e-commerce sites and a lot of the video portals have similar requirements, so keep those in mind when we continue.
Here are some typical numbers for scenarios that the setup I’ll be describing shortly is able to handle:
- millions of users
- hundreds of thousands of items
- several 100 million user-item-interactions (so called preferences)
- more than 100 requests per second to the recommender system
We’ll use a very common neighborhood-based method called Itembased Collaborative Filtering , which also forms the basis for the recommender systems deployed by Amazon and Youtube. Our data will very likely come from implicit feedback of the users such as viewed videos, purchased products, shopping carts or other signals depending on your usecase. Inform your users what data you record and use, this will not only provide transparency for privacy questions, users are also seen to adopt transparent systems more than systems they don’t understand.
The preferences which are our input data consist of simple user-item-pairs where each pair expresses that a user likes an item. From the hundreds of millions of those we need to compute a matrix of item-item-similarities (where similarity does not refer to content but to behavior!) The item-similarities will be used to answer questions like “users who like this, also like …” and are furthermore necessary for computing personalized recommendations.
These item-similarities are relatively static and it has been shown that it’s sufficient to precompute the 20 to 50 most similar items per item offline from time to time.
Have Hadoop do the heavy lifting
We will use Mahout’s ItemSimilarityJob to precompute the item-item-similarities with Apache Hadoop. If you don’t have a cluster of your own, you can temporarily rent the necessary infrastructure from Amazons ElasticMapReduce service.
The preferences need to be available in HDFS or S3 in a simple text format where each line represents a single preference in the form
You need to pick a similarity measure (loglikelihood from  is a good choice for implicit feedback) as well as the maximum number of similar items per item to compute:
The resulting file contains the pairwise similarities in the form:
A small hint: Decrease the number given in the parameter maxCooccurrencesPerItem or sample down users with an extreme number of preferences if the computation takes too long.
If you run the computation on ElasticMapReduce, consider using the excellent Python library boto that let’s you easily upload data and start Hadoop jobs in the Amazon cloud.
Setting up the infrastructure for the live recommender system
We have a way to precompute the item-similarities now, so let’s continue to plan the setup for the live recommender machines.
The first thing we need is a datastore for the preference data and the item-similarities. Mahout supports MySQL out of the box , so we’ll use this in this example but it is not to hard to create customized implementations for other datastores if you’re no MySQL fan.
The preference data will be managed by MySQLBooleanPrefJDBCDataModel, you need to create the table as described in the JavaDoc and insert your data.
CREATE TABLE taste_preferences (
user_id BIGINT NOT NULL,
item_id BIGINT NOT NULL,
PRIMARY KEY (user_id, item_id),
The item-similarities will also reside in the database and will be managed by Mahout’s MySQLJDBCInMemoryItemSimilarity, create the table for this and make sure it always holds the latest result of the Hadoop computation.
CREATE TABLE taste_item_similarity (
item_id_a BIGINT NOT NULL,
item_id_b BIGINT NOT NULL,
similarity FLOAT NOT NULL,
PRIMARY KEY (item_id_a, item_id_b));
This page has some hints about configuring MySQL for usage with Mahout.
Putting the puzzle together
Pick the servlet container and webframework of your choice and create a small java application for the recommender system. To get a minimal working recommender, we just need to instantiate and put together a handful of objects as described here:
DataSource datasource = ...
DataModel dataModel = new MySQLBooleanPrefJDBCDataModel(datasource);
ItemSimilarity similarity = new MySQLJDBCInMemoryItemSimilarity(datasource);
AllSimilarItemsCandidateItemsStrategy candidateStrategy =
ItemBasedRecommender recommender = new GenericItemBasedRecommender(dataModel,
similarity, candidateStrategy, candidateStrategy);
What components do we exactly use here?
We also create an implementation of ItemSimilarity, the one we use will load all the precomputed similarities from taste_item_similarity into memory on application start.
In the last two lines we instantiate an implementation of CandidateItemsStrategy and MostSimilarItemsCandidateItemsStrategy that decides how to find the initial set of items that could possibly be recommended and use all components to setup our ItembasedRecommender.
As we already said at the beginning more than 95% of requests to the recommender system will go to recommender.mostSimilarItems(…) so we have to make sure these calls are blazingly fast.
Fortunately we don’t have to do anything for that because MySQLJDBCInMemoryItemSimilarity has already loaded all the item similarities into memory and AllSimilarItemsCandidateItemsStrategy ensures that all possibly similar items are read only from this in-memory representation. So there is not a single database call or disk access required for most-similar-items computations.
As the system is only doing some memory lookups, a most-similar-items computation should be done in milliseconds. If you have some complicated application code wrapped around your recommender or make use of a Rescorer that applies filters that might take some time, you could also put a simple cache like ehcache in front of your recommender application.
If you benchmark your application, a single machine should easily be able to handle 100 most-similar-items requests per second with this setup.
Recommendations for logged-in users (calls to recommender.recommend(…)) only account for a tiny fraction of our overall requests. We didn’t want to have our hundreds of millions of preferences lie around in RAM, so we moved them to the database. In order to compute personalized recommendations we need to make exactly one database call that fetches the users preferences from MySQL, the rest of the computation will again only be memory lookups.
Typically the time for computing the recommendations for one user will be dominated by the call to the database over the network, yet the whole process should not take longer than 50 or 100 ms overall.
Incorporating new data
New preferences as well as new users can simply be added to the table taste_preferences in realtime, so personalized recommendations will always be up to date.
New items will however only be visible after the table taste_item_similarity has been updated with a result of the Hadoop computation that included interaction data for that item. Therefore the inclusion time for new items depends on how fast you can gather interaction data for those new items and how often you perform the offline computation. As stated in the beginning your usecase should allow one or two days for that.
Some final thoughts on scalability
What exactly constitutes the scalability of this setup? Let’s have a look at the building blocks of the recommender system and discuss what can be done to handle more data or more requests.
- Increase of data in the similarity computation
This is pretty much a no-brainer as Hadoop already offers the possibility to add more machines to decrease running time. ItemSimilarityJob should scale linearly, though in some cases users with an extreme number of preferences need to be sampled down.
- Increase of requests to the live machines
To handle increasing traffic to the live recommenders, simply add more machines. The live recommenders are stateless so you can just loadbalance the traffic to a pool of them.
- Increase of data and users
The user preferences need to be stored in the database in a way that the preferences of a single user can be retrieved very fast as a whole. A single MySQL machine should be able to hold lots of this data. If the data does not fit on a single machine anymore, it shouldn’t be to hard to partition the table across several machines and create a custom DataModel for that.
- Increase of items
The bottleneck of this setup is certainly the number of items as the whole item-similarity-matrix has to fit in the memory of each live recommender machine and cannot be partitioned. However as we only have to store something like the 20 to 50 most similar items per item, the number of items has to go up high in the millions until that matrix takes more memory than available in todays lower-end server machines.