Author: jlowe Date: Wed Jan 9 23:01:11 2013 New Revision: 1431134 URL: http://svn.apache.org/viewvc?rev=1431134&view=rev Log: svn merge -c 1431131 FIXES: MAPREDUCE-4848. TaskAttemptContext cast error during AM recovery. Contributed by Jerry Chen
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1431134&r1=1431133&r2=1431134&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Wed Jan 9 23:01:11 2013 @@ -525,6 +525,9 @@ Release 0.23.6 - UNRELEASED MAPREDUCE-4913. TestMRAppMaster#testMRAppMasterMissingStaging occasionally exits (Jason Lowe via tgraves) + MAPREDUCE-4848. TaskAttemptContext cast error during AM recovery (Jerry + Chen via jlowe) + Release 0.23.5 - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1431134&r1=1431133&r2=1431134&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Wed Jan 9 23:01:11 2013 @@ -579,7 +579,7 @@ public class MRAppMaster extends Composi */ protected Recovery createRecoveryService(AppContext appContext) { return new RecoveryService(appContext.getApplicationAttemptId(), - appContext.getClock(), getCommitter()); + appContext.getClock(), getCommitter(), isNewApiCommitter()); } /** Create and initialize (but don't start) a single job. Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1431134&r1=1431133&r2=1431134&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Wed Jan 9 23:01:11 2013 @@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -100,6 +101,7 @@ public class RecoveryService extends Com private final ApplicationAttemptId applicationAttemptId; private final OutputCommitter committer; + private final boolean newApiCommitter; private final Dispatcher dispatcher; private final ControlledClock clock; @@ -113,10 +115,11 @@ public class RecoveryService extends Com private volatile boolean recoveryMode = false; public RecoveryService(ApplicationAttemptId applicationAttemptId, - Clock clock, OutputCommitter committer) { + Clock clock, OutputCommitter committer, boolean newApiCommitter) { super("RecoveringDispatcher"); this.applicationAttemptId = applicationAttemptId; this.committer = committer; + this.newApiCommitter = newApiCommitter; this.dispatcher = createRecoveryDispatcher(); this.clock = new ControlledClock(clock); addService((Service) dispatcher); @@ -360,8 +363,17 @@ public class RecoveryService extends Com switch (state) { case SUCCEEDED: //recover the task output - TaskAttemptContext taskContext = new TaskAttemptContextImpl(getConfig(), - attInfo.getAttemptId()); + + // check the committer type and construct corresponding context + TaskAttemptContext taskContext = null; + if(newApiCommitter) { + taskContext = new TaskAttemptContextImpl(getConfig(), + attInfo.getAttemptId()); + } else { + taskContext = new org.apache.hadoop.mapred.TaskAttemptContextImpl(new JobConf(getConfig()), + TypeConverter.fromYarn(aId)); + } + try { TaskType type = taskContext.getTaskAttemptID().getTaskID().getTaskType(); int numReducers = taskContext.getConfiguration().getInt(MRJobConfig.NUM_REDUCES, 1); Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java?rev=1431134&r1=1431133&r2=1431134&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Wed Jan 9 23:01:11 2013 @@ -626,6 +626,115 @@ public class TestRecovery { validateOutput(); } + @Test + public void testRecoveryWithOldCommiter() throws Exception { + int runCount = 0; + MRApp app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), + true, ++runCount); + Configuration conf = new Configuration(); + conf.setBoolean("mapred.mapper.new-api", false); + conf.setBoolean("mapred.reducer.new-api", false); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + conf.set(FileOutputFormat.OUTDIR, outputDir.toString()); + Job job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + Assert.assertEquals("No of tasks not correct", + 3, job.getTasks().size()); + Iterator<Task> it = job.getTasks().values().iterator(); + Task mapTask1 = it.next(); + Task reduceTask1 = it.next(); + + // all maps must be running + app.waitForState(mapTask1, TaskState.RUNNING); + + TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator() + .next(); + + //before sending the TA_DONE, event make sure attempt has come to + //RUNNING state + app.waitForState(task1Attempt1, TaskAttemptState.RUNNING); + + //send the done signal to the map + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + task1Attempt1.getID(), + TaskAttemptEventType.TA_DONE)); + + //wait for map task to complete + app.waitForState(mapTask1, TaskState.SUCCEEDED); + + // Verify the shuffle-port + Assert.assertEquals(5467, task1Attempt1.getShufflePort()); + + app.waitForState(reduceTask1, TaskState.RUNNING); + TaskAttempt reduce1Attempt1 = reduceTask1.getAttempts().values().iterator().next(); + + // write output corresponding to reduce1 + writeOutput(reduce1Attempt1, conf); + + //send the done signal to the 1st reduce + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + reduce1Attempt1.getID(), + TaskAttemptEventType.TA_DONE)); + + //wait for first reduce task to complete + app.waitForState(reduceTask1, TaskState.SUCCEEDED); + + //stop the app before the job completes. + app.stop(); + + //rerun + //in rerun the map will be recovered from previous run + app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false, + ++runCount); + conf = new Configuration(); + conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true); + conf.setBoolean("mapred.mapper.new-api", false); + conf.setBoolean("mapred.reducer.new-api", false); + conf.set(FileOutputFormat.OUTDIR, outputDir.toString()); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + Assert.assertEquals("No of tasks not correct", + 3, job.getTasks().size()); + it = job.getTasks().values().iterator(); + mapTask1 = it.next(); + reduceTask1 = it.next(); + Task reduceTask2 = it.next(); + + // map will be recovered, no need to send done + app.waitForState(mapTask1, TaskState.SUCCEEDED); + + // Verify the shuffle-port after recovery + task1Attempt1 = mapTask1.getAttempts().values().iterator().next(); + Assert.assertEquals(5467, task1Attempt1.getShufflePort()); + + // first reduce will be recovered, no need to send done + app.waitForState(reduceTask1, TaskState.SUCCEEDED); + + app.waitForState(reduceTask2, TaskState.RUNNING); + + TaskAttempt reduce2Attempt = reduceTask2.getAttempts().values() + .iterator().next(); + //before sending the TA_DONE, event make sure attempt has come to + //RUNNING state + app.waitForState(reduce2Attempt, TaskAttemptState.RUNNING); + + //send the done signal to the 2nd reduce task + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + reduce2Attempt.getID(), + TaskAttemptEventType.TA_DONE)); + + //wait to get it completed + app.waitForState(reduceTask2, TaskState.SUCCEEDED); + + app.waitForState(job, JobState.SUCCEEDED); + app.verifyCompleted(); + validateOutput(); + } + private void writeBadOutput(TaskAttempt attempt, Configuration conf) throws Exception { TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,