Space: Apache Mahout (https://cwiki.apache.org/confluence/display/MAHOUT)
Page: Mahout on Elastic MapReduce 
(https://cwiki.apache.org/confluence/display/MAHOUT/Mahout+on+Elastic+MapReduce)

Change Comment:
---------------------------------------------------------------------
Added process used to create vectors on a large document set for benchmarking 
clustering algorithms.

Edited by Timothy Potter:
---------------------------------------------------------------------
h1. Introduction

This page details the set of steps that was necessary to get an example of 
k-Means clustering running on Amazon's [Elastic 
MapReduce|http://aws.amazon.com/elasticmapreduce/] (EMR). 

Note: Some of this work is due in part to credits donated by Amazon Web 
Services Apache Projects Testing Program.

h1. Getting Started

   * Get yourself an EMR account.  If you're already using EC2, then you can do 
this from [Amazon's AWS Managment Console|https://console.aws.amazon.com/], 
which has a tab for running EMR.
   * Get the [ElasticFox|https://addons.mozilla.org/en-US/firefox/addon/11626] 
and [S3Fox|https://addons.mozilla.org/en-US/firefox/search?q=s3fox&cat=all] 
Firefox extensions.  These will make it easy to monitor running EMR instances, 
upload code and data, and download results.
   * Download the [Ruby command line client for 
EMR|http://developer.amazonwebservices.com/connect/entry.jspa?externalID=2264&categoryID=262].
  You can do things from the GUI, but when you're in the midst of trying to get 
something running, the CLI client will make life a lot easier.
   * Have a look at [Common Problems Running Job 
Flows|http://developer.amazonwebservices.com/connect/thread.jspa?messageID=124694&#124694]
 and [Developing and Debugging Job 
Flows|http://developer.amazonwebservices.com/connect/message.jspa?messageID=124695#124695]
 in the EMR forum at Amazon.  They were tremendously useful.
   * Make sure that you're up to date with the Mahout source.  The fix for 
[Issue 118|http://issues.apache.org/jira/browse/MAHOUT-118] is required to get 
things running when you're sending output to an S3 bucket.
   * Build the Mahout core and examples.

Note that the Hadoop that's running on EMR is version of Hadoop 0.20.0.  The 
EMR GUI in the AWS Management Console provides a number of examples of using 
EMR, and you might want to try running one of these to get started.

One big gotcha that I discovered is that the S3N file system for Hadoop has a 
couple of weird cases that boil down to the following advice:  if you're naming 
a directory in an s3n URI, make sure that it ends in a slash and you should not 
try to use a top-level S3 bucket name as the place where your Mahout output 
will be going, you should always include a subdirectory.

h1. Uploading Code and Data

I decided that I would use separate S3 buckets for the Mahout code, the input 
for the clustering (I used the synthetic control data, you can find it easily 
from the [Quickstart] page), and the output of the clustering.  

You will need to upload:
# The Mahout Job jar.  For the example here, we are using 
{{mahout-core-0.4-SNAPSHOT.job}}
# The data.  In this example, we uploaded two files: dictionary.txt and 
part-out.vec.  The latter is the main vector file and the former is the 
dictionary that maps words to columns.  It was created by converting a Lucene 
index to Mahout vectors.


h1. Running k-means Clustering

EMR offers two modes for running MapReduce jobs.  The first is a "streaming" 
mode where you provide the source for single-step mapper and reducer functions 
(you can use languages other than Java for this).  The second mode is called 
"Custom Jar" and it gives you full control over the job steps that will run.  
This is the mode that we need to use to run Mahout.  

In order to run in Custom Jar mode, you need to look at the example that you 
want to run and figure out the arguments that you need to provide to the job.  
Essentially, you need to know the command line that you would give to 
bin/hadoop in order to run the job, including whatever parameters the job needs 
to run.  

h2. Using the GUI

The EMR GUI is an easy way to start up a Custom Jar run, but it doesn't have 
the full functionality of the CLI.  Basically, you tell the GUI where in S3 the 
jar file is using a Hadoop s3n URI like 
{{s3n://PATH/mahout-core-0.4-SNAPSHOT.job}}.  The GUI will check and make sure 
that the given file exists, which is a nice sanity check.  You can then provide 
the arguments for the job just as you would on the command line.  The arguments 
for the k-means job that were as follows:

{noformat}
org.apache.mahout.clustering.kmeans.KMeansDriver --input 
s3n://news-vecs/part-out.vec --clusters s3n://news-vecs/kmeans/clusters-9-11/ 
-k 10 --output s3n://news-vecs/out-9-11/ --distanceMeasure 
org.apache.mahout.common.distance.CosineDistanceMeasure --convergenceDelta 
0.001 --overwrite --maxIter 50 --clustering
{noformat}

TODO: Screenshot

The main failing with the GUI mode is that you can only specify a single job to 
run, and you can't run another job in the same set of instances.  Recall that 
on AWS you pay for partial hours at the hourly rate, so if your job fails in 
the first 10 seconds, you pay for the full hour and if you try again, you're 
going to paying for another hour.

Because of this, using a command line interface (CLI) is strongly recommended.

h2. Using the CLI

If you're in development mode, and trying things out, EMR allows you to set up 
a set of instances and leave them running.  Once you've done this, you can add 
job steps to the set of instances as you like.  This solves the "10 second 
failure" problem that I described above and lets you get full value for your 
EMR dollar.  Amazon has pretty good [documentation for the 
CLI|http://docs.amazonwebservices.com/ElasticMapReduce/latest/DeveloperGuide/index.html?CHAP_RunningaJob.html],
 which you'll need to read to figure out how to do things like set up your AWS 
credentials for the EMR CLI.

You can start up a job flow that will keep running using an invocation like the 
following:

{noformat}
./elastic-mapreduce --create --alive \
   --log-uri s3n://PATH_FOR_LOGS/ --key_pair YOUR_KEY \
   --num-instances 2 --name NAME_HERE
{noformat}

Fill in the name, key pair and path for logs as appropriate. This call returns 
the name of the job flow, and you'll need that for subsequent calls to add 
steps to the job flow. You can, however, retrieve it at any time by calling:
{noformat}
./elastic-mapreduce --list
{noformat}

Let's list our job flows:

{noformat}
[stgreen@dhcp-ubur02-74-153 14:16:15 emr]$ ./elastic-mapreduce --list
j-3JB4UF7CQQ025     WAITING        ec2-174-129-90-97.compute-1.amazonaws.com    
kmeans
{noformat}

At this point, everything's started up, and it's waiting for us to add a step 
to the job.  When we started the job flow, we specified a key pair that we 
created earlier so that we can log into the master while the job flow is 
running:

{noformat}
 elastic-mapreduce --ssh -j j-3JB4UF7CQQ025
{noformat}

Let's add a step to run a job:

{noformat}
 elastic-mapreduce -j j-3JB4UF7CQQ025  --jar 
s3n://PATH/mahout-core-0.4-SNAPSHOT.job  --main-class 
org.apache.mahout.clustering.kmeans.KMeansDriver --arg --input --arg 
s3n://PATH/part-out.vec --arg --clusters --arg s3n://PATH/kmeans/clusters/ 
--arg -k --arg 10 --arg --output --arg s3n://PATH/out-9-11/ --arg 
--distanceMeasure --arg  
org.apache.mahout.common.distance.CosineDistanceMeasure --arg 
--convergenceDelta --arg 0.001 --arg --overwrite --arg --maxIter --arg 50 --arg 
--clustering
{noformat}

When you do this, the job flow goes into the {{RUNNING}} state for a while and 
then returns to {{WAITING}} once the step has finished.  You can use the CLI or 
the GUI to monitor the step while it runs.  Once you've finished with your job 
flow, you can shut it down the following way:

{noformat}
./elastic-mapreduce -j j-3JB4UF7CQQ025 --terminate
{noformat}

and go look in your S3 buckets to find your output and logs.


h1. Troubleshooting

The primary means for understanding what went wrong is via the logs and 
stderr/stdout.  When running on EMR, stderr and stdout are captured to files in 
your log directories.  Additionally, logging is setup to write out to a file 
called syslog.  To view these in the AWS Console, go to your logs directory, 
then the folder with the same JobFlow id as above (j-3JB4UF7CQQ025), then the 
steps folder and then the appropriate step number (usually 1 for this case).

That is, go to the folder s3n://PATH_TO_LOGS/j-3JB4UF7CQQ025/steps/1.  In this 
directory, you will find stdout, stderr, syslog and potentially a few other 
logs. 


See [resulting 
thread|http://developer.amazonwebservices.com/connect/thread.jspa?threadID=30945&tstart=15]
 for some early user experience with Mahout on EMR

h2. Building Vectors for Large Document Sets

Use the following steps as a guide to using Elastic MapReduce (EMR) to create 
sparse vectors needed for running Mahout clustering algorithms on large 
document sets. This section evolved from benchmarking Mahout's clustering 
algorithms using a large document set. Specifically, we used the ASF mail 
archives that have been parsed and converted to the Hadoop SequenceFile format 
(block-compressed) and saved to a public S3 folder: 
{{s3://asf-mail-archives/mahout-0.4/sequence-files}}. Overall, there are 
6,094,444 key-value pairs in 283 files taking around 5.7GB of disk.

h4. 1. Setup elastic-mapreduce-ruby

As discussed previously, make sure you install the *elastic-mapreduce-ruby* 
tool. On Debian-based Linux like Ubuntu, use the following commands to install 
elastic-mapreduce-ruby's dependencies:

{noformat}
apt-get install ruby1.8
apt-get install libopenssl-ruby1.8
apt-get install libruby1.8-extras
{noformat}

Once these dependencies are installed, download and extract the 
elastic-mapreduce-ruby application. We use {{/mnt/dev}} as the base working 
directory because this process was originally conducted on an EC2 instance; be 
sure to replace this path with the correct path for your environment as you 
work through these steps.

{noformat}
mkdir -p /mnt/dev/elastic-mapreduce /mnt/dev/downloads
cd /mnt/dev/downloads
wget http://elasticmapreduce.s3.amazonaws.com/elastic-mapreduce-ruby.zip
cd /mnt/dev/elastic-mapreduce
unzip /mnt/dev/downloads/elastic-mapreduce-ruby.zip
{noformat}

Please refer to [Amazon Elastic MapReduce Ruby 
Client|http://aws.amazon.com/developertools/2264?_encoding=UTF8&jiveRedirect=1] 
for a detailed explanation, but to get running quickly, all you need to do is 
create a file named {{credentials.json}} in the elastic-mapreduce directory, 
such as {{/mnt/dev/elastic-mapreduce/credentials.json}}. The credentials.json 
should contain the following information (change to match your environment):

{noformat}
{ 
  "access-id": "YOUR_ACCESS_KEY",
  "private-key": "YOUR_SECRET_KEY", 
  "key-pair": "gsg-keypair", 
  "key-pair-file": "/mnt/dev/aws/gsg-keypair.pem", 
  "region": "us-east-1", 
  "log-uri": "s3n://BUCKET/asf-mail-archives/logs/"
}
{noformat}
  
If you are confused about any of these parameters, please read: [Understanding 
Access Credentials for AWS/EC2|http://alestic.com/2009/11/ec2-credentials]. 
Also, it's a good idea to add the elastic-mapreduce directory to your PATH. To 
verify it is working correctly, simply do:

{noformat}
elastic-mapreduce --list
{noformat}

h4. 2. Setup s3cmd and Create a Bucket

It's also beneficial when working with EMR and S3 to install 
[s3cmd|http://s3tools.org/s3cmd], which helps you interact with S3 using easy 
to understand command-line options. To install on Ubuntu, simply do:

{noformat}
sudo apt-get install s3cmd
{noformat}

Once installed, configure s3cmd by doing:

{noformat}
s3cmd --configure
{noformat}

If you don't have an S3 bucket to work with, then please create one using:

{noformat}
s3cmd mb s3://BUCKET
{noformat}

Replace this bucket name in the remaining steps whenever you see 
{{s3://BUCKET}} in the steps below.

h4. 3. Launch EMR Cluster

Once elastic-mapreduce is installed, start a cluster with no jobflow steps:

{noformat}
elastic-mapreduce --create --alive \
  --log-uri s3n://BUCKET/emr/logs/ \
  --key-pair gsg-keypair \
  --slave-instance-type m1.xlarge \
  --master-instance-type m1.xlarge \
  --num-instances # \
  --name mahout-0.4-vectorize
{noformat}

This will create an EMR Job Flow named "mahout-0.4-vectorize" in the US-East 
region using EC2 xlarge instances. Take note of the Job ID returned as you will 
need it to add the "seq2sparse" step to the Job Flow. It can take a few minutes 
for the cluster to start; the job flow enters a "waiting" status when it is 
ready. We launch the EMR instances in the *us-east-1* region so that we don't 
incur data transfer charges to/from US-Standard S3 buckets (credentials.json => 
"region":"us-east-1").

When vectorizing large document sets, you need to distribute processing across 
as many reducers as possible. This also helps keep the size of the vector files 
more manageable. I'll leave it to you to decide how many instances to allocate, 
but keep in mind that one will be dedicated as the master (Hadoop NameNode). 
Also, it took about 75 minutes to run the seq2sparse job on 19 xlarge instances 
when using {{maxNGramSize=2}} (~190 normalized instance hours – not cheap). I 
think you'll be safe to use about 10-13 instances and still finish in under 2 
hours. Also, if you are not creating bi-grams, then you won't need as much 
horse-power; a four node cluster with 3 reducers per node is sufficient for 
generating vectors with {{maxNGramSize = 1}} in less than 30 minutes.

_Tip: Amazon provides a bootstrap action to configure the cluster for running 
memory intensive jobs. For more information about this, see: 
[http://buyitnw.appspot.com/forums.aws.amazon.com/ann.jspa?annID=834]_

h4. 4. Copy Mahout JAR to S3

The Mahout 0.4 JAR containing a custom Lucene Analyzer 
({{org.apache.mahout.text.MailArchivesClusteringAnalyzer}}) is available at:

{noformat}
s3://asf-mail-archives/mahout-0.4/mahout-examples-0.4-job-ext.jar 
{noformat}

The source code is available at 
[MAHOUT-588|https://issues.apache.org/jira/browse/MAHOUT-588].

If you need to use your own Mahout JAR, use s3cmd to copy it to your S3 bucket:

{noformat}
s3cmd put JAR_FILE s3://BUCKET/
{noformat}

h4. 5. Vectorize

Schedule a jobflow step to vectorize (1-grams only) using Mahout's seq2sparse 
job:

{noformat}
elastic-mapreduce --jar 
s3://asf-mail-archives/mahout-0.4/mahout-examples-0.4-job-ext.jar \
  --main-class org.apache.mahout.driver.MahoutDriver \
  --arg seq2sparse \
  --arg -i --arg s3n://asf-mail-archives/mahout-0.4/sequence-files/ \
  --arg -o --arg /asf-mail-archives/mahout-0.4/vectors/ \
  --arg --weight --arg tfidf \
  --arg --minSupport --arg 500 \
  --arg --maxDFPercent --arg 70 \
  --arg --norm --arg 2 \
  --arg --numReducers --arg # \
  --arg --analyzerName --arg 
org.apache.mahout.text.MailArchivesClusteringAnalyzer \
  --arg --maxNGramSize --arg 1 \
  -j JOB_ID
{noformat}

You need to determine the correct number of reducers based on the EC2 instance 
type and size of your cluster. For xlarge nodes, set the number of reducers to 
3 x N (where N is the size of your EMR cluster not counting the master node). 
For large instances, 2 reducers per node is probably safe unless your job is 
extremely CPU intensive, in which case use only 1 reducer per node.

Be sure to use Hadoop's *s3n* protocol for the input parameter ({{-i 
s3n://asf-mail-archives/mahout-0.4/sequence-files/}}) so that Mahout/Hadoop can 
find the SequenceFiles in S3. Also, notice that we've configured the job to 
send output to HDFS instead of S3. This is needed to work-around an issue with 
multi-step jobs and EMR (see 
[MAHOUT-598|https://issues.apache.org/jira/browse/MAHOUT-598]). Once the job 
completes, you can copy the results to S3 from the EMR cluster's HDFS using 
distcp.

The job shown above created 6,076,937 vectors with 20,444 dimensions in around 
28 minutes on a 4+1 node cluster of EC2 xlarge instances. Depending on the 
number of unique terms, setting maxNGramSize greater than 1 has a major impact 
on the execution time of the seq2sparse job. For example, the same job with 
maxNGramSize=2 can take up to 2 hours with the bulk of the time spent creating 
collocations, see 
[Collocations|https://cwiki.apache.org/MAHOUT/collocations.html].

To monitor the status of the job, use:

{noformat}
elastic-mapreduce --logs -j JOB_ID
{noformat}

h4. 6. Copy output from HDFS to S3 (optional)

It's a good idea to save the vectors for running future jobs. Of course, if you 
don't save the vectors to S3, then they will be lost when you terminate the EMR 
cluster. There are two approaches to moving data out of HDFS to S3:

# SSH into the master node to run distcp, or
# Add a jobflow step to run distcp

To login to the master node, use:

{noformat}
elastic-mapreduce --ssh -j JOB_ID
{noformat}

Once logged in, do:

{noformat}
hadoop distcp /asf-mail-archives/mahout-0.4/vectors/ 
s3n://ACCESS_KEY:SECRET_KEY@BUCKET/asf-mail-archives/mahout-0.4/vectors/ &
{noformat}

Or, you can just add another job flow step to do it:

{noformat}
elastic-mapreduce --jar s3://elasticmapreduce/samples/distcp/distcp.jar \
  --arg hdfs:///asf-mail-archives/mahout-0.4/vectors/ \
  --arg 
s3n://ACCESS_KEY:SECRET_KEY@BUCKET/asf-mail-archives/mahout-0.4/vectors/ \
  -j JOB_ID
{noformat}

_Note: You will need all the output from the vectorize step in order to run 
Mahout's clusterdump._

Once copied, if you would like to share your results with the Mahout community, 
make the vectors public in S3 using the Amazon console or s3cmd:

{noformat}
s3cmd setacl --acl-public --recursive 
s3://BUCKET/asf-mail-archives/mahout-0.4/vectors/
{noformat}

Dump out the size of the vectors:

{noformat}
bin/mahout vectordump --seqFile 
s3n://ACCESS_KEY:SECRET_KEY@BUCKET/asf-mail-archives/mahout-0.4/vectors/tfidf-vectors/part-r-00000
 --sizeOnly | more
{noformat}

h4. 7. k-Means Clustering

Now that you have vectors, you can do some clustering! The following command 
will create a new jobflow step to run the k-Means job using the TFIDF vectors 
produced by seq2sparse:

{noformat}
elastic-mapreduce --jar 
s3://asf-mail-archives/mahout-0.4/mahout-examples-0.4-job-ext.jar \
  --main-class org.apache.mahout.driver.MahoutDriver \
  --arg kmeans \
  --arg -i --arg /asf-mail-archives/mahout-0.4/vectors/tfidf-vectors/ \
  --arg -c --arg /asf-mail-archives/mahout-0.4/initial-clusters/ \
  --arg -o --arg /asf-mail-archives/mahout-0.4/kmeans-clusters \
  --arg -x --arg 10 \
  --arg -cd --arg 0.01 \
  --arg -k --arg 60 \
  --arg --distanceMeasure --arg 
org.apache.mahout.common.distance.CosineDistanceMeasure \
  -j JOB_ID
{noformat}

Depending on the EC2 instance type and size of your cluster, the k-Means job 
can take a couple of hours to complete. The input is the HDFS location of the 
vectors created by the seq2sparse job. If you copied the vectors to S3, then 
you could also use the s3n protocol. However, since I'm using the same EMR job 
flow, the vectors are already in HDFS, so there is no need to pull them from S3.

_Tip: use a convergenceDelta of 0.01 to ensure the clustering job performs more 
than one iteration._

h4. 8. Shut down your cluster

{noformat}
elastic-mapreduce --terminate -j JOB_ID
{noformat}

Verify the cluster is terminated in your Amazon console.

Change your notification preferences: 
https://cwiki.apache.org/confluence/users/viewnotifications.action

Reply via email to