YARN-4484. Available Resource calculation for a queue is not correct when used 
with labels. (Sunil G via wangda)

(cherry picked from commit 24db9167f16ba643a186624b33a6b9b80020f476)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e34e1aa4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e34e1aa4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e34e1aa4

Branch: refs/heads/YARN-5355-branch-2
Commit: e34e1aa4fe0a0826439227175fc3321f840dddd4
Parents: 51a3131
Author: Wangda Tan <wan...@apache.org>
Authored: Fri Jul 15 11:40:12 2016 -0700
Committer: Wangda Tan <wan...@apache.org>
Committed: Fri Jul 15 11:41:35 2016 -0700

----------------------------------------------------------------------
 .../scheduler/capacity/CSQueueUtils.java        |  55 ++---
 .../TestNodeLabelContainerAllocation.java       | 212 +++++++++++++++++++
 2 files changed, 242 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e34e1aa4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
index 0166d83..d5cdb32 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
@@ -227,24 +227,34 @@ class CSQueueUtils {
         .setAbsoluteReservedCapacity(nodePartition, absoluteReservedCapacity);
   }
   
-  private static Resource getNonPartitionedMaxAvailableResourceToQueue(
-      final ResourceCalculator rc, Resource totalNonPartitionedResource,
-      CSQueue queue) {
-    Resource queueLimit = Resources.none();
-    Resource usedResources = queue.getUsedResources();
+  private static Resource getMaxAvailableResourceToQueue(
+      final ResourceCalculator rc, RMNodeLabelsManager nlm, CSQueue queue,
+      Resource cluster) {
+    Set<String> nodeLabels = queue.getNodeLabelsForQueue();
+    Resource totalAvailableResource = Resources.createResource(0, 0);
 
-    if (Resources.greaterThan(rc, totalNonPartitionedResource,
-        totalNonPartitionedResource, Resources.none())) {
-      queueLimit =
-          Resources.multiply(totalNonPartitionedResource,
-              queue.getAbsoluteCapacity());
-    }
+    for (String partition : nodeLabels) {
+      // Calculate guaranteed resource for a label in a queue by below logic.
+      // (total label resource) * (absolute capacity of label in that queue)
+      Resource queueGuranteedResource = Resources.multiply(nlm
+          .getResourceByLabel(partition, cluster), queue.getQueueCapacities()
+          .getAbsoluteCapacity(partition));
 
-    Resource available = Resources.subtract(queueLimit, usedResources);
-    return Resources.max(rc, totalNonPartitionedResource, available,
-        Resources.none());
+      // Available resource in queue for a specific label will be calculated as
+      // {(guaranteed resource for a label in a queue) -
+      // (resource usage of that label in the queue)}
+      // Finally accumulate this available resource to get total.
+      Resource available = (Resources.greaterThan(rc, cluster,
+          queueGuranteedResource,
+          queue.getQueueResourceUsage().getUsed(partition))) ? Resources
+          .componentwiseMax(Resources.subtractFrom(queueGuranteedResource,
+              queue.getQueueResourceUsage().getUsed(partition)), Resources
+              .none()) : Resources.none();
+      Resources.addTo(totalAvailableResource, available);
+    }
+    return totalAvailableResource;
   }
-  
+
   /**
    * <p>
    * Update Queue Statistics:
@@ -277,15 +287,10 @@ class CSQueueUtils {
       updateUsedCapacity(rc, nlm.getResourceByLabel(nodePartition, cluster),
           minimumAllocation, queueResourceUsage, queueCapacities, 
nodePartition);
     }
-    
-    // Now in QueueMetrics, we only store available-resource-to-queue for
-    // default partition.
-    if (nodePartition == null
-        || nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
-      childQueue.getMetrics().setAvailableResourcesToQueue(
-          getNonPartitionedMaxAvailableResourceToQueue(rc,
-              nlm.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, cluster),
-              childQueue));
-    }
+
+    // Update queue metrics w.r.t node labels. In a generic way, we can
+    // calculate available resource from all labels in cluster.
+    childQueue.getMetrics().setAvailableResourcesToQueue(
+        getMaxAvailableResourceToQueue(rc, nlm, childQueue, cluster));
    }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e34e1aa4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
index cff1514..47fd534 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
+import static org.junit.Assert.assertEquals;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -47,6 +49,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@@ -1863,4 +1866,213 @@ public class TestNodeLabelContainerAllocation {
     checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(),
         cs.getApplicationAttempt(am2.getApplicationAttemptId()));
   }
+
+  @Test
+  public void testQueueMetricsWithLabels() throws Exception {
+    /**
+     * Test case: have a following queue structure:
+     *
+     * <pre>
+     *            root
+     *         /      \
+     *        a        b
+     *        (x)     (x)
+     * </pre>
+     *
+     * a/b can access x, both of them has max-capacity-on-x = 50
+     *
+     * When doing non-exclusive allocation, app in a (or b) can use 100% of x
+     * resource.
+     */
+
+    CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(
+        this.conf);
+
+    // Define top-level queues
+    csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
+        new String[] { "a", "b" });
+    csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
+
+    final String queueA = CapacitySchedulerConfiguration.ROOT + ".a";
+    csConf.setCapacity(queueA, 25);
+    csConf.setAccessibleNodeLabels(queueA, toSet("x"));
+    csConf.setCapacityByLabel(queueA, "x", 50);
+    csConf.setMaximumCapacityByLabel(queueA, "x", 50);
+    final String queueB = CapacitySchedulerConfiguration.ROOT + ".b";
+    csConf.setCapacity(queueB, 75);
+    csConf.setAccessibleNodeLabels(queueB, toSet("x"));
+    csConf.setCapacityByLabel(queueB, "x", 50);
+    csConf.setMaximumCapacityByLabel(queueB, "x", 50);
+
+    // set node -> label
+    mgr.addToCluserNodeLabels(
+        ImmutableSet.of(NodeLabel.newInstance("x", false)));
+    mgr.addToCluserNodeLabels(
+        ImmutableSet.of(NodeLabel.newInstance("y", false)));
+    mgr.addLabelsToNode(
+        ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+    mgr.addLabelsToNode(
+        ImmutableMap.of(NodeId.newInstance("h2", 0), toSet("y")));
+
+    // inject node label manager
+    MockRM rm1 = new MockRM(csConf) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
+    MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = y
+    // app1 -> a
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a", "x");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    // app1 asks for 5 partition=x containers
+    am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>(), "x");
+    // NM1 do 50 heartbeats
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+    SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
+
+    for (int i = 0; i < 50; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    }
+
+    // app1 gets all resource in partition=x
+    Assert.assertEquals(5, schedulerNode1.getNumContainers());
+
+    SchedulerNodeReport reportNm1 = rm1.getResourceScheduler()
+        .getNodeReport(nm1.getNodeId());
+    Assert.assertEquals(5 * GB, reportNm1.getUsedResource().getMemorySize());
+    Assert.assertEquals(5 * GB,
+        reportNm1.getAvailableResource().getMemorySize());
+
+    SchedulerNodeReport reportNm2 = rm1.getResourceScheduler()
+        .getNodeReport(nm2.getNodeId());
+    Assert.assertEquals(0 * GB, reportNm2.getUsedResource().getMemorySize());
+    Assert.assertEquals(10 * GB,
+        reportNm2.getAvailableResource().getMemorySize());
+
+    LeafQueue leafQueue = (LeafQueue) cs.getQueue("a");
+    assertEquals(0 * GB, leafQueue.getMetrics().getAvailableMB());
+    assertEquals(5 * GB, leafQueue.getMetrics().getAllocatedMB());
+    rm1.close();
+  }
+
+  @Test
+  public void testQueueMetricsWithLabelsOnDefaultLabelNode() throws Exception {
+    /**
+     * Test case: have a following queue structure:
+     *
+     * <pre>
+     *            root
+     *         /      \
+     *        a        b
+     *        (x)     (x)
+     * </pre>
+     *
+     * a/b can access x, both of them has max-capacity-on-x = 50
+     *
+     * When doing non-exclusive allocation, app in a (or b) can use 100% of x
+     * resource.
+     */
+
+    CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(
+        this.conf);
+
+    // Define top-level queues
+    csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
+        new String[] { "a", "b" });
+    csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
+
+    final String queueA = CapacitySchedulerConfiguration.ROOT + ".a";
+    csConf.setCapacity(queueA, 25);
+    csConf.setAccessibleNodeLabels(queueA, toSet("x"));
+    csConf.setCapacityByLabel(queueA, "x", 50);
+    csConf.setMaximumCapacityByLabel(queueA, "x", 50);
+    final String queueB = CapacitySchedulerConfiguration.ROOT + ".b";
+    csConf.setCapacity(queueB, 75);
+    csConf.setAccessibleNodeLabels(queueB, toSet("x"));
+    csConf.setCapacityByLabel(queueB, "x", 50);
+    csConf.setMaximumCapacityByLabel(queueB, "x", 50);
+
+    // set node -> label
+    mgr.addToCluserNodeLabels(
+        ImmutableSet.of(NodeLabel.newInstance("x", false)));
+    mgr.addLabelsToNode(
+        ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+
+    // inject node label manager
+    MockRM rm1 = new MockRM(csConf) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
+    MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = <no_label>
+    // app1 -> a
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
+
+    // app1 asks for 3 partition= containers
+    am1.allocate("*", 1 * GB, 3, new ArrayList<ContainerId>());
+
+    // NM1 do 50 heartbeats
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+    SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
+    for (int i = 0; i < 50; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    }
+
+    // app1 gets all resource in partition=x (non-exclusive)
+    Assert.assertEquals(3, schedulerNode1.getNumContainers());
+
+    SchedulerNodeReport reportNm1 = rm1.getResourceScheduler()
+        .getNodeReport(nm1.getNodeId());
+    Assert.assertEquals(3 * GB, reportNm1.getUsedResource().getMemorySize());
+    Assert.assertEquals(7 * GB,
+        reportNm1.getAvailableResource().getMemorySize());
+
+    SchedulerNodeReport reportNm2 = rm1.getResourceScheduler()
+        .getNodeReport(nm2.getNodeId());
+    Assert.assertEquals(1 * GB, reportNm2.getUsedResource().getMemorySize());
+    Assert.assertEquals(9 * GB,
+        reportNm2.getAvailableResource().getMemorySize());
+
+    LeafQueue leafQueue = (LeafQueue) cs.getQueue("a");
+    double delta = 0.0001;
+    // 3GB is used from label x quota. 1.5 GB is remaining from default label.
+    // 2GB is remaining from label x.
+    assertEquals(3.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta);
+    assertEquals(4 * GB, leafQueue.getMetrics().getAllocatedMB());
+
+    // app1 asks for 1 default partition container
+    am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
+
+    // NM2 do couple of heartbeats
+    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+    SchedulerNode schedulerNode2 = cs.getSchedulerNode(nm2.getNodeId());
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+    // app1 gets all resource in default partition
+    Assert.assertEquals(2, schedulerNode2.getNumContainers());
+
+    // 3GB is used from label x quota. 2GB used from default label.
+    // So total 2.5 GB is remaining.
+    assertEquals(2.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta);
+    assertEquals(5 * GB, leafQueue.getMetrics().getAllocatedMB());
+
+    rm1.close();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to