Repository: hadoop
Updated Branches:
  refs/heads/trunk c4980a2f3 -> 69c8a7f45


YARN-1582. Capacity Scheduler: add a maximum-allocation-mb setting per queue. 
Contributed by Thomas Graves


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/69c8a7f4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/69c8a7f4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/69c8a7f4

Branch: refs/heads/trunk
Commit: 69c8a7f45be5c0aa6787b07f328d74f1e2ba5628
Parents: c4980a2
Author: Jason Lowe <jl...@apache.org>
Authored: Thu Feb 5 19:28:49 2015 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Thu Feb 5 19:28:49 2015 +0000

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../ApplicationMasterService.java               |   2 +-
 .../scheduler/AbstractYarnScheduler.java        |  23 ++
 .../scheduler/YarnScheduler.java                |  11 +-
 .../scheduler/capacity/AbstractCSQueue.java     |   8 +-
 .../scheduler/capacity/CapacityScheduler.java   |  17 ++
 .../CapacitySchedulerConfiguration.java         |  49 +++
 .../capacity/CapacitySchedulerContext.java      |   2 +
 .../scheduler/capacity/LeafQueue.java           |  36 ++-
 .../scheduler/capacity/ParentQueue.java         |   2 +-
 .../capacity/TestCapacityScheduler.java         | 302 ++++++++++++++++++-
 .../src/site/apt/CapacityScheduler.apt.vm       |  12 +
 12 files changed, 455 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/69c8a7f4/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 936cdf4..f41e5d6 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -245,6 +245,9 @@ Release 2.7.0 - UNRELEASED
     YARN-3123. Made YARN CLI show a single completed container even if the app
     is running. (Naganarasimha G R via zjshen)
 
+    YARN-1582. Capacity Scheduler: add a maximum-allocation-mb setting per
+    queue (Thomas Graves via jlowe)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/69c8a7f4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index d0b199f..0cdc1e2 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -285,7 +285,7 @@ public class ApplicationMasterService extends 
AbstractService implements
       RegisterApplicationMasterResponse response = recordFactory
           .newRecordInstance(RegisterApplicationMasterResponse.class);
       response.setMaximumResourceCapability(rScheduler
-          .getMaximumResourceCapability());
+          .getMaximumResourceCapability(app.getQueue()));
       response.setApplicationACLs(app.getRMAppAttempt(applicationAttemptId)
           .getSubmissionContext().getAMContainerSpec().getApplicationACLs());
       response.setQueue(app.getQueue());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/69c8a7f4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 753259c..04b3452 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -180,6 +180,11 @@ public abstract class AbstractYarnScheduler
     return maxResource;
   }
 
+  @Override
+  public Resource getMaximumResourceCapability(String queueName) {
+    return getMaximumResourceCapability();
+  }
+
   protected void initMaximumResourceCapability(Resource maximumAllocation) {
     maxAllocWriteLock.lock();
     try {
@@ -635,4 +640,22 @@ public abstract class AbstractYarnScheduler
       maxAllocWriteLock.unlock();
     }
   }
