This is an automated email from the ASF dual-hosted git repository.

jhung pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 041fe5f  YARN-9894. CapacitySchedulerPerf test for measuring hundreds 
of apps in a large number of queues. Contributed by Eric Payne
041fe5f is described below

commit 041fe5fb570651a20aed054f7625a5c78eceb089
Author: Jonathan Hung <jh...@linkedin.com>
AuthorDate: Wed Dec 18 13:18:11 2019 -0800

    YARN-9894. CapacitySchedulerPerf test for measuring hundreds of apps in a 
large number of queues. Contributed by Eric Payne
    
    (cherry picked from commit 7b93575b92e8bad889c1ef15e0baaade6de6de4d)
    (cherry picked from commit 0707d0a0ae36456f3467cbb408c3a9a0073c70f7)
    (cherry picked from commit 750fb4c3212e7c197f418ea2df711be4467ee27a)
---
 .../capacity/TestCapacitySchedulerPerf.java        | 176 ++++++++++++++++-----
 1 file changed, 136 insertions(+), 40 deletions(-)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPerf.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPerf.java
index 0837fd7..c37ce69 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPerf.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPerf.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
@@ -60,6 +61,9 @@ import java.util.Map;
 import java.util.PriorityQueue;
 
 import static 
org.apache.hadoop.yarn.util.resource.TestResourceUtils.TEST_CONF_RESET_RESOURCE_TYPES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -70,9 +74,22 @@ public class TestCapacitySchedulerPerf {
     return "resource-" + idx;
   }
 
