Author: jlowe Date: Sat Dec 22 01:41:26 2012 New Revision: 1425223 URL: http://svn.apache.org/viewvc?rev=1425223&view=rev Log: MAPREDUCE-4890. Invalid TaskImpl state transitions when task fails while speculating. Contributed by Jason Lowe
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1425223&r1=1425222&r2=1425223&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Sat Dec 22 01:41:26 2012 @@ -641,6 +641,9 @@ Release 0.23.6 - UNRELEASED MAPREDUCE-4793. Problem with adding resources when using both -files and -file to hadoop streaming (jlowe) + MAPREDUCE-4890. Invalid TaskImpl state transitions when task fails while + speculating (jlowe) + Release 0.23.5 - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1425223&r1=1425222&r2=1425223&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Sat Dec 22 01:41:26 2012 @@ -231,7 +231,12 @@ public abstract class TaskImpl implement // Transitions from FAILED state .addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED, EnumSet.of(TaskEventType.T_KILL, - TaskEventType.T_ADD_SPEC_ATTEMPT)) + TaskEventType.T_ADD_SPEC_ATTEMPT, + TaskEventType.T_ATTEMPT_COMMIT_PENDING, + TaskEventType.T_ATTEMPT_FAILED, + TaskEventType.T_ATTEMPT_KILLED, + TaskEventType.T_ATTEMPT_LAUNCHED, + TaskEventType.T_ATTEMPT_SUCCEEDED)) // Transitions from KILLED state .addTransition(TaskStateInternal.KILLED, TaskStateInternal.KILLED, @@ -941,6 +946,13 @@ public abstract class TaskImpl implement task.handleTaskAttemptCompletion( taskAttemptId, TaskAttemptCompletionEventStatus.TIPFAILED); + + // issue kill to all non finished attempts + for (TaskAttempt taskAttempt : task.attempts.values()) { + task.killUnfinishedAttempt + (taskAttempt, "Task has failed. Killing attempt!"); + } + task.inProgressAttempts.clear(); if (task.historyTaskStartGenerated) { TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, attempt.getDiagnostics(), Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java?rev=1425223&r1=1425222&r2=1425223&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java Sat Dec 22 01:41:26 2012 @@ -602,4 +602,73 @@ public class TestTaskImpl { assertTaskScheduledState(); assertEquals(3, taskAttempts.size()); } + + @Test + public void testFailedTransitions() { + mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(), + remoteJobConfFile, conf, taskAttemptListener, committer, jobToken, + credentials, clock, + completedTasksFromPreviousRun, startCount, + metrics, appContext, TaskType.MAP) { + @Override + protected int getMaxAttempts() { + return 1; + } + }; + TaskId taskId = getNewTaskID(); + scheduleTaskAttempt(taskId); + launchTaskAttempt(getLastAttempt().getAttemptId()); + + // add three more speculative attempts + mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), + TaskEventType.T_ADD_SPEC_ATTEMPT)); + launchTaskAttempt(getLastAttempt().getAttemptId()); + mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), + TaskEventType.T_ADD_SPEC_ATTEMPT)); + launchTaskAttempt(getLastAttempt().getAttemptId()); + mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), + TaskEventType.T_ADD_SPEC_ATTEMPT)); + launchTaskAttempt(getLastAttempt().getAttemptId()); + assertEquals(4, taskAttempts.size()); + + // have the first attempt fail, verify task failed due to no retries + MockTaskAttemptImpl taskAttempt = taskAttempts.get(0); + taskAttempt.setState(TaskAttemptState.FAILED); + mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(), + TaskEventType.T_ATTEMPT_FAILED)); + assertEquals(TaskState.FAILED, mockTask.getState()); + + // verify task can no longer be killed + mockTask.handle(new TaskEvent(taskId, TaskEventType.T_KILL)); + assertEquals(TaskState.FAILED, mockTask.getState()); + + // verify speculative doesn't launch new tasks + mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), + TaskEventType.T_ADD_SPEC_ATTEMPT)); + mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), + TaskEventType.T_ATTEMPT_LAUNCHED)); + assertEquals(TaskState.FAILED, mockTask.getState()); + assertEquals(4, taskAttempts.size()); + + // verify attempt events from active tasks don't knock task out of FAILED + taskAttempt = taskAttempts.get(1); + taskAttempt.setState(TaskAttemptState.COMMIT_PENDING); + mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(), + TaskEventType.T_ATTEMPT_COMMIT_PENDING)); + assertEquals(TaskState.FAILED, mockTask.getState()); + taskAttempt.setState(TaskAttemptState.FAILED); + mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(), + TaskEventType.T_ATTEMPT_FAILED)); + assertEquals(TaskState.FAILED, mockTask.getState()); + taskAttempt = taskAttempts.get(2); + taskAttempt.setState(TaskAttemptState.SUCCEEDED); + mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(), + TaskEventType.T_ATTEMPT_SUCCEEDED)); + assertEquals(TaskState.FAILED, mockTask.getState()); + taskAttempt = taskAttempts.get(3); + taskAttempt.setState(TaskAttemptState.KILLED); + mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(), + TaskEventType.T_ATTEMPT_KILLED)); + assertEquals(TaskState.FAILED, mockTask.getState()); + } }