Author: zjshen
Date: Thu Jul  3 01:45:53 2014
New Revision: 1607513

URL: http://svn.apache.org/r1607513
Log:
MAPREDUCE-5900. Changed to the interpret container preemption exit code as a 
task attempt killing event. Contributed by Mayank Bansal.
svn merge --ignore-ancestry -c 1607512 ../../trunk/

Modified:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
    
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
    
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
    
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1607513&r1=1607512&r2=1607513&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt 
(original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Thu 
Jul  3 01:45:53 2014
@@ -131,6 +131,9 @@ Release 2.5.0 - UNRELEASED
     MAPREDUCE-5939. StartTime showing up as the epoch time in JHS UI after
     upgrade (Chen He via jlowe)
 
+    MAPREDUCE-5900. Changed to the interpret container preemption exit code as 
a
+    task attempt killing event. (Mayank Bansal via zjshen)
+
 Release 2.4.1 - 2014-06-23 
 
   INCOMPATIBLE CHANGES

Modified: 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1607513&r1=1607512&r2=1607513&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
 Thu Jul  3 01:45:53 2014
@@ -699,7 +699,8 @@ public class RMContainerAllocator extend
   @VisibleForTesting
   public TaskAttemptEvent createContainerFinishedEvent(ContainerStatus cont,
       TaskAttemptId attemptID) {
-    if (cont.getExitStatus() == ContainerExitStatus.ABORTED) {
+    if (cont.getExitStatus() == ContainerExitStatus.ABORTED
+        || cont.getExitStatus() == ContainerExitStatus.PREEMPTED) {
       // killed by framework
       return new TaskAttemptEvent(attemptID,
           TaskAttemptEventType.TA_KILL);

Modified: 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java?rev=1607513&r1=1607512&r2=1607513&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
 Thu Jul  3 01:45:53 2014
@@ -65,6 +65,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import 
org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
@@ -795,6 +796,178 @@ public class TestTaskAttempt{
                finishTime, Long.valueOf(taImpl.getFinishTime()));  
   }
   
+  @Test
+  public void testContainerKillAfterAssigned() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(1, 2);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
+        0);
+    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+    Path jobFile = mock(Path.class);
+
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    when(taListener.getAddress()).thenReturn(
+        new InetSocketAddress("localhost", 0));
+
+    JobConf jobConf = new JobConf();
+    jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    jobConf.setBoolean("fs.file.impl.disable.cache", true);
+    jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+    jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+    TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+    when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" });
+
+    AppContext appCtx = mock(AppContext.class);
+    ClusterInfo clusterInfo = mock(ClusterInfo.class);
+    Resource resource = mock(Resource.class);
+    when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+    when(resource.getMemory()).thenReturn(1024);
+
+    TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
+        jobFile, 1, splits, jobConf, taListener, new Token(),
+        new Credentials(), new SystemClock(), appCtx);
+
+    NodeId nid = NodeId.newInstance("127.0.0.2", 0);
+    ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_SCHEDULE));
+    taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, container,
+        mock(Map.class)));
+    assertEquals("Task attempt is not in assinged state",
+        taImpl.getInternalState(), TaskAttemptStateInternal.ASSIGNED);
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_KILL));
+    assertEquals("Task should be in KILLED state",
+        TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
+        taImpl.getInternalState());
+  }
+
+  @Test
+  public void testContainerKillWhileRunning() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(1, 2);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
+        0);
+    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+    Path jobFile = mock(Path.class);
+
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    when(taListener.getAddress()).thenReturn(
+        new InetSocketAddress("localhost", 0));
+
+    JobConf jobConf = new JobConf();
+    jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    jobConf.setBoolean("fs.file.impl.disable.cache", true);
+    jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+    jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+    TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+    when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" });
+
+    AppContext appCtx = mock(AppContext.class);
+    ClusterInfo clusterInfo = mock(ClusterInfo.class);
+    Resource resource = mock(Resource.class);
+    when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+    when(resource.getMemory()).thenReturn(1024);
+
+    TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
+        jobFile, 1, splits, jobConf, taListener, new Token(),
+        new Credentials(), new SystemClock(), appCtx);
+
+    NodeId nid = NodeId.newInstance("127.0.0.2", 0);
+    ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_SCHEDULE));
+    taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, container,
+        mock(Map.class)));
+    taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
+    assertEquals("Task attempt is not in running state", taImpl.getState(),
+        TaskAttemptState.RUNNING);
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_KILL));
+    assertFalse("InternalError occurred trying to handle TA_KILL",
+        eventHandler.internalError);
+    assertEquals("Task should be in KILLED state",
+        TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
+        taImpl.getInternalState());
+  }
+
+  @Test
+  public void testContainerKillWhileCommitPending() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(1, 2);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
+        0);
+    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+    Path jobFile = mock(Path.class);
+
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    when(taListener.getAddress()).thenReturn(
+        new InetSocketAddress("localhost", 0));
+
+    JobConf jobConf = new JobConf();
+    jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    jobConf.setBoolean("fs.file.impl.disable.cache", true);
+    jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+    jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+    TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+    when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" });
+
+    AppContext appCtx = mock(AppContext.class);
+    ClusterInfo clusterInfo = mock(ClusterInfo.class);
+    Resource resource = mock(Resource.class);
+    when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+    when(resource.getMemory()).thenReturn(1024);
+
+    TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
+        jobFile, 1, splits, jobConf, taListener, new Token(),
+        new Credentials(), new SystemClock(), appCtx);
+
+    NodeId nid = NodeId.newInstance("127.0.0.2", 0);
+    ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_SCHEDULE));
+    taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, container,
+        mock(Map.class)));
+    taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
+    assertEquals("Task attempt is not in running state", taImpl.getState(),
+        TaskAttemptState.RUNNING);
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_COMMIT_PENDING));
+    assertEquals("Task should be in COMMIT_PENDING state",
+        TaskAttemptStateInternal.COMMIT_PENDING, taImpl.getInternalState());
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_KILL));
+    assertFalse("InternalError occurred trying to handle TA_KILL",
+        eventHandler.internalError);
+    assertEquals("Task should be in KILLED state",
+        TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
+        taImpl.getInternalState());
+  }
+
   public static class MockEventHandler implements EventHandler {
     public boolean internalError;
 

Modified: 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java?rev=1607513&r1=1607512&r2=1607513&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
 Thu Jul  3 01:45:53 2014
@@ -1954,6 +1954,22 @@ public class TestRMContainerAllocator {
     TaskAttemptEvent abortedEvent = allocator.createContainerFinishedEvent(
         abortedStatus, attemptId);
     Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent.getType());
+    
+    ContainerId containerId2 = ContainerId.newInstance(applicationAttemptId, 
2);
+    ContainerStatus status2 = ContainerStatus.newInstance(containerId2,
+        ContainerState.RUNNING, "", 0);
+
+    ContainerStatus preemptedStatus = ContainerStatus.newInstance(containerId2,
+        ContainerState.RUNNING, "", ContainerExitStatus.PREEMPTED);
+
+    TaskAttemptEvent event2 = allocator.createContainerFinishedEvent(status2,
+        attemptId);
+    Assert.assertEquals(TaskAttemptEventType.TA_CONTAINER_COMPLETED,
+        event2.getType());
+
+    TaskAttemptEvent abortedEvent2 = allocator.createContainerFinishedEvent(
+        preemptedStatus, attemptId);
+    Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent2.getType());
   }
   
   @Test


Reply via email to