Run scikit-learn on hadoop via pig streaming

The problem

We have a SVM model trained by scikit-learn. We now wanted to use the model for classifying test-data on a hadoop cluster. The problem is that scikit-learn isn’t installed on the hadoop cluster, and installing it on all data-nodes could be quite an ordeal.

The solution

In order to run python code on the hadoop we use pig 0.11 pig streaming. For wrapping up the correct version of python (2.7) with all the modules we need (scikit-learn, pandas, numpy…) we use virtualenv. To zip and send this virtualenv to the entire hadoop cluster we use a streaming trick called streaming-archives. And finally, for the pig version of streaming-archives, we use mapred.cache.archives.

The final technical difficulty we must face is that numpy depends on 2 c libraries called blas and lapack (see http://www.openblas.net/) . We must ship them to the hadoop cluster as well.

*(thanks to Oded Argon and Lior Ebel for all their help!)

NEW: see here how Zach did the same (+ a bit more) in Ruby!

Install blas and lapack libraries

These are 2 c libraries required by numpy to do its magic. You can’t just yum-install them on the local machine, as you’ll need to ship them to the cluster later… (they need to be installed before you install numpy!)

Download them from http://www.openblas.net/
(test_scikit)[~]$ tar -xzvf xianyi-OpenBLAS-v0.2.12-0-g7e4e195.tar.gz
(test_scikit)[~]$ cd xianyi-OpenBLAS-48f06dd/
(test_scikit)[xianyi-OpenBLAS-48f06dd]$ make PREFIX=/x/home/ihadanny/blas-install install
(test_scikit)[~]$ ls blas-install/lib/
libopenblas.a libopenblas_nehalemp-r0.2.12.a libopenblas_nehalemp-r0.2.12.so libopenblas.so libopenblas.so.0

Now make sure numpy knows where’s blas source for its installation:
(test_scikit)[~]$ export BLAS_SRC=/x/home/ihadanny/blas-install/

and where are the libraries:
(test_scikit)[~]$ cat ~/.numpy-site.cfg
[openblas]
library_dirs = /x/home/ihadanny/blas-install/lib
(test_scikit)[~]$ export BLAS=/x/home/ihadanny/blas-install/lib/libopenblas.so
(test_scikit)[~]$ export LD_LIBRARY_PATH=/x/home/ihadanny/blas-install/lib/

build the virtualenv, install the modules

[test_scikit]$ source ~/virtualenvs/test_scikit/bin/activate
(test_scikit)[test_scikit]$ pip freeze
numpy==1.9.1
pandas==0.15.1
python-dateutil==2.2
pytz==2014.10
scikit-learn==0.15.2
scipy==0.14.0
six==1.8.0
wsgiref==0.1.2

Shipping blas to the hadoop cluster

We plan on sending the virtualenv, with its python and python modules to the hadoop cluster. What would be missing is the blas libraries, so let’s put them there:
(test_scikit)[~]$ cp -r ~/blas-install/ svm_poc/test_scikit/lib

Now zip everything and upload to HDFS

[test_scikit][test_scikit]$ zip -r ../test_scikit.zip *^C
[test_scikit][test_scikit]$ hadoop fs -put ../test_scikit.zip .

And the actual code:

(test_scikit4)[svm_poc]$ cat test_stream.pig
set mapred.create.symlink yes;
set mapred.cache.archives test_scikit.zip#demoenv;
DEFINE CMD `classify.sh` input(stdin using PigStreaming(',')) output(stdout using PigStreaming(',')) ship('classify.py','classify.sh','svm.pkl','vectorizer.pkl');
A = load 'svm_poc/test_charlie1.csv' using PigStorage(',');
B = stream A through CMD;
store B into 'my_output' using PigStorage(',', '-schema');


(test_scikit4)[svm_poc]$ cat classify.sh
export LD_LIBRARY_PATH=./demoenv/lib/blas-install/lib/
./demoenv/bin/python classify.py

Things to note:

  1. The mapred.cache.archives shows hadoop where’s our virtualenv and creates a name for it, demoenv
  2. The mapred.create.symlink is necessary for the above trick to work
  3. The ship clause ships the classify code itself classify.py, the 2 liner classify.sh and whatever it needs to work with – in our case it’s the pickled SVM model = svm.pkl – don’t rely on pig’s auto-ship, it didn’t work for us
  4. And for the final touch – the classify.sh notation directs streaming to use the blas and the python packaged in our zipped virtualenv, called demoenv

Classify.py sketch:

# read from sys.stdin into np.array called X
...

#Transform to TF-IDF vectors:
vectorizer = joblib.load(‘vectorizer.pkl’)
Xt = vectorizer.transform(X)

#Build a model:
svm_classifier = joblib.load(‘svm.pkl’)

#Predict:
Y = svm_classifier.predict(Xt)

for i in range(0,len(Y)):
print Y[i]

Now look at it go!

[svm_poc]$ pig11 test_stream.pig
2014-12-01 03:50:04,573 [main] INFO org.apache.pig.tools.pigstats.ScriptState - Pig features used in the script: STREAMING
Successfully read 1234 records (4666566 bytes) from: "svm_poc/test_charlie1.csv"
Output(s):
Successfully stored 1268 records (45626 bytes) in: "my_output"
2014-12-01 03:50:29,459 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!

5 thoughts on “Run scikit-learn on hadoop via pig streaming

Leave a comment