Broken pipe on streaming job can lead to truncated output for a successful job
------------------------------------------------------------------------------

                 Key: MAPREDUCE-3790
                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-3790
             Project: Hadoop Map/Reduce
          Issue Type: Bug
          Components: contrib/streaming
    Affects Versions: 0.23.1, 0.24.0
            Reporter: Jason Lowe


If a streaming job doesn't consume all of its input then the job can be marked 
successful even though the job's output is truncated.

Here's a simple setup that can exhibit the problem.  Note that the job output 
will most likely be truncated compared to the same job run with a zero-length 
input file.

{code}
$ hdfs dfs -cat in
foo
$ yarn jar ./share/hadoop/tools/lib/hadoop-streaming-0.24.0-SNAPSHOT.jar 
-Dmapred.map.tasks=1 -Dmapred.reduce.tasks=1 -mapper /bin/env -reducer NONE 
-input in -output out
{code}

Examining the map task log shows this:

{code:title=Excerpt from map task stdout log}
2012-02-02 11:27:25,054 WARN [main] org.apache.hadoop.streaming.PipeMapRed: 
java.io.IOException: Broken pipe
2012-02-02 11:27:25,054 INFO [main] org.apache.hadoop.streaming.PipeMapRed: 
mapRedFinished
2012-02-02 11:27:25,056 WARN [Thread-12] 
org.apache.hadoop.streaming.PipeMapRed: java.io.IOException: Bad file descriptor
2012-02-02 11:27:25,124 INFO [main] org.apache.hadoop.mapred.Task: 
Task:attempt_1328203555769_0001_m_000000_0 is done. And is in the process of 
commiting
2012-02-02 11:27:25,127 WARN [Thread-11] 
org.apache.hadoop.streaming.PipeMapRed: java.io.IOException: DFSOutputStream is 
closed
2012-02-02 11:27:25,199 INFO [main] org.apache.hadoop.mapred.Task: Task 
attempt_1328203555769_0001_m_000000_0 is allowed to commit now
2012-02-02 11:27:25,225 INFO [main] 
org.apache.hadoop.mapred.FileOutputCommitter: Saved output of task 
'attempt_1328203555769_0001_m_000000_0' to 
hdfs://localhost:9000/user/somebody/out/_temporary/1
2012-02-02 11:27:27,834 INFO [main] org.apache.hadoop.mapred.Task: Task 
'attempt_1328203555769_0001_m_000000_0' done.
{code}

In PipeMapRed.mapRedFinished() we can see it will eat IOExceptions and return 
without waiting for the output threads or throwing a runtime exception to fail 
the job.  Net result is that the DFS streams could be shutdown too early if the 
output threads are still busy and we could lose job output.

Fixing this brings up the bigger question of what *should* happen when a 
streaming job doesn't consume all of its input.  Should we have grabbed all of 
the output from the job and still marked it successful or should we have failed 
the job?  If the former then we need to fix some other places in the code as 
well, since feeding a much larger input file (e.g.: 600K) to the same sample 
streaming job results in the job failing with the exception below.  It wouldn't 
be consistent to fail the job that doesn't consume a lot of input but pass the 
job that leaves just a few leftovers.

{code}
2012-02-02 10:29:37,220 INFO  mapreduce.Job (Job.java:monitorAndPrintJob(1270)) 
- Running job: job_1328200108174_0001
2012-02-02 10:29:44,354 INFO  mapreduce.Job (Job.java:monitorAndPrintJob(1291)) 
- Job job_1328200108174_0001 running in uber mode : false
2012-02-02 10:29:44,355 INFO  mapreduce.Job (Job.java:monitorAndPrintJob(1298)) 
-  map 0% reduce 0%
2012-02-02 10:29:46,394 INFO  mapreduce.Job (Job.java:printTaskEvents(1386)) - 
Task Id : attempt_1328200108174_0001_m_000000_0, Status : FAILED
Error: java.io.IOException: Broken pipe
        at java.io.FileOutputStream.writeBytes(Native Method)
        at java.io.FileOutputStream.write(FileOutputStream.java:282)
        at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105)
        at 
java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65)
        at java.io.BufferedOutputStream.write(BufferedOutputStream.java:109)
        at java.io.DataOutputStream.write(DataOutputStream.java:90)
        at 
org.apache.hadoop.streaming.io.TextInputWriter.writeUTF8(TextInputWriter.java:72)
        at 
org.apache.hadoop.streaming.io.TextInputWriter.writeValue(TextInputWriter.java:51)
        at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:106)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:394)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:329)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:147)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:396)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1177)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:142)
{code}

Assuming the job returns a successful exit code, I think we should allow the 
job to complete successfully even though it doesn't consume all of its inputs.  
Part of the reasoning is that there's already this comment in PipeMapper.java 
that implies we desire that behavior:

{code:title=PipeMapper.java}
        // terminate with success:
        // swallow input records although the stream processor failed/closed
{code}

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to