[ 
https://issues.apache.org/jira/browse/MAPREDUCE-4815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14333075#comment-14333075
 ] 

Gera Shegalov commented on MAPREDUCE-4815:
------------------------------------------

Thanks for v12, [~l201514]!

*output.FileOutputCommitter*:
nits:
in {{recoverTask}} we should add an info message about upgrade in the block 
because it may help debugging and it's a one-off situation.
{code}
          // essentially a no-op, but for backwards compatibility
          // after upgrade to the new fileOutputCommitter,
          // check if there are any output left in committedTaskPath
{code}

On the other hand we should probably suppress
{code}
 LOG.warn(attemptId + " had no output to recover.");
{code}
by enclosing it in  {{if (algorithmVersion == 1)}} because for v2 it's a normal 
situation and does not deserve a warning.

*output.TestFileOutputCommitter*

{{testRecoveryInternal}} needs to take two versions, commitVersion, 
recoveryVersion.

Such that we can have the following tests: 
testRecoveryV1 aka testRecoveryInternal(1, 1), and testRecoveryV2 
testRecoveryInternal(2,2) which you already have. 
However, we also need testRecoveryUpgradeV1V2: testRecoveryInternal(1, 2)

We can have the following validation after commitTask
{code}
    committer.commitTask(tContext);

    Path jobTempDir1 = committer.getCommittedTaskPath(tContext);
    File jtd = new File(jobTempDir1.toUri().getPath());
    if (commitVersion == 1) {
      assertTrue("Version 1 commits to temporary dir " + jtd, jtd.exists());
      validateContent(jtd);
    } else {
      assertFalse("Version 2 commits to output dir " + jtd, jtd.exists());
    }
{code}

and after recoverTask where we have 
{code}
    conf2.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
        recoveryVerion);
{code}
we can check:
{code}
    if (recoveryVerion == 1) {
      assertTrue("Version 1 recovers to " + jtd2, jtd2.exists());
      validateContent(jtd2);
    } else {
      assertFalse("Version 2 commits to output dir " + jtd2, jtd2.exists());
      if (commitVersion == 1) {
        assertTrue("Version 2  recovery moves to output dir from "  + jtd , 
jtd.list().length == 0);
      }
    }
{code}





{{testFailAbortInternal}} does not set the version passed as a parameter.

nit: throughout the code:
{code}
    
conf.setInt(org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
 version);
{code}

should simply be
{code}
 conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, 
version);
{code}
as the test is in the same package as FOC.

in {{testInvalidVersionNumber}}
do we need 
{code}
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
{code}
Similarly, since the variable {{committer} is not used, it would suffice to 
invoke the constructor without assigning the object to any variable.
{code} 
new FileOutputCommitter(outDir, tContext);
{code}

*mapred.TestFileOutputCommitter*
testMapOnlyNoOutputV1 and testMapOnlyNoOutputV2 are still needed for 
completeness

Adjust testRecovery as above.



> FileOutputCommitter.commitJob can be very slow for jobs with many output files
> ------------------------------------------------------------------------------
>
>                 Key: MAPREDUCE-4815
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-4815
>             Project: Hadoop Map/Reduce
>          Issue Type: Bug
>          Components: mrv2
>    Affects Versions: 0.23.3, 2.0.1-alpha, 2.4.1
>            Reporter: Jason Lowe
>            Assignee: Siqi Li
>         Attachments: MAPREDUCE-4815.v10.patch, MAPREDUCE-4815.v11.patch, 
> MAPREDUCE-4815.v12.patch, MAPREDUCE-4815.v3.patch, MAPREDUCE-4815.v4.patch, 
> MAPREDUCE-4815.v5.patch, MAPREDUCE-4815.v6.patch, MAPREDUCE-4815.v7.patch, 
> MAPREDUCE-4815.v8.patch, MAPREDUCE-4815.v9.patch
>
>
> If a job generates many files to commit then the commitJob method call at the 
> end of the job can take minutes.  This is a performance regression from 1.x, 
> as 1.x had the tasks commit directly to the final output directory as they 
> were completing and commitJob had very little to do.  The commit work was 
> processed in parallel and overlapped the processing of outstanding tasks.  In 
> 0.23/2.x, the commit is single-threaded and waits until all tasks have 
> completed before commencing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to