Repository: hadoop Updated Branches: refs/heads/trunk c4980a2f3 -> 69c8a7f45
YARN-1582. Capacity Scheduler: add a maximum-allocation-mb setting per queue. Contributed by Thomas Graves Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/69c8a7f4 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/69c8a7f4 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/69c8a7f4 Branch: refs/heads/trunk Commit: 69c8a7f45be5c0aa6787b07f328d74f1e2ba5628 Parents: c4980a2 Author: Jason Lowe <jl...@apache.org> Authored: Thu Feb 5 19:28:49 2015 +0000 Committer: Jason Lowe <jl...@apache.org> Committed: Thu Feb 5 19:28:49 2015 +0000 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../ApplicationMasterService.java | 2 +- .../scheduler/AbstractYarnScheduler.java | 23 ++ .../scheduler/YarnScheduler.java | 11 +- .../scheduler/capacity/AbstractCSQueue.java | 8 +- .../scheduler/capacity/CapacityScheduler.java | 17 ++ .../CapacitySchedulerConfiguration.java | 49 +++ .../capacity/CapacitySchedulerContext.java | 2 + .../scheduler/capacity/LeafQueue.java | 36 ++- .../scheduler/capacity/ParentQueue.java | 2 +- .../capacity/TestCapacityScheduler.java | 302 ++++++++++++++++++- .../src/site/apt/CapacityScheduler.apt.vm | 12 + 12 files changed, 455 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/69c8a7f4/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 936cdf4..f41e5d6 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -245,6 +245,9 @@ Release 2.7.0 - UNRELEASED YARN-3123. Made YARN CLI show a single completed container even if the app is running. (Naganarasimha G R via zjshen) + YARN-1582. Capacity Scheduler: add a maximum-allocation-mb setting per + queue (Thomas Graves via jlowe) + OPTIMIZATIONS BUG FIXES http://git-wip-us.apache.org/repos/asf/hadoop/blob/69c8a7f4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index d0b199f..0cdc1e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -285,7 +285,7 @@ public class ApplicationMasterService extends AbstractService implements RegisterApplicationMasterResponse response = recordFactory .newRecordInstance(RegisterApplicationMasterResponse.class); response.setMaximumResourceCapability(rScheduler - .getMaximumResourceCapability()); + .getMaximumResourceCapability(app.getQueue())); response.setApplicationACLs(app.getRMAppAttempt(applicationAttemptId) .getSubmissionContext().getAMContainerSpec().getApplicationACLs()); response.setQueue(app.getQueue()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/69c8a7f4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 753259c..04b3452 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -180,6 +180,11 @@ public abstract class AbstractYarnScheduler return maxResource; } + @Override + public Resource getMaximumResourceCapability(String queueName) { + return getMaximumResourceCapability(); + } + protected void initMaximumResourceCapability(Resource maximumAllocation) { maxAllocWriteLock.lock(); try { @@ -635,4 +640,22 @@ public abstract class AbstractYarnScheduler maxAllocWriteLock.unlock(); } } + + protected void refreshMaximumAllocation(Resource newMaxAlloc) { + maxAllocWriteLock.lock(); + try { + configuredMaximumAllocation = Resources.clone(newMaxAlloc); + int maxMemory = newMaxAlloc.getMemory(); + if (maxNodeMemory != -1) { + maxMemory = Math.min(maxMemory, maxNodeMemory); + } + int maxVcores = newMaxAlloc.getVirtualCores(); + if (maxNodeVCores != -1) { + maxVcores = Math.min(maxVcores, maxNodeVCores); + } + maximumAllocation = Resources.createResource(maxMemory, maxVcores); + } finally { + maxAllocWriteLock.unlock(); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/69c8a7f4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index 4a3a35c..b99b217 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -92,13 +92,22 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> { public Resource getMinimumResourceCapability(); /** - * Get maximum allocatable {@link Resource}. + * Get maximum allocatable {@link Resource} at the cluster level. * @return maximum allocatable resource */ @Public @Stable public Resource getMaximumResourceCapability(); + /** + * Get maximum allocatable {@link Resource} for the queue specified. + * @param queueName queue name + * @return maximum allocatable resource + */ + @Public + @Stable + public Resource getMaximumResourceCapability(String queueName); + @LimitedPrivate("yarn") @Evolving ResourceCalculator getResourceCalculator(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/69c8a7f4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index c4651da..e4c2665 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -57,7 +57,7 @@ public abstract class AbstractCSQueue implements CSQueue { volatile int numContainers; final Resource minimumAllocation; - final Resource maximumAllocation; + Resource maximumAllocation; QueueState state; final QueueMetrics metrics; @@ -255,7 +255,7 @@ public abstract class AbstractCSQueue implements CSQueue { Set<String> labels, String defaultLabelExpression, Map<String, Float> nodeLabelCapacities, Map<String, Float> maximumNodeLabelCapacities, - boolean reservationContinueLooking) + boolean reservationContinueLooking, Resource maxAllocation) throws IOException { // Sanity check CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity); @@ -326,6 +326,8 @@ public abstract class AbstractCSQueue implements CSQueue { this.reservationsContinueLooking = reservationContinueLooking; this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(this); + + this.maximumAllocation = maxAllocation; } protected QueueInfo getQueueInfo() { @@ -341,7 +343,7 @@ public abstract class AbstractCSQueue implements CSQueue { } @Private - public Resource getMaximumAllocation() { + public synchronized Resource getMaximumAllocation() { return maximumAllocation; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/69c8a7f4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index e622d3a..916a4db 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -356,9 +357,11 @@ public class CapacityScheduler extends validateConf(this.conf); try { LOG.info("Re-initializing queues..."); + refreshMaximumAllocation(this.conf.getMaximumAllocation()); reinitializeQueues(this.conf); } catch (Throwable t) { this.conf = oldConf; + refreshMaximumAllocation(this.conf.getMaximumAllocation()); throw new IOException("Failed to re-init queues", t); } } @@ -1580,6 +1583,20 @@ public class CapacityScheduler extends .of(SchedulerResourceTypes.MEMORY, SchedulerResourceTypes.CPU); } + @Override + public Resource getMaximumResourceCapability(String queueName) { + CSQueue queue = getQueue(queueName); + if (queue == null) { + LOG.error("Unknown queue: " + queueName); + return getMaximumResourceCapability(); + } + if (!(queue instanceof LeafQueue)) { + LOG.error("queue " + queueName + " is not an leaf queue"); + return getMaximumResourceCapability(); + } + return ((LeafQueue)queue).getMaximumAllocation(); + } + private String handleMoveToPlanQueue(String targetQueueName) { CSQueue dest = getQueue(targetQueueName); if (dest != null && dest instanceof PlanQueue) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/69c8a7f4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 55c6c0c..268cc6c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -109,6 +109,13 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur public static final boolean DEFAULT_RESERVE_CONT_LOOK_ALL_NODES = true; @Private + public static final String MAXIMUM_ALLOCATION_MB = "maximum-allocation-mb"; + + @Private + public static final String MAXIMUM_ALLOCATION_VCORES = + "maximum-allocation-vcores"; + + @Private public static final int DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS = 10000; @Private @@ -580,6 +587,48 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur return Resources.createResource(maximumMemory, maximumCores); } + /** + * Get the per queue setting for the maximum limit to allocate to + * each container request. + * + * @param queue + * name of the queue + * @return setting specified per queue else falls back to the cluster setting + */ + public Resource getMaximumAllocationPerQueue(String queue) { + String queuePrefix = getQueuePrefix(queue); + int maxAllocationMbPerQueue = getInt(queuePrefix + MAXIMUM_ALLOCATION_MB, + (int)UNDEFINED); + int maxAllocationVcoresPerQueue = getInt( + queuePrefix + MAXIMUM_ALLOCATION_VCORES, (int)UNDEFINED); + if (LOG.isDebugEnabled()) { + LOG.debug("max alloc mb per queue for " + queue + " is " + + maxAllocationMbPerQueue); + LOG.debug("max alloc vcores per queue for " + queue + " is " + + maxAllocationVcoresPerQueue); + } + Resource clusterMax = getMaximumAllocation(); + if (maxAllocationMbPerQueue == (int)UNDEFINED) { + LOG.info("max alloc mb per queue for " + queue + " is undefined"); + maxAllocationMbPerQueue = clusterMax.getMemory(); + } + if (maxAllocationVcoresPerQueue == (int)UNDEFINED) { + LOG.info("max alloc vcore per queue for " + queue + " is undefined"); + maxAllocationVcoresPerQueue = clusterMax.getVirtualCores(); + } + Resource result = Resources.createResource(maxAllocationMbPerQueue, + maxAllocationVcoresPerQueue); + if (maxAllocationMbPerQueue > clusterMax.getMemory() + || maxAllocationVcoresPerQueue > clusterMax.getVirtualCores()) { + throw new IllegalArgumentException( + "Queue maximum allocation cannot be larger than the cluster setting" + + " for queue " + queue + + " max allocation per queue: " + result + + " cluster setting: " + clusterMax); + } + return result; + } + public boolean getEnableUserMetrics() { return getBoolean(ENABLE_USER_METRICS, DEFAULT_ENABLE_USER_METRICS); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/69c8a7f4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java index 03a1cb6..28dc988 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java @@ -39,6 +39,8 @@ public interface CapacitySchedulerContext { Resource getMaximumResourceCapability(); + Resource getMaximumResourceCapability(String queueName); + RMContainerTokenSecretManager getContainerTokenSecretManager(); int getNumClusterNodes(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/69c8a7f4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index adb86a5..c143210 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -97,7 +97,7 @@ public class LeafQueue extends AbstractCSQueue { Set<FiCaSchedulerApp> pendingApplications; - private final float minimumAllocationFactor; + private float minimumAllocationFactor; private Map<String, User> users = new HashMap<String, User>(); @@ -162,7 +162,8 @@ public class LeafQueue extends AbstractCSQueue { state, acls, cs.getConfiguration().getNodeLocalityDelay(), accessibleLabels, defaultLabelExpression, this.capacitiyByNodeLabels, this.maxCapacityByNodeLabels, - cs.getConfiguration().getReservationContinueLook()); + cs.getConfiguration().getReservationContinueLook(), + cs.getConfiguration().getMaximumAllocationPerQueue(getQueuePath())); if(LOG.isDebugEnabled()) { LOG.debug("LeafQueue:" + " name=" + queueName @@ -192,11 +193,12 @@ public class LeafQueue extends AbstractCSQueue { Set<String> labels, String defaultLabelExpression, Map<String, Float> capacitieByLabel, Map<String, Float> maximumCapacitiesByLabel, - boolean revervationContinueLooking) throws IOException { + boolean revervationContinueLooking, + Resource maxAllocation) throws IOException { super.setupQueueConfigs(clusterResource, capacity, absoluteCapacity, maximumCapacity, absoluteMaxCapacity, state, acls, labels, defaultLabelExpression, capacitieByLabel, maximumCapacitiesByLabel, - revervationContinueLooking); + revervationContinueLooking, maxAllocation); // Sanity check CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity); float absCapacity = getParent().getAbsoluteCapacity() * capacity; @@ -238,6 +240,12 @@ public class LeafQueue extends AbstractCSQueue { this.nodeLocalityDelay = nodeLocalityDelay; + // re-init this since max allocation could have changed + this.minimumAllocationFactor = + Resources.ratio(resourceCalculator, + Resources.subtract(maximumAllocation, minimumAllocation), + maximumAllocation); + StringBuilder aclsString = new StringBuilder(); for (Map.Entry<QueueACL, AccessControlList> e : acls.entrySet()) { aclsString.append(e.getKey() + ":" + e.getValue().getAclString()); @@ -283,6 +291,8 @@ public class LeafQueue extends AbstractCSQueue { "minimumAllocationFactor = " + minimumAllocationFactor + " [= (float)(maximumAllocationMemory - minimumAllocationMemory) / " + "maximumAllocationMemory ]" + "\n" + + "maximumAllocation = " + maximumAllocation + + " [= configuredMaxAllocation ]" + "\n" + "numContainers = " + numContainers + " [= currentNumContainers ]" + "\n" + "state = " + state + @@ -479,6 +489,21 @@ public class LeafQueue extends AbstractCSQueue { } LeafQueue newlyParsedLeafQueue = (LeafQueue)newlyParsedQueue; + + // don't allow the maximum allocation to be decreased in size + // since we have already told running AM's the size + Resource oldMax = getMaximumAllocation(); + Resource newMax = newlyParsedLeafQueue.getMaximumAllocation(); + if (newMax.getMemory() < oldMax.getMemory() + || newMax.getVirtualCores() < oldMax.getVirtualCores()) { + throw new IOException( + "Trying to reinitialize " + + getQueuePath() + + " the maximum allocation size can not be decreased!" + + " Current setting: " + oldMax + + ", trying to set it to: " + newMax); + } + setupQueueConfigs( clusterResource, newlyParsedLeafQueue.capacity, newlyParsedLeafQueue.absoluteCapacity, @@ -494,7 +519,8 @@ public class LeafQueue extends AbstractCSQueue { newlyParsedLeafQueue.defaultLabelExpression, newlyParsedLeafQueue.capacitiyByNodeLabels, newlyParsedLeafQueue.maxCapacityByNodeLabels, - newlyParsedLeafQueue.reservationsContinueLooking); + newlyParsedLeafQueue.reservationsContinueLooking, + newlyParsedLeafQueue.getMaximumAllocation()); // queue metrics are updated, more resource may be available // activate the pending applications if possible http://git-wip-us.apache.org/repos/asf/hadoop/blob/69c8a7f4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index de92c9c..d66c06a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -132,7 +132,7 @@ public class ParentQueue extends AbstractCSQueue { super.setupQueueConfigs(clusterResource, capacity, absoluteCapacity, maximumCapacity, absoluteMaxCapacity, state, acls, accessibleLabels, defaultLabelExpression, nodeLabelCapacities, maximumCapacitiesByLabel, - reservationContinueLooking); + reservationContinueLooking, maximumAllocation); StringBuilder aclsString = new StringBuilder(); for (Map.Entry<QueueACL, AccessControlList> e : acls.entrySet()) { aclsString.append(e.getKey() + ":" + e.getValue().getAclString()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/69c8a7f4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index b6da94d..38d9d27 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -500,9 +500,12 @@ public class TestCapacityScheduler { public void testParseQueue() throws IOException { CapacityScheduler cs = new CapacityScheduler(); cs.setConf(new YarnConfiguration()); - + cs.setRMContext(resourceManager.getRMContext()); CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); setupQueueConfiguration(conf); + cs.init(conf); + cs.start(); + conf.setQueues(CapacitySchedulerConfiguration.ROOT + ".a.a1", new String[] {"b1"} ); conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a1.b1", 100.0f); conf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + ".a.a1.b1", 100.0f); @@ -2124,4 +2127,301 @@ public class TestCapacityScheduler { assertFalse("queue " + B2 + " should have been preemptable", queueB2.getPreemptionDisabled()); } + + @Test + public void testRefreshQueuesMaxAllocationRefresh() throws Exception { + // queue refresh should not allow changing the maximum allocation setting + // per queue to be smaller than previous setting + CapacityScheduler cs = new CapacityScheduler(); + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + setupQueueConfiguration(conf); + cs.setConf(new YarnConfiguration()); + cs.setRMContext(resourceManager.getRMContext()); + cs.init(conf); + cs.start(); + cs.reinitialize(conf, mockContext); + checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY); + + assertEquals("max allocation in CS", + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + cs.getMaximumResourceCapability().getMemory()); + assertEquals("max allocation for A1", + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + conf.getMaximumAllocationPerQueue(A1).getMemory()); + assertEquals("max allocation", + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + conf.getMaximumAllocation().getMemory()); + + CSQueue rootQueue = cs.getRootQueue(); + CSQueue queueA = findQueue(rootQueue, A); + CSQueue queueA1 = findQueue(queueA, A1); + assertEquals("queue max allocation", ((LeafQueue) queueA1) + .getMaximumAllocation().getMemory(), 8192); + + setMaxAllocMb(conf, A1, 4096); + + try { + cs.reinitialize(conf, mockContext); + fail("should have thrown exception"); + } catch (IOException e) { + assertTrue("max allocation exception", + e.getCause().toString().contains("not be decreased")); + } + + setMaxAllocMb(conf, A1, 8192); + cs.reinitialize(conf, mockContext); + + setMaxAllocVcores(conf, A1, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES - 1); + try { + cs.reinitialize(conf, mockContext); + fail("should have thrown exception"); + } catch (IOException e) { + assertTrue("max allocation exception", + e.getCause().toString().contains("not be decreased")); + } + } + + @Test + public void testRefreshQueuesMaxAllocationPerQueueLarge() throws Exception { + // verify we can't set the allocation per queue larger then cluster setting + CapacityScheduler cs = new CapacityScheduler(); + cs.setConf(new YarnConfiguration()); + cs.setRMContext(resourceManager.getRMContext()); + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + setupQueueConfiguration(conf); + cs.init(conf); + cs.start(); + // change max allocation for B3 queue to be larger then cluster max + setMaxAllocMb(conf, B3, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + 2048); + try { + cs.reinitialize(conf, mockContext); + fail("should have thrown exception"); + } catch (IOException e) { + assertTrue("maximum allocation exception", + e.getCause().getMessage().contains("maximum allocation")); + } + + setMaxAllocMb(conf, B3, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); + cs.reinitialize(conf, mockContext); + + setMaxAllocVcores(conf, B3, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES + 1); + try { + cs.reinitialize(conf, mockContext); + fail("should have thrown exception"); + } catch (IOException e) { + assertTrue("maximum allocation exception", + e.getCause().getMessage().contains("maximum allocation")); + } + } + + @Test + public void testRefreshQueuesMaxAllocationRefreshLarger() throws Exception { + // queue refresh should allow max allocation per queue to go larger + CapacityScheduler cs = new CapacityScheduler(); + cs.setConf(new YarnConfiguration()); + cs.setRMContext(resourceManager.getRMContext()); + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + setupQueueConfiguration(conf); + setMaxAllocMb(conf, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); + setMaxAllocVcores(conf, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); + setMaxAllocMb(conf, A1, 4096); + setMaxAllocVcores(conf, A1, 2); + cs.init(conf); + cs.start(); + cs.reinitialize(conf, mockContext); + checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY); + + assertEquals("max capability MB in CS", + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + cs.getMaximumResourceCapability().getMemory()); + assertEquals("max capability vcores in CS", + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, + cs.getMaximumResourceCapability().getVirtualCores()); + assertEquals("max allocation MB A1", + 4096, + conf.getMaximumAllocationPerQueue(A1).getMemory()); + assertEquals("max allocation vcores A1", + 2, + conf.getMaximumAllocationPerQueue(A1).getVirtualCores()); + assertEquals("cluster max allocation MB", + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + conf.getMaximumAllocation().getMemory()); + assertEquals("cluster max allocation vcores", + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, + conf.getMaximumAllocation().getVirtualCores()); + + CSQueue rootQueue = cs.getRootQueue(); + CSQueue queueA = findQueue(rootQueue, A); + CSQueue queueA1 = findQueue(queueA, A1); + assertEquals("queue max allocation", ((LeafQueue) queueA1) + .getMaximumAllocation().getMemory(), 4096); + + setMaxAllocMb(conf, A1, 6144); + setMaxAllocVcores(conf, A1, 3); + cs.reinitialize(conf, null); + // conf will have changed but we shouldn't be able to change max allocation + // for the actual queue + assertEquals("max allocation MB A1", 6144, + conf.getMaximumAllocationPerQueue(A1).getMemory()); + assertEquals("max allocation vcores A1", 3, + conf.getMaximumAllocationPerQueue(A1).getVirtualCores()); + assertEquals("max allocation MB cluster", + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + conf.getMaximumAllocation().getMemory()); + assertEquals("max allocation vcores cluster", + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, + conf.getMaximumAllocation().getVirtualCores()); + assertEquals("queue max allocation MB", 6144, + ((LeafQueue) queueA1).getMaximumAllocation().getMemory()); + assertEquals("queue max allocation vcores", 3, + ((LeafQueue) queueA1).getMaximumAllocation().getVirtualCores()); + assertEquals("max capability MB cluster", + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + cs.getMaximumResourceCapability().getMemory()); + assertEquals("cluster max capability vcores", + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, + cs.getMaximumResourceCapability().getVirtualCores()); + } + + @Test + public void testRefreshQueuesMaxAllocationCSError() throws Exception { + // Try to refresh the cluster level max allocation size to be smaller + // and it should error out + CapacityScheduler cs = new CapacityScheduler(); + cs.setConf(new YarnConfiguration()); + cs.setRMContext(resourceManager.getRMContext()); + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + setupQueueConfiguration(conf); + setMaxAllocMb(conf, 10240); + setMaxAllocVcores(conf, 10); + setMaxAllocMb(conf, A1, 4096); + setMaxAllocVcores(conf, A1, 4); + cs.init(conf); + cs.start(); + cs.reinitialize(conf, mockContext); + checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY); + + assertEquals("max allocation MB in CS", 10240, + cs.getMaximumResourceCapability().getMemory()); + assertEquals("max allocation vcores in CS", 10, + cs.getMaximumResourceCapability().getVirtualCores()); + + setMaxAllocMb(conf, 6144); + try { + cs.reinitialize(conf, mockContext); + fail("should have thrown exception"); + } catch (IOException e) { + assertTrue("max allocation exception", + e.getCause().toString().contains("not be decreased")); + } + + setMaxAllocMb(conf, 10240); + cs.reinitialize(conf, mockContext); + + setMaxAllocVcores(conf, 8); + try { + cs.reinitialize(conf, mockContext); + fail("should have thrown exception"); + } catch (IOException e) { + assertTrue("max allocation exception", + e.getCause().toString().contains("not be decreased")); + } + } + + @Test + public void testRefreshQueuesMaxAllocationCSLarger() throws Exception { + // Try to refresh the cluster level max allocation size to be larger + // and verify that if there is no setting per queue it uses the + // cluster level setting. + CapacityScheduler cs = new CapacityScheduler(); + cs.setConf(new YarnConfiguration()); + cs.setRMContext(resourceManager.getRMContext()); + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + setupQueueConfiguration(conf); + setMaxAllocMb(conf, 10240); + setMaxAllocVcores(conf, 10); + setMaxAllocMb(conf, A1, 4096); + setMaxAllocVcores(conf, A1, 4); + cs.init(conf); + cs.start(); + cs.reinitialize(conf, mockContext); + checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY); + + assertEquals("max allocation MB in CS", 10240, + cs.getMaximumResourceCapability().getMemory()); + assertEquals("max allocation vcores in CS", 10, + cs.getMaximumResourceCapability().getVirtualCores()); + + CSQueue rootQueue = cs.getRootQueue(); + CSQueue queueA = findQueue(rootQueue, A); + CSQueue queueB = findQueue(rootQueue, B); + CSQueue queueA1 = findQueue(queueA, A1); + CSQueue queueA2 = findQueue(queueA, A2); + CSQueue queueB2 = findQueue(queueB, B2); + + assertEquals("queue A1 max allocation MB", 4096, + ((LeafQueue) queueA1).getMaximumAllocation().getMemory()); + assertEquals("queue A1 max allocation vcores", 4, + ((LeafQueue) queueA1).getMaximumAllocation().getVirtualCores()); + assertEquals("queue A2 max allocation MB", 10240, + ((LeafQueue) queueA2).getMaximumAllocation().getMemory()); + assertEquals("queue A2 max allocation vcores", 10, + ((LeafQueue) queueA2).getMaximumAllocation().getVirtualCores()); + assertEquals("queue B2 max allocation MB", 10240, + ((LeafQueue) queueB2).getMaximumAllocation().getMemory()); + assertEquals("queue B2 max allocation vcores", 10, + ((LeafQueue) queueB2).getMaximumAllocation().getVirtualCores()); + + setMaxAllocMb(conf, 12288); + setMaxAllocVcores(conf, 12); + cs.reinitialize(conf, null); + // cluster level setting should change and any queues without + // per queue setting + assertEquals("max allocation MB in CS", 12288, + cs.getMaximumResourceCapability().getMemory()); + assertEquals("max allocation vcores in CS", 12, + cs.getMaximumResourceCapability().getVirtualCores()); + assertEquals("queue A1 max MB allocation", 4096, + ((LeafQueue) queueA1).getMaximumAllocation().getMemory()); + assertEquals("queue A1 max vcores allocation", 4, + ((LeafQueue) queueA1).getMaximumAllocation().getVirtualCores()); + assertEquals("queue A2 max MB allocation", 12288, + ((LeafQueue) queueA2).getMaximumAllocation().getMemory()); + assertEquals("queue A2 max vcores allocation", 12, + ((LeafQueue) queueA2).getMaximumAllocation().getVirtualCores()); + assertEquals("queue B2 max MB allocation", 12288, + ((LeafQueue) queueB2).getMaximumAllocation().getMemory()); + assertEquals("queue B2 max vcores allocation", 12, + ((LeafQueue) queueB2).getMaximumAllocation().getVirtualCores()); + } + + private void setMaxAllocMb(Configuration conf, int maxAllocMb) { + conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + maxAllocMb); + } + + private void setMaxAllocMb(CapacitySchedulerConfiguration conf, + String queueName, int maxAllocMb) { + String propName = CapacitySchedulerConfiguration.getQueuePrefix(queueName) + + CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_MB; + conf.setInt(propName, maxAllocMb); + } + + private void setMaxAllocVcores(Configuration conf, int maxAllocVcores) { + conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, + maxAllocVcores); + } + + private void setMaxAllocVcores(CapacitySchedulerConfiguration conf, + String queueName, int maxAllocVcores) { + String propName = CapacitySchedulerConfiguration.getQueuePrefix(queueName) + + CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_VCORES; + conf.setInt(propName, maxAllocVcores); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/69c8a7f4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/CapacityScheduler.apt.vm ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/CapacityScheduler.apt.vm b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/CapacityScheduler.apt.vm index bc3b9ea..8528c1a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/CapacityScheduler.apt.vm +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/CapacityScheduler.apt.vm @@ -227,6 +227,18 @@ Hadoop MapReduce Next Generation - Capacity Scheduler | | capacity irrespective of how idle th cluster is. Value is specified as | | | a float.| *--------------------------------------+--------------------------------------+ +| <<<yarn.scheduler.capacity.<queue-path>.maximum-allocation-mb>>> | | +| | The per queue maximum limit of memory to allocate to each container | +| | request at the Resource Manager. This setting overrides the cluster | +| | configuration <<<yarn.scheduler.maximum-allocation-mb>>>. This value | +| | must be smaller than or equal to the cluster maximum. | +*--------------------------------------+--------------------------------------+ +| <<<yarn.scheduler.capacity.<queue-path>.maximum-allocation-vcores>>> | | +| | The per queue maximum limit of virtual cores to allocate to each container | +| | request at the Resource Manager. This setting overrides the cluster | +| | configuration <<<yarn.scheduler.maximum-allocation-vcores>>>. This value | +| | must be smaller than or equal to the cluster maximum. | +*--------------------------------------+--------------------------------------+ * Running and Pending Application Limits