YARN-3216. Max-AM-Resource-Percentage should respect node labels. (Sunil G via 
wangda)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/56e4f623
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/56e4f623
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/56e4f623

Branch: refs/heads/HDFS-8966
Commit: 56e4f6237ae8b1852e82b186e08db3934f79a9db
Parents: 6f60621
Author: Wangda Tan <wan...@apache.org>
Authored: Mon Oct 26 16:44:39 2015 -0700
Committer: Wangda Tan <wan...@apache.org>
Committed: Mon Oct 26 16:44:39 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../scheduler/SchedulerApplicationAttempt.java  |  21 +-
 .../scheduler/capacity/CSQueueUtils.java        |   5 +
 .../CapacitySchedulerConfiguration.java         |  19 +-
 .../scheduler/capacity/LeafQueue.java           | 242 ++++++---
 .../scheduler/capacity/QueueCapacities.java     |  15 +-
 .../scheduler/common/fica/FiCaSchedulerApp.java |  46 +-
 .../yarn/server/resourcemanager/MockRM.java     |  36 +-
 .../capacity/TestApplicationLimits.java         |   5 +
 .../TestApplicationLimitsByPartition.java       | 511 +++++++++++++++++++
 .../TestCapacitySchedulerNodeLabelUpdate.java   | 103 +++-
 .../scheduler/capacity/TestUtils.java           |   6 +-
 12 files changed, 904 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/56e4f623/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index b51d89e..5f40669 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -546,6 +546,9 @@ Release 2.8.0 - UNRELEASED
     YARN-4285. Display resource usage as percentage of queue and cluster in the
     RM UI (Varun Vasudev via wangda)
 
+    YARN-3216. Max-AM-Resource-Percentage should respect node labels. 
+    (Sunil G via wangda)
+
   OPTIMIZATIONS
 
     YARN-3339. TestDockerContainerExecutor should pull a single image and not

