YARN-4606. CapacityScheduler: applications could get starved because 
computation of #activeUsers considers pending apps. Contributed by Manikandan R

(cherry picked from commit 9485c9aee6e9bb935c3e6ae4da81d70b621781de)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/830ef12a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/830ef12a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/830ef12a

Branch: refs/remotes/origin/branch-3.1
Commit: 830ef12af830de5d54e51d2b1d16c56f5eb78e43
Parents: d2212c2
Author: Eric E Payne <er...@oath.com>
Authored: Wed Jul 25 16:22:04 2018 +0000
Committer: Eric E Payne <er...@oath.com>
Committed: Wed Jul 25 16:30:30 2018 +0000

----------------------------------------------------------------------
 .../scheduler/capacity/UsersManager.java        |  27 +++-
 .../capacity/TestCapacityScheduler.java         | 128 +++++++++++++++++++
 .../capacity/TestContainerAllocation.java       |  43 +++++++
 3 files changed, 197 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/830ef12a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
index 747a488..83ee6c0 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
@@ -85,6 +85,7 @@ public class UsersManager implements AbstractUsersManager {
 
   private final QueueMetrics metrics;
   private AtomicInteger activeUsers = new AtomicInteger(0);
+  private AtomicInteger activeUsersWithOnlyPendingApps = new AtomicInteger(0);
   private Map<String, Set<ApplicationId>> usersApplications =
       new HashMap<String, Set<ApplicationId>>();
 
@@ -671,9 +672,23 @@ public class UsersManager implements AbstractUsersManager {
     // update in local storage
     userLimitPerSchedulingMode.put(schedulingMode, computedUserLimit);
 
+    computeNumActiveUsersWithOnlyPendingApps();
+
     return userLimitPerSchedulingMode;
   }
 
+  // This method is called within the lock.
+  private void computeNumActiveUsersWithOnlyPendingApps() {
+    int numPendingUsers = 0;
+    for (User user : users.values()) {
+      if ((user.getPendingApplications() > 0)
+          && (user.getActiveApplications() <= 0)) {
+        numPendingUsers++;
+      }
+    }
+    activeUsersWithOnlyPendingApps = new AtomicInteger(numPendingUsers);
+  }
+
   private Resource computeUserLimit(String userName, Resource clusterResource,
       String nodePartition, SchedulingMode schedulingMode, boolean activeUser) 
{
     Resource partitionResource = labelManager.getResourceByLabel(nodePartition,
@@ -839,6 +854,11 @@ public class UsersManager implements AbstractUsersManager {
     try {
       this.writeLock.lock();
 
+      User userDesc = getUser(user);
+      if (userDesc != null && userDesc.getActiveApplications() <= 0) {
+        return;
+      }
+
       Set<ApplicationId> userApps = usersApplications.get(user);
       if (userApps == null) {
         userApps = new HashSet<ApplicationId>();
@@ -893,7 +913,7 @@ public class UsersManager implements AbstractUsersManager {
 
   @Override
   public int getNumActiveUsers() {
-    return activeUsers.get();
+    return activeUsers.get() + activeUsersWithOnlyPendingApps.get();
   }
 
   float sumActiveUsersTimesWeights() {
@@ -1090,4 +1110,9 @@ public class UsersManager implements AbstractUsersManager 
{
       this.writeLock.unlock();
     }
   }
+
+  @VisibleForTesting
+  public int getNumActiveUsersWithOnlyPendingApps() {
+    return activeUsersWithOnlyPendingApps.get();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/830ef12a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 79cdcfe..8d948b5 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -4978,4 +4978,132 @@ public class TestCapacityScheduler extends 
CapacitySchedulerTestBase {
     Assert.assertEquals(AllocationState.QUEUE_SKIPPED,
         ContainerAllocation.QUEUE_SKIPPED.getAllocationState());
   }
+
+  @Test
+  public void testMoveAppWithActiveUsersWithOnlyPendingApps() throws Exception 
{
+
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+      ResourceScheduler.class);
+
+    CapacitySchedulerConfiguration newConf =
+        new CapacitySchedulerConfiguration(conf);
+
+    // Define top-level queues
+    newConf.setQueues(CapacitySchedulerConfiguration.ROOT,
+        new String[] { "a", "b" });
+
+    newConf.setCapacity(A, 50);
+    newConf.setCapacity(B, 50);
+
+    // Define 2nd-level queues
+    newConf.setQueues(A, new String[] { "a1" });
+    newConf.setCapacity(A1, 100);
+    newConf.setUserLimitFactor(A1, 2.0f);
+    newConf.setMaximumAMResourcePercentPerPartition(A1, "", 0.1f);
+
+    newConf.setQueues(B, new String[] { "b1" });
+    newConf.setCapacity(B1, 100);
+    newConf.setUserLimitFactor(B1, 2.0f);
+
+    LOG.info("Setup top-level queues a and b");
+
+    MockRM rm = new MockRM(newConf);
+    rm.start();
+
+    CapacityScheduler scheduler =
+        (CapacityScheduler) rm.getResourceScheduler();
+
+    MockNM nm1 = rm.registerNode("h1:1234", 16 * GB);
+
+    // submit an app
+    RMApp app = rm.submitApp(GB, "test-move-1", "u1", null, "a1");
+    MockAM am1 = MockRM.launchAndRegisterAM(app, rm, nm1);
+
+    ApplicationAttemptId appAttemptId =
+        rm.getApplicationReport(app.getApplicationId())
+            .getCurrentApplicationAttemptId();
+
+    RMApp app2 = rm.submitApp(1 * GB, "app", "u2", null, "a1");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1);
+
+    RMApp app3 = rm.submitApp(1 * GB, "app", "u3", null, "a1");
+
+    RMApp app4 = rm.submitApp(1 * GB, "app", "u4", null, "a1");
+
+    // Each application asks 50 * 1GB containers
+    am1.allocate("*", 1 * GB, 50, null);
+    am2.allocate("*", 1 * GB, 50, null);
+
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+    // check preconditions
+    List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
+    assertEquals(4, appsInA1.size());
+    String queue =
+        scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue()
+            .getQueueName();
+    Assert.assertEquals("a1", queue);
+
+    List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
+    assertTrue(appsInA.contains(appAttemptId));
+    assertEquals(4, appsInA.size());
+
+    List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
+    assertTrue(appsInRoot.contains(appAttemptId));
+    assertEquals(4, appsInRoot.size());
+
+    List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1");
+    assertTrue(appsInB1.isEmpty());
+
+    List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b");
+    assertTrue(appsInB.isEmpty());
+
+    UsersManager um =
+        (UsersManager) scheduler.getQueue("a1").getAbstractUsersManager();
+
+    assertEquals(4, um.getNumActiveUsers());
+    assertEquals(2, um.getNumActiveUsersWithOnlyPendingApps());
+
+    // now move the app
+    scheduler.moveAllApps("a1", "b1");
+
+    //Triggering this event so that user limit computation can
+    //happen again
+    for (int i = 0; i < 10; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+      Thread.sleep(500);
+    }
+
+    // check postconditions
+    appsInB1 = scheduler.getAppsInQueue("b1");
+
+    assertEquals(4, appsInB1.size());
+    queue =
+        scheduler.getApplicationAttempt(appsInB1.get(0)).getQueue()
+            .getQueueName();
+    Assert.assertEquals("b1", queue);
+
+    appsInB = scheduler.getAppsInQueue("b");
+    assertTrue(appsInB.contains(appAttemptId));
+    assertEquals(4, appsInB.size());
+
+    appsInRoot = scheduler.getAppsInQueue("root");
+    assertTrue(appsInRoot.contains(appAttemptId));
+    assertEquals(4, appsInRoot.size());
+
+    List<ApplicationAttemptId> oldAppsInA1 = scheduler.getAppsInQueue("a1");
+    assertEquals(0, oldAppsInA1.size());
+
+    UsersManager um_b1 =
+        (UsersManager) scheduler.getQueue("b1").getAbstractUsersManager();
+
+    assertEquals(2, um_b1.getNumActiveUsers());
+    assertEquals(2, um_b1.getNumActiveUsersWithOnlyPendingApps());
+
+    appsInB1 = scheduler.getAppsInQueue("b1");
+    assertEquals(4, appsInB1.size());
+    rm.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/830ef12a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
index 25e535a..b9bfc2a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
@@ -941,4 +941,47 @@ public class TestContainerAllocation {
 
     rm1.close();
   }
+
+  @Test
+  public void testActiveUsersWithOnlyPendingApps() throws Exception {
+
+    CapacitySchedulerConfiguration newConf =
+        new CapacitySchedulerConfiguration(conf);
+    newConf.setMaximumAMResourcePercentPerPartition(
+        CapacitySchedulerConfiguration.ROOT + ".default", "", 0.2f);
+    MockRM rm1 = new MockRM(newConf);
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
+
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "u1", null, "default");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    RMApp app2 = rm1.submitApp(1 * GB, "app", "u2", null, "default");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
+
+    RMApp app3 = rm1.submitApp(1 * GB, "app", "u3", null, "default");
+
+    RMApp app4 = rm1.submitApp(1 * GB, "app", "u4", null, "default");
+
+    // Each application asks 50 * 1GB containers
+    am1.allocate("*", 1 * GB, 50, null);
+    am2.allocate("*", 1 * GB, 50, null);
+
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+    for (int i = 0; i < 10; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+      Thread.sleep(1000);
+    }
+    LeafQueue lq = (LeafQueue) cs.getQueue("default");
+    UsersManager um = (UsersManager) lq.getAbstractUsersManager();
+
+    Assert.assertEquals(4, um.getNumActiveUsers());
+    Assert.assertEquals(2, um.getNumActiveUsersWithOnlyPendingApps());
+    Assert.assertEquals(2, lq.getMetrics().getAppsPending());
+    rm1.close();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to