Repository: hadoop
Updated Branches:
  refs/heads/branch-2.7 187bb77dd -> 7d24d4751


MAPREDUCE-6697. Concurrent task limits should only be applied when necessary. 
Contributed by Nathan Roberts.

(cherry picked from commit a5c0476a990ec1e7eb34ce2462a45aa52cc1350d)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7d24d475
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7d24d475
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7d24d475

Branch: refs/heads/branch-2.7
Commit: 7d24d4751e6becdb35377c3e836b1e95e663688f
Parents: 187bb77
Author: Akira Ajisaka <aajis...@apache.org>
Authored: Wed Jun 28 10:50:09 2017 +0900
Committer: Konstantin V Shvachko <s...@apache.org>
Committed: Thu Jul 13 14:20:17 2017 -0700

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |  3 +
 .../v2/app/rm/RMContainerAllocator.java         |  6 +-
 .../v2/app/rm/TestRMContainerAllocator.java     | 73 ++++++++++++++++++--
 3 files changed, 76 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7d24d475/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt 
b/hadoop-mapreduce-project/CHANGES.txt
index 68c5310..7230590 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -41,6 +41,9 @@ Release 2.7.4 - UNRELEASED
     MAPREDUCE-6433. launchTime may be negative.
     (zxu) backported by Chris Douglas.
 
+    MAPREDUCE-6697. Concurrent task limits should only be applied when
+    necessary. (Nathan Roberts via shv).
+
 Release 2.7.3 - 2016-08-25
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7d24d475/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
index 0df58b7..5426dc1 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
@@ -818,7 +818,8 @@ public class RMContainerAllocator extends 
RMContainerRequestor
 
   private void applyConcurrentTaskLimits() {
     int numScheduledMaps = scheduledRequests.maps.size();
-    if (maxRunningMaps > 0 && numScheduledMaps > 0) {
+    if (maxRunningMaps > 0 && numScheduledMaps > 0 &&
+        getJob().getTotalMaps() > maxRunningMaps) {
       int maxRequestedMaps = Math.max(0,
           maxRunningMaps - assignedRequests.maps.size());
       int numScheduledFailMaps = scheduledRequests.earlierFailedMaps.size();
@@ -833,7 +834,8 @@ public class RMContainerAllocator extends 
RMContainerRequestor
     }
 
     int numScheduledReduces = scheduledRequests.reduces.size();
-    if (maxRunningReduces > 0 && numScheduledReduces > 0) {
+    if (maxRunningReduces > 0 && numScheduledReduces > 0 &&
+        getJob().getTotalReduces() > maxRunningReduces) {
       int maxRequestedReduces = Math.max(0,
           maxRunningReduces - assignedRequests.reduces.size());
       int reduceRequestLimit = Math.min(maxRequestedReduces,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7d24d475/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
index da1fbfb..4275fcc 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
@@ -2618,14 +2618,77 @@ public class TestRMContainerAllocator {
   }
 
   @Test
+  public void testConcurrentTaskLimitsDisabledIfSmaller() throws Exception {
+    final int MAP_COUNT = 1;
+    final int REDUCE_COUNT = 1;
+    final int MAP_LIMIT = 1;
+    final int REDUCE_LIMIT = 1;
+    Configuration conf = new Configuration();
+    conf.setInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT, MAP_LIMIT);
+    conf.setInt(MRJobConfig.JOB_RUNNING_REDUCE_LIMIT, REDUCE_LIMIT);
+    conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.0f);
+    ApplicationId appId = ApplicationId.newInstance(1, 1);
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 1);
+    JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+    Job mockJob = mock(Job.class);
+    when(mockJob.getReport()).thenReturn(
+        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+            0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
+    when(mockJob.getTotalMaps()).thenReturn(MAP_COUNT);
+    when(mockJob.getTotalReduces()).thenReturn(REDUCE_COUNT);
+
+    final MockScheduler mockScheduler = new MockScheduler(appAttemptId);
+    MyContainerAllocator allocator =
+        new MyContainerAllocator(null, conf, appAttemptId, mockJob,
+            new SystemClock()) {
+          @Override
+          protected void register() {
+          }
+
+          @Override
+          protected ApplicationMasterProtocol createSchedulerProxy() {
+            return mockScheduler;
+          }
+
+          @Override
+          protected void setRequestLimit(Priority priority,
+              Resource capability, int limit) {
+            Assert.fail("setRequestLimit() should not be invoked");
+          }
+        };
+
+    // create some map requests
+    ContainerRequestEvent[] reqMapEvents = new 
ContainerRequestEvent[MAP_COUNT];
+    for (int i = 0; i < reqMapEvents.length; ++i) {
+      reqMapEvents[i] = createReq(jobId, i, 1024, new String[] { "h" + i });
+    }
+    allocator.sendRequests(Arrays.asList(reqMapEvents));
+    // create some reduce requests
+    ContainerRequestEvent[] reqReduceEvents =
+        new ContainerRequestEvent[REDUCE_COUNT];
+    for (int i = 0; i < reqReduceEvents.length; ++i) {
+      reqReduceEvents[i] =
+          createReq(jobId, i, 1024, new String[] {}, false, true);
+    }
+    allocator.sendRequests(Arrays.asList(reqReduceEvents));
+    allocator.schedule();
+    allocator.schedule();
+    allocator.schedule();
+    allocator.close();
+  }
+
+  @Test
   public void testConcurrentTaskLimits() throws Exception {
+    final int MAP_COUNT = 5;
+    final int REDUCE_COUNT = 2;
     final int MAP_LIMIT = 3;
     final int REDUCE_LIMIT = 1;
     LOG.info("Running testConcurrentTaskLimits");
     Configuration conf = new Configuration();
     conf.setInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT, MAP_LIMIT);
     conf.setInt(MRJobConfig.JOB_RUNNING_REDUCE_LIMIT, REDUCE_LIMIT);
-    conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 1.0f);
+    conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.0f);
     ApplicationId appId = ApplicationId.newInstance(1, 1);
     ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
         appId, 1);
@@ -2634,6 +2697,9 @@ public class TestRMContainerAllocator {
     when(mockJob.getReport()).thenReturn(
         MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
             0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
+    when(mockJob.getTotalMaps()).thenReturn(MAP_COUNT);
+    when(mockJob.getTotalReduces()).thenReturn(REDUCE_COUNT);
+
     final MockScheduler mockScheduler = new MockScheduler(appAttemptId);
     MyContainerAllocator allocator = new MyContainerAllocator(null, conf,
         appAttemptId, mockJob, new SystemClock()) {
@@ -2648,14 +2714,13 @@ public class TestRMContainerAllocator {
     };
 
     // create some map requests
-    ContainerRequestEvent[] reqMapEvents = new ContainerRequestEvent[5];
+    ContainerRequestEvent[] reqMapEvents = new 
ContainerRequestEvent[MAP_COUNT];
     for (int i = 0; i < reqMapEvents.length; ++i) {
       reqMapEvents[i] = createReq(jobId, i, 1024, new String[] { "h" + i });
     }
     allocator.sendRequests(Arrays.asList(reqMapEvents));
-
     // create some reduce requests
-    ContainerRequestEvent[] reqReduceEvents = new ContainerRequestEvent[2];
+    ContainerRequestEvent[] reqReduceEvents = new 
ContainerRequestEvent[REDUCE_COUNT];
     for (int i = 0; i < reqReduceEvents.length; ++i) {
       reqReduceEvents[i] = createReq(jobId, i, 1024, new String[] {},
           false, true);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to