http://git-wip-us.apache.org/repos/asf/hadoop/blob/56e4f623/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index 23f00e0..c5f8def 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.server.api.ContainerType;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
@@ -146,7 +147,9 @@ public class SchedulerApplicationAttempt implements 
SchedulableEntity {
 
   protected Queue queue;
   protected boolean isStopped = false;
-  
+
+  private String appAMNodePartitionName = CommonNodeLabelsManager.NO_LABEL;
+
   protected final RMContext rmContext;
   
   public SchedulerApplicationAttempt(ApplicationAttemptId 
applicationAttemptId, 
@@ -247,10 +250,18 @@ public class SchedulerApplicationAttempt implements 
SchedulableEntity {
     return attemptResourceUsage.getAMUsed();
   }
 
+  public Resource getAMResource(String label) {
+    return attemptResourceUsage.getAMUsed(label);
+  }
+
   public void setAMResource(Resource amResource) {
     attemptResourceUsage.setAMUsed(amResource);
   }
 
+  public void setAMResource(String label, Resource amResource) {
+    attemptResourceUsage.setAMUsed(label, amResource);
+  }
+
   public boolean isAmRunning() {
     return amRunning;
   }
@@ -886,4 +897,12 @@ public class SchedulerApplicationAttempt implements 
SchedulableEntity {
       SchedContainerChangeRequest increaseRequest) {
     changeContainerResource(increaseRequest, true);
   }
+
+  public void setAppAMNodePartitionName(String partitionName) {
+    this.appAMNodePartitionName = partitionName;
+  }
+
+  public String getAppAMNodePartitionName() {
+    return appAMNodePartitionName;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/56e4f623/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
index 1206026..2f981a7 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
@@ -138,11 +138,16 @@ class CSQueueUtils {
             csConf.getNonLabeledQueueCapacity(queuePath) / 100);
         queueCapacities.setMaximumCapacity(CommonNodeLabelsManager.NO_LABEL,
             csConf.getNonLabeledQueueMaximumCapacity(queuePath) / 100);
+        queueCapacities.setMaxAMResourcePercentage(
+            CommonNodeLabelsManager.NO_LABEL,
+            csConf.getMaximumAMResourcePercentPerPartition(queuePath, label));
       } else {
         queueCapacities.setCapacity(label,
             csConf.getLabeledQueueCapacity(queuePath, label) / 100);
         queueCapacities.setMaximumCapacity(label,
             csConf.getLabeledQueueMaximumCapacity(queuePath, label) / 100);
+        queueCapacities.setMaxAMResourcePercentage(label,
+            csConf.getMaximumAMResourcePercentPerPartition(queuePath, label));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/56e4f623/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index b1461c1..a99122d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -85,7 +85,7 @@ public class CapacitySchedulerConfiguration extends 
ReservationSchedulerConfigur
   @Private
   public static final String MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT =
     PREFIX + MAXIMUM_AM_RESOURCE_SUFFIX;
-  
+
   @Private
   public static final String QUEUES = "queues";
   
@@ -138,7 +138,7 @@ public class CapacitySchedulerConfiguration extends 
ReservationSchedulerConfigur
   @Private
   public static final float 
   DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT = 0.1f;
-  
+
   @Private
   public static final float UNDEFINED = -1;
   
@@ -514,6 +514,21 @@ public class CapacitySchedulerConfiguration extends 
ReservationSchedulerConfigur
     set(getQueuePrefix(queue) + DEFAULT_NODE_LABEL_EXPRESSION, exp);
   }
 
+  public float getMaximumAMResourcePercentPerPartition(String queue,
+      String label) {
+    // If per-partition max-am-resource-percent is not configured,
+    // use default value as max-am-resource-percent for this queue.
+    return getFloat(getNodeLabelPrefix(queue, label)
+        + MAXIMUM_AM_RESOURCE_SUFFIX,
+        getMaximumApplicationMasterResourcePerQueuePercent(queue));
+  }
+
+  public void setMaximumAMResourcePercentPerPartition(String queue,
+      String label, float percent) {
+    setFloat(getNodeLabelPrefix(queue, label)
+        + MAXIMUM_AM_RESOURCE_SUFFIX, percent);
+  }
+
   /*
    * Returns whether we should continue to look at all heart beating nodes even
    * after the reservation limit was hit. The node heart beating in could

http://git-wip-us.apache.org/repos/asf/hadoop/blob/56e4f623/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index a993ece..d92a821 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -84,7 +84,7 @@ public class LeafQueue extends AbstractCSQueue {
   protected int maxApplicationsPerUser;
   
   private float maxAMResourcePerQueuePercent;
-  
+
   private volatile int nodeLocalityDelay;
 
   Map<ApplicationAttemptId, FiCaSchedulerApp> applicationAttemptMap =
@@ -122,8 +122,8 @@ public class LeafQueue extends AbstractCSQueue {
   // preemption, key is the partition of the RMContainer allocated on
   private Map<String, TreeSet<RMContainer>> 
ignorePartitionExclusivityRMContainers =
       new HashMap<>();
-  
-  public LeafQueue(CapacitySchedulerContext cs, 
+
+  public LeafQueue(CapacitySchedulerContext cs,
       String queueName, CSQueue parent, CSQueue old) throws IOException {
     super(cs, queueName, parent, old);
     this.scheduler = cs;
@@ -538,79 +538,120 @@ public class LeafQueue extends AbstractCSQueue {
   }
   
   public synchronized Resource getAMResourceLimit() {
-     /* 
-      * The limit to the amount of resources which can be consumed by
-      * application masters for applications running in the queue
-      * is calculated by taking the greater of the max resources currently
-      * available to the queue (see absoluteMaxAvailCapacity) and the absolute
-      * resources guaranteed for the queue and multiplying it by the am
-      * resource percent.
-      *
-      * This is to allow a queue to grow its (proportional) application 
-      * master resource use up to its max capacity when other queues are 
-      * idle but to scale back down to it's guaranteed capacity as they 
-      * become busy.
-      *
-      */
-     Resource queueCurrentLimit;
-     synchronized (queueResourceLimitsInfo) {
-       queueCurrentLimit = queueResourceLimitsInfo.getQueueCurrentLimit();
-     }
-     Resource queueCap = Resources.max(resourceCalculator, lastClusterResource,
-       absoluteCapacityResource, queueCurrentLimit);
-    Resource amResouceLimit =
-        Resources.multiplyAndNormalizeUp(resourceCalculator, queueCap,
-            maxAMResourcePerQueuePercent, minimumAllocation);
+    return getAMResourceLimitPerPartition(RMNodeLabelsManager.NO_LABEL);
+  }
+
+  public synchronized Resource getUserAMResourceLimit() {
+     return getUserAMResourceLimitPerPartition(RMNodeLabelsManager.NO_LABEL);
+  }
+
+  public synchronized Resource getUserAMResourceLimitPerPartition(
+      String nodePartition) {
+    /*
+     * The user am resource limit is based on the same approach as the user
+     * limit (as it should represent a subset of that). This means that it uses
+     * the absolute queue capacity (per partition) instead of the max and is
+     * modified by the userlimit and the userlimit factor as is the userlimit
+     */
+    float effectiveUserLimit = Math.max(userLimit / 100.0f,
+        1.0f / Math.max(getActiveUsersManager().getNumActiveUsers(), 1));
+
+    Resource queuePartitionResource = Resources.multiplyAndNormalizeUp(
+        resourceCalculator,
+        labelManager.getResourceByLabel(nodePartition, lastClusterResource),
+        queueCapacities.getAbsoluteCapacity(nodePartition), minimumAllocation);
+
+    return Resources.multiplyAndNormalizeUp(resourceCalculator,
+        queuePartitionResource,
+        queueCapacities.getMaxAMResourcePercentage(nodePartition)
+            * effectiveUserLimit * userLimitFactor, minimumAllocation);
+  }
+
+  public synchronized Resource getAMResourceLimitPerPartition(
+      String nodePartition) {
+    /*
+     * For non-labeled partition, get the max value from resources currently
+     * available to the queue and the absolute resources guaranteed for the
+     * partition in the queue. For labeled partition, consider only the 
absolute
+     * resources guaranteed. Multiply this value (based on labeled/
+     * non-labeled), * with per-partition am-resource-percent to get the max am
+     * resource limit for this queue and partition.
+     */
+    Resource queuePartitionResource = Resources.multiplyAndNormalizeUp(
+        resourceCalculator,
+        labelManager.getResourceByLabel(nodePartition, lastClusterResource),
+        queueCapacities.getAbsoluteCapacity(nodePartition), minimumAllocation);
+
+    Resource queueCurrentLimit = Resources.none();
+    // For non-labeled partition, we need to consider the current queue
+    // usage limit.
+    if (nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
+      synchronized (queueResourceLimitsInfo) {
+        queueCurrentLimit = queueResourceLimitsInfo.getQueueCurrentLimit();
+      }
+    }
+
+    float amResourcePercent = queueCapacities
+        .getMaxAMResourcePercentage(nodePartition);
+
+    // Current usable resource for this queue and partition is the max of
+    // queueCurrentLimit and queuePartitionResource.
+    Resource queuePartitionUsableResource = Resources.max(resourceCalculator,
+        lastClusterResource, queueCurrentLimit, queuePartitionResource);
+
+    Resource amResouceLimit = Resources.multiplyAndNormalizeUp(
+        resourceCalculator, queuePartitionUsableResource, amResourcePercent,
+        minimumAllocation);
 
     metrics.setAMResouceLimit(amResouceLimit);
     return amResouceLimit;
   }
-  
-  public synchronized Resource getUserAMResourceLimit() {
-     /*
-      * The user amresource limit is based on the same approach as the 
-      * user limit (as it should represent a subset of that).  This means that
-      * it uses the absolute queue capacity instead of the max and is modified
-      * by the userlimit and the userlimit factor as is the userlimit
-      *
-      */ 
-     float effectiveUserLimit = Math.max(userLimit / 100.0f, 1.0f /    
-       Math.max(getActiveUsersManager().getNumActiveUsers(), 1));
-     
-     return Resources.multiplyAndNormalizeUp( 
-          resourceCalculator,
-          absoluteCapacityResource, 
-          maxAMResourcePerQueuePercent * effectiveUserLimit  *
-            userLimitFactor, minimumAllocation);
-  }
 
   private synchronized void activateApplications() {
-    //limit of allowed resource usage for application masters
-    Resource amLimit = getAMResourceLimit();
-    Resource userAMLimit = getUserAMResourceLimit();
-        
+    // limit of allowed resource usage for application masters
+    Map<String, Resource> amPartitionLimit = new HashMap<String, Resource>();
+    Map<String, Resource> userAmPartitionLimit =
+        new HashMap<String, Resource>();
+
     for (Iterator<FiCaSchedulerApp> i = getPendingAppsOrderingPolicy()
         .getAssignmentIterator(); i.hasNext();) {
       FiCaSchedulerApp application = i.next();
       ApplicationId applicationId = application.getApplicationId();
-      // Check am resource limit
-      Resource amIfStarted = 
-        Resources.add(application.getAMResource(), queueUsage.getAMUsed());
-      
+
+      // Get the am-node-partition associated with each application
+      // and calculate max-am resource limit for this partition.
+      String partitionName = application.getAppAMNodePartitionName();
+
+      Resource amLimit = amPartitionLimit.get(partitionName);
+      // Verify whether we already calculated am-limit for this label.
+      if (amLimit == null) {
+        amLimit = getAMResourceLimitPerPartition(partitionName);
+        amPartitionLimit.put(partitionName, amLimit);
+      }
+      // Check am resource limit.
+      Resource amIfStarted = Resources.add(
+          application.getAMResource(partitionName),
+          queueUsage.getAMUsed(partitionName));
+
       if (LOG.isDebugEnabled()) {
-        LOG.debug("application AMResource " + application.getAMResource() +
-          " maxAMResourcePerQueuePercent " + maxAMResourcePerQueuePercent +
-          " amLimit " + amLimit +
-          " lastClusterResource " + lastClusterResource +
-          " amIfStarted " + amIfStarted);
+        LOG.debug("application AMResource "
+            + application.getAMResource(partitionName)
+            + " maxAMResourcePerQueuePercent " + maxAMResourcePerQueuePercent
+            + " amLimit " + amLimit + " lastClusterResource "
+            + lastClusterResource + " amIfStarted " + amIfStarted
+            + " AM node-partition name " + partitionName);
       }
-      
-      if (!Resources.lessThanOrEqual(
-        resourceCalculator, lastClusterResource, amIfStarted, amLimit)) {
-        if (getNumActiveApplications() < 1) {
-          LOG.warn("maximum-am-resource-percent is insufficient to start a" +
-            " single application in queue, it is likely set too low." +
-            " skipping enforcement to allow at least one application to 
start"); 
+
+      if (!Resources.lessThanOrEqual(resourceCalculator, lastClusterResource,
+          amIfStarted, amLimit)) {
+        if (getNumActiveApplications() < 1
+            || (Resources.lessThanOrEqual(resourceCalculator,
+                lastClusterResource, queueUsage.getAMUsed(partitionName),
+                Resources.none()))) {
+          LOG.warn("maximum-am-resource-percent is insufficient to start a"
+              + " single application in queue, it is likely set too low."
+              + " skipping enforcement to allow at least one application"
+              + " to start");
         } else {
           LOG.info("Not activating application " + applicationId
               + " as  amIfStarted: " + amIfStarted + " exceeds amLimit: "
@@ -618,22 +659,31 @@ public class LeafQueue extends AbstractCSQueue {
           continue;
         }
       }
-      
+
       // Check user am resource limit
-      
       User user = getUser(application.getUser());
-      
-      Resource userAmIfStarted = 
-        Resources.add(application.getAMResource(),
-          user.getConsumedAMResources());
-        
-      if (!Resources.lessThanOrEqual(
-          resourceCalculator, lastClusterResource, userAmIfStarted, 
-          userAMLimit)) {
-        if (getNumActiveApplications() < 1) {
-          LOG.warn("maximum-am-resource-percent is insufficient to start a" +
-            " single application in queue for user, it is likely set too low." 
+
-            " skipping enforcement to allow at least one application to 
start"); 
+      Resource userAMLimit = userAmPartitionLimit.get(partitionName);
+
+      // Verify whether we already calculated user-am-limit for this label.
+      if (userAMLimit == null) {
+        userAMLimit = getUserAMResourceLimitPerPartition(partitionName);
+        userAmPartitionLimit.put(partitionName, userAMLimit);
+      }
+
+      Resource userAmIfStarted = Resources.add(
+          application.getAMResource(partitionName),
+          user.getConsumedAMResources(partitionName));
+
+      if (!Resources.lessThanOrEqual(resourceCalculator, lastClusterResource,
+          userAmIfStarted, userAMLimit)) {
+        if (getNumActiveApplications() < 1
+            || (Resources.lessThanOrEqual(resourceCalculator,
+                lastClusterResource, queueUsage.getAMUsed(partitionName),
+                Resources.none()))) {
+          LOG.warn("maximum-am-resource-percent is insufficient to start a"
+              + " single application in queue for user, it is likely set too"
+              + " low. skipping enforcement to allow at least one application"
+              + " to start");
         } else {
           LOG.info("Not activating application " + applicationId
               + " for user: " + user + " as userAmIfStarted: "
@@ -643,9 +693,12 @@ public class LeafQueue extends AbstractCSQueue {
       }
       user.activateApplication();
       orderingPolicy.addSchedulableEntity(application);
-      queueUsage.incAMUsed(application.getAMResource());
-      user.getResourceUsage().incAMUsed(application.getAMResource());
-      metrics.incAMUsed(application.getUser(), application.getAMResource());
+      queueUsage.incAMUsed(partitionName,
+          application.getAMResource(partitionName));
+      user.getResourceUsage().incAMUsed(partitionName,
+          application.getAMResource(partitionName));
+      metrics.incAMUsed(application.getUser(),
+          application.getAMResource(partitionName));
       metrics.setAMResouceLimitForUser(application.getUser(), userAMLimit);
       i.remove();
       LOG.info("Application " + applicationId + " from user: "
@@ -693,13 +746,16 @@ public class LeafQueue extends AbstractCSQueue {
 
   public synchronized void removeApplicationAttempt(
       FiCaSchedulerApp application, User user) {
+    String partitionName = application.getAppAMNodePartitionName();
     boolean wasActive =
       orderingPolicy.removeSchedulableEntity(application);
     if (!wasActive) {
       pendingOrderingPolicy.removeSchedulableEntity(application);
     } else {
-      queueUsage.decAMUsed(application.getAMResource());
-      user.getResourceUsage().decAMUsed(application.getAMResource());
+      queueUsage.decAMUsed(partitionName,
+          application.getAMResource(partitionName));
+      user.getResourceUsage().decAMUsed(partitionName,
+          application.getAMResource(partitionName));
       metrics.decAMUsed(application.getUser(), application.getAMResource());
     }
     applicationAttemptMap.remove(application.getApplicationAttemptId());
@@ -1328,6 +1384,22 @@ public class LeafQueue extends AbstractCSQueue {
     super.decUsedResource(nodeLabel, resourceToDec, application);
   }
 
+  public void incAMUsedResource(String nodeLabel, Resource resourceToInc,
+      SchedulerApplicationAttempt application) {
+    getUser(application.getUser()).getResourceUsage().incAMUsed(nodeLabel,
+        resourceToInc);
+    // ResourceUsage has its own lock, no addition lock needs here.
+    queueUsage.incAMUsed(nodeLabel, resourceToInc);
+  }
+
+  public void decAMUsedResource(String nodeLabel, Resource resourceToDec,
+      SchedulerApplicationAttempt application) {
+    getUser(application.getUser()).getResourceUsage().decAMUsed(nodeLabel,
+        resourceToDec);
+    // ResourceUsage has its own lock, no addition lock needs here.
+    queueUsage.decAMUsed(nodeLabel, resourceToDec);
+  }
+
   @VisibleForTesting
   public static class User {
     ResourceUsage userResourceUsage = new ResourceUsage();
@@ -1363,6 +1435,10 @@ public class LeafQueue extends AbstractCSQueue {
       return userResourceUsage.getAMUsed();
     }
 
+    public Resource getConsumedAMResources(String label) {
+      return userResourceUsage.getAMUsed(label);
+    }
+
     public int getTotalApplications() {
       return getPendingApplications() + getActiveApplications();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/56e4f623/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java
index d0a26d6..65100f6 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java
@@ -49,7 +49,8 @@ public class QueueCapacities {
   
   // Usage enum here to make implement cleaner
   private enum CapacityType {
-    USED_CAP(0), ABS_USED_CAP(1), MAX_CAP(2), ABS_MAX_CAP(3), CAP(4), 
ABS_CAP(5);
+    USED_CAP(0), ABS_USED_CAP(1), MAX_CAP(2), ABS_MAX_CAP(3), CAP(4), 
ABS_CAP(5),
+      MAX_AM_PERC(6);
 
     private int idx;
 
@@ -74,6 +75,7 @@ public class QueueCapacities {
       sb.append("abs_max_cap=" + capacitiesArr[3] + "%, ");
       sb.append("cap=" + capacitiesArr[4] + "%, ");
       sb.append("abs_cap=" + capacitiesArr[5] + "%}");
+      sb.append("max_am_perc=" + capacitiesArr[6] + "%}");
       return sb.toString();
     }
   }
@@ -213,7 +215,16 @@ public class QueueCapacities {
   public void setAbsoluteMaximumCapacity(String label, float value) {
     _set(label, CapacityType.ABS_MAX_CAP, value);
   }
-  
+
+  /* Absolute Maximum AM resource percentage Getter and Setter */
+  public float getMaxAMResourcePercentage(String label) {
+    return _get(label, CapacityType.MAX_AM_PERC);
+  }
+
+  public void setMaxAMResourcePercentage(String label, float value) {
+    _set(label, CapacityType.MAX_AM_PERC, value);
+  }
+
   /**
    * Clear configurable fields, like
    * (absolute)capacity/(absolute)maximum-capacity, this will be used by queue

http://git-wip-us.apache.org/repos/asf/hadoop/blob/56e4f623/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index e97da24..6f4bfe5 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
 import 
org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -99,18 +100,26 @@ public class FiCaSchedulerApp extends 
SchedulerApplicationAttempt {
     super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
     
     RMApp rmApp = rmContext.getRMApps().get(getApplicationId());
-    
+
     Resource amResource;
+    String partition;
+
     if (rmApp == null || rmApp.getAMResourceRequest() == null) {
-      //the rmApp may be undefined (the resource manager checks for this too)
-      //and unmanaged applications do not provide an amResource request
-      //in these cases, provide a default using the scheduler
+      // the rmApp may be undefined (the resource manager checks for this too)
+      // and unmanaged applications do not provide an amResource request
+      // in these cases, provide a default using the scheduler
       amResource = rmContext.getScheduler().getMinimumResourceCapability();
+      partition = CommonNodeLabelsManager.NO_LABEL;
     } else {
       amResource = rmApp.getAMResourceRequest().getCapability();
+      partition =
+          (rmApp.getAMResourceRequest().getNodeLabelExpression() == null)
+          ? CommonNodeLabelsManager.NO_LABEL
+          : rmApp.getAMResourceRequest().getNodeLabelExpression();
     }
-    
-    setAMResource(amResource);
+
+    setAppAMNodePartitionName(partition);
+    setAMResource(partition, amResource);
     setPriority(appPriority);
 
     scheduler = rmContext.getScheduler();
@@ -143,8 +152,8 @@ public class FiCaSchedulerApp extends 
SchedulerApplicationAttempt {
 
     containersToPreempt.remove(rmContainer.getContainerId());
 
-    RMAuditLogger.logSuccess(getUser(), 
-        AuditConstants.RELEASE_CONTAINER, "SchedulerApp", 
+    RMAuditLogger.logSuccess(getUser(),
+        AuditConstants.RELEASE_CONTAINER, "SchedulerApp",
         getApplicationId(), containerId);
     
     // Update usage metrics 
@@ -185,10 +194,10 @@ public class FiCaSchedulerApp extends 
SchedulerApplicationAttempt {
     // Update consumption and track allocations
     List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
         type, node, priority, request, container);
-    attemptResourceUsage.incUsed(node.getPartition(),
-        container.getResource());
-    
-    // Update resource requests related to "request" and store in RMContainer 
+
+    attemptResourceUsage.incUsed(node.getPartition(), container.getResource());
+
+    // Update resource requests related to "request" and store in RMContainer
     ((RMContainerImpl)rmContainer).setResourceRequests(resourceRequestList);
 
     // Inform the container
@@ -201,8 +210,8 @@ public class FiCaSchedulerApp extends 
SchedulerApplicationAttempt {
           + " container=" + container.getId() + " host="
           + container.getNodeId().getHost() + " type=" + type);
     }
-    RMAuditLogger.logSuccess(getUser(), 
-        AuditConstants.ALLOC_CONTAINER, "SchedulerApp", 
+    RMAuditLogger.logSuccess(getUser(),
+        AuditConstants.ALLOC_CONTAINER, "SchedulerApp",
         getApplicationId(), container.getId());
     
     return rmContainer;
@@ -483,5 +492,14 @@ public class FiCaSchedulerApp extends 
SchedulerApplicationAttempt {
     this.attemptResourceUsage.incUsed(newPartition, containerResource);
     getCSLeafQueue().decUsedResource(oldPartition, containerResource, this);
     getCSLeafQueue().incUsedResource(newPartition, containerResource, this);
+
+    // Update new partition name if container is AM and also update AM resource
+    if (rmContainer.isAMContainer()) {
+      setAppAMNodePartitionName(newPartition);
+      this.attemptResourceUsage.decAMUsed(oldPartition, containerResource);
+      this.attemptResourceUsage.incAMUsed(newPartition, containerResource);
+      getCSLeafQueue().decAMUsedResource(oldPartition, containerResource, 
this);
+      getCSLeafQueue().incAMUsedResource(newPartition, containerResource, 
this);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/56e4f623/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 6923dd2..0372cd7 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.LogAggregationContext;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
@@ -354,7 +355,19 @@ public class MockRM extends ResourceManager {
       super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
         YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null);
   }
-  
+
+  public RMApp submitApp(int masterMemory, String name, String user,
+      Map<ApplicationAccessType, String> acls, String queue, String amLabel)
+      throws Exception {
+    Resource resource = Records.newRecord(Resource.class);
+    resource.setMemory(masterMemory);
+    Priority priority = Priority.newInstance(0);
+    return submitApp(resource, name, user, acls, false, queue,
+      super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+      YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false,
+      false, null, 0, null, true, priority, amLabel);
+  }
+
   public RMApp submitApp(Resource resource, String name, String user,
       Map<ApplicationAccessType, String> acls, String queue) throws Exception {
     return submitApp(resource, name, user, acls, false, queue,
@@ -449,7 +462,20 @@ public class MockRM extends ResourceManager {
       boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
       ApplicationId applicationId, long attemptFailuresValidityInterval,
       LogAggregationContext logAggregationContext,
-      boolean cancelTokensWhenComplete, Priority priority)
+      boolean cancelTokensWhenComplete, Priority priority) throws Exception {
+    return submitApp(capability, name, user, acls, unmanaged, queue,
+      maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
+      isAppIdProvided, applicationId, attemptFailuresValidityInterval,
+      logAggregationContext, cancelTokensWhenComplete, priority, "");
+  }
+
+  public RMApp submitApp(Resource capability, String name, String user,
+      Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
+      int maxAppAttempts, Credentials ts, String appType,
+      boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
+      ApplicationId applicationId, long attemptFailuresValidityInterval,
+      LogAggregationContext logAggregationContext,
+      boolean cancelTokensWhenComplete, Priority priority, String amLabel)
       throws Exception {
     ApplicationId appId = isAppIdProvided ? applicationId : null;
     ApplicationClientProtocol client = getClientRMService();
@@ -492,6 +518,12 @@ public class MockRM extends ResourceManager {
       sub.setLogAggregationContext(logAggregationContext);
     }
     sub.setCancelTokensWhenComplete(cancelTokensWhenComplete);
+    if (amLabel != null && !amLabel.isEmpty()) {
+      ResourceRequest amResourceRequest = ResourceRequest.newInstance(
+          Priority.newInstance(0), ResourceRequest.ANY, capability, 1);
+      amResourceRequest.setNodeLabelExpression(amLabel.trim());
+      sub.setAMContainerResourceRequest(amResourceRequest);
+    }
     req.setApplicationSubmissionContext(sub);
     UserGroupInformation fakeUser =
       UserGroupInformation.createUserForTesting(user, new String[] 
{"someGroup"});

http://git-wip-us.apache.org/repos/asf/hadoop/blob/56e4f623/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
index 8c4ffd4..9f4b9f5 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
@@ -154,6 +155,10 @@ public class TestApplicationLimits {
     doReturn(user).when(application).getUser();
     doReturn(amResource).when(application).getAMResource();
     doReturn(Priority.newInstance(0)).when(application).getPriority();
+    doReturn(CommonNodeLabelsManager.NO_LABEL).when(application)
+        .getAppAMNodePartitionName();
+    doReturn(amResource).when(application).getAMResource(
+        CommonNodeLabelsManager.NO_LABEL);
     
when(application.compareInputOrderTo(any(FiCaSchedulerApp.class))).thenCallRealMethod();
     return application;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/56e4f623/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java
new file mode 100644
index 0000000..267abaf
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java
@@ -0,0 +1,511 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
+import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+public class TestApplicationLimitsByPartition {
+  final static int GB = 1024;
+
+  LeafQueue queue;
+  RMNodeLabelsManager mgr;
+  private YarnConfiguration conf;
+
+  RMContext rmContext = null;
+
+  @Before
+  public void setUp() throws IOException {
+    conf = new YarnConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    mgr = new NullRMNodeLabelsManager();
+    mgr.init(conf);
+  }
+
+  private void simpleNodeLabelMappingToManager() throws IOException {
+    // set node -> label
+    mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y"));
+    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0),
+        TestUtils.toSet("x"), NodeId.newInstance("h2", 0),
+        TestUtils.toSet("y")));
+  }
+
+  private void complexNodeLabelMappingToManager() throws IOException {
+    // set node -> label
+    mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y",
+        "z"));
+    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0),
+        TestUtils.toSet("x"), NodeId.newInstance("h2", 0),
+        TestUtils.toSet("y"), NodeId.newInstance("h3", 0),
+        TestUtils.toSet("y"), NodeId.newInstance("h4", 0),
+        TestUtils.toSet("z"), NodeId.newInstance("h5", 0),
+        RMNodeLabelsManager.EMPTY_STRING_SET));
+  }
+
+  @Test(timeout = 120000)
+  public void testAMResourceLimitWithLabels() throws Exception {
+    /*
+     * Test Case:
+     * Verify AM resource limit per partition level and per queue level. So
+     * we use 2 queues to verify this case.
+     * Queue a1 supports labels (x,y). Configure am-resource-limit as 0.2 (x)
+     * Queue c1 supports default label. Configure am-resource-limit as 0.2
+     *
+     * Queue A1 for label X can only support 2Gb AM resource.
+     * Queue C1 (empty label) can support 2Gb AM resource.
+     *
+     * Verify atleast one AM is launched, and AM resources should not go more
+     * than 2GB in each queue.
+     */
+
+    simpleNodeLabelMappingToManager();
+    CapacitySchedulerConfiguration config = (CapacitySchedulerConfiguration)
+        TestUtils.getConfigurationWithQueueLabels(conf);
+
+    // After getting queue conf, configure AM resource percent for Queue A1
+    // as 0.2 (Label X) and for Queue C1 as 0.2 (Empty Label)
+    final String A1 = CapacitySchedulerConfiguration.ROOT + ".a" + ".a1";
+    final String C1 = CapacitySchedulerConfiguration.ROOT + ".c" + ".c1";
+    config.setMaximumAMResourcePercentPerPartition(A1, "x", 0.2f);
+    config.setMaximumApplicationMasterResourcePerQueuePercent(C1, 0.2f);
+
+    // Now inject node label manager with this updated config
+    MockRM rm1 = new MockRM(config) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
+    rm1.registerNode("h2:1234", 10 * GB); // label = y
+    MockNM nm3 = rm1.registerNode("h3:1234", 10 * GB); // label = <empty>
+
+    // Submit app1 with 1Gb AM resource to Queue A1 for label X
+    RMApp app1 = rm1.submitApp(GB, "app", "user", null, "a1", "x");
+    MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    // Submit app2 with 1Gb AM resource to Queue A1 for label X
+    RMApp app2 = rm1.submitApp(GB, "app", "user", null, "a1", "x");
+    MockRM.launchAndRegisterAM(app2, rm1, nm1);
+
+    // Submit 3rd app to Queue A1 for label X, and this will be pending as
+    // AM limit is already crossed for label X. (2GB)
+    rm1.submitApp(GB, "app", "user", null, "a1", "x");
+
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1");
+    Assert.assertNotNull(leafQueue);
+
+    // Only one AM will be activated here and second AM will be still
+    // pending.
+    Assert.assertEquals(2, leafQueue.getNumActiveApplications());
+    Assert.assertEquals(1, leafQueue.getNumPendingApplications());
+
+    // Now verify the same test case in Queue C1 where label is not configured.
+    // Submit an app to Queue C1 with empty label
+    RMApp app3 = rm1.submitApp(GB, "app", "user", null, "c1");
+    MockRM.launchAndRegisterAM(app3, rm1, nm3);
+
+    // Submit next app to Queue C1 with empty label
+    RMApp app4 = rm1.submitApp(GB, "app", "user", null, "c1");
+    MockRM.launchAndRegisterAM(app4, rm1, nm3);
+
+    // Submit 3rd app to Queue C1. This will be pending as Queue's am-limit
+    // is reached.
+    rm1.submitApp(GB, "app", "user", null, "c1");
+
+    leafQueue = (LeafQueue) cs.getQueue("c1");
+    Assert.assertNotNull(leafQueue);
+
+    // 2 apps will be activated, third one will be pending as am-limit
+    // is reached.
+    Assert.assertEquals(2, leafQueue.getNumActiveApplications());
+    Assert.assertEquals(1, leafQueue.getNumPendingApplications());
+
+    rm1.killApp(app3.getApplicationId());
+    Thread.sleep(1000);
+
+    // After killing one running app, pending app will also get activated.
+    Assert.assertEquals(2, leafQueue.getNumActiveApplications());
+    Assert.assertEquals(0, leafQueue.getNumPendingApplications());
+    rm1.close();
+  }
+
+  @Test(timeout = 120000)
+  public void testAtleastOneAMRunPerPartition() throws Exception {
+    /*
+     * Test Case:
+     * Even though am-resource-limit per queue/partition may cross if we
+     * activate an app (high am resource demand), we have to activate it
+     * since no other apps are running in that Queue/Partition. Here also
+     * we run one test case for partition level and one in queue level to
+     * ensure no breakage in existing functionality.
+     *
+     * Queue a1 supports labels (x,y). Configure am-resource-limit as 0.15 (x)
+     * Queue c1 supports default label. Configure am-resource-limit as 0.15
+     *
+     * Queue A1 for label X can only support 1.5Gb AM resource.
+     * Queue C1 (empty label) can support 1.5Gb AM resource.
+     *
+     * Verify atleast one AM is launched in each Queue.
+     */
+    simpleNodeLabelMappingToManager();
+    CapacitySchedulerConfiguration config = (CapacitySchedulerConfiguration)
+        TestUtils.getConfigurationWithQueueLabels(conf);
+
+    // After getting queue conf, configure AM resource percent for Queue A1
+    // as 0.15 (Label X) and for Queue C1 as 0.15 (Empty Label)
+    final String A1 = CapacitySchedulerConfiguration.ROOT + ".a" + ".a1";
+    final String C1 = CapacitySchedulerConfiguration.ROOT + ".c" + ".c1";
+    config.setMaximumAMResourcePercentPerPartition(A1, "x", 0.15f);
+    config.setMaximumApplicationMasterResourcePerQueuePercent(C1, 0.15f);
+    // inject node label manager
+    MockRM rm1 = new MockRM(config) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
+    rm1.registerNode("h2:1234", 10 * GB); // label = y
+    MockNM nm3 = rm1.registerNode("h3:1234", 10 * GB); // label = <empty>
+
+    // Submit app1 (2 GB) to Queue A1 and label X
+    RMApp app1 = rm1.submitApp(2 * GB, "app", "user", null, "a1", "x");
+    // This app must be activated eventhough the am-resource per-partition
+    // limit is only for 1.5GB.
+    MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    // Submit 2nd app to label "X" with one GB and it must be pending since
+    // am-resource per-partition limit is crossed (1.5 GB was the limit).
+    rm1.submitApp(GB, "app", "user", null, "a1", "x");
+
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1");
+    Assert.assertNotNull(leafQueue);
+
+    // Only 1 app will be activated as am-limit for partition "x" is 0.15
+    Assert.assertEquals(1, leafQueue.getNumActiveApplications());
+    Assert.assertEquals(1, leafQueue.getNumPendingApplications());
+
+    // Now verify the same test case in Queue C1 which takes default label
+    // to see queue level am-resource-limit is still working as expected.
+
+    // Submit an app to Queue C1 with empty label (2 GB)
+    RMApp app3 = rm1.submitApp(2 * GB, "app", "user", null, "c1");
+    // This app must be activated even though the am-resource per-queue
+    // limit is only for 1.5GB
+    MockRM.launchAndRegisterAM(app3, rm1, nm3);
+
+    // Submit 2nd app to C1 (Default label, hence am-limit per-queue will be
+    // considered).
+    rm1.submitApp(GB, "app", "user", null, "c1");
+
+    leafQueue = (LeafQueue) cs.getQueue("c1");
+    Assert.assertNotNull(leafQueue);
+
+    // 1 app will be activated (and it has AM resource more than queue limit)
+    Assert.assertEquals(1, leafQueue.getNumActiveApplications());
+    Assert.assertEquals(1, leafQueue.getNumPendingApplications());
+    rm1.close();
+  }
+
+  @Test(timeout = 120000)
+  public void testDefaultAMLimitFromQueueForPartition() throws Exception {
+    /*
+     * Test Case:
+     * Configure AM resource limit per queue level. If partition level config
+     * is not found, we will be considering per-queue level am-limit. Ensure
+     * this is working as expected.
+     *
+     * Queue A1 am-resource limit to be configured as 0.2 (not for partition x)
+     *
+     * Eventhough per-partition level config is not done, CS should consider
+     * the configuration done for queue level.
+     */
+    simpleNodeLabelMappingToManager();
+    CapacitySchedulerConfiguration config = (CapacitySchedulerConfiguration)
+        TestUtils.getConfigurationWithQueueLabels(conf);
+
+    // After getting queue conf, configure AM resource percent for Queue A1
+    // as 0.2 (not for partition, rather in queue level)
+    final String A1 = CapacitySchedulerConfiguration.ROOT + ".a" + ".a1";
+    config.setMaximumApplicationMasterResourcePerQueuePercent(A1, 0.2f);
+    // inject node label manager
+    MockRM rm1 = new MockRM(config) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
+    rm1.registerNode("h2:1234", 10 * GB); // label = y
+    rm1.registerNode("h3:1234", 10 * GB); // label = <empty>
+
+    // Submit app1 (2 GB) to Queue A1 and label X
+    RMApp app1 = rm1.submitApp(2 * GB, "app", "user", null, "a1", "x");
+    MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    // Submit 2nd app to label "X" with one GB. Since queue am-limit is 2GB,
+    // 2nd app will be pending and first one will get activated.
+    rm1.submitApp(GB, "app", "user", null, "a1", "x");
+
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1");
+    Assert.assertNotNull(leafQueue);
+
+    // Only 1 app will be activated as am-limit for queue is 0.2 and same is
+    // used for partition "x" also.
+    Assert.assertEquals(1, leafQueue.getNumActiveApplications());
+    Assert.assertEquals(1, leafQueue.getNumPendingApplications());
+    rm1.close();
+  }
+
+  @Test(timeout = 120000)
+  public void testUserAMResourceLimitWithLabels() throws Exception {
+    /*
+     * Test Case:
+     * Verify user level AM resource limit. This test case is ran with two
+     * users. And per-partition level am-resource-limit will be 0.4, which
+     * internally will be 4GB. Hence 2GB will be available for each
+     * user for its AM resource.
+     *
+     * Now this test case will create a scenario where AM resource limit per
+     * partition is not met, but user level am-resource limit is reached.
+     * Hence app will be pending.
+     */
+
+    final String user_0 = "user_0";
+    final String user_1 = "user_1";
+    simpleNodeLabelMappingToManager();
+    CapacitySchedulerConfiguration config = (CapacitySchedulerConfiguration)
+        TestUtils.getConfigurationWithQueueLabels(conf);
+
+    // After getting queue conf, configure AM resource percent for Queue A1
+    // as 0.4 (Label X). Also set userlimit as 50% for this queue. So when we
+    // have two users submitting applications, each user will get 50%  of AM
+    // resource which is available in this partition.
+    final String A1 = CapacitySchedulerConfiguration.ROOT + ".a" + ".a1";
+    config.setMaximumAMResourcePercentPerPartition(A1, "x", 0.4f);
+    config.setUserLimit(A1, 50);
+
+    // Now inject node label manager with this updated config
+    MockRM rm1 = new MockRM(config) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
+    rm1.registerNode("h2:1234", 10 * GB); // label = y
+    rm1.registerNode("h3:1234", 10 * GB); // label = <empty>
+
+    // Submit app1 with 1Gb AM resource to Queue A1 for label X for user0
+    RMApp app1 = rm1.submitApp(GB, "app", user_0, null, "a1", "x");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    // Place few allocate requests to make it an active application
+    am1.allocate("*", 1 * GB, 15, new ArrayList<ContainerId>(), "");
+
+    // Now submit 2nd app to Queue A1 for label X for user1
+    RMApp app2 = rm1.submitApp(GB, "app", user_1, null, "a1", "x");
+    MockRM.launchAndRegisterAM(app2, rm1, nm1);
+
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1");
+    Assert.assertNotNull(leafQueue);
+
+    // Verify active applications count in this queue.
+    Assert.assertEquals(2, leafQueue.getNumActiveApplications());
+    Assert.assertEquals(1, leafQueue.getNumActiveApplications(user_0));
+    Assert.assertEquals(0, leafQueue.getNumPendingApplications());
+
+    // Submit 3rd app to Queue A1 for label X for user1. Now user1 will have
+    // 2 applications (2 GB resource) and user0 will have one app (1GB).
+    RMApp app3 = rm1.submitApp(GB, "app", user_1, null, "a1", "x");
+    MockAM am2 = MockRM.launchAndRegisterAM(app3, rm1, nm1);
+
+    // Place few allocate requests to make it an active application. This is
+    // to ensure that user1 and user0 are active users.
+    am2.allocate("*", 1 * GB, 10, new ArrayList<ContainerId>(), "");
+
+    // Submit final app to Queue A1 for label X. Since we are trying to submit
+    // for user1, we need 3Gb resource for AMs.
+    // 4Gb -> 40% of label "X" in queue A1
+    // Since we have 2 users, 50% of 4Gb will be max for each user. Here user1
+    // has already crossed this 2GB limit, hence this app will be pending.
+    rm1.submitApp(GB, "app", user_1, null, "a1", "x");
+
+    // Verify active applications count per user and also in queue level.
+    Assert.assertEquals(3, leafQueue.getNumActiveApplications());
+    Assert.assertEquals(1, leafQueue.getNumActiveApplications(user_0));
+    Assert.assertEquals(2, leafQueue.getNumActiveApplications(user_1));
+    Assert.assertEquals(1, leafQueue.getNumPendingApplications(user_1));
+    Assert.assertEquals(1, leafQueue.getNumPendingApplications());
+    rm1.close();
+  }
+
+  @Test
+  public void testAMResourceLimitForMultipleApplications() throws Exception {
+    /*
+     * Test Case:
+     * In a complex node label setup, verify am-resource-percentage calculation
+     * and check whether applications can get activated as per expectation.
+     */
+    complexNodeLabelMappingToManager();
+    CapacitySchedulerConfiguration config = (CapacitySchedulerConfiguration)
+        TestUtils.getComplexConfigurationWithQueueLabels(conf);
+
+    /*
+     * Queue structure:
+     *                      root (*)
+     *                  ________________
+     *                 /                \
+     *               a x(100%), y(50%)   b y(50%), z(100%)
+     *               ________________    ______________
+     *              /                   /              \
+     *             a1 (x,y)         b1(no)              b2(y,z)
+     *               100%                          y = 100%, z = 100%
+     *
+     * Node structure:
+     * h1 : x
+     * h2 : y
+     * h3 : y
+     * h4 : z
+     * h5 : NO
+     *
+     * Total resource:
+     * x: 10G
+     * y: 20G
+     * z: 10G
+     * *: 10G
+     *
+     * AM resource percentage config:
+     * A1  : 0.25
+     * B2  : 0.15
+     */
+    final String A1 = CapacitySchedulerConfiguration.ROOT + ".a" + ".a1";
+    final String B1 = CapacitySchedulerConfiguration.ROOT + ".b" + ".b1";
+    config.setMaximumAMResourcePercentPerPartition(A1, "y", 0.25f);
+    config.setMaximumApplicationMasterResourcePerQueuePercent(B1, 0.15f);
+
+    // Now inject node label manager with this updated config
+    MockRM rm1 = new MockRM(config) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    rm1.registerNode("h1:1234", 10 * GB); // label = x
+    MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = y
+    MockNM nm3 = rm1.registerNode("h3:1234", 10 * GB); // label = y
+    rm1.registerNode("h4:1234", 10 * GB); // label = z
+    MockNM nm5 = rm1.registerNode("h5:1234", 10 * GB); // label = <empty>
+
+    // Submit app1 with 2Gb AM resource to Queue A1 for label Y
+    RMApp app1 = rm1.submitApp(2 * GB, "app", "user", null, "a1", "y");
+    MockRM.launchAndRegisterAM(app1, rm1, nm2);
+
+    // Submit app2 with 1Gb AM resource to Queue A1 for label Y
+    RMApp app2 = rm1.submitApp(GB, "app", "user", null, "a1", "y");
+    MockRM.launchAndRegisterAM(app2, rm1, nm3);
+
+    // Submit another app with 1Gb AM resource to Queue A1 for label Y
+    rm1.submitApp(GB, "app", "user", null, "a1", "y");
+
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1");
+    Assert.assertNotNull(leafQueue);
+
+    /*
+     *  capacity of queue A  -> 50% for label Y
+     *  capacity of queue A1 -> 100% for label Y
+     *
+     *  Total resources available for label Y -> 20GB (nm2 and nm3)
+     *  Hence in queue A1, max resource for label Y is 10GB.
+     *
+     *  AM resource percent config for queue A1 -> 0.25
+     *        ==> 2.5Gb (3 Gb) is max-am-resource-limit
+     */
+    Assert.assertEquals(2, leafQueue.getNumActiveApplications());
+    Assert.assertEquals(1, leafQueue.getNumPendingApplications());
+
+    // Submit app3 with 1Gb AM resource to Queue B1 (no_label)
+    RMApp app3 = rm1.submitApp(GB, "app", "user", null, "b1");
+    MockRM.launchAndRegisterAM(app3, rm1, nm5);
+
+    // Submit another app with 1Gb AM resource to Queue B1 (no_label)
+    rm1.submitApp(GB, "app", "user", null, "b1");
+
+    leafQueue = (LeafQueue) cs.getQueue("b1");
+    Assert.assertNotNull(leafQueue);
+
+    /*
+     *  capacity of queue B  -> 90% for queue
+     *                       -> and 100% for no-label
+     *  capacity of queue B1 -> 50% for no-label/queue
+     *
+     *  Total resources available for no-label -> 10GB (nm5)
+     *  Hence in queue B1, max resource for no-label is 5GB.
+     *
+     *  AM resource percent config for queue B1 -> 0.15
+     *        ==> 1Gb is max-am-resource-limit
+     *
+     *  Only one app will be activated and all othe will be pending.
+     */
+    Assert.assertEquals(1, leafQueue.getNumActiveApplications());
+    Assert.assertEquals(1, leafQueue.getNumPendingApplications());
+
+    rm1.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/56e4f623/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
index 94af4e0..fe24b2d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
@@ -95,7 +95,11 @@ public class TestCapacitySchedulerNodeLabelUpdate {
   private void checkUsedResource(MockRM rm, String queueName, int memory) {
     checkUsedResource(rm, queueName, memory, RMNodeLabelsManager.NO_LABEL);
   }
-  
+
+  private void checkAMUsedResource(MockRM rm, String queueName, int memory) {
+    checkAMUsedResource(rm, queueName, memory, RMNodeLabelsManager.NO_LABEL);
+  }
+
   private void checkUsedResource(MockRM rm, String queueName, int memory,
       String label) {
     CapacityScheduler scheduler = (CapacityScheduler) 
rm.getResourceScheduler();
@@ -104,6 +108,14 @@ public class TestCapacitySchedulerNodeLabelUpdate {
         .getMemory());
   }
 
+  private void checkAMUsedResource(MockRM rm, String queueName, int memory,
+      String label) {
+    CapacityScheduler scheduler = (CapacityScheduler) 
rm.getResourceScheduler();
+    CSQueue queue = scheduler.getQueue(queueName);
+    Assert.assertEquals(memory, queue.getQueueResourceUsage().getAMUsed(label)
+        .getMemory());
+  }
+
   private void checkUserUsedResource(MockRM rm, String queueName,
       String userName, String partition, int memory) {
     CapacityScheduler scheduler = (CapacityScheduler) 
rm.getResourceScheduler();
@@ -424,4 +436,93 @@ public class TestCapacitySchedulerNodeLabelUpdate {
 
     rm.close();
   }
+
+  @Test (timeout = 60000)
+  public void testAMResourceUsageWhenNodeUpdatesPartition()
+      throws Exception {
+    // set node -> label
+    mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y", 
"z"));
+
+    // set mapping:
+    // h1 -> x
+    // h2 -> y
+    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), 
toSet("x")));
+    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h2", 0), 
toSet("y")));
+
+    // inject node label manager
+    MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm.getRMContext().setNodeLabelManager(mgr);
+    rm.start();
+    MockNM nm1 = rm.registerNode("h1:1234", 8000);
+    rm.registerNode("h2:1234", 8000);
+    rm.registerNode("h3:1234", 8000);
+
+    ContainerId containerId2;
+
+    // launch an app to queue a1 (label = x), and check all container will
+    // be allocated in h1
+    RMApp app1 = rm.submitApp(GB, "app", "user", null, "a", "x");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
+
+    // request a container.
+    am1.allocate("*", GB, 1, new ArrayList<ContainerId>(), "x");
+    ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
+    containerId2 = ContainerId.newContainerId(am1.getApplicationAttemptId(), 
2);
+    Assert.assertTrue(rm.waitForState(nm1, containerId2,
+        RMContainerState.ALLOCATED, 10 * 1000));
+
+    // check used resource:
+    // queue-a used x=2G
+    checkUsedResource(rm, "a", 2048, "x");
+    checkAMUsedResource(rm, "a", 1024, "x");
+
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    FiCaSchedulerApp app = 
cs.getApplicationAttempt(am1.getApplicationAttemptId());
+
+    // change h1's label to z
+    cs.handle(new 
NodeLabelsUpdateSchedulerEvent(ImmutableMap.of(nm1.getNodeId(),
+        toSet("z"))));
+
+    // Now the resources also should change from x to z. Verify AM and normal
+    // used resource are successfully changed.
+    checkUsedResource(rm, "a", 0, "x");
+    checkUsedResource(rm, "a", 2048, "z");
+    checkAMUsedResource(rm, "a", 0, "x");
+    checkAMUsedResource(rm, "a", 1024, "z");
+    checkUserUsedResource(rm, "a", "user", "x", 0);
+    checkUserUsedResource(rm, "a", "user", "z", 2048);
+    Assert.assertEquals(0,
+        app.getAppAttemptResourceUsage().getAMUsed("x").getMemory());
+    Assert.assertEquals(1024,
+        app.getAppAttemptResourceUsage().getAMUsed("z").getMemory());
+
+    // change h1's label to no label
+    Set<String> emptyLabels = new HashSet<>();
+    Map<NodeId,Set<String>> map = ImmutableMap.of(nm1.getNodeId(),
+        emptyLabels);
+    cs.handle(new NodeLabelsUpdateSchedulerEvent(map));
+    checkUsedResource(rm, "a", 0, "x");
+    checkUsedResource(rm, "a", 0, "z");
+    checkUsedResource(rm, "a", 2048);
+    checkAMUsedResource(rm, "a", 0, "x");
+    checkAMUsedResource(rm, "a", 0, "z");
+    checkAMUsedResource(rm, "a", 1024);
+    checkUserUsedResource(rm, "a", "user", "x", 0);
+    checkUserUsedResource(rm, "a", "user", "z", 0);
+    checkUserUsedResource(rm, "a", "user", "", 2048);
+    Assert.assertEquals(0,
+        app.getAppAttemptResourceUsage().getAMUsed("x").getMemory());
+    Assert.assertEquals(0,
+        app.getAppAttemptResourceUsage().getAMUsed("z").getMemory());
+    Assert.assertEquals(1024,
+        app.getAppAttemptResourceUsage().getAMUsed("").getMemory());
+
+    rm.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/56e4f623/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
index 5ffcaad..489ef77 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
@@ -221,13 +221,13 @@ public class TestUtils {
     when(container.getPriority()).thenReturn(priority);
     return container;
   }
-  
+
   @SuppressWarnings("unchecked")
-  private static <E> Set<E> toSet(E... elements) {
+  public static <E> Set<E> toSet(E... elements) {
     Set<E> set = Sets.newHashSet(elements);
     return set;
   }
-  
+
   /**
    * Get a queue structure:
    * <pre>

Reply via email to