+  // This test is run only when when -DRunCapacitySchedulerPerfTests=true is 
set
+  // on the command line. In addition, this test has tunables for the 
following:
+  //   Number of queues: -DNumberOfQueues (default=100)
+  //   Number of total apps: -DNumberOfApplications (default=200)
+  //   Percentage of queues with apps: -DPercentActiveQueues (default=100)
+  // E.G.:
+  // mvn test -Dtest=TestCapacitySchedulerPerf -Dsurefire.fork.timeout=1800 \
+  //    -DRunCapacitySchedulerPerfTests=true -DNumberOfQueues=50 \
+  //    -DNumberOfApplications=200 -DPercentActiveQueues=100
+  // Note that the surefire.fork.timeout flag is added because these tests 
could
+  // take longer than the surefire timeout.
   private void testUserLimitThroughputWithNumberOfResourceTypes(
-      int numOfResourceTypes)
+      int numOfResourceTypes, int numQueues, int pctActiveQueues, int appCount)
       throws Exception {
+    Assume.assumeTrue(Boolean.valueOf(
+        System.getProperty("RunCapacitySchedulerPerfTests")));
     if (numOfResourceTypes > 2) {
       // Initialize resource map
       Map<String, ResourceInformation> riMap = new HashMap<>();
@@ -91,22 +108,16 @@ public class TestCapacitySchedulerPerf {
       ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
     }
 
-    // Since this is more of a performance unit test, only run if
-    // RunUserLimitThroughput is set (-DRunUserLimitThroughput=true)
-    Assume.assumeTrue(Boolean.valueOf(
-        System.getProperty("RunCapacitySchedulerPerfTests")));
+    final int activeQueues = (int) (numQueues * (pctActiveQueues/100f));
+    final int totalApps = appCount + activeQueues;
+    // extra apps to get started with user limit
 
     CapacitySchedulerConfiguration csconf =
-        new CapacitySchedulerConfiguration();
-    csconf.setMaximumApplicationMasterResourcePerQueuePercent("root", 100.0f);
-    csconf.setMaximumAMResourcePercentPerPartition("root", "", 100.0f);
-    csconf.setMaximumApplicationMasterResourcePerQueuePercent("root.default",
-        100.0f);
-    csconf.setMaximumAMResourcePercentPerPartition("root.default", "", 100.0f);
-    csconf.setResourceComparator(DominantResourceCalculator.class);
+        createCSConfWithManyQueues(numQueues);
 
     YarnConfiguration conf = new YarnConfiguration(csconf);
-    // Don't reset resource types since we have already configured resource 
types
+    // Don't reset resource types since we have already configured resource
+    // types
     conf.setBoolean(TEST_CONF_RESET_RESOURCE_TYPES, false);
     conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
         ResourceScheduler.class);
@@ -115,11 +126,16 @@ public class TestCapacitySchedulerPerf {
     rm.start();
 
     CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
-    LeafQueue qb = (LeafQueue)cs.getQueue("default");
 
-    // For now make user limit large so we can activate all applications
-    qb.setUserLimitFactor((float)100.0);
-    qb.setupConfigurableCapacities();
+    LeafQueue[] lqs = new LeafQueue[numQueues];
+    for (int i = 0; i < numQueues; i++) {
+      String queueName = String.format("%03d", i);
+      LeafQueue qb = (LeafQueue)cs.getQueue(queueName);
+      // For now make user limit large so we can activate all applications
+      qb.setUserLimitFactor((float)100.0);
+      qb.setupConfigurableCapacities();
+      lqs[i] = qb;
+    }
 
     SchedulerEvent addAppEvent;
     SchedulerEvent addAttemptEvent;
@@ -127,13 +143,12 @@ public class TestCapacitySchedulerPerf {
     ApplicationSubmissionContext submissionContext =
         mock(ApplicationSubmissionContext.class);
 
-    final int appCount = 100;
-    ApplicationId[] appids = new ApplicationId[appCount];
-    RMAppAttemptImpl[] attempts = new RMAppAttemptImpl[appCount];
-    ApplicationAttemptId[] appAttemptIds = new ApplicationAttemptId[appCount];
-    RMAppImpl[] apps = new RMAppImpl[appCount];
-    RMAppAttemptMetrics[] attemptMetrics = new RMAppAttemptMetrics[appCount];
-    for (int i=0; i<appCount; i++) {
+    ApplicationId[] appids = new ApplicationId[totalApps];
+    RMAppAttemptImpl[] attempts = new RMAppAttemptImpl[totalApps];
+    ApplicationAttemptId[] appAttemptIds = new ApplicationAttemptId[totalApps];
+    RMAppImpl[] apps = new RMAppImpl[totalApps];
+    RMAppAttemptMetrics[] attemptMetrics = new RMAppAttemptMetrics[totalApps];
+    for (int i=0; i<totalApps; i++) {
       appids[i] = BuilderUtils.newApplicationId(100, i);
       appAttemptIds[i] =
           BuilderUtils.newApplicationAttemptId(appids[i], 1);
@@ -150,34 +165,34 @@ public class TestCapacitySchedulerPerf {
       when(apps[i].getCurrentAppAttempt()).thenReturn(attempts[i]);
 
       rm.getRMContext().getRMApps().put(appids[i], apps[i]);
+      String queueName = lqs[i % activeQueues].getQueueName();
       addAppEvent =
-          new AppAddedSchedulerEvent(appids[i], "default", "user1");
+          new AppAddedSchedulerEvent(appids[i], queueName, "user1");
       cs.handle(addAppEvent);
       addAttemptEvent =
           new AppAttemptAddedSchedulerEvent(appAttemptIds[i], false);
       cs.handle(addAttemptEvent);
     }
 
-    // add nodes  to cluster, so cluster has 20GB and 20 vcores
-    Resource nodeResource = Resource.newInstance(10 * GB, 10);
+    // add nodes to cluster with enough resources to satisfy all apps
+    Resource newResource = Resource.newInstance(totalApps * GB, totalApps);
     if (numOfResourceTypes > 2) {
       for (int i = 2; i < numOfResourceTypes; i++) {
-        nodeResource.setResourceValue(getResourceName(i), 10);
+        newResource.setResourceValue(getResourceName(i), totalApps);
       }
     }
-
-    RMNode node = MockNodes.newNodeInfo(0, nodeResource, 1, "127.0.0.1");
+    RMNode node = MockNodes.newNodeInfo(0, newResource, 1, "127.0.0.1");
     cs.handle(new NodeAddedSchedulerEvent(node));
 
-    RMNode node2 = MockNodes.newNodeInfo(0, nodeResource, 1, "127.0.0.2");
+    RMNode node2 = MockNodes.newNodeInfo(0, newResource, 1, "127.0.0.2");
     cs.handle(new NodeAddedSchedulerEvent(node2));
 
     Priority u0Priority = TestUtils.createMockPriority(1);
     RecordFactory recordFactory =
         RecordFactoryProvider.getRecordFactory(null);
 
-    FiCaSchedulerApp[] fiCaApps = new FiCaSchedulerApp[appCount];
-    for (int i=0;i<appCount;i++) {
+    FiCaSchedulerApp[] fiCaApps = new FiCaSchedulerApp[totalApps];
+    for (int i=0;i<totalApps;i++) {
       fiCaApps[i] =
           cs.getSchedulerApplications().get(apps[i].getApplicationId())
               .getCurrentAppAttempt();
@@ -195,8 +210,30 @@ public class TestCapacitySchedulerPerf {
       fiCaApps[i].updateResourceRequests(
           Collections.singletonList(resourceRequest));
     }
-    // Now force everything to be over user limit
-    qb.setUserLimitFactor((float)0.0);
+    // Now force everything to be at user limit
+    for (int i = 0; i < numQueues; i++) {
+      lqs[i].setUserLimitFactor((float)0.0);
+    }
+
+    // allocate one container for each extra apps since
+    //  LeafQueue.canAssignToUser() checks for used > limit, not used >= limit
+    cs.handle(new NodeUpdateSchedulerEvent(node));
+    cs.handle(new NodeUpdateSchedulerEvent(node2));
+
+    // make sure only the extra apps have allocated containers
+    for (int i=0;i<totalApps;i++) {
+      boolean pending = fiCaApps[i].getAppSchedulingInfo().isPending();
+      if (i < activeQueues) {
+        assertFalse(pending);
+        assertEquals(0,
+            fiCaApps[i].getTotalPendingRequestsPerPartition().size());
+      } else {
+        assertTrue(pending);
+        assertEquals(1*GB,
+            fiCaApps[i].getTotalPendingRequestsPerPartition()
+                .get(RMNodeLabelsManager.NO_LABEL).getMemorySize());
+      }
+    }
 
     // Quiet the loggers while measuring throughput
     for (Enumeration<?> loggers = LogManager.getCurrentLoggers();
@@ -239,27 +276,86 @@ public class TestCapacitySchedulerPerf {
     }
     System.out.println(
         "#ResourceTypes = " + numOfResourceTypes + ". Avg of fastest " + 
entries
-            + ": " + numerator / (timespent / entries));
+            + ": " + numerator / (timespent / entries) + " ops/sec of "
+            + appCount + " apps on " + pctActiveQueues + "% of " + numQueues
+            + " queues.");
+
+    // make sure only the extra apps have allocated containers
+    for (int i=0;i<totalApps;i++) {
+      boolean pending = fiCaApps[i].getAppSchedulingInfo().isPending();
+      if (i < activeQueues) {
+        assertFalse(pending);
+        assertEquals(0,
+            fiCaApps[i].getTotalPendingRequestsPerPartition().size());
+      } else {
+        assertTrue(pending);
+        assertEquals(1*GB,
+            fiCaApps[i].getTotalPendingRequestsPerPartition()
+                .get(RMNodeLabelsManager.NO_LABEL).getMemorySize());
+      }
+    }
+
+    rm.close();
     rm.stop();
   }
 
   @Test(timeout = 300000)
   public void testUserLimitThroughputForTwoResources() throws Exception {
-    testUserLimitThroughputWithNumberOfResourceTypes(2);
+    testUserLimitThroughputWithNumberOfResourceTypes(2, 1, 100, 100);
   }
 
   @Test(timeout = 300000)
   public void testUserLimitThroughputForThreeResources() throws Exception {
-    testUserLimitThroughputWithNumberOfResourceTypes(3);
+    testUserLimitThroughputWithNumberOfResourceTypes(3, 1, 100, 100);
   }
 
   @Test(timeout = 300000)
   public void testUserLimitThroughputForFourResources() throws Exception {
-    testUserLimitThroughputWithNumberOfResourceTypes(4);
+    testUserLimitThroughputWithNumberOfResourceTypes(4, 1, 100, 100);
   }
 
   @Test(timeout = 300000)
   public void testUserLimitThroughputForFiveResources() throws Exception {
-    testUserLimitThroughputWithNumberOfResourceTypes(5);
+    testUserLimitThroughputWithNumberOfResourceTypes(5, 1, 100, 100);
+  }
+
+  @Test(timeout = 1800000)
+  public void testUserLimitThroughputWithManyQueues() throws Exception {
+
+    int numQueues = Integer.getInteger("NumberOfQueues", 40);
+    int pctActiveQueues = Integer.getInteger("PercentActiveQueues", 100);
+    int appCount = Integer.getInteger("NumberOfApplications", 100);
+
+    testUserLimitThroughputWithNumberOfResourceTypes(
+        2, numQueues, pctActiveQueues, appCount);
+  }
+
+  CapacitySchedulerConfiguration createCSConfWithManyQueues(int numQueues)
+      throws Exception {
+    CapacitySchedulerConfiguration csconf =
+        new CapacitySchedulerConfiguration();
+    csconf.setResourceComparator(DominantResourceCalculator.class);
+    csconf.setMaximumApplicationMasterResourcePerQueuePercent("root", 100.0f);
+    csconf.setMaximumAMResourcePercentPerPartition("root", "", 100.0f);
+    csconf.setCapacity("root.default", 0.0f);
+    csconf.setOffSwitchPerHeartbeatLimit(numQueues);
+
+    float capacity = 100.0f / numQueues;
+    String[] subQueues = new String[numQueues];
+    for (int i = 0; i < numQueues; i++) {
+      String queueName = String.format("%03d", i);
+      String queuePath = "root." + queueName;
+      subQueues[i] = queueName;
+      csconf.setMaximumApplicationMasterResourcePerQueuePercent(
+          queuePath, 100.0f);
+      csconf.setMaximumAMResourcePercentPerPartition(queuePath, "", 100.0f);
+      csconf.setCapacity(queuePath, capacity);
+      csconf.setUserLimitFactor(queuePath, 100.0f);
+      csconf.setMaximumCapacity(queuePath, 100.0f);
+    }
+
+    csconf.setQueues("root", subQueues);
+
+    return csconf;
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to