Repository: hadoop
Updated Branches:
  refs/heads/branch-2 39f896e1a -> c52ad9ee8


MAPREDUCE-6771. RMContainerAllocator sends container diagnostics event after 
corresponding completion event. Contributed by Haibo Chen


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c52ad9ee
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c52ad9ee
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c52ad9ee

Branch: refs/heads/branch-2
Commit: c52ad9ee86f5033caca02a7af6aeccfc5c87a99e
Parents: 39f896e
Author: Jason Lowe <jl...@apache.org>
Authored: Thu Sep 29 15:50:50 2016 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Thu Sep 29 15:50:50 2016 +0000

----------------------------------------------------------------------
 .../v2/app/rm/RMContainerAllocator.java         | 47 ++++++++++++--------
 .../v2/app/rm/TestRMContainerAllocator.java     | 45 +++++++++++++++++++
 2 files changed, 74 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c52ad9ee/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
index 93905b6..ba3d6a4 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
@@ -147,7 +147,7 @@ public class RMContainerAllocator extends 
RMContainerRequestor
     new LinkedList<ContainerRequest>();
 
   //holds information about the assigned containers to task attempts
-  private final AssignedRequests assignedRequests = new AssignedRequests();
+  private final AssignedRequests assignedRequests;
   
   //holds scheduled requests to be fulfilled by RM
   private final ScheduledRequests scheduledRequests = new ScheduledRequests();
@@ -193,6 +193,11 @@ public class RMContainerAllocator extends 
RMContainerRequestor
     super(clientService, context);
     this.stopped = new AtomicBoolean(false);
     this.clock = context.getClock();
+    this.assignedRequests = createAssignedRequests();
+  }
+
+  protected AssignedRequests createAssignedRequests() {
+    return new AssignedRequests();
   }
 
   @Override
@@ -805,27 +810,33 @@ public class RMContainerAllocator extends 
RMContainerRequestor
     handleJobPriorityChange(response);
 
     for (ContainerStatus cont : finishedContainers) {
-      LOG.info("Received completed container " + cont.getContainerId());
-      TaskAttemptId attemptID = assignedRequests.get(cont.getContainerId());
-      if (attemptID == null) {
-        LOG.error("Container complete event for unknown container id "
-            + cont.getContainerId());
-      } else {
-        pendingRelease.remove(cont.getContainerId());
-        assignedRequests.remove(attemptID);
-        
-        // send the container completed event to Task attempt
-        eventHandler.handle(createContainerFinishedEvent(cont, attemptID));
-        
-        // Send the diagnostics
-        String diagnostics = StringInterner.weakIntern(cont.getDiagnostics());
-        eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptID,
-            diagnostics));
-      }      
+      processFinishedContainer(cont);
     }
     return newContainers;
   }
 
+  @SuppressWarnings("unchecked")
+  @VisibleForTesting
+  void processFinishedContainer(ContainerStatus container) {
+    LOG.info("Received completed container " + container.getContainerId());
+    TaskAttemptId attemptID = assignedRequests.get(container.getContainerId());
+    if (attemptID == null) {
+      LOG.error("Container complete event for unknown container "
+          + container.getContainerId());
+    } else {
+      pendingRelease.remove(container.getContainerId());
+      assignedRequests.remove(attemptID);
+
+      // Send the diagnostics
+      String diagnostic = 
StringInterner.weakIntern(container.getDiagnostics());
+      eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptID,
+          diagnostic));
+
+      // send the container completed event to Task attempt
+      eventHandler.handle(createContainerFinishedEvent(container, attemptID));
+    }
+  }
+
   private void applyConcurrentTaskLimits() {
     int numScheduledMaps = scheduledRequests.maps.size();
     if (maxRunningMaps > 0 && numScheduledMaps > 0) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c52ad9ee/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
index 98f606e..ba3b698 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
@@ -24,6 +24,7 @@ import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doCallRealMethod;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
@@ -59,6 +60,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.ClusterInfo;
 import org.apache.hadoop.mapreduce.v2.app.MRApp;
+import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
@@ -69,6 +71,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
 import 
org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
+import 
org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
@@ -142,6 +145,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import com.google.common.base.Supplier;
+import org.mockito.InOrder;
 
 @SuppressWarnings("unchecked")
 public class TestRMContainerAllocator {
@@ -3011,6 +3015,47 @@ public class TestRMContainerAllocator {
     }
   }
 
+  /**
+   * MAPREDUCE-6771. Test if RMContainerAllocator generates the events in the
+   * right order while processing finished containers.
+   */
+  @Test
+  public void testHandlingFinishedContainers() {
+    EventHandler eventHandler = mock(EventHandler.class);
+
+    AppContext context = mock(MRAppMaster.RunningAppContext.class);
+    when(context.getClock()).thenReturn(new ControlledClock());
+    when(context.getClusterInfo()).thenReturn(
+        new ClusterInfo(Resource.newInstance(10240, 1)));
+    when(context.getEventHandler()).thenReturn(eventHandler);
+    RMContainerAllocator containerAllocator =
+        new RMContainerAllocatorForFinishedContainer(null, context);
+
+    ContainerStatus finishedContainer = ContainerStatus.newInstance(
+        mock(ContainerId.class), ContainerState.COMPLETE, "", 0);
+    containerAllocator.processFinishedContainer(finishedContainer);
+
+    InOrder inOrder = inOrder(eventHandler);
+    inOrder.verify(eventHandler).handle(
+        isA(TaskAttemptDiagnosticsUpdateEvent.class));
+    inOrder.verify(eventHandler).handle(isA(TaskAttemptEvent.class));
+    inOrder.verifyNoMoreInteractions();
+  }
+
+  private static class RMContainerAllocatorForFinishedContainer
+      extends RMContainerAllocator {
+    public RMContainerAllocatorForFinishedContainer(ClientService 
clientService,
+        AppContext context) {
+      super(clientService, context);
+    }
+    @Override
+    protected AssignedRequests createAssignedRequests() {
+      AssignedRequests assignedReqs = mock(AssignedRequests.class);
+      TaskAttemptId taskAttempt = mock(TaskAttemptId.class);
+      when(assignedReqs.get(any(ContainerId.class))).thenReturn(taskAttempt);
+      return assignedReqs;
+    }
+  }
 
   @Test
   public void testAvoidAskMoreReducersWhenReducerPreemptionIsRequired()


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to