Author: jlowe Date: Fri Apr 25 22:26:24 2014 New Revision: 1590168 URL: http://svn.apache.org/r1590168 Log: MAPREDUCE-5835. Killing Task might cause the job to go to ERROR state. Contributed by Ming Ma
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1590168&r1=1590167&r2=1590168&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Fri Apr 25 22:26:24 2014 @@ -236,6 +236,9 @@ Release 2.4.1 - UNRELEASED MAPREDUCE-5843. Fixed TestMRKeyValueTextInputFormat to not leak files and thus avoid failing on Windows. (Varun Vasudev via vinodkv) + MAPREDUCE-5835. Killing Task might cause the job to go to ERROR state + (Ming Ma via jlowe) + Release 2.4.0 - 2014-04-07 INCOMPATIBLE CHANGES Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1590168&r1=1590167&r2=1590168&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Fri Apr 25 22:26:24 2014 @@ -249,8 +249,16 @@ public abstract class TaskImpl implement TaskEventType.T_ATTEMPT_SUCCEEDED)) // Transitions from KILLED state + // There could be a race condition where TaskImpl might receive + // T_ATTEMPT_SUCCEEDED followed by T_ATTEMPTED_KILLED for the same attempt. + // a. The task is in KILL_WAIT. + // b. Before TA transitions to SUCCEEDED state, Task sends TA_KILL event. + // c. TA transitions to SUCCEEDED state and thus send T_ATTEMPT_SUCCEEDED + // to the task. The task transitions to KILLED state. + // d. TA processes TA_KILL event and sends T_ATTEMPT_KILLED to the task. .addTransition(TaskStateInternal.KILLED, TaskStateInternal.KILLED, EnumSet.of(TaskEventType.T_KILL, + TaskEventType.T_ATTEMPT_KILLED, TaskEventType.T_ADD_SPEC_ATTEMPT)) // create the topology tables Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java?rev=1590168&r1=1590167&r2=1590168&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java Fri Apr 25 22:26:24 2014 @@ -214,6 +214,87 @@ public class TestKill { app.waitForInternalState((JobImpl)job, JobStateInternal.KILLED); } + static class MyAsyncDispatch extends AsyncDispatcher { + private CountDownLatch latch; + private TaskAttemptEventType attemptEventTypeToWait; + MyAsyncDispatch(CountDownLatch latch, TaskAttemptEventType attemptEventTypeToWait) { + super(); + this.latch = latch; + this.attemptEventTypeToWait = attemptEventTypeToWait; + } + + @Override + protected void dispatch(Event event) { + if (event instanceof TaskAttemptEvent) { + TaskAttemptEvent attemptEvent = (TaskAttemptEvent) event; + TaskAttemptId attemptID = ((TaskAttemptEvent) event).getTaskAttemptID(); + if (attemptEvent.getType() == this.attemptEventTypeToWait + && attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0 ) { + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + super.dispatch(event); + } + } + + // This is to test a race condition where JobEventType.JOB_KILL is generated + // right after TaskAttemptEventType.TA_DONE is generated. + // TaskImpl's state machine might receive both T_ATTEMPT_SUCCEEDED + // and T_ATTEMPT_KILLED from the same attempt. + @Test + public void testKillTaskWaitKillJobAfterTA_DONE() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + final Dispatcher dispatcher = new MyAsyncDispatch(latch, TaskAttemptEventType.TA_DONE); + MRApp app = new MRApp(1, 1, false, this.getClass().getName(), true) { + @Override + public Dispatcher createDispatcher() { + return dispatcher; + } + }; + Job job = app.submit(new Configuration()); + JobId jobId = app.getJobId(); + app.waitForState(job, JobState.RUNNING); + Assert.assertEquals("Num tasks not correct", 2, job.getTasks().size()); + Iterator<Task> it = job.getTasks().values().iterator(); + Task mapTask = it.next(); + Task reduceTask = it.next(); + app.waitForState(mapTask, TaskState.RUNNING); + app.waitForState(reduceTask, TaskState.RUNNING); + TaskAttempt mapAttempt = mapTask.getAttempts().values().iterator().next(); + app.waitForState(mapAttempt, TaskAttemptState.RUNNING); + TaskAttempt reduceAttempt = reduceTask.getAttempts().values().iterator().next(); + app.waitForState(reduceAttempt, TaskAttemptState.RUNNING); + + // The order in the dispatch event queue, from the oldest to the newest + // TA_DONE + // JOB_KILL + // CONTAINER_REMOTE_CLEANUP ( from TA_DONE's handling ) + // T_KILL ( from JOB_KILL's handling ) + // TA_CONTAINER_CLEANED ( from CONTAINER_REMOTE_CLEANUP's handling ) + // TA_KILL ( from T_KILL's handling ) + // T_ATTEMPT_SUCCEEDED ( from TA_CONTAINER_CLEANED's handling ) + // T_ATTEMPT_KILLED ( from TA_KILL's handling ) + + // Finish map + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + mapAttempt.getID(), + TaskAttemptEventType.TA_DONE)); + + // Now kill the job + app.getContext().getEventHandler() + .handle(new JobEvent(jobId, JobEventType.JOB_KILL)); + + //unblock + latch.countDown(); + + app.waitForInternalState((JobImpl)job, JobStateInternal.KILLED); + } + @Test public void testKillTaskAttempt() throws Exception { final CountDownLatch latch = new CountDownLatch(1);