YARN-4280. CapacityScheduler reservations may not prevent indefinite 
postponement on a busy cluster. Contributed by Kuhu Shukla


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4d92aefd
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4d92aefd
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4d92aefd

Branch: refs/heads/YARN-2915
Commit: 4d92aefd35d4517d9435d81bafdec0d77905a7a0
Parents: 580a833
Author: Jason Lowe <jl...@apache.org>
Authored: Wed Aug 3 18:53:14 2016 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Wed Aug 3 18:53:14 2016 +0000

----------------------------------------------------------------------
 .../scheduler/capacity/CSAssignment.java        |  31 +-
 .../scheduler/capacity/LeafQueue.java           |   7 +-
 .../scheduler/capacity/ParentQueue.java         |  43 ++-
 .../allocator/AbstractContainerAllocator.java   |  11 +-
 .../allocator/IncreaseContainerAllocator.java   |  10 +-
 .../capacity/TestCapacityScheduler.java         | 288 ++++++++++++++++++-
 6 files changed, 362 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d92aefd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java
index 6406efe..7bea9af 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java
@@ -34,36 +34,47 @@ public class CSAssignment {
   public static final CSAssignment NULL_ASSIGNMENT =
       new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
 
-  public static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true);
+  public static final CSAssignment SKIP_ASSIGNMENT =
+      new CSAssignment(SkippedType.OTHER);
 
   private Resource resource;
   private NodeType type;
   private RMContainer excessReservation;
   private FiCaSchedulerApp application;
-  private final boolean skipped;
+  private SkippedType skipped;
+
+  /**
+   * Reason for the queue to get skipped.
+   */
+  public enum SkippedType {
+    NONE,
+    QUEUE_LIMIT,
+    OTHER
+  }
+
   private boolean fulfilledReservation;
   private final AssignmentInformation assignmentInformation;
   private boolean increaseAllocation;
   private List<RMContainer> containersToKill;
 
   public CSAssignment(Resource resource, NodeType type) {
-    this(resource, type, null, null, false, false);
+    this(resource, type, null, null, SkippedType.NONE, false);
   }
 
   public CSAssignment(FiCaSchedulerApp application,
       RMContainer excessReservation) {
     this(excessReservation.getContainer().getResource(), NodeType.NODE_LOCAL,
-      excessReservation, application, false, false);
+      excessReservation, application, SkippedType.NONE, false);
   }
 
