This is an automated email from the ASF dual-hosted git repository. snemeth pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new c7699d3dcd4 YARN-11079. Make an AbstractParentQueue to store common ParentQueue and ManagedParentQueue functionality. Contributed by Susheel Gupta c7699d3dcd4 is described below commit c7699d3dcd4f8feaf2c5ae5943b8a4cec738e95d Author: Szilard Nemeth <snem...@apache.org> AuthorDate: Thu May 4 22:16:18 2023 -0400 YARN-11079. Make an AbstractParentQueue to store common ParentQueue and ManagedParentQueue functionality. Contributed by Susheel Gupta --- .../ProportionalCapacityPreemptionPolicy.java | 6 +- .../monitor/capacity/TempQueuePerPartition.java | 3 +- .../csmappingrule/MappingRuleValidationHelper.java | 5 +- .../scheduler/capacity/AbstractCSQueue.java | 6 +- .../capacity/AbstractManagedParentQueue.java | 2 +- .../{ParentQueue.java => AbstractParentQueue.java} | 233 +-- .../capacity/AutoCreatedQueueManagementPolicy.java | 4 +- .../capacity/CSMaxRunningAppsEnforcer.java | 12 +- .../scheduler/capacity/CapacityScheduler.java | 2 +- .../capacity/CapacitySchedulerConfigValidator.java | 4 +- .../capacity/CapacitySchedulerQueueManager.java | 9 +- .../scheduler/capacity/ManagedParentQueue.java | 2 +- .../scheduler/capacity/ParentQueue.java | 1572 +------------------- .../scheduler/capacity/PlanQueue.java | 1 + .../GuaranteedOrZeroCapacityOverTimePolicy.java | 8 +- .../event/QueueManagementChangeEvent.java | 9 +- .../webapp/dao/CapacitySchedulerInfo.java | 6 +- .../webapp/dao/CapacitySchedulerQueueInfo.java | 6 +- .../dao/helper/CapacitySchedulerInfoHelper.java | 3 +- .../TestCapacitySchedulerNewQueueAutoCreation.java | 4 +- .../capacity/TestCapacitySchedulerQueues.java | 2 +- .../scheduler/capacity/TestChildQueueOrder.java | 4 +- .../scheduler/capacity/TestParentQueue.java | 4 +- 23 files changed, 133 insertions(+), 1774 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 1c4d60962ee..443241a664a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -21,6 +21,7 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractParentQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -41,7 +42,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity .ManagedParentQueue; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptableQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; @@ -630,9 +630,9 @@ public class ProportionalCapacityPreemptionPolicy partitionToLookAt, killable, absCap, absMaxCap, partitionResource, reserved, curQueue, effMinRes, effMaxRes); - if (curQueue instanceof ParentQueue) { + if (curQueue instanceof AbstractParentQueue) { String configuredOrderingPolicy = - ((ParentQueue) curQueue).getQueueOrderingPolicy().getConfigName(); + ((AbstractParentQueue) curQueue).getQueueOrderingPolicy().getConfigName(); // Recursively add children for (CSQueue c : curQueue.getChildQueues()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java index 78075bb5c17..13db9892d9f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java @@ -25,6 +25,7 @@ import java.util.LinkedHashMap; import java.util.Map; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractLeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; @@ -57,7 +58,7 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity { final ArrayList<TempQueuePerPartition> children; private Collection<TempAppPerPartition> apps; AbstractLeafQueue leafQueue; - ParentQueue parentQueue; + AbstractParentQueue parentQueue; boolean preemptionDisabled; protected Resource pendingDeductReserved; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/csmappingrule/MappingRuleValidationHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/csmappingrule/MappingRuleValidationHelper.java index f4f19e0ae12..713d668998e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/csmappingrule/MappingRuleValidationHelper.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/csmappingrule/MappingRuleValidationHelper.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.placement.csmappingrule; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ManagedParentQueue; @@ -123,8 +124,8 @@ public final class MappingRuleValidationHelper { //if the grandparent allows new dynamic creation, the dynamic parent and //the dynamic leaf queue can be created as well CSQueue grandParentQueue = queueManager.getQueue(grandParentPath); - if (grandParentQueue != null && grandParentQueue instanceof ParentQueue && - ((ParentQueue)grandParentQueue).isEligibleForAutoQueueCreation()) { + if (grandParentQueue != null && grandParentQueue instanceof AbstractParentQueue && + ((AbstractParentQueue)grandParentQueue).isEligibleForAutoQueueCreation()) { //Grandparent is a new dynamic parent queue, which allows deep queue //creation return ValidationResult.CREATABLE; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index e8bed4604f6..c8b9c6e8829 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -410,8 +410,8 @@ public abstract class AbstractCSQueue implements CSQueue { */ protected void setDynamicQueueProperties() { // Set properties from parent template - if (parent instanceof ParentQueue) { - ((ParentQueue) parent).getAutoCreatedQueueTemplate() + if (parent instanceof AbstractParentQueue) { + ((AbstractParentQueue) parent).getAutoCreatedQueueTemplate() .setTemplateEntriesForChild(queueContext.getConfiguration(), getQueuePath()); String parentTemplate = String.format("%s.%s", parent.getQueuePath(), @@ -1262,7 +1262,7 @@ public abstract class AbstractCSQueue implements CSQueue { CapacityConfigType.ABSOLUTE_RESOURCE)) { newEffectiveMinResource = createNormalizedMinResource( usageTracker.getQueueResourceQuotas().getConfiguredMinResource(label), - ((ParentQueue) parent).getEffectiveMinRatio(label)); + ((AbstractParentQueue) parent).getEffectiveMinRatio(label)); // Max resource of a queue should be the minimum of {parent's maxResources, // this queue's maxResources}. Both parent's maxResources and this queue's diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java index e6d61e33075..0a3d0f5c5f7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java @@ -35,7 +35,7 @@ import java.util.Map; * From the user perspective this is equivalent to a LeafQueue, * but functionality wise is a sub-class of ParentQueue */ -public abstract class AbstractManagedParentQueue extends ParentQueue { +public abstract class AbstractManagedParentQueue extends AbstractParentQueue { private static final Logger LOG = LoggerFactory.getLogger( AbstractManagedParentQueue.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractParentQueue.java similarity index 90% copy from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java copy to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractParentQueue.java index a816b91034c..8a849d28a2e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractParentQueue.java @@ -1,20 +1,20 @@ /** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; @@ -26,6 +26,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap; @@ -35,8 +36,6 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; @@ -80,19 +79,16 @@ import org.apache.hadoop.yarn.util.resource.Resources; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.getACLsForFlexibleAutoCreatedParentQueue; -@Private -@Evolving -public class ParentQueue extends AbstractCSQueue { - +public abstract class AbstractParentQueue extends AbstractCSQueue { private static final Logger LOG = - LoggerFactory.getLogger(ParentQueue.class); + LoggerFactory.getLogger(AbstractParentQueue.class); protected final List<CSQueue> childQueues; private final boolean rootQueue; - private volatile int numApplications; + private AtomicInteger numApplications = new AtomicInteger(0); - private final RecordFactory recordFactory = - RecordFactoryProvider.getRecordFactory(null); + private final RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); private QueueOrderingPolicy queueOrderingPolicy; @@ -110,34 +106,35 @@ public class ParentQueue extends AbstractCSQueue { private final Map<String, Map<String, Float>> effectiveMinResourceRatio = new ConcurrentHashMap<>(); - public ParentQueue(CapacitySchedulerQueueContext queueContext, - String queueName, CSQueue parent, CSQueue old) throws IOException { + public AbstractParentQueue(CapacitySchedulerQueueContext queueContext, + String queueName, CSQueue parent, CSQueue old) + throws IOException { this(queueContext, queueName, parent, old, false); } - private ParentQueue(CapacitySchedulerQueueContext queueContext, - String queueName, CSQueue parent, CSQueue old, boolean isDynamic) - throws IOException { + public AbstractParentQueue(CapacitySchedulerQueueContext queueContext, + String queueName, CSQueue parent, CSQueue old, boolean isDynamic) throws + IOException { + super(queueContext, queueName, parent, old); setDynamicQueue(isDynamic); this.rootQueue = (parent == null); float rawCapacity = queueContext.getConfiguration() - .getNonLabeledQueueCapacity(this.queuePath); + .getNonLabeledQueueCapacity(this.queuePath); if (rootQueue && - (rawCapacity != CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE)) { + (rawCapacity != CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE)) { throw new IllegalArgumentException("Illegal " + - "capacity of " + rawCapacity + " for queue " + queueName + - ". Must be " + CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE); + "capacity of " + rawCapacity + " for queue " + queueName + + ". Must be " + CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE); } this.childQueues = new ArrayList<>(); this.allowZeroCapacitySum = - queueContext.getConfiguration() - .getAllowZeroCapacitySum(getQueuePath()); + queueContext.getConfiguration() + .getAllowZeroCapacitySum(getQueuePath()); - setupQueueConfigs(queueContext.getClusterResource()); } // returns what is configured queue ordering policy @@ -172,7 +169,7 @@ public class ParentQueue extends AbstractCSQueue { queueOrderingPolicy = configuration.getQueueOrderingPolicy( getQueuePath(), parent == null ? null : - ((ParentQueue) parent).getQueueOrderingPolicyConfigName()); + ((AbstractParentQueue) parent).getQueueOrderingPolicyConfigName()); queueOrderingPolicy.setQueues(childQueues); LOG.info(getQueueName() + ", " + getCapacityOrWeightString() @@ -194,9 +191,9 @@ public class ParentQueue extends AbstractCSQueue { protected void setDynamicQueueACLProperties() { super.setDynamicQueueACLProperties(); - if (parent instanceof ParentQueue) { + if (parent instanceof AbstractParentQueue) { acls.putAll(getACLsForFlexibleAutoCreatedParentQueue( - ((ParentQueue) parent).getAutoCreatedQueueTemplate())); + ((AbstractParentQueue) parent).getAutoCreatedQueueTemplate())); } } @@ -204,7 +201,7 @@ public class ParentQueue extends AbstractCSQueue { // Check weight configuration, throw exception when configuration is invalid // return true when all children use weight mode. - private QueueCapacityType getCapacityConfigurationTypeForQueues( + public QueueCapacityType getCapacityConfigurationTypeForQueues( Collection<CSQueue> queues) throws IOException { // Do we have ANY queue set capacity in any labels? boolean percentageIsSet = false; @@ -249,7 +246,6 @@ public class ParentQueue extends AbstractCSQueue { } } } - // If we have mixed capacity, weight or absolute resource (any of the two) // We will throw exception // Root queue is an exception here, because by default root queue returns @@ -277,7 +273,7 @@ public class ParentQueue extends AbstractCSQueue { } } - private enum QueueCapacityType { + public enum QueueCapacityType { WEIGHT, ABSOLUTE_RESOURCE, PERCENT; } @@ -452,7 +448,7 @@ public class ParentQueue extends AbstractCSQueue { } } - + @Override public List<QueueUserACLInfo> getQueueUserAclInfo( UserGroupInformation user) { @@ -486,7 +482,7 @@ public class ParentQueue extends AbstractCSQueue { "numContainers=" + getNumContainers(); } - private CSQueue createNewQueue(String childQueuePath, boolean isLeaf) + public CSQueue createNewQueue(String childQueuePath, boolean isLeaf) throws SchedulerDynamicEditException { try { AbstractCSQueue childQueue; @@ -496,7 +492,7 @@ public class ParentQueue extends AbstractCSQueue { if (isLeaf) { childQueue = new LeafQueue(queueContext, queueShortName, this, null, true); - } else{ + } else { childQueue = new ParentQueue(queueContext, queueShortName, this, null, true); } childQueue.setDynamicQueue(true); @@ -509,76 +505,6 @@ public class ParentQueue extends AbstractCSQueue { } } - public ParentQueue addDynamicParentQueue(String queuePath) - throws SchedulerDynamicEditException { - return (ParentQueue) addDynamicChildQueue(queuePath, false); - } - - public LeafQueue addDynamicLeafQueue(String queuePath) - throws SchedulerDynamicEditException { - return (LeafQueue) addDynamicChildQueue(queuePath, true); - } - - // New method to add child queue - private CSQueue addDynamicChildQueue(String childQueuePath, boolean isLeaf) - throws SchedulerDynamicEditException { - writeLock.lock(); - try { - // Check if queue exists, if queue exists, write a warning message (this - // should not happen, since it will be handled before calling this method) - // , but we will move on. - CSQueue queue = - queueContext.getQueueManager().getQueueByFullName( - childQueuePath); - if (queue != null) { - LOG.warn( - "This should not happen, trying to create queue=" + childQueuePath - + ", however the queue already exists"); - return queue; - } - - // Check if the max queue limit is exceeded. - int maxQueues = queueContext.getConfiguration(). - getAutoCreatedQueuesV2MaxChildQueuesLimit(getQueuePath()); - if (childQueues.size() >= maxQueues) { - throw new SchedulerDynamicEditException( - "Cannot auto create queue " + childQueuePath + ". Max Child " - + "Queue limit exceeded which is configured as: " + maxQueues - + " and number of child queues is: " + childQueues.size()); - } - - // First, check if we allow creation or not - boolean weightsAreUsed = false; - try { - weightsAreUsed = getCapacityConfigurationTypeForQueues(childQueues) - == QueueCapacityType.WEIGHT; - } catch (IOException e) { - LOG.warn("Caught Exception during auto queue creation", e); - } - if (!weightsAreUsed) { - throw new SchedulerDynamicEditException( - "Trying to create new queue=" + childQueuePath - + " but not all the queues under parent=" + this.getQueuePath() - + " are using weight-based capacity. Failed to created queue"); - } - - CSQueue newQueue = createNewQueue(childQueuePath, isLeaf); - this.childQueues.add(newQueue); - updateLastSubmittedTimeStamp(); - - // Call updateClusterResource. - // Which will deal with all effectiveMin/MaxResource - // Calculation - this.updateClusterResource(queueContext.getClusterResource(), - new ResourceLimits(queueContext.getClusterResource())); - - return newQueue; - } finally { - writeLock.unlock(); - } - } - - // New method to remove child queue public void removeChildQueue(CSQueue queue) throws SchedulerDynamicEditException { @@ -625,10 +551,10 @@ public class ParentQueue extends AbstractCSQueue { return isDynamicQueue() || queueContext.getConfiguration(). isAutoQueueCreationV2Enabled(getQueuePath()); } - + @Override public void reinitialize(CSQueue newlyParsedQueue, - Resource clusterResource) throws IOException { + Resource clusterResource) throws IOException { writeLock.lock(); try { // We skip reinitialize for dynamic queues, when this is called, and @@ -639,14 +565,14 @@ public class ParentQueue extends AbstractCSQueue { } // Sanity check - if (!(newlyParsedQueue instanceof ParentQueue) || !newlyParsedQueue + if (!(newlyParsedQueue instanceof AbstractParentQueue) || !newlyParsedQueue .getQueuePath().equals(getQueuePath())) { throw new IOException( "Trying to reinitialize " + getQueuePath() + " from " + newlyParsedQueue.getQueuePath()); } - ParentQueue newlyParsedParentQueue = (ParentQueue) newlyParsedQueue; + AbstractParentQueue newlyParsedParentQueue = (AbstractParentQueue) newlyParsedQueue; // Set new configs setupQueueConfigs(clusterResource); @@ -681,8 +607,8 @@ public class ParentQueue extends AbstractCSQueue { // checked to ensure that this child-queue is in STOPPED state if // Child queue has been converted to ParentQueue. if ((childQueue instanceof AbstractLeafQueue - && newChildQueue instanceof ParentQueue) - || (childQueue instanceof ParentQueue + && newChildQueue instanceof AbstractParentQueue) + || (childQueue instanceof AbstractParentQueue && newChildQueue instanceof AbstractLeafQueue)) { // We would convert this LeafQueue to ParentQueue, or vice versa. // consider this as the combination of DELETE then ADD. @@ -755,7 +681,7 @@ public class ParentQueue extends AbstractCSQueue { } finally { writeLock.unlock(); } - + // Inform the parent queue if (parent != null) { try { @@ -809,19 +735,14 @@ public class ParentQueue extends AbstractCSQueue { private void addApplication(ApplicationId applicationId, String user) { - writeLock.lock(); - try { - ++numApplications; + numApplications.incrementAndGet(); - LOG.info( - "Application added -" + " appId: " + applicationId + " user: " + user - + " leaf-queue of parent: " + getQueuePath() + " #applications: " - + getNumApplications()); - } finally { - writeLock.unlock(); - } + LOG.info( + "Application added -" + " appId: " + applicationId + " user: " + user + + " leaf-queue of parent: " + getQueuePath() + " #applications: " + + getNumApplications()); } - + @Override public void finishApplication(ApplicationId application, String user) { @@ -837,16 +758,11 @@ public class ParentQueue extends AbstractCSQueue { private void removeApplication(ApplicationId applicationId, String user) { - writeLock.lock(); - try { - --numApplications; + numApplications.decrementAndGet(); - LOG.info("Application removed -" + " appId: " + applicationId + " user: " - + user + " leaf-queue of parent: " + getQueuePath() - + " #applications: " + getNumApplications()); - } finally { - writeLock.unlock(); - } + LOG.info("Application removed -" + " appId: " + applicationId + " user: " + + user + " leaf-queue of parent: " + getQueuePath() + + " #applications: " + getNumApplications()); } private String getParentName() { @@ -855,8 +771,8 @@ public class ParentQueue extends AbstractCSQueue { @Override public CSAssignment assignContainers(Resource clusterResource, - CandidateNodeSet<FiCaSchedulerNode> candidates, - ResourceLimits resourceLimits, SchedulingMode schedulingMode) { + CandidateNodeSet<FiCaSchedulerNode> candidates, + ResourceLimits resourceLimits, SchedulingMode schedulingMode) { FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates); // if our queue cannot access this node, just return @@ -1128,7 +1044,7 @@ public class ParentQueue extends AbstractCSQueue { } if (Resources.greaterThan( - resourceCalculator, cluster, + resourceCalculator, cluster, childAssignment.getResource(), Resources.none())) { assignment = childAssignment; break; @@ -1162,10 +1078,10 @@ public class ParentQueue extends AbstractCSQueue { String getChildQueuesToPrint() { StringBuilder sb = new StringBuilder(); for (CSQueue q : childQueues) { - sb.append(q.getQueuePath() + + sb.append(q.getQueuePath() + " usedCapacity=(" + q.getUsedCapacity() + "), " + " label=(" - + StringUtils.join(q.getAccessibleNodeLabels().iterator(), ",") + + StringUtils.join(q.getAccessibleNodeLabels().iterator(), ",") + ")"); } return sb.toString(); @@ -1174,7 +1090,7 @@ public class ParentQueue extends AbstractCSQueue { private void printChildQueues() { if (LOG.isDebugEnabled()) { LOG.debug("printChildQueues - queue: " + getQueuePath() - + " child-queues: " + getChildQueuesToPrint()); + + " child-queues: " + getChildQueuesToPrint()); } } @@ -1194,8 +1110,8 @@ public class ParentQueue extends AbstractCSQueue { @Override public void completedContainer(Resource clusterResource, - FiCaSchedulerApp application, FiCaSchedulerNode node, - RMContainer rmContainer, ContainerStatus containerStatus, + FiCaSchedulerApp application, FiCaSchedulerNode node, + RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event, CSQueue completedChildQueue, boolean sortQueues) { if (application != null) { @@ -1205,9 +1121,9 @@ public class ParentQueue extends AbstractCSQueue { // Inform the parent if (parent != null) { // complete my parent - parent.completedContainer(clusterResource, application, + parent.completedContainer(clusterResource, application, node, rmContainer, null, event, this, sortQueues); - } + } } } @@ -1398,7 +1314,7 @@ public class ParentQueue extends AbstractCSQueue { } } - + @Override public void recoverContainer(Resource clusterResource, SchedulerApplicationAttempt attempt, RMContainer rmContainer) { @@ -1424,7 +1340,7 @@ public class ParentQueue extends AbstractCSQueue { parent.recoverContainer(clusterResource, attempt, rmContainer); } } - + @Override public ActiveUsersManager getAbstractUsersManager() { // Should never be called since all applications are submitted to LeafQueues @@ -1483,9 +1399,9 @@ public class ParentQueue extends AbstractCSQueue { } } } - + public int getNumApplications() { - return numApplications; + return numApplications.get(); } void allocateResource(Resource clusterResource, @@ -1675,4 +1591,5 @@ public class ParentQueue extends AbstractCSQueue { public AutoCreatedQueueTemplate getAutoCreatedQueueTemplate() { return autoCreatedQueueTemplate; } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedQueueManagementPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedQueueManagementPolicy.java index cea3df4e03b..b981723e02c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedQueueManagementPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedQueueManagementPolicy.java @@ -30,14 +30,14 @@ public interface AutoCreatedQueueManagementPolicy { * @param parentQueue parent queue * @throws IOException an I/O exception has occurred. */ - void init(ParentQueue parentQueue) throws IOException; + void init(AbstractParentQueue parentQueue) throws IOException; /** * Reinitialize policy state ( if required ). * @param parentQueue parent queue * @throws IOException an I/O exception has occurred. */ - void reinitialize(ParentQueue parentQueue) throws IOException; + void reinitialize(AbstractParentQueue parentQueue) throws IOException; /** * Get initial template for the specified leaf queue. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSMaxRunningAppsEnforcer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSMaxRunningAppsEnforcer.java index 83068e20786..c664674381c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSMaxRunningAppsEnforcer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSMaxRunningAppsEnforcer.java @@ -125,10 +125,10 @@ public class CSMaxRunningAppsEnforcer { String user = app.getUser(); AbstractCSQueue queue = (AbstractCSQueue) app.getQueue(); // Increment running counts for all parent queues - ParentQueue parent = (ParentQueue) queue.getParent(); + AbstractParentQueue parent = (AbstractParentQueue) queue.getParent(); while (parent != null) { parent.incrementRunnableApps(); - parent = (ParentQueue) parent.getParent(); + parent = (AbstractParentQueue) parent.getParent(); } Integer userNumRunnable = usersNumRunnableApps.get(user); @@ -189,12 +189,12 @@ public class CSMaxRunningAppsEnforcer { (queue.getNumRunnableApps() == queue.getMaxParallelApps() - 1) ? queue : null; - ParentQueue parent = (ParentQueue) queue.getParent(); + AbstractParentQueue parent = (AbstractParentQueue) queue.getParent(); while (parent != null) { if (parent.getNumRunnableApps() == parent.getMaxParallelApps() - 1) { highestQueueWithAppsNowRunnable = parent; } - parent = (ParentQueue) parent.getParent(); + parent = (AbstractParentQueue) parent.getParent(); } List<List<FiCaSchedulerApp>> appsNowMaybeRunnable = @@ -303,10 +303,10 @@ public class CSMaxRunningAppsEnforcer { // Update runnable app bookkeeping for queues AbstractCSQueue queue = (AbstractCSQueue) app.getQueue(); - ParentQueue parent = (ParentQueue) queue.getParent(); + AbstractParentQueue parent = (AbstractParentQueue) queue.getParent(); while (parent != null) { parent.decrementRunnableApps(); - parent = (ParentQueue) parent.getParent(); + parent = (AbstractParentQueue) parent.getParent(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index e513359af0d..2c61fe61adb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -2097,7 +2097,7 @@ public class CapacityScheduler extends { QueueManagementChangeEvent queueManagementChangeEvent = (QueueManagementChangeEvent) event; - ParentQueue parentQueue = queueManagementChangeEvent.getParentQueue(); + AbstractParentQueue parentQueue = queueManagementChangeEvent.getParentQueue(); try { final List<QueueManagementChange> queueManagementChanges = queueManagementChangeEvent.getQueueManagementChanges(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigValidator.java index a0467e21c81..396527aabb3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigValidator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigValidator.java @@ -172,7 +172,7 @@ public final class CapacitySchedulerConfigValidator { private static void validateParentQueueConversion(CSQueue oldQueue, CSQueue newQueue) throws IOException { - if (oldQueue instanceof ParentQueue) { + if (oldQueue instanceof AbstractParentQueue) { if (!(oldQueue instanceof ManagedParentQueue) && newQueue instanceof ManagedParentQueue) { throw new IOException( "Can not convert parent queue: " + oldQueue.getQueuePath() @@ -199,7 +199,7 @@ public final class CapacitySchedulerConfigValidator { private static void validateLeafQueueConversion(CSQueue oldQueue, CSQueue newQueue) throws IOException { - if (oldQueue instanceof AbstractLeafQueue && newQueue instanceof ParentQueue) { + if (oldQueue instanceof AbstractLeafQueue && newQueue instanceof AbstractParentQueue) { if (isEitherQueueStopped(oldQueue.getState(), newQueue.getState())) { LOG.info("Converting the leaf queue: {} to parent queue.", oldQueue.getQueuePath()); } else { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java index d29d80e07e9..c7d625f6ffd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java @@ -249,7 +249,8 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager< boolean isAutoCreateEnabled = conf.isAutoCreateChildQueueEnabled(fullQueueName); // if a queue is eligible for auto queue creation v2 it must be a ParentQueue // (even if it is empty) - final boolean isDynamicParent = oldQueue instanceof ParentQueue && oldQueue.isDynamicQueue(); + final boolean isDynamicParent = oldQueue instanceof AbstractParentQueue && + oldQueue.isDynamicQueue(); boolean isAutoQueueCreationEnabledParent = isDynamicParent || conf.isAutoQueueCreationV2Enabled( fullQueueName) || isAutoCreateEnabled; @@ -270,7 +271,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager< throw new IllegalStateException("Only Leaf Queues can be reservable for " + fullQueueName); } - ParentQueue parentQueue; + AbstractParentQueue parentQueue; if (isAutoCreateEnabled) { parentQueue = new ManagedParentQueue(queueContext, queueName, parent, oldQueues.get( fullQueueName)); @@ -591,7 +592,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager< "above the limit."); } - if (!(firstExistingParent instanceof ParentQueue)) { + if (!(firstExistingParent instanceof AbstractParentQueue)) { throw new SchedulerDynamicEditException( "Could not auto create hierarchy of " + queue.getFullPath() + ". Queue " + queue.getParent() + @@ -599,7 +600,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager< ); } - ParentQueue existingParentQueue = (ParentQueue) firstExistingParent; + AbstractParentQueue existingParentQueue = (AbstractParentQueue) firstExistingParent; if (!existingParentQueue.isEligibleForAutoQueueCreation()) { throw new SchedulerDynamicEditException("Auto creation of queue " + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java index 4f7ed405184..ca40f21a14a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java @@ -56,7 +56,7 @@ public class ManagedParentQueue extends AbstractManagedParentQueue { final String queueName, final CSQueue parent, final CSQueue old) throws IOException { super(queueContext, queueName, parent, old); - + super.setupQueueConfigs(queueContext.getClusterResource()); shouldFailAutoCreationWhenGuaranteedCapacityExceeded = queueContext.getConfiguration() .getShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index a816b91034c..b27431f6554 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -19,494 +19,30 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList; -import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap; -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.util.Sets; -import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Evolving; -import org.apache.hadoop.security.AccessControlException; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.authorize.AccessControlList; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.ExecutionType; -import org.apache.hadoop.yarn.api.records.QueueACL; -import org.apache.hadoop.yarn.api.records.QueueInfo; -import org.apache.hadoop.yarn.api.records.QueueState; -import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceInformation; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.security.AccessType; -import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AllocationState; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils; -import org.apache.hadoop.yarn.util.UnitsConversionUtil; -import org.apache.hadoop.yarn.util.resource.ResourceUtils; -import org.apache.hadoop.yarn.util.resource.Resources; - -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.getACLsForFlexibleAutoCreatedParentQueue; - @Private @Evolving -public class ParentQueue extends AbstractCSQueue { +public class ParentQueue extends AbstractParentQueue { private static final Logger LOG = LoggerFactory.getLogger(ParentQueue.class); - protected final List<CSQueue> childQueues; - private final boolean rootQueue; - private volatile int numApplications; - - private final RecordFactory recordFactory = - RecordFactoryProvider.getRecordFactory(null); - - private QueueOrderingPolicy queueOrderingPolicy; - - private long lastSkipQueueDebugLoggingTimestamp = -1; - - private int runnableApps; - - private final boolean allowZeroCapacitySum; - - private AutoCreatedQueueTemplate autoCreatedQueueTemplate; - - // A ratio of the queue's effective minimum resource and the summary of the configured - // minimum resource of its children grouped by labels and calculated for each resource names - // distinctively. - private final Map<String, Map<String, Float>> effectiveMinResourceRatio = - new ConcurrentHashMap<>(); - public ParentQueue(CapacitySchedulerQueueContext queueContext, String queueName, CSQueue parent, CSQueue old) throws IOException { this(queueContext, queueName, parent, old, false); } - private ParentQueue(CapacitySchedulerQueueContext queueContext, + public ParentQueue(CapacitySchedulerQueueContext queueContext, String queueName, CSQueue parent, CSQueue old, boolean isDynamic) throws IOException { - super(queueContext, queueName, parent, old); - setDynamicQueue(isDynamic); - this.rootQueue = (parent == null); - - float rawCapacity = queueContext.getConfiguration() - .getNonLabeledQueueCapacity(this.queuePath); - - if (rootQueue && - (rawCapacity != CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE)) { - throw new IllegalArgumentException("Illegal " + - "capacity of " + rawCapacity + " for queue " + queueName + - ". Must be " + CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE); - } - - this.childQueues = new ArrayList<>(); - this.allowZeroCapacitySum = - queueContext.getConfiguration() - .getAllowZeroCapacitySum(getQueuePath()); - - setupQueueConfigs(queueContext.getClusterResource()); - } - - // returns what is configured queue ordering policy - private String getQueueOrderingPolicyConfigName() { - return queueOrderingPolicy == null ? - null : - queueOrderingPolicy.getConfigName(); - } - - protected void setupQueueConfigs(Resource clusterResource) - throws IOException { - writeLock.lock(); - try { - CapacitySchedulerConfiguration configuration = queueContext.getConfiguration(); - autoCreatedQueueTemplate = new AutoCreatedQueueTemplate( - configuration, this.queuePath); - super.setupQueueConfigs(clusterResource); - StringBuilder aclsString = new StringBuilder(); - for (Map.Entry<AccessType, AccessControlList> e : getACLs().entrySet()) { - aclsString.append(e.getKey()).append(":") - .append(e.getValue().getAclString()); - } - - StringBuilder labelStrBuilder = new StringBuilder(); - if (getAccessibleNodeLabels() != null) { - for (String nodeLabel : getAccessibleNodeLabels()) { - labelStrBuilder.append(nodeLabel).append(","); - } - } - - // Initialize queue ordering policy - queueOrderingPolicy = configuration.getQueueOrderingPolicy( - getQueuePath(), parent == null ? - null : - ((ParentQueue) parent).getQueueOrderingPolicyConfigName()); - queueOrderingPolicy.setQueues(childQueues); - - LOG.info(getQueueName() + ", " + getCapacityOrWeightString() - + ", absoluteCapacity=" + getAbsoluteCapacity() - + ", maxCapacity=" + getMaximumCapacity() - + ", absoluteMaxCapacity=" + getAbsoluteMaximumCapacity() - + ", state=" + getState() + ", acls=" - + aclsString + ", labels=" + labelStrBuilder + "\n" - + ", reservationsContinueLooking=" + isReservationsContinueLooking() - + ", orderingPolicy=" + getQueueOrderingPolicyConfigName() - + ", priority=" + getPriority() - + ", allowZeroCapacitySum=" + allowZeroCapacitySum); - } finally { - writeLock.unlock(); - } - } - - @Override - protected void setDynamicQueueACLProperties() { - super.setDynamicQueueACLProperties(); - - if (parent instanceof ParentQueue) { - acls.putAll(getACLsForFlexibleAutoCreatedParentQueue( - ((ParentQueue) parent).getAutoCreatedQueueTemplate())); - } - } - - private static float PRECISION = 0.0005f; // 0.05% precision - - // Check weight configuration, throw exception when configuration is invalid - // return true when all children use weight mode. - private QueueCapacityType getCapacityConfigurationTypeForQueues( - Collection<CSQueue> queues) throws IOException { - // Do we have ANY queue set capacity in any labels? - boolean percentageIsSet = false; - - // Do we have ANY queue set weight in any labels? - boolean weightIsSet = false; - - // Do we have ANY queue set absolute in any labels? - boolean absoluteMinResSet = false; - - StringBuilder diagMsg = new StringBuilder(); - - for (CSQueue queue : queues) { - for (String nodeLabel : queueCapacities.getExistingNodeLabels()) { - float capacityByLabel = queue.getQueueCapacities().getCapacity(nodeLabel); - if (capacityByLabel > 0) { - percentageIsSet = true; - } - float weightByLabel = queue.getQueueCapacities().getWeight(nodeLabel); - // By default weight is set to -1, so >= 0 is enough. - if (weightByLabel >= 0) { - weightIsSet = true; - diagMsg.append( - "{Queue=" + queue.getQueuePath() + ", label=" + nodeLabel - + " uses weight mode}. "); - } - if (!queue.getQueueResourceQuotas().getConfiguredMinResource(nodeLabel) - .equals(Resources.none())) { - absoluteMinResSet = true; - // There's a special handling: when absolute resource is configured, - // capacity will be calculated (and set) for UI/metrics purposes, so - // when asboluteMinResource is set, unset percentage - percentageIsSet = false; - diagMsg.append( - "{Queue=" + queue.getQueuePath() + ", label=" + nodeLabel - + " uses absolute mode}. "); - } - if (percentageIsSet) { - diagMsg.append( - "{Queue=" + queue.getQueuePath() + ", label=" + nodeLabel - + " uses percentage mode}. "); - } - } - } - - // If we have mixed capacity, weight or absolute resource (any of the two) - // We will throw exception - // Root queue is an exception here, because by default root queue returns - // 100 as capacity no matter what. We should look into this case in the - // future. To avoid impact too many code paths, we don;t check root queue's - // config. - if (queues.iterator().hasNext() && - !queues.iterator().next().getQueuePath().equals( - CapacitySchedulerConfiguration.ROOT) && - (percentageIsSet ? 1 : 0) + (weightIsSet ? 1 : 0) + (absoluteMinResSet ? - 1 : - 0) > 1) { - throw new IOException("Parent queue '" + getQueuePath() - + "' have children queue used mixed of " - + " weight mode, percentage and absolute mode, it is not allowed, please " - + "double check, details:" + diagMsg.toString()); - } - - if (weightIsSet || queues.isEmpty()) { - return QueueCapacityType.WEIGHT; - } else if (absoluteMinResSet) { - return QueueCapacityType.ABSOLUTE_RESOURCE; - } else { - return QueueCapacityType.PERCENT; - } - } - - private enum QueueCapacityType { - WEIGHT, ABSOLUTE_RESOURCE, PERCENT; - } - - /** - * Set child queue and verify capacities - * +--------------+---------------------------+-------------------------------------+------------------------+ - * | | parent-weight | parent-pct | parent-abs | - * +--------------+---------------------------+-------------------------------------+------------------------+ - * | child-weight | No specific check | No specific check | X | - * +--------------+---------------------------+-------------------------------------+------------------------+ - * | child-pct | Sum(children.capacity) = | When: | X | - * | | 0 OR 100 | parent.capacity>0 | | - * | | | sum(children.capacity)=100 OR 0 | | - * | | | parent.capacity=0 | | - * | | | sum(children.capacity)=0 | | - * +--------------+---------------------------+-------------------------------------+------------------------+ - * | child-abs | X | X | Sum(children.minRes)<= | - * | | | | parent.minRes | - * +--------------+---------------------------+-------------------------------------+------------------------+ - * @param childQueues - */ - void setChildQueues(Collection<CSQueue> childQueues) throws IOException { - writeLock.lock(); - try { - boolean isLegacyQueueMode = queueContext.getConfiguration().isLegacyQueueMode(); - if (isLegacyQueueMode) { - QueueCapacityType childrenCapacityType = - getCapacityConfigurationTypeForQueues(childQueues); - QueueCapacityType parentCapacityType = - getCapacityConfigurationTypeForQueues(ImmutableList.of(this)); - - if (childrenCapacityType == QueueCapacityType.ABSOLUTE_RESOURCE - || parentCapacityType == QueueCapacityType.ABSOLUTE_RESOURCE) { - // We don't allow any mixed absolute + {weight, percentage} between - // children and parent - if (childrenCapacityType != parentCapacityType && !this.getQueuePath() - .equals(CapacitySchedulerConfiguration.ROOT)) { - throw new IOException("Parent=" + this.getQueuePath() - + ": When absolute minResource is used, we must make sure both " - + "parent and child all use absolute minResource"); - } - - // Ensure that for each parent queue: parent.min-resource >= - // Σ(child.min-resource). - for (String nodeLabel : queueCapacities.getExistingNodeLabels()) { - Resource minRes = Resources.createResource(0, 0); - for (CSQueue queue : childQueues) { - // Accumulate all min/max resource configured for all child queues. - Resources.addTo(minRes, queue.getQueueResourceQuotas() - .getConfiguredMinResource(nodeLabel)); - } - Resource resourceByLabel = labelManager.getResourceByLabel(nodeLabel, - queueContext.getClusterResource()); - Resource parentMinResource = - usageTracker.getQueueResourceQuotas().getConfiguredMinResource(nodeLabel); - if (!parentMinResource.equals(Resources.none()) && Resources.lessThan( - resourceCalculator, resourceByLabel, parentMinResource, minRes)) { - throw new IOException( - "Parent Queues" + " capacity: " + parentMinResource - + " is less than" + " to its children:" + minRes - + " for queue:" + getQueueName()); - } - } - } - - // When child uses percent - if (childrenCapacityType == QueueCapacityType.PERCENT) { - float childrenPctSum = 0; - // check label capacities - for (String nodeLabel : queueCapacities.getExistingNodeLabels()) { - // check children's labels - childrenPctSum = 0; - for (CSQueue queue : childQueues) { - childrenPctSum += queue.getQueueCapacities().getCapacity(nodeLabel); - } - - if (Math.abs(1 - childrenPctSum) > PRECISION) { - // When children's percent sum != 100% - if (Math.abs(childrenPctSum) > PRECISION) { - // It is wrong when percent sum != {0, 1} - throw new IOException( - "Illegal" + " capacity sum of " + childrenPctSum - + " for children of queue " + getQueueName() + " for label=" - + nodeLabel + ". It should be either 0 or 1.0"); - } else { - // We also allow children's percent sum = 0 under the following - // conditions - // - Parent uses weight mode - // - Parent uses percent mode, and parent has - // (capacity=0 OR allowZero) - if (parentCapacityType == QueueCapacityType.PERCENT) { - if ((Math.abs(queueCapacities.getCapacity(nodeLabel)) - > PRECISION) && (!allowZeroCapacitySum)) { - throw new IOException( - "Illegal" + " capacity sum of " + childrenPctSum - + " for children of queue " + getQueueName() - + " for label=" + nodeLabel - + ". It is set to 0, but parent percent != 0, and " - + "doesn't allow children capacity to set to 0"); - } - } - } - } else { - // Even if child pct sum == 1.0, we will make sure parent has - // positive percent. - if (parentCapacityType == QueueCapacityType.PERCENT && Math.abs( - queueCapacities.getCapacity(nodeLabel)) <= 0f - && !allowZeroCapacitySum) { - throw new IOException( - "Illegal" + " capacity sum of " + childrenPctSum - + " for children of queue " + getQueueName() + " for label=" - + nodeLabel + ". queue=" + getQueueName() - + " has zero capacity, but child" - + "queues have positive capacities"); - } - } - } - } - } - - this.childQueues.clear(); - this.childQueues.addAll(childQueues); - if (LOG.isDebugEnabled()) { - LOG.debug("setChildQueues: " + getChildQueuesToPrint()); - } - } finally { - writeLock.unlock(); - } - } - - @Override - public QueueInfo getQueueInfo( - boolean includeChildQueues, boolean recursive) { - readLock.lock(); - try { - QueueInfo queueInfo = getQueueInfo(); - - List<QueueInfo> childQueuesInfo = new ArrayList<>(); - if (includeChildQueues) { - for (CSQueue child : childQueues) { - // Get queue information recursively? - childQueuesInfo.add(child.getQueueInfo(recursive, recursive)); - } - } - queueInfo.setChildQueues(childQueuesInfo); - - return queueInfo; - } finally { - readLock.unlock(); - } - - } - - private QueueUserACLInfo getUserAclInfo( - UserGroupInformation user) { - readLock.lock(); - try { - QueueUserACLInfo userAclInfo = recordFactory.newRecordInstance( - QueueUserACLInfo.class); - List<QueueACL> operations = new ArrayList<QueueACL>(); - for (QueueACL operation : QueueACL.values()) { - if (hasAccess(operation, user)) { - operations.add(operation); - } - } - - userAclInfo.setQueueName(getQueuePath()); - userAclInfo.setUserAcls(operations); - return userAclInfo; - } finally { - readLock.unlock(); - } - - } - - @Override - public List<QueueUserACLInfo> getQueueUserAclInfo( - UserGroupInformation user) { - readLock.lock(); - try { - List<QueueUserACLInfo> userAcls = new ArrayList<>(); - - // Add parent queue acls - userAcls.add(getUserAclInfo(user)); - - // Add children queue acls - for (CSQueue child : childQueues) { - userAcls.addAll(child.getQueueUserAclInfo(user)); - } - - return userAcls; - } finally { - readLock.unlock(); - } - - } - - public String toString() { - return getQueueName() + ": " + - "numChildQueue= " + childQueues.size() + ", " + - getCapacityOrWeightString() + ", " + - "absoluteCapacity=" + queueCapacities.getAbsoluteCapacity() + ", " + - "usedResources=" + usageTracker.getQueueUsage().getUsed() + ", " + - "usedCapacity=" + getUsedCapacity() + ", " + - "numApps=" + getNumApplications() + ", " + - "numContainers=" + getNumContainers(); - } - - private CSQueue createNewQueue(String childQueuePath, boolean isLeaf) - throws SchedulerDynamicEditException { - try { - AbstractCSQueue childQueue; - String queueShortName = childQueuePath.substring( - childQueuePath.lastIndexOf(".") + 1); - - if (isLeaf) { - childQueue = new LeafQueue(queueContext, - queueShortName, this, null, true); - } else{ - childQueue = new ParentQueue(queueContext, queueShortName, this, null, true); - } - childQueue.setDynamicQueue(true); - // It should be sufficient now, we don't need to set more, because weights - // related setup will be handled in updateClusterResources - - return childQueue; - } catch (IOException e) { - throw new SchedulerDynamicEditException(e.toString()); - } + super(queueContext, queueName, parent, old, isDynamic); + super.setupQueueConfigs(queueContext.getClusterResource()); } public ParentQueue addDynamicParentQueue(String queuePath) @@ -577,1102 +113,4 @@ public class ParentQueue extends AbstractCSQueue { writeLock.unlock(); } } - - - // New method to remove child queue - public void removeChildQueue(CSQueue queue) - throws SchedulerDynamicEditException { - writeLock.lock(); - try { - if (!(queue instanceof AbstractCSQueue) || - !((AbstractCSQueue) queue).isDynamicQueue()) { - throw new SchedulerDynamicEditException("Queue " + getQueuePath() - + " can not remove " + queue.getQueuePath() + - " because it is not a dynamic queue"); - } - - // We need to check if the parent of the child queue is exactly this - // ParentQueue object - if (queue.getParent() != this) { - throw new SchedulerDynamicEditException("Queue " + getQueuePath() - + " can not remove " + queue.getQueuePath() + - " because it has a different parent queue"); - } - - // Now we can do remove and update - this.childQueues.remove(queue); - queueContext.getQueueManager() - .removeQueue(queue.getQueuePath()); - - // Call updateClusterResource, - // which will deal with all effectiveMin/MaxResource - // Calculation - this.updateClusterResource(queueContext.getClusterResource(), - new ResourceLimits(queueContext.getClusterResource())); - - } finally { - writeLock.unlock(); - } - } - - /** - * Check whether this queue supports adding additional child queues - * dynamically. - * @return true, if queue is eligible to create additional queues dynamically, - * false otherwise - */ - public boolean isEligibleForAutoQueueCreation() { - return isDynamicQueue() || queueContext.getConfiguration(). - isAutoQueueCreationV2Enabled(getQueuePath()); - } - - @Override - public void reinitialize(CSQueue newlyParsedQueue, - Resource clusterResource) throws IOException { - writeLock.lock(); - try { - // We skip reinitialize for dynamic queues, when this is called, and - // new queue is different from this queue, we will make this queue to be - // static queue. - if (newlyParsedQueue != this) { - this.setDynamicQueue(false); - } - - // Sanity check - if (!(newlyParsedQueue instanceof ParentQueue) || !newlyParsedQueue - .getQueuePath().equals(getQueuePath())) { - throw new IOException( - "Trying to reinitialize " + getQueuePath() + " from " - + newlyParsedQueue.getQueuePath()); - } - - ParentQueue newlyParsedParentQueue = (ParentQueue) newlyParsedQueue; - - // Set new configs - setupQueueConfigs(clusterResource); - - // Re-configure existing child queues and add new ones - // The CS has already checked to ensure all existing child queues are present! - Map<String, CSQueue> currentChildQueues = getQueuesMap(childQueues); - Map<String, CSQueue> newChildQueues = getQueuesMap( - newlyParsedParentQueue.childQueues); - - // Reinitialize dynamic queues as well, because they are not parsed - for (String queue : Sets.difference(currentChildQueues.keySet(), - newChildQueues.keySet())) { - CSQueue candidate = currentChildQueues.get(queue); - if (candidate instanceof AbstractCSQueue) { - if (((AbstractCSQueue) candidate).isDynamicQueue()) { - candidate.reinitialize(candidate, clusterResource); - } - } - } - - for (Map.Entry<String, CSQueue> e : newChildQueues.entrySet()) { - String newChildQueueName = e.getKey(); - CSQueue newChildQueue = e.getValue(); - - CSQueue childQueue = currentChildQueues.get(newChildQueueName); - - // Check if the child-queue already exists - if (childQueue != null) { - // Check if the child-queue has been converted into parent queue or - // parent Queue has been converted to child queue. The CS has already - // checked to ensure that this child-queue is in STOPPED state if - // Child queue has been converted to ParentQueue. - if ((childQueue instanceof AbstractLeafQueue - && newChildQueue instanceof ParentQueue) - || (childQueue instanceof ParentQueue - && newChildQueue instanceof AbstractLeafQueue)) { - // We would convert this LeafQueue to ParentQueue, or vice versa. - // consider this as the combination of DELETE then ADD. - newChildQueue.setParent(this); - currentChildQueues.put(newChildQueueName, newChildQueue); - // inform CapacitySchedulerQueueManager - CapacitySchedulerQueueManager queueManager = - queueContext.getQueueManager(); - queueManager.addQueue(newChildQueueName, newChildQueue); - continue; - } - // Re-init existing queues - childQueue.reinitialize(newChildQueue, clusterResource); - LOG.info(getQueuePath() + ": re-configured queue: " + childQueue); - } else{ - // New child queue, do not re-init - - // Set parent to 'this' - newChildQueue.setParent(this); - - // Save in list of current child queues - currentChildQueues.put(newChildQueueName, newChildQueue); - - LOG.info( - getQueuePath() + ": added new child queue: " + newChildQueue); - } - } - - // remove the deleted queue in the refreshed xml. - for (Iterator<Map.Entry<String, CSQueue>> itr = currentChildQueues - .entrySet().iterator(); itr.hasNext();) { - Map.Entry<String, CSQueue> e = itr.next(); - String queueName = e.getKey(); - if (!newChildQueues.containsKey(queueName)) { - if (((AbstractCSQueue)e.getValue()).isDynamicQueue()) { - // Don't remove dynamic queue if we cannot find it in the config. - continue; - } - itr.remove(); - } - } - - // Re-sort all queues - setChildQueues(currentChildQueues.values()); - - // Make sure we notifies QueueOrderingPolicy - queueOrderingPolicy.setQueues(childQueues); - } finally { - writeLock.unlock(); - } - } - - private Map<String, CSQueue> getQueuesMap(List<CSQueue> queues) { - Map<String, CSQueue> queuesMap = new HashMap<String, CSQueue>(); - for (CSQueue queue : queues) { - queuesMap.put(queue.getQueuePath(), queue); - } - return queuesMap; - } - - @Override - public void submitApplication(ApplicationId applicationId, String user, - String queue) throws AccessControlException { - writeLock.lock(); - try { - // Sanity check - validateSubmitApplication(applicationId, user, queue); - - addApplication(applicationId, user); - } finally { - writeLock.unlock(); - } - - // Inform the parent queue - if (parent != null) { - try { - parent.submitApplication(applicationId, user, queue); - } catch (AccessControlException ace) { - LOG.info("Failed to submit application to parent-queue: " + - parent.getQueuePath(), ace); - removeApplication(applicationId, user); - throw ace; - } - } - } - - public void validateSubmitApplication(ApplicationId applicationId, - String userName, String queue) throws AccessControlException { - writeLock.lock(); - try { - if (queue.equals(getQueueName())) { - throw new AccessControlException( - "Cannot submit application " + "to non-leaf queue: " + getQueueName()); - } - - if (getState() != QueueState.RUNNING) { - throw new AccessControlException("Queue " + getQueuePath() - + " is STOPPED. Cannot accept submission of application: " - + applicationId); - } - } finally { - writeLock.unlock(); - } - } - - @Override - public void submitApplicationAttempt(FiCaSchedulerApp application, - String userName) { - // submit attempt logic. - } - - @Override - public void submitApplicationAttempt(FiCaSchedulerApp application, - String userName, boolean isMoveApp) { - throw new UnsupportedOperationException("Submission of application attempt" - + " to parent queue is not supported"); - } - - @Override - public void finishApplicationAttempt(FiCaSchedulerApp application, - String queue) { - // finish attempt logic. - } - - private void addApplication(ApplicationId applicationId, - String user) { - writeLock.lock(); - try { - ++numApplications; - - LOG.info( - "Application added -" + " appId: " + applicationId + " user: " + user - + " leaf-queue of parent: " + getQueuePath() + " #applications: " - + getNumApplications()); - } finally { - writeLock.unlock(); - } - } - - @Override - public void finishApplication(ApplicationId application, String user) { - - removeApplication(application, user); - - appFinished(); - - // Inform the parent queue - if (parent != null) { - parent.finishApplication(application, user); - } - } - - private void removeApplication(ApplicationId applicationId, - String user) { - writeLock.lock(); - try { - --numApplications; - - LOG.info("Application removed -" + " appId: " + applicationId + " user: " - + user + " leaf-queue of parent: " + getQueuePath() - + " #applications: " + getNumApplications()); - } finally { - writeLock.unlock(); - } - } - - private String getParentName() { - return parent != null ? parent.getQueuePath() : ""; - } - - @Override - public CSAssignment assignContainers(Resource clusterResource, - CandidateNodeSet<FiCaSchedulerNode> candidates, - ResourceLimits resourceLimits, SchedulingMode schedulingMode) { - FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates); - - // if our queue cannot access this node, just return - if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY - && !queueNodeLabelsSettings.isAccessibleToPartition(candidates.getPartition())) { - if (LOG.isDebugEnabled()) { - long now = System.currentTimeMillis(); - // Do logging every 1 sec to avoid excessive logging. - if (now - this.lastSkipQueueDebugLoggingTimestamp > 1000) { - LOG.debug("Skip this queue=" + getQueuePath() - + ", because it is not able to access partition=" + candidates - .getPartition()); - this.lastSkipQueueDebugLoggingTimestamp = now; - } - } - - ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, - getParentName(), getQueuePath(), ActivityState.REJECTED, - ActivityDiagnosticConstant.QUEUE_NOT_ABLE_TO_ACCESS_PARTITION); - if (rootQueue) { - ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager, - node); - } - - return CSAssignment.NULL_ASSIGNMENT; - } - - // Check if this queue need more resource, simply skip allocation if this - // queue doesn't need more resources. - if (!super.hasPendingResourceRequest(candidates.getPartition(), - clusterResource, schedulingMode)) { - if (LOG.isDebugEnabled()) { - long now = System.currentTimeMillis(); - // Do logging every 1 sec to avoid excessive logging. - if (now - this.lastSkipQueueDebugLoggingTimestamp > 1000) { - LOG.debug("Skip this queue=" + getQueuePath() - + ", because it doesn't need more resource, schedulingMode=" - + schedulingMode.name() + " node-partition=" + candidates - .getPartition()); - this.lastSkipQueueDebugLoggingTimestamp = now; - } - } - - ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, - getParentName(), getQueuePath(), ActivityState.SKIPPED, - ActivityDiagnosticConstant.QUEUE_DO_NOT_NEED_MORE_RESOURCE); - if (rootQueue) { - ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager, - node); - } - - return CSAssignment.NULL_ASSIGNMENT; - } - - CSAssignment assignment = new CSAssignment(Resources.createResource(0, 0), - NodeType.NODE_LOCAL); - - while (canAssign(clusterResource, node)) { - LOG.debug("Trying to assign containers to child-queue of {}", - getQueuePath()); - - // Are we over maximum-capacity for this queue? - // This will also consider parent's limits and also continuous reservation - // looking - if (!super.canAssignToThisQueue(clusterResource, - candidates.getPartition(), - resourceLimits, Resources - .createResource(getMetrics().getReservedMB(), - getMetrics().getReservedVirtualCores()), schedulingMode)) { - - ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, - getParentName(), getQueuePath(), ActivityState.REJECTED, - ActivityDiagnosticConstant.QUEUE_HIT_MAX_CAPACITY_LIMIT); - if (rootQueue) { - ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager, - node); - } - - break; - } - - // Schedule - CSAssignment assignedToChild = assignContainersToChildQueues( - clusterResource, candidates, resourceLimits, schedulingMode); - assignment.setType(assignedToChild.getType()); - assignment.setRequestLocalityType( - assignedToChild.getRequestLocalityType()); - assignment.setExcessReservation(assignedToChild.getExcessReservation()); - assignment.setContainersToKill(assignedToChild.getContainersToKill()); - assignment.setFulfilledReservation( - assignedToChild.isFulfilledReservation()); - assignment.setFulfilledReservedContainer( - assignedToChild.getFulfilledReservedContainer()); - - // Done if no child-queue assigned anything - if (Resources.greaterThan(resourceCalculator, clusterResource, - assignedToChild.getResource(), Resources.none())) { - - ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, - getParentName(), getQueuePath(), ActivityState.ACCEPTED, - ActivityDiagnosticConstant.EMPTY); - - boolean isReserved = - assignedToChild.getAssignmentInformation().getReservationDetails() - != null && !assignedToChild.getAssignmentInformation() - .getReservationDetails().isEmpty(); - if (rootQueue) { - ActivitiesLogger.NODE.finishAllocatedNodeAllocation( - activitiesManager, node, - assignedToChild.getAssignmentInformation() - .getFirstAllocatedOrReservedContainerId(), - isReserved ? - AllocationState.RESERVED : AllocationState.ALLOCATED); - } - - // Track resource utilization in this pass of the scheduler - Resources.addTo(assignment.getResource(), - assignedToChild.getResource()); - Resources.addTo(assignment.getAssignmentInformation().getAllocated(), - assignedToChild.getAssignmentInformation().getAllocated()); - Resources.addTo(assignment.getAssignmentInformation().getReserved(), - assignedToChild.getAssignmentInformation().getReserved()); - assignment.getAssignmentInformation().incrAllocations( - assignedToChild.getAssignmentInformation().getNumAllocations()); - assignment.getAssignmentInformation().incrReservations( - assignedToChild.getAssignmentInformation().getNumReservations()); - assignment.getAssignmentInformation().getAllocationDetails().addAll( - assignedToChild.getAssignmentInformation() - .getAllocationDetails()); - assignment.getAssignmentInformation().getReservationDetails().addAll( - assignedToChild.getAssignmentInformation() - .getReservationDetails()); - assignment.setIncreasedAllocation( - assignedToChild.isIncreasedAllocation()); - - if (LOG.isDebugEnabled()) { - LOG.debug("assignedContainer reserved=" + isReserved + " queue=" - + getQueuePath() + " usedCapacity=" + getUsedCapacity() - + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used=" - + usageTracker.getQueueUsage().getUsed() + " cluster=" + clusterResource); - - LOG.debug( - "ParentQ=" + getQueuePath() + " assignedSoFarInThisIteration=" - + assignment.getResource() + " usedCapacity=" - + getUsedCapacity() + " absoluteUsedCapacity=" - + getAbsoluteUsedCapacity()); - } - } else{ - assignment.setSkippedType(assignedToChild.getSkippedType()); - - ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, - getParentName(), getQueuePath(), ActivityState.SKIPPED, - ActivityDiagnosticConstant.EMPTY); - if (rootQueue) { - ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager, - node); - } - - break; - } - - /* - * Previously here, we can allocate more than one container for each - * allocation under rootQ. Now this logic is not proper any more - * in global scheduling world. - * - * So here do not try to allocate more than one container for each - * allocation, let top scheduler make the decision. - */ - break; - } - - return assignment; - } - - private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) { - // When node == null means global scheduling is enabled, always return true - if (null == node) { - return true; - } - - // Two conditions need to meet when trying to allocate: - // 1) Node doesn't have reserved container - // 2) Node's available-resource + killable-resource should > 0 - boolean accept = node.getReservedContainer() == null && - Resources.fitsIn(resourceCalculator, queueAllocationSettings.getMinimumAllocation(), - Resources.add(node.getUnallocatedResource(), node.getTotalKillableResources())); - if (!accept) { - ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, - getParentName(), getQueuePath(), ActivityState.REJECTED, - () -> node.getReservedContainer() != null ? - ActivityDiagnosticConstant. - QUEUE_SKIPPED_BECAUSE_SINGLE_NODE_RESERVED : - ActivityDiagnosticConstant. - QUEUE_SKIPPED_BECAUSE_SINGLE_NODE_RESOURCE_INSUFFICIENT); - if (rootQueue) { - ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager, - node); - } - } - return accept; - } - - public ResourceLimits getResourceLimitsOfChild(CSQueue child, - Resource clusterResource, ResourceLimits parentLimits, - String nodePartition, boolean netLimit) { - // Set resource-limit of a given child, child.limit = - // min(my.limit - my.used + child.used, child.max) - - // First, cap parent limit by parent's max - parentLimits.setLimit(Resources.min(resourceCalculator, clusterResource, - parentLimits.getLimit(), - usageTracker.getQueueResourceQuotas().getEffectiveMaxResource(nodePartition))); - - // Parent available resource = parent-limit - parent-used-resource - Resource limit = parentLimits.getLimit(); - if (netLimit) { - limit = parentLimits.getNetLimit(); - } - Resource parentMaxAvailableResource = Resources.subtract( - limit, usageTracker.getQueueUsage().getUsed(nodePartition)); - - // Deduct killable from used - Resources.addTo(parentMaxAvailableResource, - getTotalKillableResource(nodePartition)); - - // Child's limit = parent-available-resource + child-used - Resource childLimit = Resources.add(parentMaxAvailableResource, - child.getQueueResourceUsage().getUsed(nodePartition)); - - // Normalize before return - childLimit = - Resources.roundDown(resourceCalculator, childLimit, - queueAllocationSettings.getMinimumAllocation()); - - return new ResourceLimits(childLimit); - } - - private Iterator<CSQueue> sortAndGetChildrenAllocationIterator( - String partition) { - return queueOrderingPolicy.getAssignmentIterator(partition); - } - - private CSAssignment assignContainersToChildQueues(Resource cluster, - CandidateNodeSet<FiCaSchedulerNode> candidates, ResourceLimits limits, - SchedulingMode schedulingMode) { - CSAssignment assignment = CSAssignment.NULL_ASSIGNMENT; - - printChildQueues(); - - // Try to assign to most 'under-served' sub-queue - for (Iterator<CSQueue> iter = sortAndGetChildrenAllocationIterator( - candidates.getPartition()); iter.hasNext(); ) { - CSQueue childQueue = iter.next(); - LOG.debug("Trying to assign to queue: {} stats: {}", - childQueue.getQueuePath(), childQueue); - - // Get ResourceLimits of child queue before assign containers - ResourceLimits childLimits = - getResourceLimitsOfChild(childQueue, cluster, limits, - candidates.getPartition(), true); - - CSAssignment childAssignment = childQueue.assignContainers(cluster, - candidates, childLimits, schedulingMode); - if(LOG.isDebugEnabled()) { - LOG.debug("Assigned to queue: " + childQueue.getQueuePath() + - " stats: " + childQueue + " --> " + - childAssignment.getResource() + ", " + childAssignment.getType()); - } - - if (Resources.greaterThan( - resourceCalculator, cluster, - childAssignment.getResource(), Resources.none())) { - assignment = childAssignment; - break; - } else if (childAssignment.getSkippedType() == - CSAssignment.SkippedType.QUEUE_LIMIT) { - if (assignment.getSkippedType() != - CSAssignment.SkippedType.QUEUE_LIMIT) { - assignment = childAssignment; - } - Resource blockedHeadroom = null; - if (childQueue instanceof AbstractLeafQueue) { - blockedHeadroom = childLimits.getHeadroom(); - } else { - blockedHeadroom = childLimits.getBlockedHeadroom(); - } - Resource resourceToSubtract = Resources.max(resourceCalculator, - cluster, blockedHeadroom, Resources.none()); - limits.addBlockedHeadroom(resourceToSubtract); - if(LOG.isDebugEnabled()) { - LOG.debug("Decrease parentLimits " + limits.getLimit() + - " for " + this.getQueuePath() + " by " + - resourceToSubtract + " as childQueue=" + - childQueue.getQueuePath() + " is blocked"); - } - } - } - - return assignment; - } - - String getChildQueuesToPrint() { - StringBuilder sb = new StringBuilder(); - for (CSQueue q : childQueues) { - sb.append(q.getQueuePath() + - " usedCapacity=(" + q.getUsedCapacity() + "), " + - " label=(" - + StringUtils.join(q.getAccessibleNodeLabels().iterator(), ",") - + ")"); - } - return sb.toString(); - } - - private void printChildQueues() { - if (LOG.isDebugEnabled()) { - LOG.debug("printChildQueues - queue: " + getQueuePath() - + " child-queues: " + getChildQueuesToPrint()); - } - } - - private void internalReleaseResource(Resource clusterResource, - FiCaSchedulerNode node, Resource releasedResource) { - writeLock.lock(); - try { - super.releaseResource(clusterResource, releasedResource, - node.getPartition()); - - LOG.debug("completedContainer {}, cluster={}", this, clusterResource); - - } finally { - writeLock.unlock(); - } - } - - @Override - public void completedContainer(Resource clusterResource, - FiCaSchedulerApp application, FiCaSchedulerNode node, - RMContainer rmContainer, ContainerStatus containerStatus, - RMContainerEventType event, CSQueue completedChildQueue, - boolean sortQueues) { - if (application != null) { - internalReleaseResource(clusterResource, node, - rmContainer.getContainer().getResource()); - - // Inform the parent - if (parent != null) { - // complete my parent - parent.completedContainer(clusterResource, application, - node, rmContainer, null, event, this, sortQueues); - } - } - } - - @Override - public void refreshAfterResourceCalculation(Resource clusterResource, - ResourceLimits resourceLimits) { - CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, - this, labelManager, null); - // Update configured capacity/max-capacity for default partition only - CSQueueUtils.updateConfiguredCapacityMetrics(resourceCalculator, - labelManager.getResourceByLabel(null, clusterResource), - RMNodeLabelsManager.NO_LABEL, this); - } - - @Override - public void updateClusterResource(Resource clusterResource, - ResourceLimits resourceLimits) { - writeLock.lock(); - try { - // Special handle root queue - if (rootQueue) { - for (String nodeLabel : queueCapacities.getExistingNodeLabels()) { - if (queueCapacities.getWeight(nodeLabel) > 0) { - queueCapacities.setNormalizedWeight(nodeLabel, 1f); - } - } - } - - // Update absolute capacities of this queue, this need to happen before - // below calculation for effective capacities - updateAbsoluteCapacities(); - - // Normalize all dynamic queue queue's weight to 1 for all accessible node - // labels, this is important because existing node labels could keep - // changing when new node added, or node label mapping changed. We need - // this to ensure auto created queue can access all labels. - for (String nodeLabel : queueCapacities.getExistingNodeLabels()) { - for (CSQueue queue : childQueues) { - // For dynamic queue, we will set weight to 1 every time, because it - // is possible new labels added to the parent. - if (((AbstractCSQueue) queue).isDynamicQueue()) { - if (queue.getQueueCapacities().getWeight(nodeLabel) == -1f) { - queue.getQueueCapacities().setWeight(nodeLabel, 1f); - } - } - } - } - - // Normalize weight of children - if (getCapacityConfigurationTypeForQueues(childQueues) - == QueueCapacityType.WEIGHT) { - for (String nodeLabel : queueCapacities.getExistingNodeLabels()) { - float sumOfWeight = 0; - - for (CSQueue queue : childQueues) { - if (queue.getQueueCapacities().getExistingNodeLabels() - .contains(nodeLabel)) { - float weight = Math.max(0, - queue.getQueueCapacities().getWeight(nodeLabel)); - sumOfWeight += weight; - } - } - // When sum of weight == 0, skip setting normalized_weight (so - // normalized weight will be 0). - if (Math.abs(sumOfWeight) > 1e-6) { - for (CSQueue queue : childQueues) { - if (queue.getQueueCapacities().getExistingNodeLabels() - .contains(nodeLabel)) { - queue.getQueueCapacities().setNormalizedWeight(nodeLabel, - queue.getQueueCapacities().getWeight(nodeLabel) / - sumOfWeight); - } - } - } - } - } - - // Update effective capacity in all parent queue. - for (String label : queueNodeLabelsSettings.getConfiguredNodeLabels()) { - calculateEffectiveResourcesAndCapacity(label, clusterResource); - } - - // Update all children - for (CSQueue childQueue : childQueues) { - // Get ResourceLimits of child queue before assign containers - ResourceLimits childLimits = getResourceLimitsOfChild(childQueue, - clusterResource, resourceLimits, - RMNodeLabelsManager.NO_LABEL, false); - childQueue.updateClusterResource(clusterResource, childLimits); - } - - CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, - this, labelManager, null); - // Update configured capacity/max-capacity for default partition only - CSQueueUtils.updateConfiguredCapacityMetrics(resourceCalculator, - labelManager.getResourceByLabel(null, clusterResource), - RMNodeLabelsManager.NO_LABEL, this); - } catch (IOException e) { - LOG.error("Error during updating cluster resource: ", e); - throw new YarnRuntimeException("Fatal issue during scheduling", e); - } finally { - writeLock.unlock(); - } - } - - @Override - public boolean hasChildQueues() { - return true; - } - - private void calculateEffectiveResourcesAndCapacity(String label, - Resource clusterResource) { - // Update effective resources for my self; - if (rootQueue) { - Resource resourceByLabel = labelManager.getResourceByLabel(label, clusterResource); - usageTracker.getQueueResourceQuotas().setEffectiveMinResource(label, resourceByLabel); - usageTracker.getQueueResourceQuotas().setEffectiveMaxResource(label, resourceByLabel); - } else { - super.updateEffectiveResources(clusterResource); - } - - recalculateEffectiveMinRatio(label, clusterResource); - } - - private void recalculateEffectiveMinRatio(String label, Resource clusterResource) { - // For root queue, ensure that max/min resource is updated to latest - // cluster resource. - Resource resourceByLabel = labelManager.getResourceByLabel(label, clusterResource); - - // Total configured min resources of direct children of this given parent queue - Resource configuredMinResources = Resource.newInstance(0L, 0); - for (CSQueue childQueue : getChildQueues()) { - Resources.addTo(configuredMinResources, - childQueue.getQueueResourceQuotas().getConfiguredMinResource(label)); - } - - // Factor to scale down effective resource: When cluster has sufficient - // resources, effective_min_resources will be same as configured min_resources. - Resource numeratorForMinRatio = null; - if (getQueuePath().equals("root")) { - if (!resourceByLabel.equals(Resources.none()) && Resources.lessThan(resourceCalculator, - clusterResource, resourceByLabel, configuredMinResources)) { - numeratorForMinRatio = resourceByLabel; - } - } else { - if (Resources.lessThan(resourceCalculator, clusterResource, - usageTracker.getQueueResourceQuotas().getEffectiveMinResource(label), - configuredMinResources)) { - numeratorForMinRatio = usageTracker.getQueueResourceQuotas().getEffectiveMinResource(label); - } - } - - effectiveMinResourceRatio.put(label, getEffectiveMinRatio( - configuredMinResources, numeratorForMinRatio)); - } - - private Map<String, Float> getEffectiveMinRatio( - Resource configuredMinResources, Resource numeratorForMinRatio) { - Map<String, Float> effectiveMinRatioPerResource = new HashMap<>(); - if (numeratorForMinRatio != null) { - int maxLength = ResourceUtils.getNumberOfCountableResourceTypes(); - for (int i = 0; i < maxLength; i++) { - ResourceInformation nResourceInformation = numeratorForMinRatio - .getResourceInformation(i); - ResourceInformation dResourceInformation = configuredMinResources - .getResourceInformation(i); - - long nValue = nResourceInformation.getValue(); - long dValue = UnitsConversionUtil.convert( - dResourceInformation.getUnits(), nResourceInformation.getUnits(), - dResourceInformation.getValue()); - if (dValue != 0) { - effectiveMinRatioPerResource.put(nResourceInformation.getName(), - (float) nValue / dValue); - } - } - } - return ImmutableMap.copyOf(effectiveMinRatioPerResource); - } - - @Override - public List<CSQueue> getChildQueues() { - readLock.lock(); - try { - return new ArrayList<CSQueue>(childQueues); - } finally { - readLock.unlock(); - } - - } - - @Override - public void recoverContainer(Resource clusterResource, - SchedulerApplicationAttempt attempt, RMContainer rmContainer) { - if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { - return; - } - if (rmContainer.getExecutionType() != ExecutionType.GUARANTEED) { - return; - } - - // Careful! Locking order is important! - writeLock.lock(); - try { - FiCaSchedulerNode node = queueContext.getNode( - rmContainer.getContainer().getNodeId()); - allocateResource(clusterResource, - rmContainer.getContainer().getResource(), node.getPartition()); - } finally { - writeLock.unlock(); - } - - if (parent != null) { - parent.recoverContainer(clusterResource, attempt, rmContainer); - } - } - - @Override - public ActiveUsersManager getAbstractUsersManager() { - // Should never be called since all applications are submitted to LeafQueues - return null; - } - - @Override - public void collectSchedulerApplications( - Collection<ApplicationAttemptId> apps) { - readLock.lock(); - try { - for (CSQueue queue : childQueues) { - queue.collectSchedulerApplications(apps); - } - } finally { - readLock.unlock(); - } - - } - - @Override - public void attachContainer(Resource clusterResource, - FiCaSchedulerApp application, RMContainer rmContainer) { - if (application != null) { - FiCaSchedulerNode node = - queueContext.getNode(rmContainer.getContainer().getNodeId()); - allocateResource(clusterResource, rmContainer.getContainer() - .getResource(), node.getPartition()); - LOG.info("movedContainer" + " queueMoveIn=" + getQueuePath() - + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" - + getAbsoluteUsedCapacity() + " used=" + usageTracker.getQueueUsage().getUsed() + - " cluster=" + clusterResource); - // Inform the parent - if (parent != null) { - parent.attachContainer(clusterResource, application, rmContainer); - } - } - } - - @Override - public void detachContainer(Resource clusterResource, - FiCaSchedulerApp application, RMContainer rmContainer) { - if (application != null) { - FiCaSchedulerNode node = - queueContext.getNode(rmContainer.getContainer().getNodeId()); - super.releaseResource(clusterResource, - rmContainer.getContainer().getResource(), - node.getPartition()); - LOG.info("movedContainer" + " queueMoveOut=" + getQueuePath() - + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" - + getAbsoluteUsedCapacity() + " used=" + usageTracker.getQueueUsage().getUsed() + - " cluster=" + clusterResource); - // Inform the parent - if (parent != null) { - parent.detachContainer(clusterResource, application, rmContainer); - } - } - } - - public int getNumApplications() { - return numApplications; - } - - void allocateResource(Resource clusterResource, - Resource resource, String nodePartition) { - writeLock.lock(); - try { - super.allocateResource(clusterResource, resource, nodePartition); - - /** - * check if we need to kill (killable) containers if maximum resource violated. - * Doing this because we will deduct killable resource when going from root. - * For example: - * <pre> - * Root - * / \ - * a b - * / \ - * a1 a2 - * </pre> - * - * a: max=10G, used=10G, killable=2G - * a1: used=8G, killable=2G - * a2: used=2G, pending=2G, killable=0G - * - * When we get queue-a to allocate resource, even if queue-a - * reaches its max resource, we deduct its used by killable, so we can allocate - * at most 2G resources. ResourceLimits passed down to a2 has headroom set to 2G. - * - * If scheduler finds a 2G available resource in existing cluster, and assigns it - * to a2, now a2's used= 2G + 2G = 4G, and a's used = 8G + 4G = 12G > 10G - * - * When this happens, we have to preempt killable container (on same or different - * nodes) of parent queue to avoid violating parent's max resource. - */ - if (!usageTracker.getQueueResourceQuotas().getEffectiveMaxResource(nodePartition) - .equals(Resources.none())) { - if (Resources.lessThan(resourceCalculator, clusterResource, - usageTracker.getQueueResourceQuotas().getEffectiveMaxResource(nodePartition), - usageTracker.getQueueUsage().getUsed(nodePartition))) { - killContainersToEnforceMaxQueueCapacity(nodePartition, - clusterResource); - } - } else { - if (getQueueCapacities() - .getAbsoluteMaximumCapacity(nodePartition) < getQueueCapacities() - .getAbsoluteUsedCapacity(nodePartition)) { - killContainersToEnforceMaxQueueCapacity(nodePartition, - clusterResource); - } - } - } finally { - writeLock.unlock(); - } - } - - private void killContainersToEnforceMaxQueueCapacity(String partition, - Resource clusterResource) { - Iterator<RMContainer> killableContainerIter = getKillableContainers( - partition); - if (!killableContainerIter.hasNext()) { - return; - } - - Resource partitionResource = labelManager.getResourceByLabel(partition, - null); - Resource maxResource = getEffectiveMaxCapacity(partition); - - while (Resources.greaterThan(resourceCalculator, partitionResource, - usageTracker.getQueueUsage().getUsed(partition), maxResource)) { - RMContainer toKillContainer = killableContainerIter.next(); - FiCaSchedulerApp attempt = queueContext.getApplicationAttempt( - toKillContainer.getContainerId().getApplicationAttemptId()); - FiCaSchedulerNode node = queueContext.getNode( - toKillContainer.getAllocatedNode()); - if (null != attempt && null != node) { - AbstractLeafQueue lq = attempt.getCSLeafQueue(); - lq.completedContainer(clusterResource, attempt, node, toKillContainer, - SchedulerUtils.createPreemptedContainerStatus( - toKillContainer.getContainerId(), - SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL, - null, false); - LOG.info("Killed container=" + toKillContainer.getContainerId() - + " from queue=" + lq.getQueuePath() + " to make queue=" + this - .getQueuePath() + "'s max-capacity enforced"); - } - - if (!killableContainerIter.hasNext()) { - break; - } - } - } - - public void apply(Resource cluster, - ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) { - if (request.anythingAllocatedOrReserved()) { - ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> - allocation = request.getFirstAllocatedOrReservedContainer(); - SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> - schedulerContainer = allocation.getAllocatedOrReservedContainer(); - - // Do not modify queue when allocation from reserved container - if (allocation.getAllocateFromReservedContainer() == null) { - writeLock.lock(); - try { - // Book-keeping - // Note: Update headroom to account for current allocation too... - allocateResource(cluster, allocation.getAllocatedOrReservedResource(), - schedulerContainer.getNodePartition()); - - LOG.info("assignedContainer" + " queue=" + getQueuePath() - + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" - + getAbsoluteUsedCapacity() + " used=" + usageTracker.getQueueUsage().getUsed() - + " cluster=" + cluster); - } finally { - writeLock.unlock(); - } - } - } - - if (parent != null) { - parent.apply(cluster, request); - } - } - - @Override - public void stopQueue() { - this.writeLock.lock(); - try { - if (getNumApplications() > 0) { - updateQueueState(QueueState.DRAINING); - } else { - updateQueueState(QueueState.STOPPED); - } - if (getChildQueues() != null) { - for(CSQueue child : getChildQueues()) { - child.stopQueue(); - } - } - } finally { - this.writeLock.unlock(); - } - } - - public QueueOrderingPolicy getQueueOrderingPolicy() { - return queueOrderingPolicy; - } - - @Override - int getNumRunnableApps() { - readLock.lock(); - try { - return runnableApps; - } finally { - readLock.unlock(); - } - } - - void incrementRunnableApps() { - writeLock.lock(); - try { - runnableApps++; - } finally { - writeLock.unlock(); - } - } - - void decrementRunnableApps() { - writeLock.lock(); - try { - runnableApps--; - } finally { - writeLock.unlock(); - } - } - - Map<String, Float> getEffectiveMinRatio(String label) { - return effectiveMinResourceRatio.get(label); - } - - @Override - public boolean isEligibleForAutoDeletion() { - return isDynamicQueue() && getChildQueues().size() == 0 && - queueContext.getConfiguration(). - isAutoExpiredDeletionEnabled(this.getQueuePath()); - } - - public AutoCreatedQueueTemplate getAutoCreatedQueueTemplate() { - return autoCreatedQueueTemplate; - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java index 5051c739d5c..903539fcf18 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java @@ -47,6 +47,7 @@ public class PlanQueue extends AbstractManagedParentQueue { public PlanQueue(CapacitySchedulerQueueContext queueContext, String queueName, CSQueue parent, CSQueue old) throws IOException { super(queueContext, queueName, parent, old); + super.setupQueueConfigs(queueContext.getClusterResource()); updateAbsoluteCapacities(); // Set the reservation queue attributes for the Plan diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java index 7461ef76aa2..165a2d4d04a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractParentQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; @@ -31,7 +32,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AutoCrea import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AutoCreatedQueueManagementPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ManagedParentQueue; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueManagementChange; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; @@ -251,7 +251,7 @@ public class GuaranteedOrZeroCapacityOverTimePolicy } @Override - public void init(final ParentQueue parentQueue) throws IOException { + public void init(final AbstractParentQueue parentQueue) throws IOException { ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); readLock = lock.readLock(); writeLock = lock.writeLock(); @@ -533,7 +533,7 @@ public class GuaranteedOrZeroCapacityOverTimePolicy * queues */ private Map<String, QueueCapacities> deactivateLeafQueuesIfInActive( - ParentQueue parentQueue, String nodeLabel, + AbstractParentQueue parentQueue, String nodeLabel, LeafQueueEntitlements leafQueueEntitlements) throws SchedulerDynamicEditException { Map<String, QueueCapacities> deactivatedQueues = new HashMap<>(); @@ -658,7 +658,7 @@ public class GuaranteedOrZeroCapacityOverTimePolicy } @Override - public void reinitialize(final ParentQueue parentQueue) throws IOException { + public void reinitialize(final AbstractParentQueue parentQueue) throws IOException { if (!(parentQueue instanceof ManagedParentQueue)) { throw new IllegalStateException( "Expected instance of type " + ManagedParentQueue.class + " found " diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/QueueManagementChangeEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/QueueManagementChangeEvent.java index 926e1be6668..d56f2eb9592 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/QueueManagementChangeEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/QueueManagementChangeEvent.java @@ -17,8 +17,7 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity - .ParentQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity .QueueManagementChange; @@ -29,17 +28,17 @@ import java.util.List; */ public class QueueManagementChangeEvent extends SchedulerEvent { - private ParentQueue parentQueue; + private AbstractParentQueue parentQueue; private List<QueueManagementChange> queueManagementChanges; - public QueueManagementChangeEvent(ParentQueue parentQueue, + public QueueManagementChangeEvent(AbstractParentQueue parentQueue, List<QueueManagementChange> queueManagementChanges) { super(SchedulerEventType.MANAGE_QUEUE); this.parentQueue = parentQueue; this.queueManagementChanges = queueManagementChanges; } - public ParentQueue getParentQueue() { + public AbstractParentQueue getParentQueue() { return parentQueue; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerInfo.java index b8d688e7cb7..9e3c15b0273 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerInfo.java @@ -26,10 +26,10 @@ import javax.xml.bind.annotation.XmlType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractLeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.helper.CapacitySchedulerInfoHelper; import java.util.ArrayList; @@ -100,8 +100,8 @@ public class CapacitySchedulerInfo extends SchedulerInfo { queueAcls.addAll(getSortedQueueAclInfoList(parent, queueName, conf)); queuePriority = parent.getPriority().getPriority(); - if (parent instanceof ParentQueue) { - ParentQueue queue = (ParentQueue) parent; + if (parent instanceof AbstractParentQueue) { + AbstractParentQueue queue = (AbstractParentQueue) parent; orderingPolicyInfo = queue.getQueueOrderingPolicy() .getConfigName(); autoQueueTemplateProperties = CapacitySchedulerInfoHelper diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerQueueInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerQueueInfo.java index dc830f56ede..b79a71712e5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerQueueInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerQueueInfo.java @@ -36,10 +36,10 @@ import org.apache.hadoop.yarn.security.AccessType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.helper.CapacitySchedulerInfoHelper; @@ -160,8 +160,8 @@ public class CapacitySchedulerQueueInfo { queueAcls.addAll(getSortedQueueAclInfoList(q, queuePath, conf)); queuePriority = q.getPriority().getPriority(); - if (q instanceof ParentQueue) { - ParentQueue queue = (ParentQueue) q; + if (q instanceof AbstractParentQueue) { + AbstractParentQueue queue = (AbstractParentQueue) q; orderingPolicyInfo = queue.getQueueOrderingPolicy() .getConfigName(); autoQueueTemplateProperties = CapacitySchedulerInfoHelper diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/helper/CapacitySchedulerInfoHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/helper/CapacitySchedulerInfoHelper.java index 0ba9bbb8418..a4f2cff257c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/helper/CapacitySchedulerInfoHelper.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/helper/CapacitySchedulerInfoHelper.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.helper; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractLeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AutoCreatedLeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ManagedParentQueue; @@ -84,7 +85,7 @@ public class CapacitySchedulerInfoHelper { public static String getQueueType(CSQueue queue) { if (queue instanceof AbstractLeafQueue) { return LEAF_QUEUE; - } else if (queue instanceof ParentQueue) { + } else if (queue instanceof AbstractParentQueue) { return PARENT_QUEUE; } return UNKNOWN_QUEUE; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java index 47db9565af1..c32dc8124bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java @@ -517,12 +517,12 @@ public class TestCapacitySchedulerNewQueueAutoCreation empty = cs.getQueue("root.empty-auto-parent"); Assert.assertTrue("empty-auto-parent is not a ParentQueue", - empty instanceof ParentQueue); + empty instanceof AbstractParentQueue); Assert.assertEquals("empty-auto-parent has children", 0, empty.getChildQueues().size()); Assert.assertTrue("empty-auto-parent is not eligible " + "for auto queue creation", - ((ParentQueue)empty).isEligibleForAutoQueueCreation()); + ((AbstractParentQueue)empty).isEligibleForAutoQueueCreation()); } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueues.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueues.java index fc1870097e9..302832f711d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueues.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueues.java @@ -712,7 +712,7 @@ public class TestCapacitySchedulerQueues { + " to convert a leaf queue WITHOUT running apps"); } b1 = cs.getQueue(targetQueue); - Assert.assertTrue(b1 instanceof ParentQueue); + Assert.assertTrue(b1 instanceof AbstractParentQueue); Assert.assertEquals(QueueState.RUNNING, b1.getState()); Assert.assertTrue(!b1.getChildQueues().isEmpty()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java index 1af3563c527..52bafa792ef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java @@ -137,8 +137,8 @@ public class TestChildQueueOrder { " alloc=" + allocation + " node=" + node.getNodeName()); } final Resource allocatedResource = Resources.createResource(allocation); - if (queue instanceof ParentQueue) { - ((ParentQueue)queue).allocateResource(clusterResource, + if (queue instanceof AbstractParentQueue) { + ((AbstractParentQueue)queue).allocateResource(clusterResource, allocatedResource, RMNodeLabelsManager.NO_LABEL); } else { FiCaSchedulerApp app1 = getMockApplication(0, ""); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java index 476abc638fb..6051523074a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java @@ -199,8 +199,8 @@ public class TestParentQueue { " alloc=" + allocation + " node=" + node.getNodeName()); } final Resource allocatedResource = Resources.createResource(allocation); - if (queue instanceof ParentQueue) { - ((ParentQueue)queue).allocateResource(clusterResource, + if (queue instanceof AbstractParentQueue) { + ((AbstractParentQueue)queue).allocateResource(clusterResource, allocatedResource, RMNodeLabelsManager.NO_LABEL); } else { FiCaSchedulerApp app1 = getMockApplication(0, ""); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org