Repository: hadoop
Updated Branches:
  refs/heads/branch-2 4ecbe2e9b -> d1aa3e0f8


MAPREDUCE-6689. MapReduce job can infinitely increase number of reducer 
resource requests. Contributed by Wangda Tan
(cherry picked from commit c9bb96fa81fc925e33ccc0b02c98cc2d929df120)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d1aa3e0f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d1aa3e0f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d1aa3e0f

Branch: refs/heads/branch-2
Commit: d1aa3e0f895e2996274c19d2b5e0b1b96c4eff21
Parents: 4ecbe2e
Author: Jason Lowe <jl...@apache.org>
Authored: Fri May 6 22:25:47 2016 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Fri May 6 22:27:24 2016 +0000

----------------------------------------------------------------------
 .../v2/app/rm/RMContainerAllocator.java         |  36 +++---
 .../v2/app/rm/TestRMContainerAllocator.java     | 127 +++++++++++++++++++
 2 files changed, 147 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1aa3e0f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
index b82cd91..1d63fb6 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
@@ -275,14 +275,18 @@ public class RMContainerAllocator extends 
RMContainerRequestor
     }
 
     if (recalculateReduceSchedule) {
-      preemptReducesIfNeeded();
-      scheduleReduces(
-          getJob().getTotalMaps(), completedMaps,
-          scheduledRequests.maps.size(), scheduledRequests.reduces.size(), 
-          assignedRequests.maps.size(), assignedRequests.reduces.size(),
-          mapResourceRequest, reduceResourceRequest,
-          pendingReduces.size(), 
-          maxReduceRampupLimit, reduceSlowStart);
+      boolean reducerPreempted = preemptReducesIfNeeded();
+
+      if (!reducerPreempted) {
+        // Only schedule new reducers if no reducer preemption happens for
+        // this heartbeat
+        scheduleReduces(getJob().getTotalMaps(), completedMaps,
+            scheduledRequests.maps.size(), scheduledRequests.reduces.size(),
+            assignedRequests.maps.size(), assignedRequests.reduces.size(),
+            mapResourceRequest, reduceResourceRequest, pendingReduces.size(),
+            maxReduceRampupLimit, reduceSlowStart);
+      }
+
       recalculateReduceSchedule = false;
     }
 
@@ -465,30 +469,30 @@ public class RMContainerAllocator extends 
RMContainerRequestor
 
   @Private
   @VisibleForTesting
-  void preemptReducesIfNeeded() {
+  boolean preemptReducesIfNeeded() {
     if (reduceResourceRequest.equals(Resources.none())) {
-      return; // no reduces
+      return false; // no reduces
     }
 
     if (assignedRequests.maps.size() > 0) {
       // there are assigned mappers
-      return;
+      return false;
     }
 
     if (scheduledRequests.maps.size() <= 0) {
       // there are no pending requests for mappers
-      return;
+      return false;
     }
+
     // At this point:
     // we have pending mappers and all assigned resources are taken by reducers
-
     if (reducerUnconditionalPreemptionDelayMs >= 0) {
       // Unconditional preemption is enabled.
       // If mappers are pending for longer than the configured threshold,
       // preempt reducers irrespective of what the headroom is.
       if (preemptReducersForHangingMapRequests(
           reducerUnconditionalPreemptionDelayMs)) {
-        return;
+        return true;
       }
     }
 
@@ -498,12 +502,12 @@ public class RMContainerAllocator extends 
RMContainerRequestor
     if 
(ResourceCalculatorUtils.computeAvailableContainers(availableResourceForMap,
         mapResourceRequest, getSchedulerResourceTypes()) > 0) {
       // the available headroom is enough to run a mapper
-      return;
+      return false;
     }
 
     // Available headroom is not enough to run mapper. See if we should hold
     // off before preempting reducers and preempt if okay.
-    preemptReducersForHangingMapRequests(reducerNoHeadroomPreemptionDelayMs);
+    return 
preemptReducersForHangingMapRequests(reducerNoHeadroomPreemptionDelayMs);
   }
 
   private boolean preemptReducersForHangingMapRequests(long pendingThreshold) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1aa3e0f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
index 9802300..bc90190 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
@@ -3011,6 +3011,133 @@ public class TestRMContainerAllocator {
     }
   }
 