-  public CSAssignment(boolean skipped) {
+  public CSAssignment(SkippedType skipped) {
     this(Resource.newInstance(0, 0), NodeType.NODE_LOCAL, null, null, skipped,
       false);
   }
 
   public CSAssignment(Resource resource, NodeType type,
       RMContainer excessReservation, FiCaSchedulerApp application,
-      boolean skipped, boolean fulfilledReservation) {
+      SkippedType skipped, boolean fulfilledReservation) {
     this.resource = resource;
     this.type = type;
     this.excessReservation = excessReservation;
@@ -105,10 +116,14 @@ public class CSAssignment {
     excessReservation = rmContainer;
   }
 
-  public boolean getSkipped() {
+  public SkippedType getSkippedType() {
     return skipped;
   }
-  
+
+  public void setSkippedType(SkippedType skippedType) {
+    this.skipped = skippedType;
+  }
+
   @Override
   public String toString() {
     String ret = "resource:" + resource.toString();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d92aefd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 9aae909..a243e93 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -60,6 +60,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
+
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
@@ -972,8 +973,12 @@ public class LeafQueue extends AbstractCSQueue {
 
         // Done
         return assignment;
-      } else if (assignment.getSkipped()) {
+      } else if (assignment.getSkippedType()
+          == CSAssignment.SkippedType.OTHER) {
         application.updateNodeInfoForAMDiagnostics(node);
+      } else if(assignment.getSkippedType()
+          == CSAssignment.SkippedType.QUEUE_LIMIT) {
+        return assignment;
       } else {
         // If we don't allocate anything, and it is not skipped by application,
         // we will return to respect FIFO of applications

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d92aefd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 6fcd6c1..9ae35ee 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.nodelabels.RMNodeLabel;
 import org.apache.hadoop.yarn.security.AccessType;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -474,6 +473,7 @@ public class ParentQueue extends AbstractCSQueue {
             " cluster=" + clusterResource);
 
       } else {
+        assignment.setSkippedType(assignedToChild.getSkippedType());
         break;
       }
 
@@ -511,14 +511,14 @@ public class ParentQueue extends AbstractCSQueue {
   }
 
   private ResourceLimits getResourceLimitsOfChild(CSQueue child,
-      Resource clusterResource, ResourceLimits parentLimits,
+      Resource clusterResource, Resource parentLimits,
       String nodePartition) {
     // Set resource-limit of a given child, child.limit =
     // min(my.limit - my.used + child.used, child.max)
 
     // Parent available resource = parent-limit - parent-used-resource
     Resource parentMaxAvailableResource = Resources.subtract(
-        parentLimits.getLimit(), queueUsage.getUsed(nodePartition));
+        parentLimits, queueUsage.getUsed(nodePartition));
     // Deduct killable from used
     Resources.addTo(parentMaxAvailableResource,
         getTotalKillableResource(nodePartition));
@@ -568,9 +568,9 @@ public class ParentQueue extends AbstractCSQueue {
   private synchronized CSAssignment assignContainersToChildQueues(
       Resource cluster, FiCaSchedulerNode node, ResourceLimits limits,
       SchedulingMode schedulingMode) {
-    CSAssignment assignment = 
-        new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
-    
+    CSAssignment assignment = CSAssignment.NULL_ASSIGNMENT;
+
+    Resource parentLimits = limits.getLimit();
     printChildQueues();
 
     // Try to assign to most 'under-served' sub-queue
@@ -584,20 +584,21 @@ public class ParentQueue extends AbstractCSQueue {
 
       // Get ResourceLimits of child queue before assign containers
       ResourceLimits childLimits =
-          getResourceLimitsOfChild(childQueue, cluster, limits, 
node.getPartition());
+          getResourceLimitsOfChild(childQueue, cluster, parentLimits,
+              node.getPartition());
       
-      assignment = childQueue.assignContainers(cluster, node, 
+      CSAssignment childAssignment = childQueue.assignContainers(cluster, node,
           childLimits, schedulingMode);
       if(LOG.isDebugEnabled()) {
         LOG.debug("Assigned to queue: " + childQueue.getQueuePath() +
-          " stats: " + childQueue + " --> " + 
-          assignment.getResource() + ", " + assignment.getType());
+            " stats: " + childQueue + " --> " +
+            childAssignment.getResource() + ", " + childAssignment.getType());
       }
 
       // If we do assign, remove the queue and re-insert in-order to re-sort
       if (Resources.greaterThan(
               resourceCalculator, cluster, 
-              assignment.getResource(), Resources.none())) {
+              childAssignment.getResource(), Resources.none())) {
         // Only update childQueues when we doing non-partitioned node
         // allocation.
         if (RMNodeLabelsManager.NO_LABEL.equals(node.getPartition())) {
@@ -610,7 +611,24 @@ public class ParentQueue extends AbstractCSQueue {
             printChildQueues();
           }
         }
+        assignment = childAssignment;
         break;
+      } else if (childAssignment.getSkippedType() ==
+          CSAssignment.SkippedType.QUEUE_LIMIT) {
+        if (assignment.getSkippedType() !=
+            CSAssignment.SkippedType.QUEUE_LIMIT) {
+          assignment = childAssignment;
+        }
+        Resource resourceToSubtract = Resources.max(resourceCalculator,
+            cluster, childLimits.getHeadroom(), Resources.none());
+        if(LOG.isDebugEnabled()) {
+          LOG.debug("Decrease parentLimits " + parentLimits +
+              " for " + this.getQueueName() + " by " +
+              resourceToSubtract + " as childQueue=" +
+              childQueue.getQueueName() + " is blocked");
+        }
+        parentLimits = Resources.subtract(parentLimits,
+            resourceToSubtract);
       }
     }
     
@@ -731,7 +749,8 @@ public class ParentQueue extends AbstractCSQueue {
     for (CSQueue childQueue : childQueues) {
       // Get ResourceLimits of child queue before assign containers
       ResourceLimits childLimits = getResourceLimitsOfChild(childQueue,
-          clusterResource, resourceLimits, RMNodeLabelsManager.NO_LABEL);
+          clusterResource, resourceLimits.getLimit(),
+          RMNodeLabelsManager.NO_LABEL);
       childQueue.updateClusterResource(clusterResource, childLimits);
     }
     

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d92aefd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java
index afac235..4d5a7dc 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java
@@ -55,8 +55,10 @@ public abstract class AbstractContainerAllocator {
       Resource clusterResource, ContainerAllocation result,
       RMContainer rmContainer) {
     // Handle skipped
-    boolean skipped =
-        (result.getAllocationState() == AllocationState.APP_SKIPPED);
+    CSAssignment.SkippedType skipped =
+        (result.getAllocationState() == AllocationState.APP_SKIPPED) ?
+        CSAssignment.SkippedType.OTHER :
+        CSAssignment.SkippedType.NONE;
     CSAssignment assignment = new CSAssignment(skipped);
     assignment.setApplication(application);
     
@@ -110,6 +112,11 @@ public abstract class AbstractContainerAllocator {
       }
 
       assignment.setContainersToKill(result.getToKillContainers());
+    } else {
+      if (result.getAllocationState() == AllocationState.QUEUE_SKIPPED) {
+        assignment.setSkippedType(
+            CSAssignment.SkippedType.QUEUE_LIMIT);
+      }
     }
     
     return assignment;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d92aefd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java
index 4a2ae18..509dfba 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java
@@ -71,7 +71,7 @@ public class IncreaseContainerAllocator extends 
AbstractContainerAllocator {
       SchedContainerChangeRequest request) {
     CSAssignment assignment =
         new CSAssignment(request.getDeltaCapacity(), NodeType.NODE_LOCAL, null,
-            application, false, false);
+            application, CSAssignment.SkippedType.NONE, false);
     Resources.addTo(assignment.getAssignmentInformation().getReserved(),
         request.getDeltaCapacity());
     assignment.getAssignmentInformation().incrReservations();
@@ -88,7 +88,7 @@ public class IncreaseContainerAllocator extends 
AbstractContainerAllocator {
       SchedContainerChangeRequest request, boolean fromReservation) {
     CSAssignment assignment =
         new CSAssignment(request.getDeltaCapacity(), NodeType.NODE_LOCAL, null,
-            application, false, fromReservation);
+            application, CSAssignment.SkippedType.NONE, fromReservation);
     Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
         request.getDeltaCapacity());
     assignment.getAssignmentInformation().incrAllocations();
@@ -311,7 +311,8 @@ public class IncreaseContainerAllocator extends 
AbstractContainerAllocator {
           // Try to allocate the increase request
           assigned =
               allocateIncreaseRequest(node, clusterResource, increaseRequest);
-          if (!assigned.getSkipped()) {
+          if (assigned.getSkippedType()
+              == CSAssignment.SkippedType.NONE) {
             // When we don't skip this request, which means we either allocated
             // OR reserved this request. We will break
             break;
@@ -328,7 +329,8 @@ public class IncreaseContainerAllocator extends 
AbstractContainerAllocator {
         }
 
         // We may have allocated something
-        if (assigned != null && !assigned.getSkipped()) {
+        if (assigned != null && assigned.getSkippedType()
+            == CSAssignment.SkippedType.NONE) {
           break;
         }
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d92aefd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index d3567f5..2da7adb 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -113,6 +113,8 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.
+    ContainerExpiredSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
@@ -166,6 +168,12 @@ public class TestCapacityScheduler {
   private static final String B3 = B + ".b3";
   private static float A_CAPACITY = 10.5f;
   private static float B_CAPACITY = 89.5f;
+  private static final String P1 = CapacitySchedulerConfiguration.ROOT + ".p1";
+  private static final String P2 = CapacitySchedulerConfiguration.ROOT + ".p2";
+  private static final String X1 = P1 + ".x1";
+  private static final String X2 = P1 + ".x2";
+  private static final String Y1 = P2 + ".y1";
+  private static final String Y2 = P2 + ".y2";
   private static float A1_CAPACITY = 30;
   private static float A2_CAPACITY = 70;
   private static float B1_CAPACITY = 79.2f;
@@ -411,7 +419,52 @@ public class TestCapacityScheduler {
     LOG.info("Setup top-level queues a and b");
     return conf;
   }
-  
+
+  private CapacitySchedulerConfiguration setupBlockedQueueConfiguration(
+      CapacitySchedulerConfiguration conf) {
+
+    // Define top-level queues
+    conf.setQueues(CapacitySchedulerConfiguration.ROOT,
+        new String[]{"a", "b"});
+
+    conf.setCapacity(A, 80f);
+    conf.setCapacity(B, 20f);
+    conf.setUserLimitFactor(A, 100);
+    conf.setUserLimitFactor(B, 100);
+    conf.setMaximumCapacity(A, 100);
+    conf.setMaximumCapacity(B, 100);
+    LOG.info("Setup top-level queues a and b");
+    return conf;
+  }
+
+  private CapacitySchedulerConfiguration setupOtherBlockedQueueConfiguration(
+      CapacitySchedulerConfiguration conf) {
+
+    // Define top-level queues
+    conf.setQueues(CapacitySchedulerConfiguration.ROOT,
+        new String[]{"p1", "p2"});
+
+    conf.setCapacity(P1, 50f);
+    conf.setMaximumCapacity(P1, 50f);
+    conf.setCapacity(P2, 50f);
+    conf.setMaximumCapacity(P2, 100f);
+    // Define 2nd-level queues
+    conf.setQueues(P1, new String[] {"x1", "x2"});
+    conf.setCapacity(X1, 80f);
+    conf.setMaximumCapacity(X1, 100f);
+    conf.setUserLimitFactor(X1, 2f);
+    conf.setCapacity(X2, 20f);
+    conf.setMaximumCapacity(X2, 100f);
+    conf.setUserLimitFactor(X2, 2f);
+
+    conf.setQueues(P2, new String[]{"y1", "y2"});
+    conf.setCapacity(Y1, 80f);
+    conf.setUserLimitFactor(Y1, 2f);
+    conf.setCapacity(Y2, 20f);
+    conf.setUserLimitFactor(Y2, 2f);
+    return conf;
+  }
+
   @Test
   public void testMaximumCapacitySetup() {
     float delta = 0.0000001f;
@@ -3415,4 +3468,237 @@ public class TestCapacityScheduler {
     scheduler.handle(appRemovedEvent1);
     rm.stop();
   }
+
+  @Test
+  public void testCSReservationWithRootUnblocked() throws Exception {
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    conf.setResourceComparator(DominantResourceCalculator.class);
+    setupOtherBlockedQueueConfiguration(conf);
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    MockRM rm = new MockRM(conf);
+    rm.start();
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    ParentQueue q = (ParentQueue) cs.getQueue("p1");
+
+    Assert.assertNotNull(q);
+    String host = "127.0.0.1";
+    String host1 = "test";
+    RMNode node =
+        MockNodes.newNodeInfo(0, Resource.newInstance(8 * GB, 8), 1, host);
+    RMNode node1 =
+        MockNodes.newNodeInfo(0, Resource.newInstance(8 * GB, 8), 2, host1);
+    cs.handle(new NodeAddedSchedulerEvent(node));
+    cs.handle(new NodeAddedSchedulerEvent(node1));
+    ApplicationAttemptId appAttemptId1 =
+        appHelper(rm, cs, 100, 1, "x1", "userX1");
+    ApplicationAttemptId appAttemptId2 =
+        appHelper(rm, cs, 100, 2, "x2", "userX2");
+    ApplicationAttemptId appAttemptId3 =
+        appHelper(rm, cs, 100, 3, "y1", "userY1");
+    RecordFactory recordFactory =
+        RecordFactoryProvider.getRecordFactory(null);
+
+    Priority priority = TestUtils.createMockPriority(1);
+    ResourceRequest y1Req = null;
+    ResourceRequest x1Req = null;
+    ResourceRequest x2Req = null;
+    for(int i=0; i < 4; i++) {
+      y1Req = TestUtils.createResourceRequest(
+          ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory);
+      cs.allocate(appAttemptId3,
+          Collections.<ResourceRequest>singletonList(y1Req),
+          Collections.<ContainerId>emptyList(),
+          null, null, null, null);
+      CapacityScheduler.schedule(cs);
+    }
+    assertEquals("Y1 Used Resource should be 4 GB", 4 * GB,
+        cs.getQueue("y1").getUsedResources().getMemorySize());
+    assertEquals("P2 Used Resource should be 4 GB", 4 * GB,
+        cs.getQueue("p2").getUsedResources().getMemorySize());
+
+    for(int i=0; i < 7; i++) {
+      x1Req = TestUtils.createResourceRequest(
+          ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory);
+      cs.allocate(appAttemptId1,
+          Collections.<ResourceRequest>singletonList(x1Req),
+          Collections.<ContainerId>emptyList(),
+          null, null, null, null);
+      CapacityScheduler.schedule(cs);
+    }
+    assertEquals("X1 Used Resource should be 7 GB", 7 * GB,
+        cs.getQueue("x1").getUsedResources().getMemorySize());
+    assertEquals("P1 Used Resource should be 7 GB", 7 * GB,
+        cs.getQueue("p1").getUsedResources().getMemorySize());
+
+    x2Req = TestUtils.createResourceRequest(
+        ResourceRequest.ANY, 2 * GB, 1, true, priority, recordFactory);
+    cs.allocate(appAttemptId2,
+        Collections.<ResourceRequest>singletonList(x2Req),
+        Collections.<ContainerId>emptyList(),
+        null, null, null, null);
+    CapacityScheduler.schedule(cs);
+    assertEquals("X2 Used Resource should be 0", 0,
+        cs.getQueue("x2").getUsedResources().getMemorySize());
+    assertEquals("P1 Used Resource should be 7 GB", 7 * GB,
+        cs.getQueue("p1").getUsedResources().getMemorySize());
+    //this assign should fail
+    x1Req = TestUtils.createResourceRequest(
+        ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory);
+    cs.allocate(appAttemptId1,
+        Collections.<ResourceRequest>singletonList(x1Req),
+        Collections.<ContainerId>emptyList(),
+        null, null, null, null);
+    CapacityScheduler.schedule(cs);
+    assertEquals("X1 Used Resource should be 7 GB", 7 * GB,
+        cs.getQueue("x1").getUsedResources().getMemorySize());
+    assertEquals("P1 Used Resource should be 7 GB", 7 * GB,
+        cs.getQueue("p1").getUsedResources().getMemorySize());
+
+    //this should get thru
+    for (int i=0; i < 4; i++) {
+      y1Req = TestUtils.createResourceRequest(
+          ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory);
+      cs.allocate(appAttemptId3,
+          Collections.<ResourceRequest>singletonList(y1Req),
+          Collections.<ContainerId>emptyList(),
+          null, null, null, null);
+      CapacityScheduler.schedule(cs);
+    }
+    assertEquals("P2 Used Resource should be 8 GB", 8 * GB,
+        cs.getQueue("p2").getUsedResources().getMemorySize());
+
+    //Free a container from X1
+    ContainerId containerId = ContainerId.newContainerId(appAttemptId1, 2);
+    cs.handle(new ContainerExpiredSchedulerEvent(containerId));
+
+    //Schedule pending request
+    CapacityScheduler.schedule(cs);
+    assertEquals("X2 Used Resource should be 2 GB", 2 * GB,
+        cs.getQueue("x2").getUsedResources().getMemorySize());
+    assertEquals("P1 Used Resource should be 8 GB", 8 * GB,
+        cs.getQueue("p1").getUsedResources().getMemorySize());
+    assertEquals("P2 Used Resource should be 8 GB", 8 * GB,
+        cs.getQueue("p2").getUsedResources().getMemorySize());
+    assertEquals("Root Used Resource should be 16 GB", 16 * GB,
+        cs.getRootQueue().getUsedResources().getMemorySize());
+    rm.stop();
+  }
+
+  @Test
+  public void testCSQueueBlocked() throws Exception {
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    setupBlockedQueueConfiguration(conf);
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    MockRM rm = new MockRM(conf);
+    rm.start();
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    LeafQueue q = (LeafQueue) cs.getQueue("a");
+
+    Assert.assertNotNull(q);
+    String host = "127.0.0.1";
+    String host1 = "test";
+    RMNode node =
+        MockNodes.newNodeInfo(0, Resource.newInstance(8 * GB, 8), 1, host);
+    RMNode node1 =
+        MockNodes.newNodeInfo(0, Resource.newInstance(8 * GB, 8), 2, host1);
+    cs.handle(new NodeAddedSchedulerEvent(node));
+    cs.handle(new NodeAddedSchedulerEvent(node1));
+    //add app begin
+    ApplicationAttemptId appAttemptId1 =
+        appHelper(rm, cs, 100, 1, "a", "user1");
+    ApplicationAttemptId appAttemptId2 =
+        appHelper(rm, cs, 100, 2, "b", "user2");
+    //add app end
+
+    RecordFactory recordFactory =
+        RecordFactoryProvider.getRecordFactory(null);
+
+    Priority priority = TestUtils.createMockPriority(1);
+    ResourceRequest r1 = TestUtils.createResourceRequest(
+        ResourceRequest.ANY, 2 * GB, 1, true, priority, recordFactory);
+    //This will allocate for app1
+    cs.allocate(appAttemptId1, Collections.<ResourceRequest>singletonList(r1),
+        Collections.<ContainerId>emptyList(),
+        null, null, null, null).getContainers().size();
+    CapacityScheduler.schedule(cs);
+    ResourceRequest r2 = null;
+    for (int i =0; i < 13; i++) {
+      r2 = TestUtils.createResourceRequest(
+          ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory);
+      cs.allocate(appAttemptId2,
+          Collections.<ResourceRequest>singletonList(r2),
+          Collections.<ContainerId>emptyList(),
+          null, null, null, null);
+      CapacityScheduler.schedule(cs);
+    }
+    assertEquals("A Used Resource should be 2 GB", 2 * GB,
+        cs.getQueue("a").getUsedResources().getMemorySize());
+    assertEquals("B Used Resource should be 2 GB", 13 * GB,
+        cs.getQueue("b").getUsedResources().getMemorySize());
+    r1 = TestUtils.createResourceRequest(
+        ResourceRequest.ANY, 2 * GB, 1, true, priority, recordFactory);
+    r2 = TestUtils.createResourceRequest(
+        ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory);
+    cs.allocate(appAttemptId1, Collections.<ResourceRequest>singletonList(r1),
+        Collections.<ContainerId>emptyList(),
+        null, null, null, null).getContainers().size();
+    CapacityScheduler.schedule(cs);
+
+    cs.allocate(appAttemptId2, Collections.<ResourceRequest>singletonList(r2),
+        Collections.<ContainerId>emptyList(), null, null, null, null);
+    CapacityScheduler.schedule(cs);
+    //Check blocked Resource
+    assertEquals("A Used Resource should be 2 GB", 2 * GB,
+        cs.getQueue("a").getUsedResources().getMemorySize());
+    assertEquals("B Used Resource should be 13 GB", 13 * GB,
+        cs.getQueue("b").getUsedResources().getMemorySize());
+
+    ContainerId containerId1 = ContainerId.newContainerId(appAttemptId2, 10);
+    ContainerId containerId2 =ContainerId.newContainerId(appAttemptId2, 11);
+
+    cs.handle(new ContainerExpiredSchedulerEvent(containerId1));
+    cs.handle(new ContainerExpiredSchedulerEvent(containerId2));
+    CapacityScheduler.schedule(cs);
+    rm.drainEvents();
+    assertEquals("A Used Resource should be 2 GB", 4 * GB,
+        cs.getQueue("a").getUsedResources().getMemorySize());
+    assertEquals("B Used Resource should be 12 GB", 12 * GB,
+        cs.getQueue("b").getUsedResources().getMemorySize());
+    assertEquals("Used Resource on Root should be 16 GB", 16 * GB,
+        cs.getRootQueue().getUsedResources().getMemorySize());
+    rm.stop();
+  }
+
+  private ApplicationAttemptId appHelper(MockRM rm, CapacityScheduler cs,
+                                         int clusterTs, int appId, String 
queue,
+                                         String user) {
+    ApplicationId appId1 = BuilderUtils.newApplicationId(clusterTs, appId);
+    ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(
+        appId1, appId);
+
+    RMAppAttemptMetrics attemptMetric1 =
+        new RMAppAttemptMetrics(appAttemptId1, rm.getRMContext());
+    RMAppImpl app1 = mock(RMAppImpl.class);
+    when(app1.getApplicationId()).thenReturn(appId1);
+    RMAppAttemptImpl attempt1 = mock(RMAppAttemptImpl.class);
+    Container container = mock(Container.class);
+    when(attempt1.getMasterContainer()).thenReturn(container);
+    ApplicationSubmissionContext submissionContext = mock(
+        ApplicationSubmissionContext.class);
+    when(attempt1.getSubmissionContext()).thenReturn(submissionContext);
+    when(attempt1.getAppAttemptId()).thenReturn(appAttemptId1);
+    when(attempt1.getRMAppAttemptMetrics()).thenReturn(attemptMetric1);
+    when(app1.getCurrentAppAttempt()).thenReturn(attempt1);
+    rm.getRMContext().getRMApps().put(appId1, app1);
+
+    SchedulerEvent addAppEvent1 =
+        new AppAddedSchedulerEvent(appId1, queue, user);
+    cs.handle(addAppEvent1);
+    SchedulerEvent addAttemptEvent1 =
+        new AppAttemptAddedSchedulerEvent(appAttemptId1, false);
+    cs.handle(addAttemptEvent1);
+    return appAttemptId1;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to