Author: bobby Date: Mon Nov 12 17:15:45 2012 New Revision: 1408360 URL: http://svn.apache.org/viewvc?rev=1408360&view=rev Log: MAPREDUCE-4425. Speculation + Fetch failures can lead to a hung job (jlowe via bobby)
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1408360&r1=1408359&r2=1408360&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Mon Nov 12 17:15:45 2012 @@ -654,6 +654,9 @@ Release 0.23.5 - UNRELEASED MAPREDUCE-4751. AM stuck in KILL_WAIT for days (vinodkv via bobby) MAPREDUCE-4787. TestJobMonitorAndPrint is broken (Rob Parker via bobby) + + MAPREDUCE-4425. Speculation + Fetch failures can lead to a hung job (jlowe + via bobby) Release 0.23.4 - UNRELEASED Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1408360&r1=1408359&r2=1408360&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Mon Nov 12 17:15:45 2012 @@ -217,13 +217,15 @@ public abstract class TaskImpl implement .addTransition(TaskStateInternal.SUCCEEDED, EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.SUCCEEDED), TaskEventType.T_ATTEMPT_KILLED, new RetroactiveKilledTransition()) + .addTransition(TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED, + TaskEventType.T_ATTEMPT_SUCCEEDED, + new AttemptSucceededAtSucceededTransition()) // Ignore-able transitions. .addTransition( TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED, EnumSet.of(TaskEventType.T_ADD_SPEC_ATTEMPT, TaskEventType.T_ATTEMPT_COMMIT_PENDING, TaskEventType.T_ATTEMPT_LAUNCHED, - TaskEventType.T_ATTEMPT_SUCCEEDED, TaskEventType.T_KILL)) // Transitions from FAILED state @@ -971,6 +973,8 @@ public abstract class TaskImpl implement !castEvent.getTaskAttemptID().equals(task.successfulAttempt)) { // don't allow a different task attempt to override a previous // succeeded state + task.finishedAttempts.add(castEvent.getTaskAttemptID()); + task.inProgressAttempts.remove(castEvent.getTaskAttemptID()); return TaskStateInternal.SUCCEEDED; } @@ -1013,6 +1017,8 @@ public abstract class TaskImpl implement !attemptId.equals(task.successfulAttempt)) { // don't allow a different task attempt to override a previous // succeeded state + task.finishedAttempts.add(castEvent.getTaskAttemptID()); + task.inProgressAttempts.remove(castEvent.getTaskAttemptID()); return TaskStateInternal.SUCCEEDED; } } @@ -1043,6 +1049,16 @@ public abstract class TaskImpl implement } } + private static class AttemptSucceededAtSucceededTransition + implements SingleArcTransition<TaskImpl, TaskEvent> { + @Override + public void transition(TaskImpl task, TaskEvent event) { + TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event; + task.finishedAttempts.add(castEvent.getTaskAttemptID()); + task.inProgressAttempts.remove(castEvent.getTaskAttemptID()); + } + } + private static class KillNewTransition implements SingleArcTransition<TaskImpl, TaskEvent> { @Override Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java?rev=1408360&r1=1408359&r2=1408360&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java Mon Nov 12 17:15:45 2012 @@ -141,7 +141,6 @@ public class TestTaskImpl { private float progress = 0; private TaskAttemptState state = TaskAttemptState.NEW; - private TaskAttemptId attemptId; private TaskType taskType; public MockTaskAttemptImpl(TaskId taskId, int id, EventHandler eventHandler, @@ -152,14 +151,11 @@ public class TestTaskImpl { AppContext appContext, TaskType taskType) { super(taskId, id, eventHandler, taskAttemptListener, jobFile, partition, conf, dataLocations, committer, jobToken, credentials, clock, appContext); - attemptId = Records.newRecord(TaskAttemptId.class); - attemptId.setId(id); - attemptId.setTaskId(taskId); this.taskType = taskType; } public TaskAttemptId getAttemptId() { - return attemptId; + return getID(); } @Override @@ -561,4 +557,49 @@ public class TestTaskImpl { mockTask = createMockTask(TaskType.REDUCE); runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_COMMIT_PENDING); } + + @Test + public void testSpeculativeMapFetchFailure() { + // Setup a scenario where speculative task wins, first attempt killed + mockTask = createMockTask(TaskType.MAP); + runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_KILLED); + assertEquals(2, taskAttempts.size()); + + // speculative attempt retroactively fails from fetch failures + mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(), + TaskEventType.T_ATTEMPT_FAILED)); + + assertTaskScheduledState(); + assertEquals(3, taskAttempts.size()); + } + + @Test + public void testSpeculativeMapMultipleSucceedFetchFailure() { + // Setup a scenario where speculative task wins, first attempt succeeds + mockTask = createMockTask(TaskType.MAP); + runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_SUCCEEDED); + assertEquals(2, taskAttempts.size()); + + // speculative attempt retroactively fails from fetch failures + mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(), + TaskEventType.T_ATTEMPT_FAILED)); + + assertTaskScheduledState(); + assertEquals(3, taskAttempts.size()); + } + + @Test + public void testSpeculativeMapFailedFetchFailure() { + // Setup a scenario where speculative task wins, first attempt succeeds + mockTask = createMockTask(TaskType.MAP); + runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_FAILED); + assertEquals(2, taskAttempts.size()); + + // speculative attempt retroactively fails from fetch failures + mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(), + TaskEventType.T_ATTEMPT_FAILED)); + + assertTaskScheduledState(); + assertEquals(3, taskAttempts.size()); + } }