Author: bobby Date: Tue Jul 10 16:11:23 2012 New Revision: 1359749 URL: http://svn.apache.org/viewvc?rev=1359749&view=rev Log: svn merge -c 1359747. FIXES: MAPREDUCE-4252. MR2 job never completes with 1 pending task (Tom White via bobby)
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1359749&r1=1359748&r2=1359749&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Tue Jul 10 16:11:23 2012 @@ -543,6 +543,9 @@ Release 0.23.3 - UNRELEASED MAPREDUCE-4300. OOM in AM can turn it into a zombie. (Robert Evans via tgraves) + MAPREDUCE-4252. MR2 job never completes with 1 pending task (Tom White via + bobby) + Release 0.23.2 - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1359749&r1=1359748&r2=1359749&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Tue Jul 10 16:11:23 2012 @@ -189,7 +189,7 @@ public abstract class TaskImpl implement // Transitions from SUCCEEDED state .addTransition(TaskState.SUCCEEDED, //only possible for map tasks - EnumSet.of(TaskState.SCHEDULED, TaskState.FAILED), + EnumSet.of(TaskState.SCHEDULED, TaskState.SUCCEEDED, TaskState.FAILED), TaskEventType.T_ATTEMPT_FAILED, new MapRetroactiveFailureTransition()) .addTransition(TaskState.SUCCEEDED, //only possible for map tasks EnumSet.of(TaskState.SCHEDULED, TaskState.SUCCEEDED), @@ -618,7 +618,7 @@ public abstract class TaskImpl implement } } - private void internalError(TaskEventType type) { + protected void internalError(TaskEventType type) { LOG.error("Invalid event " + type + " on Task " + this.taskId); eventHandler.handle(new JobDiagnosticsUpdateEvent( this.taskId.getJobId(), "Invalid event " + type + @@ -896,6 +896,16 @@ public abstract class TaskImpl implement @Override public TaskState transition(TaskImpl task, TaskEvent event) { + if (event instanceof TaskTAttemptEvent) { + TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event; + if (task.getState() == TaskState.SUCCEEDED && + !castEvent.getTaskAttemptID().equals(task.successfulAttempt)) { + // don't allow a different task attempt to override a previous + // succeeded state + return TaskState.SUCCEEDED; + } + } + //verify that this occurs only for map task //TODO: consider moving it to MapTaskImpl if (!TaskType.MAP.equals(task.getType())) { Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java?rev=1359749&r1=1359748&r2=1359749&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java Tue Jul 10 16:11:23 2012 @@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.v2.a import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -127,8 +128,13 @@ public class TestTaskImpl { @Override protected int getMaxAttempts() { return 100; - } - + } + + @Override + protected void internalError(TaskEventType type) { + super.internalError(type); + fail("Internal error: " + type); + } } private class MockTaskAttemptImpl extends TaskAttemptImpl { @@ -462,5 +468,32 @@ public class TestTaskImpl { assertTaskSucceededState(); } + + @Test + public void testSpeculativeTaskAttemptSucceedsEvenIfFirstFails() { + TaskId taskId = getNewTaskID(); + scheduleTaskAttempt(taskId); + launchTaskAttempt(getLastAttempt().getAttemptId()); + updateLastAttemptState(TaskAttemptState.RUNNING); + + // Add a speculative task attempt that succeeds + mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), + TaskEventType.T_ADD_SPEC_ATTEMPT)); + launchTaskAttempt(getLastAttempt().getAttemptId()); + commitTaskAttempt(getLastAttempt().getAttemptId()); + mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), + TaskEventType.T_ATTEMPT_SUCCEEDED)); + + // The task should now have succeeded + assertTaskSucceededState(); + + // Now fail the first task attempt, after the second has succeeded + mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(0).getAttemptId(), + TaskEventType.T_ATTEMPT_FAILED)); + + // The task should still be in the succeeded state + assertTaskSucceededState(); + + } }