Author: jlowe
Date: Sat Dec 22 01:41:26 2012
New Revision: 1425223

URL: http://svn.apache.org/viewvc?rev=1425223&view=rev
Log:
MAPREDUCE-4890. Invalid TaskImpl state transitions when task fails while 
speculating. Contributed by Jason Lowe

Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1425223&r1=1425222&r2=1425223&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Sat Dec 22 
01:41:26 2012
@@ -641,6 +641,9 @@ Release 0.23.6 - UNRELEASED
     MAPREDUCE-4793. Problem with adding resources when using both -files and
     -file to hadoop streaming (jlowe)
 
+    MAPREDUCE-4890. Invalid TaskImpl state transitions when task fails while
+    speculating (jlowe)
+
 Release 0.23.5 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1425223&r1=1425222&r2=1425223&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
 Sat Dec 22 01:41:26 2012
@@ -231,7 +231,12 @@ public abstract class TaskImpl implement
     // Transitions from FAILED state        
     .addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED,
         EnumSet.of(TaskEventType.T_KILL,
-                   TaskEventType.T_ADD_SPEC_ATTEMPT))
+                   TaskEventType.T_ADD_SPEC_ATTEMPT,
+                   TaskEventType.T_ATTEMPT_COMMIT_PENDING,
+                   TaskEventType.T_ATTEMPT_FAILED,
+                   TaskEventType.T_ATTEMPT_KILLED,
+                   TaskEventType.T_ATTEMPT_LAUNCHED,
+                   TaskEventType.T_ATTEMPT_SUCCEEDED))
 
     // Transitions from KILLED state
     .addTransition(TaskStateInternal.KILLED, TaskStateInternal.KILLED,
@@ -941,6 +946,13 @@ public abstract class TaskImpl implement
         task.handleTaskAttemptCompletion(
             taskAttemptId, 
             TaskAttemptCompletionEventStatus.TIPFAILED);
+
+        // issue kill to all non finished attempts
+        for (TaskAttempt taskAttempt : task.attempts.values()) {
+          task.killUnfinishedAttempt
+            (taskAttempt, "Task has failed. Killing attempt!");
+        }
+        task.inProgressAttempts.clear();
         
         if (task.historyTaskStartGenerated) {
         TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, 
attempt.getDiagnostics(),

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java?rev=1425223&r1=1425222&r2=1425223&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
 Sat Dec 22 01:41:26 2012
@@ -602,4 +602,73 @@ public class TestTaskImpl {
     assertTaskScheduledState();
     assertEquals(3, taskAttempts.size());
   }
+
+  @Test
+  public void testFailedTransitions() {
+    mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
+        remoteJobConfFile, conf, taskAttemptListener, committer, jobToken,
+        credentials, clock,
+        completedTasksFromPreviousRun, startCount,
+        metrics, appContext, TaskType.MAP) {
+          @Override
+          protected int getMaxAttempts() {
+            return 1;
+          }
+    };
+    TaskId taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    launchTaskAttempt(getLastAttempt().getAttemptId());
+
+    // add three more speculative attempts
+    mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
+        TaskEventType.T_ADD_SPEC_ATTEMPT));
+    launchTaskAttempt(getLastAttempt().getAttemptId());
+    mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
+        TaskEventType.T_ADD_SPEC_ATTEMPT));
+    launchTaskAttempt(getLastAttempt().getAttemptId());
+    mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
+        TaskEventType.T_ADD_SPEC_ATTEMPT));
+    launchTaskAttempt(getLastAttempt().getAttemptId());
+    assertEquals(4, taskAttempts.size());
+
+    // have the first attempt fail, verify task failed due to no retries
+    MockTaskAttemptImpl taskAttempt = taskAttempts.get(0);
+    taskAttempt.setState(TaskAttemptState.FAILED);
+    mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
+        TaskEventType.T_ATTEMPT_FAILED));
+    assertEquals(TaskState.FAILED, mockTask.getState());
+
+    // verify task can no longer be killed
+    mockTask.handle(new TaskEvent(taskId, TaskEventType.T_KILL));
+    assertEquals(TaskState.FAILED, mockTask.getState());
+
+    // verify speculative doesn't launch new tasks
+    mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
+        TaskEventType.T_ADD_SPEC_ATTEMPT));
+    mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
+        TaskEventType.T_ATTEMPT_LAUNCHED));
+    assertEquals(TaskState.FAILED, mockTask.getState());
+    assertEquals(4, taskAttempts.size());
+
+    // verify attempt events from active tasks don't knock task out of FAILED
+    taskAttempt = taskAttempts.get(1);
+    taskAttempt.setState(TaskAttemptState.COMMIT_PENDING);
+    mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
+        TaskEventType.T_ATTEMPT_COMMIT_PENDING));
+    assertEquals(TaskState.FAILED, mockTask.getState());
+    taskAttempt.setState(TaskAttemptState.FAILED);
+    mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
+        TaskEventType.T_ATTEMPT_FAILED));
+    assertEquals(TaskState.FAILED, mockTask.getState());
+    taskAttempt = taskAttempts.get(2);
+    taskAttempt.setState(TaskAttemptState.SUCCEEDED);
+    mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
+        TaskEventType.T_ATTEMPT_SUCCEEDED));
+    assertEquals(TaskState.FAILED, mockTask.getState());
+    taskAttempt = taskAttempts.get(3);
+    taskAttempt.setState(TaskAttemptState.KILLED);
+    mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
+        TaskEventType.T_ATTEMPT_KILLED));
+    assertEquals(TaskState.FAILED, mockTask.getState());
+  }
 }


Reply via email to