This is an automated email from the ASF dual-hosted git repository. jhung pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-2.10 by this push: new 041fe5f YARN-9894. CapacitySchedulerPerf test for measuring hundreds of apps in a large number of queues. Contributed by Eric Payne 041fe5f is described below commit 041fe5fb570651a20aed054f7625a5c78eceb089 Author: Jonathan Hung <jh...@linkedin.com> AuthorDate: Wed Dec 18 13:18:11 2019 -0800 YARN-9894. CapacitySchedulerPerf test for measuring hundreds of apps in a large number of queues. Contributed by Eric Payne (cherry picked from commit 7b93575b92e8bad889c1ef15e0baaade6de6de4d) (cherry picked from commit 0707d0a0ae36456f3467cbb408c3a9a0073c70f7) (cherry picked from commit 750fb4c3212e7c197f418ea2df711be4467ee27a) --- .../capacity/TestCapacitySchedulerPerf.java | 176 ++++++++++++++++----- 1 file changed, 136 insertions(+), 40 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPerf.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPerf.java index 0837fd7..c37ce69 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPerf.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPerf.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics; @@ -60,6 +61,9 @@ import java.util.Map; import java.util.PriorityQueue; import static org.apache.hadoop.yarn.util.resource.TestResourceUtils.TEST_CONF_RESET_RESOURCE_TYPES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -70,9 +74,22 @@ public class TestCapacitySchedulerPerf { return "resource-" + idx; } + // This test is run only when when -DRunCapacitySchedulerPerfTests=true is set + // on the command line. In addition, this test has tunables for the following: + // Number of queues: -DNumberOfQueues (default=100) + // Number of total apps: -DNumberOfApplications (default=200) + // Percentage of queues with apps: -DPercentActiveQueues (default=100) + // E.G.: + // mvn test -Dtest=TestCapacitySchedulerPerf -Dsurefire.fork.timeout=1800 \ + // -DRunCapacitySchedulerPerfTests=true -DNumberOfQueues=50 \ + // -DNumberOfApplications=200 -DPercentActiveQueues=100 + // Note that the surefire.fork.timeout flag is added because these tests could + // take longer than the surefire timeout. private void testUserLimitThroughputWithNumberOfResourceTypes( - int numOfResourceTypes) + int numOfResourceTypes, int numQueues, int pctActiveQueues, int appCount) throws Exception { + Assume.assumeTrue(Boolean.valueOf( + System.getProperty("RunCapacitySchedulerPerfTests"))); if (numOfResourceTypes > 2) { // Initialize resource map Map<String, ResourceInformation> riMap = new HashMap<>(); @@ -91,22 +108,16 @@ public class TestCapacitySchedulerPerf { ResourceUtils.initializeResourcesFromResourceInformationMap(riMap); } - // Since this is more of a performance unit test, only run if - // RunUserLimitThroughput is set (-DRunUserLimitThroughput=true) - Assume.assumeTrue(Boolean.valueOf( - System.getProperty("RunCapacitySchedulerPerfTests"))); + final int activeQueues = (int) (numQueues * (pctActiveQueues/100f)); + final int totalApps = appCount + activeQueues; + // extra apps to get started with user limit CapacitySchedulerConfiguration csconf = - new CapacitySchedulerConfiguration(); - csconf.setMaximumApplicationMasterResourcePerQueuePercent("root", 100.0f); - csconf.setMaximumAMResourcePercentPerPartition("root", "", 100.0f); - csconf.setMaximumApplicationMasterResourcePerQueuePercent("root.default", - 100.0f); - csconf.setMaximumAMResourcePercentPerPartition("root.default", "", 100.0f); - csconf.setResourceComparator(DominantResourceCalculator.class); + createCSConfWithManyQueues(numQueues); YarnConfiguration conf = new YarnConfiguration(csconf); - // Don't reset resource types since we have already configured resource types + // Don't reset resource types since we have already configured resource + // types conf.setBoolean(TEST_CONF_RESET_RESOURCE_TYPES, false); conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class); @@ -115,11 +126,16 @@ public class TestCapacitySchedulerPerf { rm.start(); CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); - LeafQueue qb = (LeafQueue)cs.getQueue("default"); - // For now make user limit large so we can activate all applications - qb.setUserLimitFactor((float)100.0); - qb.setupConfigurableCapacities(); + LeafQueue[] lqs = new LeafQueue[numQueues]; + for (int i = 0; i < numQueues; i++) { + String queueName = String.format("%03d", i); + LeafQueue qb = (LeafQueue)cs.getQueue(queueName); + // For now make user limit large so we can activate all applications + qb.setUserLimitFactor((float)100.0); + qb.setupConfigurableCapacities(); + lqs[i] = qb; + } SchedulerEvent addAppEvent; SchedulerEvent addAttemptEvent; @@ -127,13 +143,12 @@ public class TestCapacitySchedulerPerf { ApplicationSubmissionContext submissionContext = mock(ApplicationSubmissionContext.class); - final int appCount = 100; - ApplicationId[] appids = new ApplicationId[appCount]; - RMAppAttemptImpl[] attempts = new RMAppAttemptImpl[appCount]; - ApplicationAttemptId[] appAttemptIds = new ApplicationAttemptId[appCount]; - RMAppImpl[] apps = new RMAppImpl[appCount]; - RMAppAttemptMetrics[] attemptMetrics = new RMAppAttemptMetrics[appCount]; - for (int i=0; i<appCount; i++) { + ApplicationId[] appids = new ApplicationId[totalApps]; + RMAppAttemptImpl[] attempts = new RMAppAttemptImpl[totalApps]; + ApplicationAttemptId[] appAttemptIds = new ApplicationAttemptId[totalApps]; + RMAppImpl[] apps = new RMAppImpl[totalApps]; + RMAppAttemptMetrics[] attemptMetrics = new RMAppAttemptMetrics[totalApps]; + for (int i=0; i<totalApps; i++) { appids[i] = BuilderUtils.newApplicationId(100, i); appAttemptIds[i] = BuilderUtils.newApplicationAttemptId(appids[i], 1); @@ -150,34 +165,34 @@ public class TestCapacitySchedulerPerf { when(apps[i].getCurrentAppAttempt()).thenReturn(attempts[i]); rm.getRMContext().getRMApps().put(appids[i], apps[i]); + String queueName = lqs[i % activeQueues].getQueueName(); addAppEvent = - new AppAddedSchedulerEvent(appids[i], "default", "user1"); + new AppAddedSchedulerEvent(appids[i], queueName, "user1"); cs.handle(addAppEvent); addAttemptEvent = new AppAttemptAddedSchedulerEvent(appAttemptIds[i], false); cs.handle(addAttemptEvent); } - // add nodes to cluster, so cluster has 20GB and 20 vcores - Resource nodeResource = Resource.newInstance(10 * GB, 10); + // add nodes to cluster with enough resources to satisfy all apps + Resource newResource = Resource.newInstance(totalApps * GB, totalApps); if (numOfResourceTypes > 2) { for (int i = 2; i < numOfResourceTypes; i++) { - nodeResource.setResourceValue(getResourceName(i), 10); + newResource.setResourceValue(getResourceName(i), totalApps); } } - - RMNode node = MockNodes.newNodeInfo(0, nodeResource, 1, "127.0.0.1"); + RMNode node = MockNodes.newNodeInfo(0, newResource, 1, "127.0.0.1"); cs.handle(new NodeAddedSchedulerEvent(node)); - RMNode node2 = MockNodes.newNodeInfo(0, nodeResource, 1, "127.0.0.2"); + RMNode node2 = MockNodes.newNodeInfo(0, newResource, 1, "127.0.0.2"); cs.handle(new NodeAddedSchedulerEvent(node2)); Priority u0Priority = TestUtils.createMockPriority(1); RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - FiCaSchedulerApp[] fiCaApps = new FiCaSchedulerApp[appCount]; - for (int i=0;i<appCount;i++) { + FiCaSchedulerApp[] fiCaApps = new FiCaSchedulerApp[totalApps]; + for (int i=0;i<totalApps;i++) { fiCaApps[i] = cs.getSchedulerApplications().get(apps[i].getApplicationId()) .getCurrentAppAttempt(); @@ -195,8 +210,30 @@ public class TestCapacitySchedulerPerf { fiCaApps[i].updateResourceRequests( Collections.singletonList(resourceRequest)); } - // Now force everything to be over user limit - qb.setUserLimitFactor((float)0.0); + // Now force everything to be at user limit + for (int i = 0; i < numQueues; i++) { + lqs[i].setUserLimitFactor((float)0.0); + } + + // allocate one container for each extra apps since + // LeafQueue.canAssignToUser() checks for used > limit, not used >= limit + cs.handle(new NodeUpdateSchedulerEvent(node)); + cs.handle(new NodeUpdateSchedulerEvent(node2)); + + // make sure only the extra apps have allocated containers + for (int i=0;i<totalApps;i++) { + boolean pending = fiCaApps[i].getAppSchedulingInfo().isPending(); + if (i < activeQueues) { + assertFalse(pending); + assertEquals(0, + fiCaApps[i].getTotalPendingRequestsPerPartition().size()); + } else { + assertTrue(pending); + assertEquals(1*GB, + fiCaApps[i].getTotalPendingRequestsPerPartition() + .get(RMNodeLabelsManager.NO_LABEL).getMemorySize()); + } + } // Quiet the loggers while measuring throughput for (Enumeration<?> loggers = LogManager.getCurrentLoggers(); @@ -239,27 +276,86 @@ public class TestCapacitySchedulerPerf { } System.out.println( "#ResourceTypes = " + numOfResourceTypes + ". Avg of fastest " + entries - + ": " + numerator / (timespent / entries)); + + ": " + numerator / (timespent / entries) + " ops/sec of " + + appCount + " apps on " + pctActiveQueues + "% of " + numQueues + + " queues."); + + // make sure only the extra apps have allocated containers + for (int i=0;i<totalApps;i++) { + boolean pending = fiCaApps[i].getAppSchedulingInfo().isPending(); + if (i < activeQueues) { + assertFalse(pending); + assertEquals(0, + fiCaApps[i].getTotalPendingRequestsPerPartition().size()); + } else { + assertTrue(pending); + assertEquals(1*GB, + fiCaApps[i].getTotalPendingRequestsPerPartition() + .get(RMNodeLabelsManager.NO_LABEL).getMemorySize()); + } + } + + rm.close(); rm.stop(); } @Test(timeout = 300000) public void testUserLimitThroughputForTwoResources() throws Exception { - testUserLimitThroughputWithNumberOfResourceTypes(2); + testUserLimitThroughputWithNumberOfResourceTypes(2, 1, 100, 100); } @Test(timeout = 300000) public void testUserLimitThroughputForThreeResources() throws Exception { - testUserLimitThroughputWithNumberOfResourceTypes(3); + testUserLimitThroughputWithNumberOfResourceTypes(3, 1, 100, 100); } @Test(timeout = 300000) public void testUserLimitThroughputForFourResources() throws Exception { - testUserLimitThroughputWithNumberOfResourceTypes(4); + testUserLimitThroughputWithNumberOfResourceTypes(4, 1, 100, 100); } @Test(timeout = 300000) public void testUserLimitThroughputForFiveResources() throws Exception { - testUserLimitThroughputWithNumberOfResourceTypes(5); + testUserLimitThroughputWithNumberOfResourceTypes(5, 1, 100, 100); + } + + @Test(timeout = 1800000) + public void testUserLimitThroughputWithManyQueues() throws Exception { + + int numQueues = Integer.getInteger("NumberOfQueues", 40); + int pctActiveQueues = Integer.getInteger("PercentActiveQueues", 100); + int appCount = Integer.getInteger("NumberOfApplications", 100); + + testUserLimitThroughputWithNumberOfResourceTypes( + 2, numQueues, pctActiveQueues, appCount); + } + + CapacitySchedulerConfiguration createCSConfWithManyQueues(int numQueues) + throws Exception { + CapacitySchedulerConfiguration csconf = + new CapacitySchedulerConfiguration(); + csconf.setResourceComparator(DominantResourceCalculator.class); + csconf.setMaximumApplicationMasterResourcePerQueuePercent("root", 100.0f); + csconf.setMaximumAMResourcePercentPerPartition("root", "", 100.0f); + csconf.setCapacity("root.default", 0.0f); + csconf.setOffSwitchPerHeartbeatLimit(numQueues); + + float capacity = 100.0f / numQueues; + String[] subQueues = new String[numQueues]; + for (int i = 0; i < numQueues; i++) { + String queueName = String.format("%03d", i); + String queuePath = "root." + queueName; + subQueues[i] = queueName; + csconf.setMaximumApplicationMasterResourcePerQueuePercent( + queuePath, 100.0f); + csconf.setMaximumAMResourcePercentPerPartition(queuePath, "", 100.0f); + csconf.setCapacity(queuePath, capacity); + csconf.setUserLimitFactor(queuePath, 100.0f); + csconf.setMaximumCapacity(queuePath, 100.0f); + } + + csconf.setQueues("root", subQueues); + + return csconf; } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org