Repository: hadoop Updated Branches: refs/heads/branch-2.7 187bb77dd -> 7d24d4751
MAPREDUCE-6697. Concurrent task limits should only be applied when necessary. Contributed by Nathan Roberts. (cherry picked from commit a5c0476a990ec1e7eb34ce2462a45aa52cc1350d) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7d24d475 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7d24d475 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7d24d475 Branch: refs/heads/branch-2.7 Commit: 7d24d4751e6becdb35377c3e836b1e95e663688f Parents: 187bb77 Author: Akira Ajisaka <aajis...@apache.org> Authored: Wed Jun 28 10:50:09 2017 +0900 Committer: Konstantin V Shvachko <s...@apache.org> Committed: Thu Jul 13 14:20:17 2017 -0700 ---------------------------------------------------------------------- hadoop-mapreduce-project/CHANGES.txt | 3 + .../v2/app/rm/RMContainerAllocator.java | 6 +- .../v2/app/rm/TestRMContainerAllocator.java | 73 ++++++++++++++++++-- 3 files changed, 76 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/7d24d475/hadoop-mapreduce-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 68c5310..7230590 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -41,6 +41,9 @@ Release 2.7.4 - UNRELEASED MAPREDUCE-6433. launchTime may be negative. (zxu) backported by Chris Douglas. + MAPREDUCE-6697. Concurrent task limits should only be applied when + necessary. (Nathan Roberts via shv). + Release 2.7.3 - 2016-08-25 INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/7d24d475/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 0df58b7..5426dc1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -818,7 +818,8 @@ public class RMContainerAllocator extends RMContainerRequestor private void applyConcurrentTaskLimits() { int numScheduledMaps = scheduledRequests.maps.size(); - if (maxRunningMaps > 0 && numScheduledMaps > 0) { + if (maxRunningMaps > 0 && numScheduledMaps > 0 && + getJob().getTotalMaps() > maxRunningMaps) { int maxRequestedMaps = Math.max(0, maxRunningMaps - assignedRequests.maps.size()); int numScheduledFailMaps = scheduledRequests.earlierFailedMaps.size(); @@ -833,7 +834,8 @@ public class RMContainerAllocator extends RMContainerRequestor } int numScheduledReduces = scheduledRequests.reduces.size(); - if (maxRunningReduces > 0 && numScheduledReduces > 0) { + if (maxRunningReduces > 0 && numScheduledReduces > 0 && + getJob().getTotalReduces() > maxRunningReduces) { int maxRequestedReduces = Math.max(0, maxRunningReduces - assignedRequests.reduces.size()); int reduceRequestLimit = Math.min(maxRequestedReduces, http://git-wip-us.apache.org/repos/asf/hadoop/blob/7d24d475/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index da1fbfb..4275fcc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -2618,14 +2618,77 @@ public class TestRMContainerAllocator { } @Test + public void testConcurrentTaskLimitsDisabledIfSmaller() throws Exception { + final int MAP_COUNT = 1; + final int REDUCE_COUNT = 1; + final int MAP_LIMIT = 1; + final int REDUCE_LIMIT = 1; + Configuration conf = new Configuration(); + conf.setInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT, MAP_LIMIT); + conf.setInt(MRJobConfig.JOB_RUNNING_REDUCE_LIMIT, REDUCE_LIMIT); + conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.0f); + ApplicationId appId = ApplicationId.newInstance(1, 1); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); + Job mockJob = mock(Job.class); + when(mockJob.getReport()).thenReturn( + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, + 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); + when(mockJob.getTotalMaps()).thenReturn(MAP_COUNT); + when(mockJob.getTotalReduces()).thenReturn(REDUCE_COUNT); + + final MockScheduler mockScheduler = new MockScheduler(appAttemptId); + MyContainerAllocator allocator = + new MyContainerAllocator(null, conf, appAttemptId, mockJob, + new SystemClock()) { + @Override + protected void register() { + } + + @Override + protected ApplicationMasterProtocol createSchedulerProxy() { + return mockScheduler; + } + + @Override + protected void setRequestLimit(Priority priority, + Resource capability, int limit) { + Assert.fail("setRequestLimit() should not be invoked"); + } + }; + + // create some map requests + ContainerRequestEvent[] reqMapEvents = new ContainerRequestEvent[MAP_COUNT]; + for (int i = 0; i < reqMapEvents.length; ++i) { + reqMapEvents[i] = createReq(jobId, i, 1024, new String[] { "h" + i }); + } + allocator.sendRequests(Arrays.asList(reqMapEvents)); + // create some reduce requests + ContainerRequestEvent[] reqReduceEvents = + new ContainerRequestEvent[REDUCE_COUNT]; + for (int i = 0; i < reqReduceEvents.length; ++i) { + reqReduceEvents[i] = + createReq(jobId, i, 1024, new String[] {}, false, true); + } + allocator.sendRequests(Arrays.asList(reqReduceEvents)); + allocator.schedule(); + allocator.schedule(); + allocator.schedule(); + allocator.close(); + } + + @Test public void testConcurrentTaskLimits() throws Exception { + final int MAP_COUNT = 5; + final int REDUCE_COUNT = 2; final int MAP_LIMIT = 3; final int REDUCE_LIMIT = 1; LOG.info("Running testConcurrentTaskLimits"); Configuration conf = new Configuration(); conf.setInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT, MAP_LIMIT); conf.setInt(MRJobConfig.JOB_RUNNING_REDUCE_LIMIT, REDUCE_LIMIT); - conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 1.0f); + conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.0f); ApplicationId appId = ApplicationId.newInstance(1, 1); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( appId, 1); @@ -2634,6 +2697,9 @@ public class TestRMContainerAllocator { when(mockJob.getReport()).thenReturn( MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); + when(mockJob.getTotalMaps()).thenReturn(MAP_COUNT); + when(mockJob.getTotalReduces()).thenReturn(REDUCE_COUNT); + final MockScheduler mockScheduler = new MockScheduler(appAttemptId); MyContainerAllocator allocator = new MyContainerAllocator(null, conf, appAttemptId, mockJob, new SystemClock()) { @@ -2648,14 +2714,13 @@ public class TestRMContainerAllocator { }; // create some map requests - ContainerRequestEvent[] reqMapEvents = new ContainerRequestEvent[5]; + ContainerRequestEvent[] reqMapEvents = new ContainerRequestEvent[MAP_COUNT]; for (int i = 0; i < reqMapEvents.length; ++i) { reqMapEvents[i] = createReq(jobId, i, 1024, new String[] { "h" + i }); } allocator.sendRequests(Arrays.asList(reqMapEvents)); - // create some reduce requests - ContainerRequestEvent[] reqReduceEvents = new ContainerRequestEvent[2]; + ContainerRequestEvent[] reqReduceEvents = new ContainerRequestEvent[REDUCE_COUNT]; for (int i = 0; i < reqReduceEvents.length; ++i) { reqReduceEvents[i] = createReq(jobId, i, 1024, new String[] {}, false, true); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org