+
+  protected void refreshMaximumAllocation(Resource newMaxAlloc) {
+    maxAllocWriteLock.lock();
+    try {
+      configuredMaximumAllocation = Resources.clone(newMaxAlloc);
+      int maxMemory = newMaxAlloc.getMemory();
+      if (maxNodeMemory != -1) {
+        maxMemory = Math.min(maxMemory, maxNodeMemory);
+      }
+      int maxVcores = newMaxAlloc.getVirtualCores();
+      if (maxNodeVCores != -1) {
+        maxVcores = Math.min(maxVcores, maxNodeVCores);
+      }
+      maximumAllocation = Resources.createResource(maxMemory, maxVcores);
+    } finally {
+      maxAllocWriteLock.unlock();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/69c8a7f4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
index 4a3a35c..b99b217 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
@@ -92,13 +92,22 @@ public interface YarnScheduler extends 
EventHandler<SchedulerEvent> {
   public Resource getMinimumResourceCapability();
   
   /**
-   * Get maximum allocatable {@link Resource}.
+   * Get maximum allocatable {@link Resource} at the cluster level.
    * @return maximum allocatable resource
    */
   @Public
   @Stable
   public Resource getMaximumResourceCapability();
 
+  /**
+   * Get maximum allocatable {@link Resource} for the queue specified.
+   * @param queueName queue name
+   * @return maximum allocatable resource
+   */
+  @Public
+  @Stable
+  public Resource getMaximumResourceCapability(String queueName);
+
   @LimitedPrivate("yarn")
   @Evolving
   ResourceCalculator getResourceCalculator();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/69c8a7f4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index c4651da..e4c2665 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -57,7 +57,7 @@ public abstract class AbstractCSQueue implements CSQueue {
   volatile int numContainers;
   
   final Resource minimumAllocation;
-  final Resource maximumAllocation;
+  Resource maximumAllocation;
   QueueState state;
   final QueueMetrics metrics;
   
@@ -255,7 +255,7 @@ public abstract class AbstractCSQueue implements CSQueue {
       Set<String> labels, String defaultLabelExpression,
       Map<String, Float> nodeLabelCapacities,
       Map<String, Float> maximumNodeLabelCapacities,
-      boolean reservationContinueLooking)
+      boolean reservationContinueLooking, Resource maxAllocation)
       throws IOException {
     // Sanity check
     CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
@@ -326,6 +326,8 @@ public abstract class AbstractCSQueue implements CSQueue {
     this.reservationsContinueLooking = reservationContinueLooking;
 
     this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(this);
+
+    this.maximumAllocation = maxAllocation;
   }
   
   protected QueueInfo getQueueInfo() {
@@ -341,7 +343,7 @@ public abstract class AbstractCSQueue implements CSQueue {
   }
   
   @Private
-  public Resource getMaximumAllocation() {
+  public synchronized Resource getMaximumAllocation() {
     return maximumAllocation;
   }
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/69c8a7f4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index e622d3a..916a4db 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -356,9 +357,11 @@ public class CapacityScheduler extends
     validateConf(this.conf);
     try {
       LOG.info("Re-initializing queues...");
+      refreshMaximumAllocation(this.conf.getMaximumAllocation());
       reinitializeQueues(this.conf);
     } catch (Throwable t) {
       this.conf = oldConf;
+      refreshMaximumAllocation(this.conf.getMaximumAllocation());
       throw new IOException("Failed to re-init queues", t);
     }
   }
@@ -1580,6 +1583,20 @@ public class CapacityScheduler extends
       .of(SchedulerResourceTypes.MEMORY, SchedulerResourceTypes.CPU);
   }
   
+  @Override
+  public Resource getMaximumResourceCapability(String queueName) {
+    CSQueue queue = getQueue(queueName);
+    if (queue == null) {
+      LOG.error("Unknown queue: " + queueName);
+      return getMaximumResourceCapability();
+    }
+    if (!(queue instanceof LeafQueue)) {
+      LOG.error("queue " + queueName + " is not an leaf queue");
+      return getMaximumResourceCapability();
+    }
+    return ((LeafQueue)queue).getMaximumAllocation();
+  }
+
   private String handleMoveToPlanQueue(String targetQueueName) {
     CSQueue dest = getQueue(targetQueueName);
     if (dest != null && dest instanceof PlanQueue) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/69c8a7f4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index 55c6c0c..268cc6c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -109,6 +109,13 @@ public class CapacitySchedulerConfiguration extends 
ReservationSchedulerConfigur
   public static final boolean DEFAULT_RESERVE_CONT_LOOK_ALL_NODES = true;
 
   @Private
+  public static final String MAXIMUM_ALLOCATION_MB = "maximum-allocation-mb";
+
+  @Private
+  public static final String MAXIMUM_ALLOCATION_VCORES =
+      "maximum-allocation-vcores";
+
+  @Private
   public static final int DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS = 10000;
   
   @Private
@@ -580,6 +587,48 @@ public class CapacitySchedulerConfiguration extends 
ReservationSchedulerConfigur
     return Resources.createResource(maximumMemory, maximumCores);
   }
 
+  /**
+   * Get the per queue setting for the maximum limit to allocate to
+   * each container request.
+   *
+   * @param queue
+   *          name of the queue
+   * @return setting specified per queue else falls back to the cluster setting
+   */
+  public Resource getMaximumAllocationPerQueue(String queue) {
+    String queuePrefix = getQueuePrefix(queue);
+    int maxAllocationMbPerQueue = getInt(queuePrefix + MAXIMUM_ALLOCATION_MB,
+        (int)UNDEFINED);
+    int maxAllocationVcoresPerQueue = getInt(
+        queuePrefix + MAXIMUM_ALLOCATION_VCORES, (int)UNDEFINED);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("max alloc mb per queue for " + queue + " is "
+          + maxAllocationMbPerQueue);
+      LOG.debug("max alloc vcores per queue for " + queue + " is "
+          + maxAllocationVcoresPerQueue);
+    }
+    Resource clusterMax = getMaximumAllocation();
+    if (maxAllocationMbPerQueue == (int)UNDEFINED) {
+      LOG.info("max alloc mb per queue for " + queue + " is undefined");
+      maxAllocationMbPerQueue = clusterMax.getMemory();
+    }
+    if (maxAllocationVcoresPerQueue == (int)UNDEFINED) {
+       LOG.info("max alloc vcore per queue for " + queue + " is undefined");
+      maxAllocationVcoresPerQueue = clusterMax.getVirtualCores();
+    }
+    Resource result = Resources.createResource(maxAllocationMbPerQueue,
+        maxAllocationVcoresPerQueue);
+    if (maxAllocationMbPerQueue > clusterMax.getMemory()
+        || maxAllocationVcoresPerQueue > clusterMax.getVirtualCores()) {
+      throw new IllegalArgumentException(
+          "Queue maximum allocation cannot be larger than the cluster setting"
+          + " for queue " + queue
+          + " max allocation per queue: " + result
+          + " cluster setting: " + clusterMax);
+    }
+    return result;
+  }
+
   public boolean getEnableUserMetrics() {
     return getBoolean(ENABLE_USER_METRICS, DEFAULT_ENABLE_USER_METRICS);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/69c8a7f4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
index 03a1cb6..28dc988 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
@@ -39,6 +39,8 @@ public interface CapacitySchedulerContext {
 
   Resource getMaximumResourceCapability();
 
+  Resource getMaximumResourceCapability(String queueName);
+
   RMContainerTokenSecretManager getContainerTokenSecretManager();
   
   int getNumClusterNodes();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/69c8a7f4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index adb86a5..c143210 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -97,7 +97,7 @@ public class LeafQueue extends AbstractCSQueue {
   
   Set<FiCaSchedulerApp> pendingApplications;
   
-  private final float minimumAllocationFactor;
+  private float minimumAllocationFactor;
 
   private Map<String, User> users = new HashMap<String, User>();
 
@@ -162,7 +162,8 @@ public class LeafQueue extends AbstractCSQueue {
         state, acls, cs.getConfiguration().getNodeLocalityDelay(), 
accessibleLabels,
         defaultLabelExpression, this.capacitiyByNodeLabels,
         this.maxCapacityByNodeLabels,
-        cs.getConfiguration().getReservationContinueLook());
+        cs.getConfiguration().getReservationContinueLook(),
+        cs.getConfiguration().getMaximumAllocationPerQueue(getQueuePath()));
 
     if(LOG.isDebugEnabled()) {
       LOG.debug("LeafQueue:" + " name=" + queueName
@@ -192,11 +193,12 @@ public class LeafQueue extends AbstractCSQueue {
       Set<String> labels, String defaultLabelExpression,
       Map<String, Float> capacitieByLabel,
       Map<String, Float> maximumCapacitiesByLabel, 
-      boolean revervationContinueLooking) throws IOException {
+      boolean revervationContinueLooking,
+      Resource maxAllocation) throws IOException {
     super.setupQueueConfigs(clusterResource, capacity, absoluteCapacity,
         maximumCapacity, absoluteMaxCapacity, state, acls, labels,
         defaultLabelExpression, capacitieByLabel, maximumCapacitiesByLabel,
-        revervationContinueLooking);
+        revervationContinueLooking, maxAllocation);
     // Sanity check
     CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
     float absCapacity = getParent().getAbsoluteCapacity() * capacity;
@@ -238,6 +240,12 @@ public class LeafQueue extends AbstractCSQueue {
     
     this.nodeLocalityDelay = nodeLocalityDelay;
 
+    // re-init this since max allocation could have changed
+    this.minimumAllocationFactor =
+        Resources.ratio(resourceCalculator,
+            Resources.subtract(maximumAllocation, minimumAllocation),
+            maximumAllocation);
+
     StringBuilder aclsString = new StringBuilder();
     for (Map.Entry<QueueACL, AccessControlList> e : acls.entrySet()) {
       aclsString.append(e.getKey() + ":" + e.getValue().getAclString());
@@ -283,6 +291,8 @@ public class LeafQueue extends AbstractCSQueue {
         "minimumAllocationFactor = " + minimumAllocationFactor +
         " [= (float)(maximumAllocationMemory - minimumAllocationMemory) / " +
         "maximumAllocationMemory ]" + "\n" +
+        "maximumAllocation = " + maximumAllocation +
+        " [= configuredMaxAllocation ]" + "\n" +
         "numContainers = " + numContainers +
         " [= currentNumContainers ]" + "\n" +
         "state = " + state +
@@ -479,6 +489,21 @@ public class LeafQueue extends AbstractCSQueue {
     }
 
     LeafQueue newlyParsedLeafQueue = (LeafQueue)newlyParsedQueue;
+
+    // don't allow the maximum allocation to be decreased in size
+    // since we have already told running AM's the size
+    Resource oldMax = getMaximumAllocation();
+    Resource newMax = newlyParsedLeafQueue.getMaximumAllocation();
+    if (newMax.getMemory() < oldMax.getMemory()
+        || newMax.getVirtualCores() < oldMax.getVirtualCores()) {
+      throw new IOException(
+          "Trying to reinitialize "
+              + getQueuePath()
+              + " the maximum allocation size can not be decreased!"
+              + " Current setting: " + oldMax
+              + ", trying to set it to: " + newMax);
+    }
+
     setupQueueConfigs(
         clusterResource,
         newlyParsedLeafQueue.capacity, newlyParsedLeafQueue.absoluteCapacity, 
@@ -494,7 +519,8 @@ public class LeafQueue extends AbstractCSQueue {
         newlyParsedLeafQueue.defaultLabelExpression,
         newlyParsedLeafQueue.capacitiyByNodeLabels,
         newlyParsedLeafQueue.maxCapacityByNodeLabels,
-        newlyParsedLeafQueue.reservationsContinueLooking);
+        newlyParsedLeafQueue.reservationsContinueLooking,
+        newlyParsedLeafQueue.getMaximumAllocation());
 
     // queue metrics are updated, more resource may be available
     // activate the pending applications if possible

http://git-wip-us.apache.org/repos/asf/hadoop/blob/69c8a7f4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index de92c9c..d66c06a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -132,7 +132,7 @@ public class ParentQueue extends AbstractCSQueue {
     super.setupQueueConfigs(clusterResource, capacity, absoluteCapacity,
         maximumCapacity, absoluteMaxCapacity, state, acls, accessibleLabels,
         defaultLabelExpression, nodeLabelCapacities, maximumCapacitiesByLabel,
-        reservationContinueLooking);
+        reservationContinueLooking, maximumAllocation);
    StringBuilder aclsString = new StringBuilder();
     for (Map.Entry<QueueACL, AccessControlList> e : acls.entrySet()) {
       aclsString.append(e.getKey() + ":" + e.getValue().getAclString());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/69c8a7f4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index b6da94d..38d9d27 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -500,9 +500,12 @@ public class TestCapacityScheduler {
   public void testParseQueue() throws IOException {
     CapacityScheduler cs = new CapacityScheduler();
     cs.setConf(new YarnConfiguration());
-
+    cs.setRMContext(resourceManager.getRMContext());
     CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
     setupQueueConfiguration(conf);
+    cs.init(conf);
+    cs.start();
+
     conf.setQueues(CapacitySchedulerConfiguration.ROOT + ".a.a1", new String[] 
{"b1"} );
     conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a1.b1", 100.0f);
     conf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + ".a.a1.b1", 
100.0f);
@@ -2124,4 +2127,301 @@ public class TestCapacityScheduler {
     assertFalse("queue " + B2 + " should have been preemptable",
         queueB2.getPreemptionDisabled());
   }
+
+  @Test
+  public void testRefreshQueuesMaxAllocationRefresh() throws Exception {
+    // queue refresh should not allow changing the maximum allocation setting
+    // per queue to be smaller than previous setting
+    CapacityScheduler cs = new CapacityScheduler();
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(conf);
+    cs.setConf(new YarnConfiguration());
+    cs.setRMContext(resourceManager.getRMContext());
+    cs.init(conf);
+    cs.start();
+    cs.reinitialize(conf, mockContext);
+    checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
+
+    assertEquals("max allocation in CS",
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+        cs.getMaximumResourceCapability().getMemory());
+    assertEquals("max allocation for A1",
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+        conf.getMaximumAllocationPerQueue(A1).getMemory());
+    assertEquals("max allocation",
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+        conf.getMaximumAllocation().getMemory());
+
+    CSQueue rootQueue = cs.getRootQueue();
+    CSQueue queueA = findQueue(rootQueue, A);
+    CSQueue queueA1 = findQueue(queueA, A1);
+    assertEquals("queue max allocation", ((LeafQueue) queueA1)
+        .getMaximumAllocation().getMemory(), 8192);
+
+    setMaxAllocMb(conf, A1, 4096);
+
+    try {
+      cs.reinitialize(conf, mockContext);
+      fail("should have thrown exception");
+    } catch (IOException e) {
+      assertTrue("max allocation exception",
+          e.getCause().toString().contains("not be decreased"));
+    }
+
+    setMaxAllocMb(conf, A1, 8192);
+    cs.reinitialize(conf, mockContext);
+
+    setMaxAllocVcores(conf, A1,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES - 1);
+    try {
+      cs.reinitialize(conf, mockContext);
+      fail("should have thrown exception");
+    } catch (IOException e) {
+      assertTrue("max allocation exception",
+          e.getCause().toString().contains("not be decreased"));
+    }
+  }
+
+  @Test
+  public void testRefreshQueuesMaxAllocationPerQueueLarge() throws Exception {
+    // verify we can't set the allocation per queue larger then cluster setting
+    CapacityScheduler cs = new CapacityScheduler();
+    cs.setConf(new YarnConfiguration());
+    cs.setRMContext(resourceManager.getRMContext());
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(conf);
+    cs.init(conf);
+    cs.start();
+    // change max allocation for B3 queue to be larger then cluster max
+    setMaxAllocMb(conf, B3,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + 2048);
+    try {
+      cs.reinitialize(conf, mockContext);
+      fail("should have thrown exception");
+    } catch (IOException e) {
+      assertTrue("maximum allocation exception",
+          e.getCause().getMessage().contains("maximum allocation"));
+    }
+
+    setMaxAllocMb(conf, B3,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
+    cs.reinitialize(conf, mockContext);
+
+    setMaxAllocVcores(conf, B3,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES + 1);
+    try {
+      cs.reinitialize(conf, mockContext);
+      fail("should have thrown exception");
+    } catch (IOException e) {
+      assertTrue("maximum allocation exception",
+          e.getCause().getMessage().contains("maximum allocation"));
+    }
+  }
+
+  @Test
+  public void testRefreshQueuesMaxAllocationRefreshLarger() throws Exception {
+    // queue refresh should allow max allocation per queue to go larger
+    CapacityScheduler cs = new CapacityScheduler();
+    cs.setConf(new YarnConfiguration());
+    cs.setRMContext(resourceManager.getRMContext());
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(conf);
+    setMaxAllocMb(conf,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
+    setMaxAllocVcores(conf,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
+    setMaxAllocMb(conf, A1, 4096);
+    setMaxAllocVcores(conf, A1, 2);
+    cs.init(conf);
+    cs.start();
+    cs.reinitialize(conf, mockContext);
+    checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
+
+    assertEquals("max capability MB in CS",
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+        cs.getMaximumResourceCapability().getMemory());
+    assertEquals("max capability vcores in CS",
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+        cs.getMaximumResourceCapability().getVirtualCores());
+    assertEquals("max allocation MB A1",
+        4096,
+        conf.getMaximumAllocationPerQueue(A1).getMemory());
+    assertEquals("max allocation vcores A1",
+        2,
+        conf.getMaximumAllocationPerQueue(A1).getVirtualCores());
+    assertEquals("cluster max allocation MB",
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+        conf.getMaximumAllocation().getMemory());
+    assertEquals("cluster max allocation vcores",
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+        conf.getMaximumAllocation().getVirtualCores());
+
+    CSQueue rootQueue = cs.getRootQueue();
+    CSQueue queueA = findQueue(rootQueue, A);
+    CSQueue queueA1 = findQueue(queueA, A1);
+    assertEquals("queue max allocation", ((LeafQueue) queueA1)
+        .getMaximumAllocation().getMemory(), 4096);
+
+    setMaxAllocMb(conf, A1, 6144);
+    setMaxAllocVcores(conf, A1, 3);
+    cs.reinitialize(conf, null);
+    // conf will have changed but we shouldn't be able to change max allocation
+    // for the actual queue
+    assertEquals("max allocation MB A1", 6144,
+        conf.getMaximumAllocationPerQueue(A1).getMemory());
+    assertEquals("max allocation vcores A1", 3,
+        conf.getMaximumAllocationPerQueue(A1).getVirtualCores());
+    assertEquals("max allocation MB cluster",
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+        conf.getMaximumAllocation().getMemory());
+    assertEquals("max allocation vcores cluster",
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+        conf.getMaximumAllocation().getVirtualCores());
+    assertEquals("queue max allocation MB", 6144,
+        ((LeafQueue) queueA1).getMaximumAllocation().getMemory());
+    assertEquals("queue max allocation vcores", 3,
+        ((LeafQueue) queueA1).getMaximumAllocation().getVirtualCores());
+    assertEquals("max capability MB cluster",
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+        cs.getMaximumResourceCapability().getMemory());
+    assertEquals("cluster max capability vcores",
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+        cs.getMaximumResourceCapability().getVirtualCores());
+  }
+
+  @Test
+  public void testRefreshQueuesMaxAllocationCSError() throws Exception {
+    // Try to refresh the cluster level max allocation size to be smaller
+    // and it should error out
+    CapacityScheduler cs = new CapacityScheduler();
+    cs.setConf(new YarnConfiguration());
+    cs.setRMContext(resourceManager.getRMContext());
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(conf);
+    setMaxAllocMb(conf, 10240);
+    setMaxAllocVcores(conf, 10);
+    setMaxAllocMb(conf, A1, 4096);
+    setMaxAllocVcores(conf, A1, 4);
+    cs.init(conf);
+    cs.start();
+    cs.reinitialize(conf, mockContext);
+    checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
+
+    assertEquals("max allocation MB in CS", 10240,
+        cs.getMaximumResourceCapability().getMemory());
+    assertEquals("max allocation vcores in CS", 10,
+        cs.getMaximumResourceCapability().getVirtualCores());
+
+    setMaxAllocMb(conf, 6144);
+    try {
+      cs.reinitialize(conf, mockContext);
+      fail("should have thrown exception");
+    } catch (IOException e) {
+      assertTrue("max allocation exception",
+          e.getCause().toString().contains("not be decreased"));
+    }
+
+    setMaxAllocMb(conf, 10240);
+    cs.reinitialize(conf, mockContext);
+
+    setMaxAllocVcores(conf, 8);
+    try {
+      cs.reinitialize(conf, mockContext);
+      fail("should have thrown exception");
+    } catch (IOException e) {
+      assertTrue("max allocation exception",
+          e.getCause().toString().contains("not be decreased"));
+    }
+  }
+
+  @Test
+  public void testRefreshQueuesMaxAllocationCSLarger() throws Exception {
+    // Try to refresh the cluster level max allocation size to be larger
+    // and verify that if there is no setting per queue it uses the
+    // cluster level setting.
+    CapacityScheduler cs = new CapacityScheduler();
+    cs.setConf(new YarnConfiguration());
+    cs.setRMContext(resourceManager.getRMContext());
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(conf);
+    setMaxAllocMb(conf, 10240);
+    setMaxAllocVcores(conf, 10);
+    setMaxAllocMb(conf, A1, 4096);
+    setMaxAllocVcores(conf, A1, 4);
+    cs.init(conf);
+    cs.start();
+    cs.reinitialize(conf, mockContext);
+    checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
+
+    assertEquals("max allocation MB in CS", 10240,
+        cs.getMaximumResourceCapability().getMemory());
+    assertEquals("max allocation vcores in CS", 10,
+        cs.getMaximumResourceCapability().getVirtualCores());
+
+    CSQueue rootQueue = cs.getRootQueue();
+    CSQueue queueA = findQueue(rootQueue, A);
+    CSQueue queueB = findQueue(rootQueue, B);
+    CSQueue queueA1 = findQueue(queueA, A1);
+    CSQueue queueA2 = findQueue(queueA, A2);
+    CSQueue queueB2 = findQueue(queueB, B2);
+
+    assertEquals("queue A1 max allocation MB", 4096,
+        ((LeafQueue) queueA1).getMaximumAllocation().getMemory());
+    assertEquals("queue A1 max allocation vcores", 4,
+        ((LeafQueue) queueA1).getMaximumAllocation().getVirtualCores());
+    assertEquals("queue A2 max allocation MB", 10240,
+        ((LeafQueue) queueA2).getMaximumAllocation().getMemory());
+    assertEquals("queue A2 max allocation vcores", 10,
+        ((LeafQueue) queueA2).getMaximumAllocation().getVirtualCores());
+    assertEquals("queue B2 max allocation MB", 10240,
+        ((LeafQueue) queueB2).getMaximumAllocation().getMemory());
+    assertEquals("queue B2 max allocation vcores", 10,
+        ((LeafQueue) queueB2).getMaximumAllocation().getVirtualCores());
+
+    setMaxAllocMb(conf, 12288);
+    setMaxAllocVcores(conf, 12);
+    cs.reinitialize(conf, null);
+    // cluster level setting should change and any queues without
+    // per queue setting
+    assertEquals("max allocation MB in CS", 12288,
+        cs.getMaximumResourceCapability().getMemory());
+    assertEquals("max allocation vcores in CS", 12,
+        cs.getMaximumResourceCapability().getVirtualCores());
+    assertEquals("queue A1 max MB allocation", 4096,
+        ((LeafQueue) queueA1).getMaximumAllocation().getMemory());
+    assertEquals("queue A1 max vcores allocation", 4,
+        ((LeafQueue) queueA1).getMaximumAllocation().getVirtualCores());
+    assertEquals("queue A2 max MB allocation", 12288,
+        ((LeafQueue) queueA2).getMaximumAllocation().getMemory());
+    assertEquals("queue A2 max vcores allocation", 12,
+        ((LeafQueue) queueA2).getMaximumAllocation().getVirtualCores());
+    assertEquals("queue B2 max MB allocation", 12288,
+        ((LeafQueue) queueB2).getMaximumAllocation().getMemory());
+    assertEquals("queue B2 max vcores allocation", 12,
+        ((LeafQueue) queueB2).getMaximumAllocation().getVirtualCores());
+  }
+
+  private void setMaxAllocMb(Configuration conf, int maxAllocMb) {
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+        maxAllocMb);
+  }
+
+  private void setMaxAllocMb(CapacitySchedulerConfiguration conf,
+      String queueName, int maxAllocMb) {
+    String propName = CapacitySchedulerConfiguration.getQueuePrefix(queueName)
+        + CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_MB;
+    conf.setInt(propName, maxAllocMb);
+  }
+
+  private void setMaxAllocVcores(Configuration conf, int maxAllocVcores) {
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+        maxAllocVcores);
+  }
+
+  private void setMaxAllocVcores(CapacitySchedulerConfiguration conf,
+      String queueName, int maxAllocVcores) {
+    String propName = CapacitySchedulerConfiguration.getQueuePrefix(queueName)
+        + CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_VCORES;
+    conf.setInt(propName, maxAllocVcores);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/69c8a7f4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/CapacityScheduler.apt.vm
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/CapacityScheduler.apt.vm
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/CapacityScheduler.apt.vm
index bc3b9ea..8528c1a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/CapacityScheduler.apt.vm
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/CapacityScheduler.apt.vm
@@ -227,6 +227,18 @@ Hadoop MapReduce Next Generation - Capacity Scheduler
 | | capacity irrespective of how idle th cluster is. Value is specified as |
 | | a float.|
 *--------------------------------------+--------------------------------------+
+| <<<yarn.scheduler.capacity.<queue-path>.maximum-allocation-mb>>> |   |
+| | The per queue maximum limit of memory to allocate to each container |
+| | request at the Resource Manager. This setting overrides the cluster |
+| | configuration <<<yarn.scheduler.maximum-allocation-mb>>>. This value |
+| | must be smaller than or equal to the cluster maximum. |
+*--------------------------------------+--------------------------------------+
+| <<<yarn.scheduler.capacity.<queue-path>.maximum-allocation-vcores>>> |   |
+| | The per queue maximum limit of virtual cores to allocate to each container 
|
+| | request at the Resource Manager. This setting overrides the cluster |
+| | configuration <<<yarn.scheduler.maximum-allocation-vcores>>>. This value |
+| | must be smaller than or equal to the cluster maximum. |
+*--------------------------------------+--------------------------------------+
 
     * Running and Pending Application Limits
     

Reply via email to