+
+  @Test
+  public void testAvoidAskMoreReducersWhenReducerPreemptionIsRequired()
+      throws Exception {
+    LOG.info("Running 
testAvoidAskMoreReducersWhenReducerPreemptionIsRequired");
+    Configuration conf = new Configuration();
+    MyResourceManager rm = new MyResourceManager(conf);
+    rm.start();
+    DrainDispatcher dispatcher =
+        (DrainDispatcher) rm.getRMContext().getDispatcher();
+
+    // Submit the application
+    RMApp app = rm.submitApp(1024);
+    dispatcher.await();
+
+    MockNM amNodeManager = rm.registerNode("amNM:1234", 1260);
+    amNodeManager.nodeHeartbeat(true);
+    dispatcher.await();
+
+    ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+        .getAppAttemptId();
+    rm.sendAMLaunched(appAttemptId);
+    dispatcher.await();
+
+    JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+    Job mockJob = mock(Job.class);
+    when(mockJob.getReport()).thenReturn(
+        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+            0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
+    MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
+        appAttemptId, mockJob);
+    // Use a controlled clock to advance time for test.
+    ControlledClock clock = (ControlledClock)allocator.getContext().getClock();
+    clock.setTime(System.currentTimeMillis());
+
+    // Register nodes to RM.
+    MockNM nodeManager = rm.registerNode("h1:1234", 1024);
+    dispatcher.await();
+
+    // Request 2 maps and 1 reducer(sone on nodes which are not registered).
+    ContainerRequestEvent event1 =
+        createReq(jobId, 1, 1024, new String[] { "h1" });
+    allocator.sendRequest(event1);
+    ContainerRequestEvent event2 =
+        createReq(jobId, 2, 1024, new String[] { "h2" });
+    allocator.sendRequest(event2);
+    ContainerRequestEvent event3 =
+        createReq(jobId, 3, 1024, new String[] { "h2" }, false, true);
+    allocator.sendRequest(event3);
+
+    // This will tell the scheduler about the requests but there will be no
+    // allocations as nodes are not added.
+    allocator.schedule();
+    dispatcher.await();
+
+    // Advance clock so that maps can be considered as hanging.
+    clock.setTime(System.currentTimeMillis() + 500000L);
+
+    // Request for another reducer on h3 which has not registered.
+    ContainerRequestEvent event4 =
+        createReq(jobId, 4, 1024, new String[] { "h3" }, false, true);
+    allocator.sendRequest(event4);
+
+    allocator.schedule();
+    dispatcher.await();
+
+    // Update resources in scheduler through node heartbeat from h1.
+    nodeManager.nodeHeartbeat(true);
+    dispatcher.await();
+
+    rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(1024, 1));
+    allocator.schedule();
+    dispatcher.await();
+
+    // One map is assigned.
+    Assert.assertEquals(1, allocator.getAssignedRequests().maps.size());
+    // Send deallocate request for map so that no maps are assigned after this.
+    ContainerAllocatorEvent deallocate = createDeallocateEvent(jobId, 1, 
false);
+    allocator.sendDeallocate(deallocate);
+    // Now one reducer should be scheduled and one should be pending.
+    Assert.assertEquals(1, allocator.getScheduledRequests().reduces.size());
+    Assert.assertEquals(1, allocator.getNumOfPendingReduces());
+    // No map should be assigned and one should be scheduled.
+    Assert.assertEquals(1, allocator.getScheduledRequests().maps.size());
+    Assert.assertEquals(0, allocator.getAssignedRequests().maps.size());
+
+    Assert.assertEquals(6, allocator.getAsk().size());
+    for (ResourceRequest req : allocator.getAsk()) {
+      boolean isReduce =
+          req.getPriority().equals(RMContainerAllocator.PRIORITY_REDUCE);
+      if (isReduce) {
+        // 1 reducer each asked on h2, * and default-rack
+        Assert.assertTrue((req.getResourceName().equals("*") ||
+            req.getResourceName().equals("/default-rack") ||
+            req.getResourceName().equals("h2")) && req.getNumContainers() == 
1);
+      } else { //map
+        // 0 mappers asked on h1 and 1 each on * and default-rack
+        Assert.assertTrue(((req.getResourceName().equals("*") ||
+            req.getResourceName().equals("/default-rack")) &&
+            req.getNumContainers() == 1) || (req.getResourceName().equals("h1")
+            && req.getNumContainers() == 0));
+      }
+    }
+
+    clock.setTime(System.currentTimeMillis() + 500000L + 10 * 60 * 1000);
+
+    // On next allocate request to scheduler, headroom reported will be 2048.
+    rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(2048, 0));
+    allocator.schedule();
+    dispatcher.await();
+    // After allocate response from scheduler, all scheduled reduces are ramped
+    // down and move to pending. 3 asks are also updated with 0 containers to
+    // indicate ramping down of reduces to scheduler.
+    Assert.assertEquals(0, allocator.getScheduledRequests().reduces.size());
+    Assert.assertEquals(2, allocator.getNumOfPendingReduces());
+    Assert.assertEquals(3, allocator.getAsk().size());
+    for (ResourceRequest req : allocator.getAsk()) {
+      Assert.assertEquals(
+          RMContainerAllocator.PRIORITY_REDUCE, req.getPriority());
+      Assert.assertTrue(req.getResourceName().equals("*") ||
+          req.getResourceName().equals("/default-rack") ||
+          req.getResourceName().equals("h2"));
+      Assert.assertEquals(Resource.newInstance(1024, 1), req.getCapability());
+      Assert.assertEquals(0, req.getNumContainers());
+    }
+  }
+
   private static class MockScheduler implements ApplicationMasterProtocol {
     ApplicationAttemptId attemptId;
     long nextContainerId = 10;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to