That's weird.  I thought I responded to this, but I don't see one on the list 
(and have vague recollection at best of whether I actually did 
respond)...anyway...

On Feb 3, 2011, at 6:41 PM, Allen Wittenauer wrote:

> On Feb 1, 2011, at 11:40 PM, Keith Wiley wrote:
> 
>> I would really appreciate any help people can offer on the following matters.
>> 
>> When running a streaming job, -D, -files, -libjars, and -archives don't seem 
>> work, but -jobconf, -file, -cacheFile, and -cacheArchive do.  With the first 
>> four parameters anywhere in command I always get a "Streaming Command 
>> Failed!" error.  The last four work though.  Note that some of those 
>> parameters (-files) do work when I a run a Hadoop job in the normal 
>> framework, just not when I specify the streaming jar.
> 
>       There are some issues with how the streaming jar processes the command 
> line, especially in 0.20, in that they need to be in the correct order.  In 
> general, the -D's need to be *before* the rest of the streaming params.  This 
> is what works for me:
> 
> hadoop  \
>        jar \
>         `ls $HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar` \
>        -Dmapred.reduce.tasks.speculative.execution=false \
>        -Dmapred.map.tasks.speculative.execution=false \
>        -Dmapred.job.name="oh noes aw is doing perl again" \
>        -input ${ATTEMPTIN} \
>        -output ${ATTEMPTOUT} \
>        -mapper map.pl \
>        -reducer reduce.pl  \
>        -file jobsvs-map1.pl \
>        -file jobsvs-reduce1.pl 

I'll give that a shot today.  Thanks.  I hate deprication warnings, they make 
me feel so guilty.

>> How do I force a single record (input file) to be processed by a single 
>> mapper to get maximum parallelism?

>> I don't understand exactly what that means and how to go about doing it.  In 
>> the normal Hadoop framework I have achieved this goal by setting 
>> mapred.max.split.size small enough that only one input record fits (about 
>> 6MBs), but I tried that with my streaming job ala "-jobconf 
>> mapred.max.split.size=X" where X is a very low number, about as many as a 
>> single streaming input record (which in the streaming case is not 6MB, but 
>> merely ~100 bytes, just a filename referenced ala -cacheFile), but it didn't 
>> work, it sent multiple records to each mapper anyway.
> 
>       What you actually want to do is set mapred.min.split.size set to an 
> extremely high value.  

I agree except that method I described helps force parallelism.  Setting 
mapred.max.split.size to a size slightly larger than a single record does a 
very good job of forcing 1-to-1 parallelism.  Forcing it to just larger than 
two records forces 2-to-1, etc.  It is very nice to be able to achieve perfect 
parallelism...but it didn't work with streaming.

I have since discovered that in the case of streaming, mapred.map.tasks is a 
good way to achieve this goal.  Ironically, if I recall correctly, this 
seemingly obvious method for setting the number mappers did not work so well in 
my original nonstreaming case, which is why I resorted to the rather contrived 
method of calculating and setting mapred.max.split.size instead.

>> Achieving 1-to-1 parallelism between map tasks, nodes, and input records is 
>> very import because my map tasks take a very long time to run, upwards of an 
>> hour.  I cannot have them queueing up on a small number of nodes while there 
>> are numerous unused nodes (task slots) available to be doing work.
> 
>       If all the task slots are in use, why would you care if they are 
> queueing up?  Also keep in mind that if a node fails, that work will need to 
> get re-done anyway.


Because all slots are not in use.  It's a very larger cluster and it's 
excruciating that Hadoop partially serializes a job by piling multiple map 
tasks onto a single map in a queue even when the cluster is massively 
underutilized.  This occurs when the input records are significantly smaller 
than the block size (6MB vs 64MB in my case, give me about a 32x serialization 
cost!!!).  To put it differently, if I let Hadoop do it its own stupid way, the 
job takes 32 times longer than it should take if it evenly distributed the map 
tasks across the nodes.  Packing the input files into larger sequence fils does 
not help with this problem.  The input splits are calculated from the 
individual files and thus, I still get this undesirable packing effect.

Thanks a lot.  Lots of stuff to think about in you post.  I appreciate it.

Cheers!

________________________________________________________________________________
Keith Wiley     kwi...@keithwiley.com     keithwiley.com    music.keithwiley.com

"It's a fine line between meticulous and obsessive-compulsive and a slippery
rope between obsessive-compulsive and debilitatingly slow."
                                           --  Keith Wiley
________________________________________________________________________________

Reply via email to