Thursday, February 11, 2016

Movie Recommendations using Apache Spark (Python & Scala)


Here is code for Movie Recommendations for a given movie that I developed in both Python & Scala.  (This is Current Record of 54 Seconds to process 22 Million ratings dataset for 35K movies)

1. Set up Spark [ Link ]
2. Download movies, ratings data. (Extract to C:\SparkCourse)
3. Run Pyspark command : (For Scala skip to #5)
pyspark --packages com.databricks:spark-csv_2.11:1.3.0 --num-executors 5 --driver-memory 4g --executor-memory 4g
4. Copy/Paste code below on Pyspark shell window
sc.setLogLevel("WARN");

movies=sqlContext.read.format("com.databricks.spark.csv").options(delimiter=",").options(header="true").load("file:///SparkCourse/ml-latest/movies.csv");

ratings=sqlContext.read.format("com.databricks.spark.csv").options(delimiter=",").options(header="true").load("file:///SparkCourse/ml-latest/ratings.csv");

ratings.registerTempTable("ratings");
ratings.cache();

movies.registerTempTable("movies");
movies.cache();


usersWatched=sqlContext.sql("Select r.movieId from ratings r, ratings x where r.userId = x.userId and x.movieId = 2273");

usersWatched.registerTempTable("usersWatched");
usersWatched.cache();

results = sqlContext.sql("Select u.movieId, m.title, m.genres, count(*) cnt from usersWatched u, movies m where u.movieId = m.movieId group by u.movieId, m.title, m.genres Order by cnt desc");

results.show();


5. Same output can be achieved using Scala code.
 Run Spark-shell command :

spark-shell --packages com.databricks:spark-csv_2.11:1.3.0 --num-executors 5 --driver-memory 4g --executor-memory 4g

6.  Copy/Paste code below on Spark-shell window :-

sc.setLogLevel("WARN");

val movies=sqlContext.read.format("com.databricks.spark.csv").option("delimiter",",").option("header","true").load("file:///SparkCourse/ml-latest/movies.csv");

val ratings=sqlContext.read.format("com.databricks.spark.csv").option("delimiter",",").option("header","true").load("file:///SparkCourse/ml-latest/ratings.csv");

ratings.registerTempTable("ratings");
ratings.cache();

movies.registerTempTable("movies");
movies.cache();

val usersWatched=sqlContext.sql("Select r.movieId from ratings r, ratings x where r.userId = x.userId and x.movieId = 2273");

usersWatched.registerTempTable("usersWatched");
usersWatched.cache();

val results = sqlContext.sql("Select u.movieId, m.title, m.genres, count(*) cnt from usersWatched u, movies m where u.movieId = m.movieId group by u.movieId, m.title, m.genres Order by cnt desc");

results.show();


7. Movie Recommendations for Movie "Rush Hour" (movieId=2273) is displayed below. (You can select any movie id as input)






















Facts : 
    1. Movies dataset has approx 35,000 movies and Ratings dataset has 22 million rows.
    2. Executed on Windows 7 64 with Standalone Spark with Xeon Processor.
    3. Results show up after 1.15 minutes in Scala and approx 2:40 minutes in Python (For less popular movies it takes less execution time as ratings availability rows are less). Saving the DataFrame in Parquet file and then using it boosts performance further (54 seconds) which is record that I am trying to break further. If anyone breaks this record, Please comment in this post.
    5. SQL Context is used here to compute results in this example.

No comments: