Repository: hadoop Updated Branches: refs/heads/trunk 40d222e86 -> 6eaca2e36
YARN-4105. Capacity Scheduler headroom for DRF is wrong. Contributed by Chang Li Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6eaca2e3 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6eaca2e3 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6eaca2e3 Branch: refs/heads/trunk Commit: 6eaca2e3634a88dc55689e8960352d6248c424d9 Parents: 40d222e Author: Jason Lowe <jl...@apache.org> Authored: Fri Sep 4 15:30:53 2015 +0000 Committer: Jason Lowe <jl...@apache.org> Committed: Fri Sep 4 15:30:53 2015 +0000 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/capacity/LeafQueue.java | 2 +- .../capacity/TestCapacityScheduler.java | 112 ++++++++++++++++++- 3 files changed, 115 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/6eaca2e3/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 29eabb5..662106b 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -880,6 +880,9 @@ Release 2.7.2 - UNRELEASED YARN-4103. RM WebServices missing scheme for appattempts logLinks. (Jonathan Eagles via vvasudeb) + YARN-4105. Capacity Scheduler headroom for DRF is wrong (Chang Li via + jlowe) + Release 2.7.1 - 2015-07-06 INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/6eaca2e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 658eae1..b43f658 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -878,7 +878,7 @@ public class LeafQueue extends AbstractCSQueue { * */ Resource headroom = - Resources.min(resourceCalculator, clusterResource, + Resources.componentwiseMin( Resources.subtract(userLimit, user.getUsed()), Resources.subtract(currentResourceLimit, queueUsage.getUsed()) ); http://git-wip-us.apache.org/repos/asf/hadoop/blob/6eaca2e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 279299e..44773be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -2916,7 +2916,117 @@ public class TestCapacityScheduler { rm.stop(); } - + + @Test + public void testHeadRoomCalculationWithDRC() throws Exception { + // test with total cluster resource of 20GB memory and 20 vcores. + // the queue where two apps running has user limit 0.8 + // allocate 10GB memory and 1 vcore to app 1. + // app 1 should have headroom + // 20GB*0.8 - 10GB = 6GB memory available and 15 vcores. + // allocate 1GB memory and 1 vcore to app2. + // app 2 should have headroom 20GB - 10 - 1 = 1GB memory, + // and 20*0.8 - 1 = 15 vcores. + + CapacitySchedulerConfiguration csconf = + new CapacitySchedulerConfiguration(); + csconf.setResourceComparator(DominantResourceCalculator.class); + + YarnConfiguration conf = new YarnConfiguration(csconf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + + MockRM rm = new MockRM(conf); + rm.start(); + + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + LeafQueue qb = (LeafQueue)cs.getQueue("default"); + qb.setUserLimitFactor((float)0.8); + + // add app 1 + ApplicationId appId = BuilderUtils.newApplicationId(100, 1); + ApplicationAttemptId appAttemptId = + BuilderUtils.newApplicationAttemptId(appId, 1); + + RMAppAttemptMetrics attemptMetric = + new RMAppAttemptMetrics(appAttemptId, rm.getRMContext()); + RMAppImpl app = mock(RMAppImpl.class); + when(app.getApplicationId()).thenReturn(appId); + RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class); + when(attempt.getAppAttemptId()).thenReturn(appAttemptId); + when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric); + when(app.getCurrentAppAttempt()).thenReturn(attempt); + + rm.getRMContext().getRMApps().put(appId, app); + + SchedulerEvent addAppEvent = + new AppAddedSchedulerEvent(appId, "default", "user1"); + cs.handle(addAppEvent); + SchedulerEvent addAttemptEvent = + new AppAttemptAddedSchedulerEvent(appAttemptId, false); + cs.handle(addAttemptEvent); + + // add app 2 + ApplicationId appId2 = BuilderUtils.newApplicationId(100, 2); + ApplicationAttemptId appAttemptId2 = + BuilderUtils.newApplicationAttemptId(appId2, 1); + + RMAppAttemptMetrics attemptMetric2 = + new RMAppAttemptMetrics(appAttemptId2, rm.getRMContext()); + RMAppImpl app2 = mock(RMAppImpl.class); + when(app2.getApplicationId()).thenReturn(appId2); + RMAppAttemptImpl attempt2 = mock(RMAppAttemptImpl.class); + when(attempt2.getAppAttemptId()).thenReturn(appAttemptId2); + when(attempt2.getRMAppAttemptMetrics()).thenReturn(attemptMetric2); + when(app2.getCurrentAppAttempt()).thenReturn(attempt2); + + rm.getRMContext().getRMApps().put(appId2, app2); + addAppEvent = + new AppAddedSchedulerEvent(appId2, "default", "user2"); + cs.handle(addAppEvent); + addAttemptEvent = + new AppAttemptAddedSchedulerEvent(appAttemptId2, false); + cs.handle(addAttemptEvent); + + // add nodes to cluster, so cluster have 20GB and 20 vcores + Resource newResource = Resource.newInstance(10 * GB, 10); + RMNode node = MockNodes.newNodeInfo(0, newResource, 1, "127.0.0.1"); + cs.handle(new NodeAddedSchedulerEvent(node)); + + Resource newResource2 = Resource.newInstance(10 * GB, 10); + RMNode node2 = MockNodes.newNodeInfo(0, newResource2, 1, "127.0.0.2"); + cs.handle(new NodeAddedSchedulerEvent(node2)); + + FiCaSchedulerApp fiCaApp1 = + cs.getSchedulerApplications().get(app.getApplicationId()) + .getCurrentAppAttempt(); + + FiCaSchedulerApp fiCaApp2 = + cs.getSchedulerApplications().get(app2.getApplicationId()) + .getCurrentAppAttempt(); + Priority u0Priority = TestUtils.createMockPriority(1); + RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + + // allocate container for app1 with 10GB memory and 1 vcore + fiCaApp1.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 10*GB, 1, true, + u0Priority, recordFactory))); + cs.handle(new NodeUpdateSchedulerEvent(node)); + cs.handle(new NodeUpdateSchedulerEvent(node2)); + assertEquals(6*GB, fiCaApp1.getHeadroom().getMemory()); + assertEquals(15, fiCaApp1.getHeadroom().getVirtualCores()); + + // allocate container for app2 with 1GB memory and 1 vcore + fiCaApp2.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, + u0Priority, recordFactory))); + cs.handle(new NodeUpdateSchedulerEvent(node)); + cs.handle(new NodeUpdateSchedulerEvent(node2)); + assertEquals(9*GB, fiCaApp2.getHeadroom().getMemory()); + assertEquals(15, fiCaApp2.getHeadroom().getVirtualCores()); + } + @Test public void testDefaultNodeLabelExpressionQueueConfig() throws Exception { CapacityScheduler cs = new CapacityScheduler();