YARN-5555. Scheduler UI: "% of Queue" is inaccurate if leaf queue is hierarchically nested. Contributed by Eric Payne.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/05f5c0f6 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/05f5c0f6 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/05f5c0f6 Branch: refs/heads/HADOOP-12756 Commit: 05f5c0f631680cffc36a79550c351620615445db Parents: 0690f09 Author: Varun Vasudev <vvasu...@apache.org> Authored: Fri Sep 2 16:02:01 2016 +0530 Committer: Varun Vasudev <vvasu...@apache.org> Committed: Fri Sep 2 16:02:01 2016 +0530 ---------------------------------------------------------------------- .../scheduler/common/fica/FiCaSchedulerApp.java | 27 ++++++ .../scheduler/capacity/TestLeafQueue.java | 87 ++++++++++++++++++++ 2 files changed, 114 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/05f5c0f6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 33dee80..9c84a23 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -29,6 +29,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -57,6 +58,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider; @@ -617,4 +619,29 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { updateAMContainerDiagnostics(AMState.ACTIVATED, diagnosticMessageBldr.toString()); } } + + /** + * Recalculates the per-app, percent of queue metric, specific to the + * Capacity Scheduler. + */ + @Override + public synchronized ApplicationResourceUsageReport getResourceUsageReport() { + ApplicationResourceUsageReport report = super.getResourceUsageReport(); + Resource cluster = rmContext.getScheduler().getClusterResource(); + Resource totalPartitionRes = + rmContext.getNodeLabelManager() + .getResourceByLabel(getAppAMNodePartitionName(), cluster); + ResourceCalculator calc = rmContext.getScheduler().getResourceCalculator(); + if (!calc.isInvalidDivisor(totalPartitionRes)) { + float queueAbsMaxCapPerPartition = + ((AbstractCSQueue)getQueue()).getQueueCapacities() + .getAbsoluteCapacity(getAppAMNodePartitionName()); + float queueUsagePerc = + calc.divide(totalPartitionRes, report.getUsedResources(), + Resources.multiply(totalPartitionRes, + queueAbsMaxCapPerPartition)) * 100; + report.setQueueUsagePercentage(queueUsagePerc); + } + return report; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/05f5c0f6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index b2c53da..9134889 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -51,6 +51,8 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; +import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -67,6 +69,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; @@ -3312,6 +3315,90 @@ public class TestLeafQueue { return csContext; } + @Test + public void testApplicationQueuePercent() + throws Exception { + Resource res = Resource.newInstance(10 * 1024, 10); + CapacityScheduler scheduler = mock(CapacityScheduler.class); + when(scheduler.getClusterResource()).thenReturn(res); + when(scheduler.getResourceCalculator()) + .thenReturn(new DefaultResourceCalculator()); + + ApplicationAttemptId appAttId = createAppAttemptId(0, 0); + RMContext rmContext = mock(RMContext.class); + when(rmContext.getEpoch()).thenReturn(3L); + when(rmContext.getScheduler()).thenReturn(scheduler); + when(rmContext.getRMApps()) + .thenReturn(new ConcurrentHashMap<ApplicationId, RMApp>()); + RMNodeLabelsManager nlm = mock(RMNodeLabelsManager.class); + when(nlm.getResourceByLabel(any(), any())).thenReturn(res); + when(rmContext.getNodeLabelManager()).thenReturn(nlm); + + // Queue "test" consumes 100% of the cluster, so its capacity and absolute + // capacity are both 1.0f. + Queue queue = createQueue("test", null, 1.0f, 1.0f); + final String user = "user1"; + FiCaSchedulerApp app = + new FiCaSchedulerApp(appAttId, user, queue, + queue.getActiveUsersManager(), rmContext); + + // Resource request + Resource requestedResource = Resource.newInstance(1536, 2); + app.getAppAttemptResourceUsage().incUsed(requestedResource); + // In "test" queue, 1536 used is 15% of both the queue and the cluster + assertEquals(15.0f, app.getResourceUsageReport().getQueueUsagePercentage(), + 0.01f); + assertEquals(15.0f, + app.getResourceUsageReport().getClusterUsagePercentage(), 0.01f); + + // Queue "test2" is a child of root and its capacity is 50% of root. As a + // child of root, its absolute capaicty is also 50%. + queue = createQueue("test2", null, 0.5f, 0.5f); + app = new FiCaSchedulerApp(appAttId, user, queue, + queue.getActiveUsersManager(), rmContext); + app.getAppAttemptResourceUsage().incUsed(requestedResource); + // In "test2" queue, 1536 used is 30% of "test2" and 15% of the cluster. + assertEquals(30.0f, app.getResourceUsageReport().getQueueUsagePercentage(), + 0.01f); + assertEquals(15.0f, + app.getResourceUsageReport().getClusterUsagePercentage(), 0.01f); + + // Queue "test2.1" is 50% of queue "test2", which is 50% of the cluster. + // Therefore, "test2.1" capacity is 50% and absolute capacity is 25%. + AbstractCSQueue qChild = createQueue("test2.1", queue, 0.5f, 0.25f); + app = new FiCaSchedulerApp(appAttId, user, qChild, + qChild.getActiveUsersManager(), rmContext); + app.getAppAttemptResourceUsage().incUsed(requestedResource); + // In "test2.1" queue, 1536 used is 60% of "test2.1" and 15% of the cluster. + assertEquals(60.0f, app.getResourceUsageReport().getQueueUsagePercentage(), + 0.01f); + assertEquals(15.0f, + app.getResourceUsageReport().getClusterUsagePercentage(), 0.01f); + } + + private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) { + ApplicationId appIdImpl = ApplicationId.newInstance(0, appId); + ApplicationAttemptId attId = + ApplicationAttemptId.newInstance(appIdImpl, attemptId); + return attId; + } + + private AbstractCSQueue createQueue(String name, Queue parent, float capacity, + float absCap) { + CSQueueMetrics metrics = CSQueueMetrics.forQueue(name, parent, false, cs.getConf()); + QueueInfo queueInfo = QueueInfo.newInstance(name, capacity, 1.0f, 0, null, + null, QueueState.RUNNING, null, "", null, false); + ActiveUsersManager activeUsersManager = new ActiveUsersManager(metrics); + AbstractCSQueue queue = mock(AbstractCSQueue.class); + when(queue.getMetrics()).thenReturn(metrics); + when(queue.getActiveUsersManager()).thenReturn(activeUsersManager); + when(queue.getQueueInfo(false, false)).thenReturn(queueInfo); + QueueCapacities qCaps = mock(QueueCapacities.class); + when(qCaps.getAbsoluteCapacity(any())).thenReturn(absCap); + when(queue.getQueueCapacities()).thenReturn(qCaps); + return queue; + } + @After public void tearDown() throws Exception { if (cs != null) { --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org