Author: jlowe
Date: Fri Apr 25 22:26:24 2014
New Revision: 1590168

URL: http://svn.apache.org/r1590168
Log:
MAPREDUCE-5835. Killing Task might cause the job to go to ERROR state. 
Contributed by Ming Ma

Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1590168&r1=1590167&r2=1590168&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Fri Apr 25 
22:26:24 2014
@@ -236,6 +236,9 @@ Release 2.4.1 - UNRELEASED
     MAPREDUCE-5843. Fixed TestMRKeyValueTextInputFormat to not leak files and
     thus avoid failing on Windows. (Varun Vasudev via vinodkv)
 
+    MAPREDUCE-5835. Killing Task might cause the job to go to ERROR state
+    (Ming Ma via jlowe)
+
 Release 2.4.0 - 2014-04-07 
 
   INCOMPATIBLE CHANGES

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1590168&r1=1590167&r2=1590168&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
 Fri Apr 25 22:26:24 2014
@@ -249,8 +249,16 @@ public abstract class TaskImpl implement
                    TaskEventType.T_ATTEMPT_SUCCEEDED))
 
     // Transitions from KILLED state
+    // There could be a race condition where TaskImpl might receive
+    // T_ATTEMPT_SUCCEEDED followed by T_ATTEMPTED_KILLED for the same attempt.
+    // a. The task is in KILL_WAIT.
+    // b. Before TA transitions to SUCCEEDED state, Task sends TA_KILL event.
+    // c. TA transitions to SUCCEEDED state and thus send T_ATTEMPT_SUCCEEDED
+    //    to the task. The task transitions to KILLED state.
+    // d. TA processes TA_KILL event and sends T_ATTEMPT_KILLED to the task.
     .addTransition(TaskStateInternal.KILLED, TaskStateInternal.KILLED,
         EnumSet.of(TaskEventType.T_KILL,
+                   TaskEventType.T_ATTEMPT_KILLED,
                    TaskEventType.T_ADD_SPEC_ATTEMPT))
 
     // create the topology tables

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java?rev=1590168&r1=1590167&r2=1590168&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java
 Fri Apr 25 22:26:24 2014
@@ -214,6 +214,87 @@ public class TestKill {
     app.waitForInternalState((JobImpl)job, JobStateInternal.KILLED);
   }
 
+  static class MyAsyncDispatch extends AsyncDispatcher {
+    private CountDownLatch latch;
+    private TaskAttemptEventType attemptEventTypeToWait;
+    MyAsyncDispatch(CountDownLatch latch, TaskAttemptEventType 
attemptEventTypeToWait) {
+      super();
+      this.latch = latch;
+      this.attemptEventTypeToWait = attemptEventTypeToWait;
+    }
+
+    @Override
+    protected void dispatch(Event event) {
+      if (event instanceof TaskAttemptEvent) {
+        TaskAttemptEvent attemptEvent = (TaskAttemptEvent) event;
+        TaskAttemptId attemptID = ((TaskAttemptEvent) 
event).getTaskAttemptID();
+        if (attemptEvent.getType() == this.attemptEventTypeToWait
+            && attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0 ) {
+          try {
+            latch.await();
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          }
+        }
+      }
+      super.dispatch(event);
+    }
+  }
+
+  // This is to test a race condition where JobEventType.JOB_KILL is generated
+  // right after TaskAttemptEventType.TA_DONE is generated.
+  // TaskImpl's state machine might receive both T_ATTEMPT_SUCCEEDED
+  // and T_ATTEMPT_KILLED from the same attempt.
+  @Test
+  public void testKillTaskWaitKillJobAfterTA_DONE() throws Exception {
+    CountDownLatch latch = new CountDownLatch(1);
+    final Dispatcher dispatcher = new MyAsyncDispatch(latch, 
TaskAttemptEventType.TA_DONE);
+    MRApp app = new MRApp(1, 1, false, this.getClass().getName(), true) {
+      @Override
+      public Dispatcher createDispatcher() {
+        return dispatcher;
+      }
+    };
+    Job job = app.submit(new Configuration());
+    JobId jobId = app.getJobId();
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("Num tasks not correct", 2, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task mapTask = it.next();
+    Task reduceTask = it.next();
+    app.waitForState(mapTask, TaskState.RUNNING);
+    app.waitForState(reduceTask, TaskState.RUNNING);
+    TaskAttempt mapAttempt = mapTask.getAttempts().values().iterator().next();
+    app.waitForState(mapAttempt, TaskAttemptState.RUNNING);
+    TaskAttempt reduceAttempt = 
reduceTask.getAttempts().values().iterator().next();
+    app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
+
+    // The order in the dispatch event queue, from the oldest to the newest
+    // TA_DONE
+    // JOB_KILL
+    // CONTAINER_REMOTE_CLEANUP ( from TA_DONE's handling )
+    // T_KILL ( from JOB_KILL's handling )
+    // TA_CONTAINER_CLEANED ( from CONTAINER_REMOTE_CLEANUP's handling )
+    // TA_KILL ( from T_KILL's handling )
+    // T_ATTEMPT_SUCCEEDED ( from TA_CONTAINER_CLEANED's handling )
+    // T_ATTEMPT_KILLED ( from TA_KILL's handling )
+
+    // Finish map
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            mapAttempt.getID(),
+            TaskAttemptEventType.TA_DONE));
+
+    // Now kill the job
+    app.getContext().getEventHandler()
+        .handle(new JobEvent(jobId, JobEventType.JOB_KILL));
+
+    //unblock
+    latch.countDown();
+
+    app.waitForInternalState((JobImpl)job, JobStateInternal.KILLED);
+  }
+
   @Test
   public void testKillTaskAttempt() throws Exception {
     final CountDownLatch latch = new CountDownLatch(1);


Reply via email to