YARN-4606. CapacityScheduler: applications could get starved because computation of #activeUsers considers pending apps. Contributed by Manikandan R
(cherry picked from commit 9485c9aee6e9bb935c3e6ae4da81d70b621781de) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/830ef12a Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/830ef12a Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/830ef12a Branch: refs/remotes/origin/branch-3.1 Commit: 830ef12af830de5d54e51d2b1d16c56f5eb78e43 Parents: d2212c2 Author: Eric E Payne <er...@oath.com> Authored: Wed Jul 25 16:22:04 2018 +0000 Committer: Eric E Payne <er...@oath.com> Committed: Wed Jul 25 16:30:30 2018 +0000 ---------------------------------------------------------------------- .../scheduler/capacity/UsersManager.java | 27 +++- .../capacity/TestCapacityScheduler.java | 128 +++++++++++++++++++ .../capacity/TestContainerAllocation.java | 43 +++++++ 3 files changed, 197 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/830ef12a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java index 747a488..83ee6c0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java @@ -85,6 +85,7 @@ public class UsersManager implements AbstractUsersManager { private final QueueMetrics metrics; private AtomicInteger activeUsers = new AtomicInteger(0); + private AtomicInteger activeUsersWithOnlyPendingApps = new AtomicInteger(0); private Map<String, Set<ApplicationId>> usersApplications = new HashMap<String, Set<ApplicationId>>(); @@ -671,9 +672,23 @@ public class UsersManager implements AbstractUsersManager { // update in local storage userLimitPerSchedulingMode.put(schedulingMode, computedUserLimit); + computeNumActiveUsersWithOnlyPendingApps(); + return userLimitPerSchedulingMode; } + // This method is called within the lock. + private void computeNumActiveUsersWithOnlyPendingApps() { + int numPendingUsers = 0; + for (User user : users.values()) { + if ((user.getPendingApplications() > 0) + && (user.getActiveApplications() <= 0)) { + numPendingUsers++; + } + } + activeUsersWithOnlyPendingApps = new AtomicInteger(numPendingUsers); + } + private Resource computeUserLimit(String userName, Resource clusterResource, String nodePartition, SchedulingMode schedulingMode, boolean activeUser) { Resource partitionResource = labelManager.getResourceByLabel(nodePartition, @@ -839,6 +854,11 @@ public class UsersManager implements AbstractUsersManager { try { this.writeLock.lock(); + User userDesc = getUser(user); + if (userDesc != null && userDesc.getActiveApplications() <= 0) { + return; + } + Set<ApplicationId> userApps = usersApplications.get(user); if (userApps == null) { userApps = new HashSet<ApplicationId>(); @@ -893,7 +913,7 @@ public class UsersManager implements AbstractUsersManager { @Override public int getNumActiveUsers() { - return activeUsers.get(); + return activeUsers.get() + activeUsersWithOnlyPendingApps.get(); } float sumActiveUsersTimesWeights() { @@ -1090,4 +1110,9 @@ public class UsersManager implements AbstractUsersManager { this.writeLock.unlock(); } } + + @VisibleForTesting + public int getNumActiveUsersWithOnlyPendingApps() { + return activeUsersWithOnlyPendingApps.get(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/830ef12a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 79cdcfe..8d948b5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -4978,4 +4978,132 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { Assert.assertEquals(AllocationState.QUEUE_SKIPPED, ContainerAllocation.QUEUE_SKIPPED.getAllocationState()); } + + @Test + public void testMoveAppWithActiveUsersWithOnlyPendingApps() throws Exception { + + YarnConfiguration conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + + CapacitySchedulerConfiguration newConf = + new CapacitySchedulerConfiguration(conf); + + // Define top-level queues + newConf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] { "a", "b" }); + + newConf.setCapacity(A, 50); + newConf.setCapacity(B, 50); + + // Define 2nd-level queues + newConf.setQueues(A, new String[] { "a1" }); + newConf.setCapacity(A1, 100); + newConf.setUserLimitFactor(A1, 2.0f); + newConf.setMaximumAMResourcePercentPerPartition(A1, "", 0.1f); + + newConf.setQueues(B, new String[] { "b1" }); + newConf.setCapacity(B1, 100); + newConf.setUserLimitFactor(B1, 2.0f); + + LOG.info("Setup top-level queues a and b"); + + MockRM rm = new MockRM(newConf); + rm.start(); + + CapacityScheduler scheduler = + (CapacityScheduler) rm.getResourceScheduler(); + + MockNM nm1 = rm.registerNode("h1:1234", 16 * GB); + + // submit an app + RMApp app = rm.submitApp(GB, "test-move-1", "u1", null, "a1"); + MockAM am1 = MockRM.launchAndRegisterAM(app, rm, nm1); + + ApplicationAttemptId appAttemptId = + rm.getApplicationReport(app.getApplicationId()) + .getCurrentApplicationAttemptId(); + + RMApp app2 = rm.submitApp(1 * GB, "app", "u2", null, "a1"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1); + + RMApp app3 = rm.submitApp(1 * GB, "app", "u3", null, "a1"); + + RMApp app4 = rm.submitApp(1 * GB, "app", "u4", null, "a1"); + + // Each application asks 50 * 1GB containers + am1.allocate("*", 1 * GB, 50, null); + am2.allocate("*", 1 * GB, 50, null); + + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); + + // check preconditions + List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(4, appsInA1.size()); + String queue = + scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue() + .getQueueName(); + Assert.assertEquals("a1", queue); + + List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(4, appsInA.size()); + + List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(4, appsInRoot.size()); + + List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1"); + assertTrue(appsInB1.isEmpty()); + + List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.isEmpty()); + + UsersManager um = + (UsersManager) scheduler.getQueue("a1").getAbstractUsersManager(); + + assertEquals(4, um.getNumActiveUsers()); + assertEquals(2, um.getNumActiveUsersWithOnlyPendingApps()); + + // now move the app + scheduler.moveAllApps("a1", "b1"); + + //Triggering this event so that user limit computation can + //happen again + for (int i = 0; i < 10; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + Thread.sleep(500); + } + + // check postconditions + appsInB1 = scheduler.getAppsInQueue("b1"); + + assertEquals(4, appsInB1.size()); + queue = + scheduler.getApplicationAttempt(appsInB1.get(0)).getQueue() + .getQueueName(); + Assert.assertEquals("b1", queue); + + appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.contains(appAttemptId)); + assertEquals(4, appsInB.size()); + + appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(4, appsInRoot.size()); + + List<ApplicationAttemptId> oldAppsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(0, oldAppsInA1.size()); + + UsersManager um_b1 = + (UsersManager) scheduler.getQueue("b1").getAbstractUsersManager(); + + assertEquals(2, um_b1.getNumActiveUsers()); + assertEquals(2, um_b1.getNumActiveUsersWithOnlyPendingApps()); + + appsInB1 = scheduler.getAppsInQueue("b1"); + assertEquals(4, appsInB1.size()); + rm.close(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/830ef12a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java index 25e535a..b9bfc2a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java @@ -941,4 +941,47 @@ public class TestContainerAllocation { rm1.close(); } + + @Test + public void testActiveUsersWithOnlyPendingApps() throws Exception { + + CapacitySchedulerConfiguration newConf = + new CapacitySchedulerConfiguration(conf); + newConf.setMaximumAMResourcePercentPerPartition( + CapacitySchedulerConfiguration.ROOT + ".default", "", 0.2f); + MockRM rm1 = new MockRM(newConf); + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); + + RMApp app1 = rm1.submitApp(1 * GB, "app", "u1", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + RMApp app2 = rm1.submitApp(1 * GB, "app", "u2", null, "default"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1); + + RMApp app3 = rm1.submitApp(1 * GB, "app", "u3", null, "default"); + + RMApp app4 = rm1.submitApp(1 * GB, "app", "u4", null, "default"); + + // Each application asks 50 * 1GB containers + am1.allocate("*", 1 * GB, 50, null); + am2.allocate("*", 1 * GB, 50, null); + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + + for (int i = 0; i < 10; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + Thread.sleep(1000); + } + LeafQueue lq = (LeafQueue) cs.getQueue("default"); + UsersManager um = (UsersManager) lq.getAbstractUsersManager(); + + Assert.assertEquals(4, um.getNumActiveUsers()); + Assert.assertEquals(2, um.getNumActiveUsersWithOnlyPendingApps()); + Assert.assertEquals(2, lq.getMetrics().getAppsPending()); + rm1.close(); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org