YARN-3388. Allocation in LeafQueue could get stuck because DRF calculator isn't 
well supported when computing user-limit. (Nathan Roberts via wangda)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/444b2ea7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/444b2ea7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/444b2ea7

Branch: refs/heads/HDFS-9806
Commit: 444b2ea7afebf9f6c3d356154b71abfd0ea95b23
Parents: 3d93745
Author: Wangda Tan <wan...@apache.org>
Authored: Fri Aug 19 16:28:32 2016 -0700
Committer: Wangda Tan <wan...@apache.org>
Committed: Fri Aug 19 16:28:32 2016 -0700

----------------------------------------------------------------------
 .../scheduler/capacity/LeafQueue.java           | 206 +++++++++++++++++--
 .../scheduler/capacity/TestLeafQueue.java       | 198 +++++++++++++++++-
 .../scheduler/capacity/TestUtils.java           |  24 ++-
 3 files changed, 396 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/444b2ea7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 636762f..1ca69be 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -20,6 +20,9 @@ package 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 import java.io.IOException;
 import java.util.*;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
@@ -70,9 +73,11 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPo
 import org.apache.hadoop.yarn.server.utils.Lock;
 import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
 import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
 
 @Private
 @Unstable
@@ -111,7 +116,7 @@ public class LeafQueue extends AbstractCSQueue {
 
   // cache last cluster resource to compute actual capacity
   private Resource lastClusterResource = Resources.none();
-  
+
   private final QueueResourceLimitsInfo queueResourceLimitsInfo =
       new QueueResourceLimitsInfo();
 
@@ -119,6 +124,10 @@ public class LeafQueue extends AbstractCSQueue {
 
   private OrderingPolicy<FiCaSchedulerApp> orderingPolicy = null;
 
+  // Summation of consumed ratios for all users in queue
+  private float totalUserConsumedRatio = 0;
+  private UsageRatios qUsageRatios;
+
   // record all ignore partition exclusivityRMContainer, this will be used to 
do
   // preemption, key is the partition of the RMContainer allocated on
   private Map<String, TreeSet<RMContainer>> 
ignorePartitionExclusivityRMContainers =
@@ -135,6 +144,8 @@ public class LeafQueue extends AbstractCSQueue {
     // One time initialization is enough since it is static ordering policy
     this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps();
 
+    qUsageRatios = new UsageRatios();
+
     if(LOG.isDebugEnabled()) {
       LOG.debug("LeafQueue:" + " name=" + queueName
         + ", fullname=" + getQueuePath());
@@ -159,7 +170,7 @@ public class LeafQueue extends AbstractCSQueue {
     setQueueResourceLimitsInfo(clusterResource);
 
     CapacitySchedulerConfiguration conf = csContext.getConfiguration();
-    
+
     
setOrderingPolicy(conf.<FiCaSchedulerApp>getOrderingPolicy(getQueuePath()));
 
     userLimit = conf.getUserLimit(getQueuePath());
@@ -1149,6 +1160,9 @@ public class LeafQueue extends AbstractCSQueue {
   private Resource computeUserLimit(FiCaSchedulerApp application,
       Resource clusterResource, User user,
       String nodePartition, SchedulingMode schedulingMode) {
+    Resource partitionResource = labelManager.getResourceByLabel(nodePartition,
+        clusterResource);
+
     // What is our current capacity? 
     // * It is equal to the max(required, queue-capacity) if
     //   we're running below capacity. The 'max' ensures that jobs in queues
@@ -1157,7 +1171,7 @@ public class LeafQueue extends AbstractCSQueue {
     //   (usedResources + required) (which extra resources we are allocating)
     Resource queueCapacity =
         Resources.multiplyAndNormalizeUp(resourceCalculator,
-            labelManager.getResourceByLabel(nodePartition, clusterResource),
+            partitionResource,
             queueCapacities.getAbsoluteCapacity(nodePartition),
             minimumAllocation);
 
@@ -1169,15 +1183,30 @@ public class LeafQueue extends AbstractCSQueue {
     // Allow progress for queues with miniscule capacity
     queueCapacity =
         Resources.max(
-            resourceCalculator, clusterResource, 
+            resourceCalculator, partitionResource,
             queueCapacity, 
             required);
 
+
+    /* We want to base the userLimit calculation on
+     * max(queueCapacity, usedResources+required). However, we want
+     * usedResources to be based on the combined ratios of all the users in the
+     * queue so we use consumedRatio to calculate such.
+     * The calculation is dependent on how the resourceCalculator calculates 
the
+     * ratio between two Resources. DRF Example: If usedResources is
+     * greater than queueCapacity and users have the following [mem,cpu] 
usages:
+     * User1: [10%,20%] - Dominant resource is 20%
+     * User2: [30%,10%] - Dominant resource is 30%
+     * Then total consumedRatio is then 20+30=50%. Yes, this value can be
+     * larger than 100% but for the purposes of making sure all users are
+     * getting their fair share, it works.
+     */
+    Resource consumed = Resources.multiplyAndNormalizeUp(resourceCalculator,
+        partitionResource, qUsageRatios.getUsageRatio(nodePartition),
+        minimumAllocation);
     Resource currentCapacity =
-        Resources.lessThan(resourceCalculator, clusterResource,
-            queueUsage.getUsed(nodePartition), queueCapacity) ? queueCapacity
-            : Resources.add(queueUsage.getUsed(nodePartition), required);
-    
+        Resources.lessThan(resourceCalculator, partitionResource, consumed,
+            queueCapacity) ? queueCapacity : Resources.add(consumed, required);
     // Never allow a single user to take more than the 
     // queue's configured capacity * user-limit-factor.
     // Also, the queue's configured capacity should be higher than 
@@ -1186,9 +1215,10 @@ public class LeafQueue extends AbstractCSQueue {
     final int activeUsers = activeUsersManager.getNumActiveUsers();
     
     // User limit resource is determined by:
-    // max{currentCapacity / #activeUsers, currentCapacity * 
user-limit-percentage%)
+    // max{currentCapacity / #activeUsers, currentCapacity *
+    // user-limit-percentage%)
     Resource userLimitResource = Resources.max(
-        resourceCalculator, clusterResource, 
+        resourceCalculator, partitionResource,
         Resources.divideAndCeil(
             resourceCalculator, currentCapacity, activeUsers),
         Resources.divideAndCeil(
@@ -1212,8 +1242,7 @@ public class LeafQueue extends AbstractCSQueue {
       maxUserLimit =
           Resources.multiplyAndRoundDown(queueCapacity, userLimitFactor);
     } else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
-      maxUserLimit =
-          labelManager.getResourceByLabel(nodePartition, clusterResource);
+      maxUserLimit = partitionResource;
     }
     
     // Cap final user limit with maxUserLimit
@@ -1221,7 +1250,7 @@ public class LeafQueue extends AbstractCSQueue {
         Resources.roundUp(
             resourceCalculator, 
             Resources.min(
-                resourceCalculator, clusterResource,   
+                resourceCalculator, partitionResource,
                   userLimitResource,
                   maxUserLimit
                 ), 
@@ -1229,18 +1258,22 @@ public class LeafQueue extends AbstractCSQueue {
 
     if (LOG.isDebugEnabled()) {
       String userName = application.getUser();
-      LOG.debug("User limit computation for " + userName + 
+      LOG.debug("User limit computation for " + userName +
           " in queue " + getQueueName() +
           " userLimitPercent=" + userLimit +
           " userLimitFactor=" + userLimitFactor +
-          " required: " + required + 
-          " consumed: " + user.getUsed() + 
+          " required: " + required +
+          " consumed: " + consumed +
           " user-limit-resource: " + userLimitResource +
-          " queueCapacity: " + queueCapacity + 
+          " queueCapacity: " + queueCapacity +
           " qconsumed: " + queueUsage.getUsed() +
+          " consumedRatio: " + totalUserConsumedRatio +
           " currentCapacity: " + currentCapacity +
           " activeUsers: " + activeUsers +
-          " clusterCapacity: " + clusterResource
+          " clusterCapacity: " + clusterResource +
+          " resourceByLabel: " + partitionResource +
+          " usageratio: " + qUsageRatios.getUsageRatio(nodePartition) +
+          " Partition: " + nodePartition
       );
     }
     user.setUserResourceLimit(userLimitResource);
@@ -1347,6 +1380,42 @@ public class LeafQueue extends AbstractCSQueue {
     }
   }
 
+  private synchronized float calculateUserUsageRatio(Resource clusterResource,
+      String nodePartition) {
+    Resource resourceByLabel =
+        labelManager.getResourceByLabel(nodePartition, clusterResource);
+    float consumed = 0;
+    User user;
+    for (Map.Entry<String, User> entry : users.entrySet()) {
+      user = entry.getValue();
+      consumed += user.resetAndUpdateUsageRatio(resourceCalculator,
+          resourceByLabel, nodePartition);
+    }
+    return consumed;
+  }
+
+  private synchronized void recalculateQueueUsageRatio(Resource 
clusterResource,
+      String nodePartition) {
+    ResourceUsage queueResourceUsage = this.getQueueResourceUsage();
+
+    if (nodePartition == null) {
+      for (String partition : 
Sets.union(queueCapacities.getNodePartitionsSet(),
+          queueResourceUsage.getNodePartitionsSet())) {
+        qUsageRatios.setUsageRatio(partition,
+            calculateUserUsageRatio(clusterResource, partition));
+      }
+    } else {
+      qUsageRatios.setUsageRatio(nodePartition,
+          calculateUserUsageRatio(clusterResource, nodePartition));
+    }
+  }
+
+  private synchronized void updateQueueUsageRatio(String nodePartition,
+      float delta) {
+    qUsageRatios.incUsageRatio(nodePartition, delta);
+  }
+
+
   @Override
   public void completedContainer(Resource clusterResource, 
       FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer 
rmContainer, 
@@ -1384,7 +1453,7 @@ public class LeafQueue extends AbstractCSQueue {
           removed =
               application.containerCompleted(rmContainer, containerStatus,
                   event, node.getPartition());
-          
+
           node.releaseContainer(container);
         }
 
@@ -1417,6 +1486,8 @@ public class LeafQueue extends AbstractCSQueue {
       boolean isIncreasedAllocation) {
     super.allocateResource(clusterResource, resource, nodePartition,
         isIncreasedAllocation);
+    Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition,
+        clusterResource);
     
     // handle ignore exclusivity container
     if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
@@ -1435,6 +1506,12 @@ public class LeafQueue extends AbstractCSQueue {
     String userName = application.getUser();
     User user = getUser(userName);
     user.assignContainer(resource, nodePartition);
+
+    // Update usage ratios
+    updateQueueUsageRatio(nodePartition,
+        user.updateUsageRatio(resourceCalculator, resourceByLabel,
+            nodePartition));
+
     // Note this is a bit unconventional since it gets the object and modifies
     // it here, rather then using set routine
     Resources.subtractFrom(application.getHeadroom(), resource); // headroom
@@ -1455,6 +1532,8 @@ public class LeafQueue extends AbstractCSQueue {
       RMContainer rmContainer, boolean isChangeResource) {
     super.releaseResource(clusterResource, resource, nodePartition,
         isChangeResource);
+    Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition,
+        clusterResource);
     
     // handle ignore exclusivity container
     if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
@@ -1474,6 +1553,12 @@ public class LeafQueue extends AbstractCSQueue {
     String userName = application.getUser();
     User user = getUser(userName);
     user.releaseContainer(resource, nodePartition);
+
+    // Update usage ratios
+    updateQueueUsageRatio(nodePartition,
+        user.updateUsageRatio(resourceCalculator, resourceByLabel,
+            nodePartition));
+
     metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
 
     if (LOG.isDebugEnabled()) {
@@ -1513,7 +1598,10 @@ public class LeafQueue extends AbstractCSQueue {
     // absoluteMaxCapacity now,  will be replaced with absoluteMaxAvailCapacity
     // during allocation
     setQueueResourceLimitsInfo(clusterResource);
-    
+
+    // Update user consumedRatios
+    recalculateQueueUsageRatio(clusterResource, null);
+
     // Update metrics
     CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
         minimumAllocation, this, labelManager, null);
@@ -1565,17 +1653,93 @@ public class LeafQueue extends AbstractCSQueue {
     queueUsage.decAMUsed(nodeLabel, resourceToDec);
   }
 
+  /*
+   * Usage Ratio
+   */
+  static private class UsageRatios {
+    private Map<String, Float> usageRatios;
+    private ReadLock readLock;
+    private WriteLock writeLock;
+
+    public UsageRatios() {
+      ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+      readLock = lock.readLock();
+      writeLock = lock.writeLock();
+      usageRatios = new HashMap<String, Float>();
+    }
+
+    private void incUsageRatio(String label, float delta) {
+      try {
+        writeLock.lock();
+        Float fl = usageRatios.get(label);
+        if (null == fl) {
+          fl = new Float(0.0);
+        }
+        fl += delta;
+        usageRatios.put(label, new Float(fl));
+      } finally {
+        writeLock.unlock();
+      }
+    }
+
+    float getUsageRatio(String label) {
+      try {
+        readLock.lock();
+        Float f = usageRatios.get(label);
+        if (null == f) {
+          return 0.0f;
+        }
+        return f;
+      } finally {
+        readLock.unlock();
+      }
+    }
+
+    private void setUsageRatio(String label, float ratio) {
+      try {
+        writeLock.lock();
+        usageRatios.put(label, new Float(ratio));
+      } finally {
+        writeLock.unlock();
+      }
+    }
+  }
+
+  @VisibleForTesting
+  public float getUsageRatio(String label) {
+    return qUsageRatios.getUsageRatio(label);
+  }
+
   @VisibleForTesting
   public static class User {
     ResourceUsage userResourceUsage = new ResourceUsage();
     volatile Resource userResourceLimit = Resource.newInstance(0, 0);
     int pendingApplications = 0;
     int activeApplications = 0;
+    private UsageRatios userUsageRatios = new UsageRatios();
 
     public ResourceUsage getResourceUsage() {
       return userResourceUsage;
     }
     
+    public synchronized float resetAndUpdateUsageRatio(
+        ResourceCalculator resourceCalculator,
+        Resource resource, String nodePartition) {
+      userUsageRatios.setUsageRatio(nodePartition, 0);
+      return updateUsageRatio(resourceCalculator, resource, nodePartition);
+    }
+
+    public synchronized float updateUsageRatio(
+        ResourceCalculator resourceCalculator,
+        Resource resource, String nodePartition) {
+      float delta;
+      float newRatio =
+          Resources.ratio(resourceCalculator, getUsed(nodePartition), 
resource);
+      delta = newRatio - userUsageRatios.getUsageRatio(nodePartition);
+      userUsageRatios.setUsageRatio(nodePartition, newRatio);
+      return delta;
+    }
+
     public Resource getUsed() {
       return userResourceUsage.getUsed();
     }
@@ -1713,7 +1877,7 @@ public class LeafQueue extends AbstractCSQueue {
         .getSchedulableEntities()) {
       apps.add(pendingApp.getApplicationAttemptId());
     }
-    for (FiCaSchedulerApp app : 
+    for (FiCaSchedulerApp app :
       orderingPolicy.getSchedulableEntities()) {
       apps.add(app.getApplicationAttemptId());
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/444b2ea7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 274c063..b2c53da 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -41,6 +41,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CyclicBarrier;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -72,6 +74,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestK
 
 
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue.User;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@@ -83,6 +86,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderi
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.After;
@@ -97,6 +101,7 @@ import static 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.T
 public class TestLeafQueue {  
   private final RecordFactory recordFactory = 
       RecordFactoryProvider.getRecordFactory(null);
+  private static final Log LOG = LogFactory.getLog(TestLeafQueue.class);
 
   RMContext rmContext;
   RMContext spyRMContext;
@@ -106,16 +111,29 @@ public class TestLeafQueue {
   CapacitySchedulerContext csContext;
   
   CSQueue root;
-  Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
+  Map<String, CSQueue> queues;
   
   final static int GB = 1024;
   final static String DEFAULT_RACK = "/default";
 
-  private final ResourceCalculator resourceCalculator = new 
DefaultResourceCalculator();
+  private final ResourceCalculator resourceCalculator =
+      new DefaultResourceCalculator();
   
+  private final ResourceCalculator dominantResourceCalculator =
+      new DominantResourceCalculator();
+
   @Before
   public void setUp() throws Exception {
+    setUpInternal(resourceCalculator);
+  }
+
+  private void setUpWithDominantResourceCalculator() throws Exception {
+    setUpInternal(dominantResourceCalculator);
+  }
+
+  private void setUpInternal(ResourceCalculator rC) throws Exception {
     CapacityScheduler spyCs = new CapacityScheduler();
+    queues = new HashMap<String, CSQueue>();
     cs = spy(spyCs);
     rmContext = TestUtils.getMockRMContext();
     spyRMContext = spy(rmContext);
@@ -134,6 +152,8 @@ public class TestLeafQueue {
     csConf = 
         new CapacitySchedulerConfiguration();
     csConf.setBoolean("yarn.scheduler.capacity.user-metrics.enable", true);
+    csConf.setBoolean(
+        "yarn.scheduler.capacity.reservations-continue-look-all-nodes", false);
     final String newRoot = "root" + System.currentTimeMillis();
     setupQueueConfiguration(csConf, newRoot);
     YarnConfiguration conf = new YarnConfiguration();
@@ -153,6 +173,7 @@ public class TestLeafQueue {
     when(csContext.getResourceCalculator()).
         thenReturn(resourceCalculator);
     when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
+    when(csContext.getResourceCalculator()).thenReturn(rC);
     when(csContext.getRMContext()).thenReturn(rmContext);
     RMContainerTokenSecretManager containerTokenSecretManager =
         new RMContainerTokenSecretManager(conf);
@@ -179,7 +200,8 @@ public class TestLeafQueue {
         .thenReturn(new YarnConfiguration());
     when(cs.getNumClusterNodes()).thenReturn(3);
   }
-  
+
+
   private static final String A = "a";
   private static final String B = "b";
   private static final String C = "c";
@@ -608,14 +630,180 @@ public class TestLeafQueue {
     assertEquals((int)(a.getCapacity() * 
node_0.getTotalResource().getMemorySize()),
         a.getMetrics().getAvailableMB());
   }
-  
+  @Test
+  public void testDRFUsageRatioRounding() throws Exception {
+    CSAssignment assign;
+    setUpWithDominantResourceCalculator();
+    // Mock the queue
+    LeafQueue b = stubLeafQueue((LeafQueue) queues.get(E));
+
+    // Users
+    final String user0 = "user_0";
+
+    // Submit applications
+    final ApplicationAttemptId appAttemptId0 =
+        TestUtils.getMockApplicationAttemptId(0, 0);
+    FiCaSchedulerApp app0 =
+        new FiCaSchedulerApp(appAttemptId0, user0, b,
+            b.getActiveUsersManager(), spyRMContext);
+    b.submitApplicationAttempt(app0, user0);
+
+    // Setup some nodes
+    String host0 = "127.0.0.1";
+    FiCaSchedulerNode node0 =
+        TestUtils.getMockNode(host0, DEFAULT_RACK, 0, 80 * GB, 100);
+
+    // Make cluster relatively large so usageRatios are small
+    int numNodes = 1000;
+    Resource clusterResource =
+        Resources.createResource(numNodes * (80 * GB), numNodes * 100);
+    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+
+    // Set user-limit. Need a small queue within a large cluster.
+    b.setUserLimit(50);
+    b.setUserLimitFactor(1000000);
+    b.setMaxCapacity(1.0f);
+    b.setAbsoluteCapacity(0.00001f);
+
+    // First allocation is larger than second but is still vcore dominant
+    // so usage ratio will be based on vcores. If consumedRatio doesn't round
+    // in our favor then new limit calculation will actually be less than
+    // what is currently consumed and we will fail to allocate
+    Priority priority = TestUtils.createMockPriority(1);
+    app0.updateResourceRequests(Collections.singletonList(TestUtils
+        .createResourceRequest(ResourceRequest.ANY, 20 * GB, 29, 1, true,
+            priority, recordFactory, RMNodeLabelsManager.NO_LABEL)));
+    assign = b.assignContainers(clusterResource, node0, new ResourceLimits(
+        clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    app0.updateResourceRequests(Collections.singletonList(TestUtils
+        .createResourceRequest(ResourceRequest.ANY, 10 * GB, 29, 2, true,
+            priority, recordFactory, RMNodeLabelsManager.NO_LABEL)));
+    assign = b.assignContainers(clusterResource, node0, new ResourceLimits(
+        clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    assertTrue("Still within limits, should assign",
+        assign.getResource().getMemorySize() > 0);
+  }
+
+  @Test
+  public void testDRFUserLimits() throws Exception {
+    setUpWithDominantResourceCalculator();
+
+    // Mock the queue
+    LeafQueue b = stubLeafQueue((LeafQueue) queues.get(B));
+    // unset maxCapacity
+    b.setMaxCapacity(1.0f);
+
+    // Users
+    final String user0 = "user_0";
+    final String user1 = "user_1";
+
+    // Submit applications
+    final ApplicationAttemptId appAttemptId0 =
+        TestUtils.getMockApplicationAttemptId(0, 0);
+    FiCaSchedulerApp app0 =
+        new FiCaSchedulerApp(appAttemptId0, user0, b,
+            b.getActiveUsersManager(), spyRMContext);
+    b.submitApplicationAttempt(app0, user0);
+
+    final ApplicationAttemptId appAttemptId2 =
+        TestUtils.getMockApplicationAttemptId(2, 0);
+    FiCaSchedulerApp app2 =
+        new FiCaSchedulerApp(appAttemptId2, user1, b,
+            b.getActiveUsersManager(), spyRMContext);
+    b.submitApplicationAttempt(app2, user1);
+
+    // Setup some nodes
+    String host0 = "127.0.0.1";
+    FiCaSchedulerNode node0 =
+        TestUtils.getMockNode(host0, DEFAULT_RACK, 0, 8 * GB, 100);
+    String host1 = "127.0.0.2";
+    FiCaSchedulerNode node1 =
+        TestUtils.getMockNode(host1, DEFAULT_RACK, 0, 8 * GB, 100);
+
+    int numNodes = 2;
+    Resource clusterResource =
+        Resources.createResource(numNodes * (8 * GB), numNodes * 100);
+    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+
+    // Setup resource-requests so that one application is memory dominant
+    // and other application is vcores dominant
+    Priority priority = TestUtils.createMockPriority(1);
+    app0.updateResourceRequests(Collections.singletonList(TestUtils
+        .createResourceRequest(ResourceRequest.ANY, 1 * GB, 40, 10, true,
+            priority, recordFactory, RMNodeLabelsManager.NO_LABEL)));
+
+    app2.updateResourceRequests(Collections.singletonList(TestUtils
+        .createResourceRequest(ResourceRequest.ANY, 2 * GB, 10, 10, true,
+            priority, recordFactory, RMNodeLabelsManager.NO_LABEL)));
+
+    /**
+     * Start testing...
+     */
+
+    // Set user-limit
+    b.setUserLimit(50);
+    b.setUserLimitFactor(2);
+    User queueUser0 = b.getUser(user0);
+    User queueUser1 = b.getUser(user1);
+
+    assertEquals("There should 2 active users!", 2, b
+        .getActiveUsersManager().getNumActiveUsers());
+    // Fill both Nodes as far as we can
+    CSAssignment assign;
+    do {
+      assign =
+          b.assignContainers(clusterResource, node0, new ResourceLimits(
+              clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+      LOG.info(assign.toString());
+    } while (assign.getResource().getMemorySize() > 0 &&
+        assign.getAssignmentInformation().getNumReservations() == 0);
+    do {
+      assign =
+          b.assignContainers(clusterResource, node1, new ResourceLimits(
+              clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    } while (assign.getResource().getMemorySize() > 0 &&
+        assign.getAssignmentInformation().getNumReservations() == 0);
+    //LOG.info("user_0: " + queueUser0.getUsed());
+    //LOG.info("user_1: " + queueUser1.getUsed());
+
+    assertTrue("Verify user_0 got resources ", queueUser0.getUsed()
+        .getMemorySize() > 0);
+    assertTrue("Verify user_1 got resources ", queueUser1.getUsed()
+        .getMemorySize() > 0);
+    assertTrue(
+        "Exepected AbsoluteUsedCapacity > 0.95, got: "
+            + b.getAbsoluteUsedCapacity(), b.getAbsoluteUsedCapacity() > 0.95);
+
+    // Verify consumedRatio is based on dominant resources
+    float expectedRatio =
+        queueUser0.getUsed().getVirtualCores()
+            / (numNodes * 100.0f)
+            + queueUser1.getUsed().getMemorySize()
+            / (numNodes * 8.0f * GB);
+    assertEquals(expectedRatio, b.getUsageRatio(""), 0.001);
+    // Add another node and make sure consumedRatio is adjusted
+    // accordingly.
+    numNodes = 3;
+    clusterResource =
+        Resources.createResource(numNodes * (8 * GB), numNodes * 100);
+    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+    root.updateClusterResource(clusterResource, new ResourceLimits(
+        clusterResource));
+    expectedRatio =
+        queueUser0.getUsed().getVirtualCores()
+            / (numNodes * 100.0f)
+            + queueUser1.getUsed().getMemorySize()
+            / (numNodes * 8.0f * GB);
+    assertEquals(expectedRatio, b.getUsageRatio(""), 0.001);
+  }
+
   @Test
   public void testUserLimits() throws Exception {
     // Mock the queue
     LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
     //unset maxCapacity
     a.setMaxCapacity(1.0f);
-    
+
     // Users
     final String user_0 = "user_0";
     final String user_1 = "user_1";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/444b2ea7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
index 66e833f..a60b7ed 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
@@ -161,10 +161,17 @@ public class TestUtils {
   public static ResourceRequest createResourceRequest(
       String resourceName, int memory, int numContainers, boolean 
relaxLocality,
       Priority priority, RecordFactory recordFactory, String labelExpression) {
-    ResourceRequest request = 
+    return createResourceRequest(resourceName, memory, 1, numContainers,
+        relaxLocality, priority, recordFactory, labelExpression);
+  }
+
+  public static ResourceRequest createResourceRequest(String resourceName,
+      int memory, int vcores, int numContainers, boolean relaxLocality,
+      Priority priority, RecordFactory recordFactory, String labelExpression) {
+    ResourceRequest request =
         recordFactory.newRecordInstance(ResourceRequest.class);
-    Resource capability = Resources.createResource(memory, 1);
-    
+    Resource capability = Resources.createResource(memory, vcores);
+
     request.setNumContainers(numContainers);
     request.setResourceName(resourceName);
     request.setCapability(capability);
@@ -192,13 +199,18 @@ public class TestUtils {
     return ApplicationAttemptId.newInstance(applicationId, attemptId);
   }
   
-  public static FiCaSchedulerNode getMockNode(
-      String host, String rack, int port, int capability) {
+  public static FiCaSchedulerNode getMockNode(String host, String rack,
+      int port, int memory) {
+    return getMockNode(host, rack, port, memory, 1);
+  }
+
+  public static FiCaSchedulerNode getMockNode(String host, String rack,
+      int port, int memory, int vcores) {
     NodeId nodeId = NodeId.newInstance(host, port);
     RMNode rmNode = mock(RMNode.class);
     when(rmNode.getNodeID()).thenReturn(nodeId);
     when(rmNode.getTotalCapability()).thenReturn(
-        Resources.createResource(capability, 1));
+        Resources.createResource(memory, vcores));
     when(rmNode.getNodeAddress()).thenReturn(host+":"+port);
     when(rmNode.getHostName()).thenReturn(host);
     when(rmNode.getRackName()).thenReturn(rack);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to