Thursday, February 11, 2016

Movie Recommendations using Apache Spark (Python & Scala)


Here is code for Movie Recommendations for a given movie that I developed in both Python & Scala.  (This is Current Record of 54 Seconds to process 22 Million ratings dataset for 35K movies)

1. Set up Spark [ Link ]
2. Download movies, ratings data. (Extract to C:\SparkCourse)
3. Run Pyspark command : (For Scala skip to #5)
pyspark --packages com.databricks:spark-csv_2.11:1.3.0 --num-executors 5 --driver-memory 4g --executor-memory 4g
4. Copy/Paste code below on Pyspark shell window
sc.setLogLevel("WARN");

movies=sqlContext.read.format("com.databricks.spark.csv").options(delimiter=",").options(header="true").load("file:///SparkCourse/ml-latest/movies.csv");

ratings=sqlContext.read.format("com.databricks.spark.csv").options(delimiter=",").options(header="true").load("file:///SparkCourse/ml-latest/ratings.csv");

ratings.registerTempTable("ratings");
ratings.cache();

movies.registerTempTable("movies");
movies.cache();


usersWatched=sqlContext.sql("Select r.movieId from ratings r, ratings x where r.userId = x.userId and x.movieId = 2273");

usersWatched.registerTempTable("usersWatched");
usersWatched.cache();

results = sqlContext.sql("Select u.movieId, m.title, m.genres, count(*) cnt from usersWatched u, movies m where u.movieId = m.movieId group by u.movieId, m.title, m.genres Order by cnt desc");

results.show();


5. Same output can be achieved using Scala code.
 Run Spark-shell command :

spark-shell --packages com.databricks:spark-csv_2.11:1.3.0 --num-executors 5 --driver-memory 4g --executor-memory 4g

6.  Copy/Paste code below on Spark-shell window :-

sc.setLogLevel("WARN");

val movies=sqlContext.read.format("com.databricks.spark.csv").option("delimiter",",").option("header","true").load("file:///SparkCourse/ml-latest/movies.csv");

val ratings=sqlContext.read.format("com.databricks.spark.csv").option("delimiter",",").option("header","true").load("file:///SparkCourse/ml-latest/ratings.csv");

ratings.registerTempTable("ratings");
ratings.cache();

movies.registerTempTable("movies");
movies.cache();

val usersWatched=sqlContext.sql("Select r.movieId from ratings r, ratings x where r.userId = x.userId and x.movieId = 2273");

usersWatched.registerTempTable("usersWatched");
usersWatched.cache();

val results = sqlContext.sql("Select u.movieId, m.title, m.genres, count(*) cnt from usersWatched u, movies m where u.movieId = m.movieId group by u.movieId, m.title, m.genres Order by cnt desc");

results.show();


7. Movie Recommendations for Movie "Rush Hour" (movieId=2273) is displayed below. (You can select any movie id as input)






















Facts : 
    1. Movies dataset has approx 35,000 movies and Ratings dataset has 22 million rows.
    2. Executed on Windows 7 64 with Standalone Spark with Xeon Processor.
    3. Results show up after 1.15 minutes in Scala and approx 2:40 minutes in Python (For less popular movies it takes less execution time as ratings availability rows are less). Saving the DataFrame in Parquet file and then using it boosts performance further (54 seconds) which is record that I am trying to break further. If anyone breaks this record, Please comment in this post.
    5. SQL Context is used here to compute results in this example.

Saturday, February 6, 2016

Getting Started with Spark on Windows 7 (64 bit)


Lets get started on Apache Spark 1.6 on Windows 7 (64 Bit). [ Mac, Ubuntu, other OS steps are similar except winutils step that is only for Windows OS ]

-  Download and install Java  (Needs Java 1.7 or 1.8, Ignore if already installed)
-  Download & Install Anaconda Python 3.5+.  (Extract to C:\Anaconda3 or any folder )
-  Download Spark  ( Download 7-zip to unzip .gz files) : Extract to C:\BigData\Spark making sure that all 15 folders go under C:\BigData\Spark folder and not in long folder name with version number
-  Download winutils.exe  ( Put in C:\BigData\Hadoop\bin )  -- This is for 64-bit
-  Download Sample Data  (Extract to C:\BigData\Data)

1. Create Environment Variables :-
    SPARK_HOME : C:\BigData\Spark
    HADOOP_HOME : C:\BigData\Hadoop
    JAVA_HOME :  Make sure JAVA_HOME is Environment variable is defined and pointing to Java home directory.

2. Add to Environment variable PATH at end:
   ;%SPARK_HOME%\bin;%HADOOP_HOME%\bin;%JAVA_HOME%\bin;

3. Create folder  
    c:\tmp\hive
 
4. On Command prompt in Admin Mode : (One time only)
winutils.exe chmod -R 777 \tmp\hive

5. On command prompt in Adminstrator mode start Spark using :-
pyspark --packages com.databricks:spark-csv_2.11:1.3.0

[ If everything is set you should see Welcome to Spark version 1.6 using Python 3.5.1 message ]

6.  Type following commands on Pyspark shell :-
sc.setLogLevel("WARN")

movies=sqlContext.read.format("com.databricks.spark.csv").options(delimiter="|").options(header="false").load("file:///BigData/Data/ml-100k/u.item")

movies.registerTempTable("movies")

movies.cache()

movies.show()

ratings=sqlContext.read.format("com.databricks.spark.csv").options(delimiter="\t").options(header="false").load("file:///BigData/Data/ml-100k/u.data")

ratings.registerTempTable("ratings")

ratings.cache()

ratings.show()


# To view Ratings distribution :
ratingsDistribution=sqlContext.sql("Select c2 as ratings, count(*) as cnt from ratings group by c2")

TopMovies.show()

#Top Most Watched Movies :
TopMovies=sqlContext.sql("Select c1 as movieId, Count(*) as Cnt from ratings group by c1 Order by Cnt Desc")

TopMovies.show()



# Top Most Watched Movies by Name
TopMovieNames=sqlContext.sql("Select movies.c1 as MovieName, count(*) as Cnt from ratings, movies where ratings.c1=movies.c0 group by movies.c1 Order by Cnt desc")

TopMovieNames.show()
---------------------------------------------------------------
Some more troubleshooting commands/tips :-
---------------------------------------------------------------

sqlContext._get_hive_ctx()      - If this runs clean with no errors, then Winutils.exe version, HADOOP_HOME path etc is correct. 

sc.appName="myFirstApp"  - appName appears in Jobs, easy to differentiate

winutils.exe ls \tmp\hive  : This command on windows Command propmt will display access level to \tmp\hive folder. If you see any errors, then probably you may not have proper access to C:\ drive (especially work laptop that has restrictions to C:\ drive. In that case try to run winutils and pyspark command from D:\ prompt. If \tmp\hive permissions are not set properly, you may receive error like this ( java.lang.RuntimeException: java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS should be writable. Current permissions are: rw-rw-rw-)

type sc. and hit Tab key will list all commands, options. Same with sqlContext.

ratings.unpersist()   - to remove from Cache. ( ratings.cache() to put in Memory ) 

create C:\BigData\Spark\bin\myspark.bat with pyspark --packages com.databricks:spark-csv_2.11:1.3.0  in it. Next time just type myspark on command line to open pyspark with CSV package.

Download larger Movie/Ratings data sets to slice n dice data in different ways and evaluate performance (memory/cpu) implications when Data is cached vs not cached. Explore other related data sets at this Link