YARN-7394. Merge code paths for Reservation/Plan queues and Auto Created queues. (Suma Shivaprasad via wangda)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/13fa2d4e Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/13fa2d4e Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/13fa2d4e Branch: refs/heads/YARN-6592 Commit: 13fa2d4e3e55a849dcd7e472750f3e0422cc2ac9 Parents: 8f214dc Author: Wangda Tan <wan...@apache.org> Authored: Mon Nov 6 21:38:24 2017 -0800 Committer: Wangda Tan <wan...@apache.org> Committed: Mon Nov 6 21:38:24 2017 -0800 ---------------------------------------------------------------------- .../CapacitySchedulerPlanFollower.java | 10 +- .../capacity/AbstractManagedParentQueue.java | 232 +++++++++++++++++++ .../capacity/AutoCreatedLeafQueue.java | 129 +++++++++++ .../scheduler/capacity/CapacityScheduler.java | 55 +++-- .../capacity/CapacitySchedulerQueueManager.java | 4 +- .../scheduler/capacity/LeafQueue.java | 4 + .../scheduler/capacity/ParentQueue.java | 13 ++ .../scheduler/capacity/PlanQueue.java | 191 ++------------- .../scheduler/capacity/ReservationQueue.java | 122 ---------- .../capacity/TestAutoCreatedLeafQueue.java | 113 +++++++++ .../TestCapacitySchedulerDynamicBehavior.java | 32 +-- .../capacity/TestReservationQueue.java | 110 --------- 12 files changed, 569 insertions(+), 446 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/13fa2d4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java index 551f075..2e16689 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java @@ -28,10 +28,10 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AutoCreatedLeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ReservationQueue; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.resource.Resources; import org.slf4j.Logger; @@ -92,8 +92,8 @@ public class CapacitySchedulerPlanFollower extends AbstractSchedulerPlanFollower String planQueueName, Queue queue, String currResId) { PlanQueue planQueue = (PlanQueue)queue; try { - ReservationQueue resQueue = - new ReservationQueue(cs, currResId, planQueue); + AutoCreatedLeafQueue resQueue = + new AutoCreatedLeafQueue(cs, currResId, planQueue); cs.addQueue(resQueue); } catch (SchedulerDynamicEditException e) { LOG.warn( @@ -112,8 +112,8 @@ public class CapacitySchedulerPlanFollower extends AbstractSchedulerPlanFollower PlanQueue planQueue = (PlanQueue)queue; if (cs.getQueue(defReservationId) == null) { try { - ReservationQueue defQueue = - new ReservationQueue(cs, defReservationId, planQueue); + AutoCreatedLeafQueue defQueue = + new AutoCreatedLeafQueue(cs, defReservationId, planQueue); cs.addQueue(defQueue); } catch (SchedulerDynamicEditException e) { LOG.warn( http://git-wip-us.apache.org/repos/asf/hadoop/blob/13fa2d4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java new file mode 100644 index 0000000..b3d1b47 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Iterator; + +/** + * A container class for automatically created child leaf queues. + * From the user perspective this is equivalent to a LeafQueue, + * but functionality wise is a sub-class of ParentQueue + */ +public abstract class AbstractManagedParentQueue extends ParentQueue { + + private static final Logger LOG = LoggerFactory.getLogger( + AbstractManagedParentQueue.class); + + private int maxAppsForAutoCreatedQueues; + private int maxAppsPerUserForAutoCreatedQueues; + private int userLimit; + private float userLimitFactor; + + public AbstractManagedParentQueue(CapacitySchedulerContext cs, + String queueName, CSQueue parent, CSQueue old) throws IOException { + super(cs, queueName, parent, old); + + super.setupQueueConfigs(csContext.getClusterResource()); + initializeLeafQueueConfigs(); + + StringBuffer queueInfo = new StringBuffer(); + queueInfo.append("Created Managed Parent Queue: ").append(queueName) + .append("\nof type : [" + getClass()) + .append("]\nwith capacity: [") + .append(super.getCapacity()).append("]\nwith max capacity: [") + .append(super.getMaximumCapacity()).append("\nwith max apps: [") + .append(getMaxApplicationsForAutoCreatedQueues()) + .append("]\nwith max apps per user: [") + .append(getMaxApplicationsPerUserForAutoCreatedQueues()) + .append("]\nwith user limit: [").append(getUserLimit()) + .append("]\nwith user limit factor: [") + .append(getUserLimitFactor()).append("]."); + LOG.info(queueInfo.toString()); + } + + @Override + public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) + throws IOException { + try { + writeLock.lock(); + + // Set new configs + setupQueueConfigs(clusterResource); + + initializeLeafQueueConfigs(); + + // run reinitialize on each existing queue, to trigger absolute cap + // recomputations + for (CSQueue res : this.getChildQueues()) { + res.reinitialize(res, clusterResource); + } + } finally { + writeLock.unlock(); + } + } + + /** + * Initialize leaf queue configs from template configurations specified on + * parent queue. + */ + protected void initializeLeafQueueConfigs() { + + CapacitySchedulerConfiguration conf = csContext.getConfiguration(); + + final String queuePath = super.getQueuePath(); + int maxApps = conf.getMaximumApplicationsPerQueue(queuePath); + if (maxApps < 0) { + maxApps = (int) ( + CapacitySchedulerConfiguration.DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS + * getAbsoluteCapacity()); + } + userLimit = conf.getUserLimit(queuePath); + userLimitFactor = conf.getUserLimitFactor(queuePath); + maxAppsForAutoCreatedQueues = maxApps; + maxAppsPerUserForAutoCreatedQueues = + (int) (maxApps * (userLimit / 100.0f) * userLimitFactor); + + } + + /** + * Number of maximum applications for each of the auto created leaf queues. + * + * @return maxAppsForAutoCreatedQueues + */ + public int getMaxApplicationsForAutoCreatedQueues() { + return maxAppsForAutoCreatedQueues; + } + + /** + * Number of maximum applications per user for each of the auto created + * leaf queues. + * + * @return maxAppsPerUserForAutoCreatedQueues + */ + public int getMaxApplicationsPerUserForAutoCreatedQueues() { + return maxAppsPerUserForAutoCreatedQueues; + } + + /** + * User limit value for each of the auto created leaf queues. + * + * @return userLimit + */ + public int getUserLimitForAutoCreatedQueues() { + return userLimit; + } + + /** + * User limit factor value for each of the auto created leaf queues. + * + * @return userLimitFactor + */ + public float getUserLimitFactor() { + return userLimitFactor; + } + + public int getMaxAppsForAutoCreatedQueues() { + return maxAppsForAutoCreatedQueues; + } + + public int getMaxAppsPerUserForAutoCreatedQueues() { + return maxAppsPerUserForAutoCreatedQueues; + } + + public int getUserLimit() { + return userLimit; + } + + /** + * Add the specified child queue. + * @param childQueue reference to the child queue to be added + * @throws SchedulerDynamicEditException + */ + public void addChildQueue(CSQueue childQueue) + throws SchedulerDynamicEditException { + try { + writeLock.lock(); + if (childQueue.getCapacity() > 0) { + throw new SchedulerDynamicEditException( + "Queue " + childQueue + " being added has non zero capacity."); + } + boolean added = this.childQueues.add(childQueue); + if (LOG.isDebugEnabled()) { + LOG.debug("updateChildQueues (action: add queue): " + added + " " + + getChildQueuesToPrint()); + } + } finally { + writeLock.unlock(); + } + } + + /** + * Remove the specified child queue. + * @param childQueue reference to the child queue to be removed + * @throws SchedulerDynamicEditException + */ + public void removeChildQueue(CSQueue childQueue) + throws SchedulerDynamicEditException { + try { + writeLock.lock(); + if (childQueue.getCapacity() > 0) { + throw new SchedulerDynamicEditException( + "Queue " + childQueue + " being removed has non zero capacity."); + } + Iterator<CSQueue> qiter = childQueues.iterator(); + while (qiter.hasNext()) { + CSQueue cs = qiter.next(); + if (cs.equals(childQueue)) { + qiter.remove(); + if (LOG.isDebugEnabled()) { + LOG.debug("Removed child queue: {}" + cs.getQueueName()); + } + } + } + } finally { + writeLock.unlock(); + } + } + + /** + * Remove the specified child queue. + * @param childQueueName name of the child queue to be removed + * @throws SchedulerDynamicEditException + */ + public CSQueue removeChildQueue(String childQueueName) + throws SchedulerDynamicEditException { + CSQueue childQueue; + try { + writeLock.lock(); + childQueue = this.csContext.getCapacitySchedulerQueueManager().getQueue( + childQueueName); + if (childQueue != null) { + removeChildQueue(childQueue); + } else { + throw new SchedulerDynamicEditException("Cannot find queue to delete " + + ": " + childQueueName); + } + } finally { + writeLock.unlock(); + } + return childQueue; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/13fa2d4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java new file mode 100644 index 0000000..4eb7cdd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Leaf queues which are auto created by an underkying implementation of + * AbstractManagedParentQueue. Eg: PlanQueue for reservations or + * ManagedParentQueue for auto created dynamic queues + */ +public class AutoCreatedLeafQueue extends LeafQueue { + + private static final Logger LOG = LoggerFactory + .getLogger(AutoCreatedLeafQueue.class); + + private AbstractManagedParentQueue parent; + + public AutoCreatedLeafQueue(CapacitySchedulerContext cs, String queueName, + AbstractManagedParentQueue parent) throws IOException { + super(cs, queueName, parent, null); + + updateApplicationAndUserLimits(parent.getUserLimitForAutoCreatedQueues(), + parent.getUserLimitFactor(), + parent.getMaxApplicationsForAutoCreatedQueues(), + parent.getMaxApplicationsPerUserForAutoCreatedQueues()); + + this.parent = parent; + } + + @Override + public void reinitialize(CSQueue newlyParsedQueue, + Resource clusterResource) throws IOException { + try { + writeLock.lock(); + + validate(newlyParsedQueue); + + super.reinitialize(newlyParsedQueue, clusterResource); + CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, + this, labelManager, null); + + updateApplicationAndUserLimits(parent.getUserLimitForAutoCreatedQueues(), + parent.getUserLimitFactor(), + parent.getMaxApplicationsForAutoCreatedQueues(), + parent.getMaxApplicationsPerUserForAutoCreatedQueues()); + + } finally { + writeLock.unlock(); + } + } + + /** + * This methods to change capacity for a queue and adjusts its + * absoluteCapacity. + * + * @param entitlement the new entitlement for the queue (capacity, + * maxCapacity) + * @throws SchedulerDynamicEditException + */ + public void setEntitlement(QueueEntitlement entitlement) + throws SchedulerDynamicEditException { + try { + writeLock.lock(); + float capacity = entitlement.getCapacity(); + if (capacity < 0 || capacity > 1.0f) { + throw new SchedulerDynamicEditException( + "Capacity demand is not in the [0,1] range: " + capacity); + } + setCapacity(capacity); + setAbsoluteCapacity(getParent().getAbsoluteCapacity() * getCapacity()); + setMaxCapacity(entitlement.getMaxCapacity()); + if (LOG.isDebugEnabled()) { + LOG.debug("successfully changed to " + capacity + " for queue " + this + .getQueueName()); + } + } finally { + writeLock.unlock(); + } + } + + private void validate(final CSQueue newlyParsedQueue) throws IOException { + if (!(newlyParsedQueue instanceof AutoCreatedLeafQueue) || !newlyParsedQueue + .getQueuePath().equals(getQueuePath())) { + throw new IOException( + "Error trying to reinitialize " + getQueuePath() + " from " + + newlyParsedQueue.getQueuePath()); + } + + } + + @Override + protected void setupConfigurableCapacities() { + CSQueueUtils.updateAndCheckCapacitiesByLabel(getQueuePath(), + queueCapacities, parent == null ? null : parent.getQueueCapacities()); + } + + private void updateApplicationAndUserLimits(int userLimit, + float userLimitFactor, + int maxAppsForAutoCreatedQueues, + int maxAppsPerUserForAutoCreatedQueues) { + setUserLimit(userLimit); + setUserLimitFactor(userLimitFactor); + setMaxApplications(maxAppsForAutoCreatedQueues); + setMaxApplicationsPerUser(maxAppsPerUserForAutoCreatedQueues); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/13fa2d4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index ca289b1..db69042 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1921,12 +1921,12 @@ public class CapacityScheduler extends writeLock.lock(); LOG.info("Removing queue: " + queueName); CSQueue q = this.getQueue(queueName); - if (!(q instanceof ReservationQueue)) { + if (!(q instanceof AutoCreatedLeafQueue)) { throw new SchedulerDynamicEditException( "The queue that we are asked " + "to remove (" + queueName - + ") is not a ReservationQueue"); + + ") is not a AutoCreatedLeafQueue"); } - ReservationQueue disposableLeafQueue = (ReservationQueue) q; + AutoCreatedLeafQueue disposableLeafQueue = (AutoCreatedLeafQueue) q; // at this point we should have no more apps if (disposableLeafQueue.getNumApplications() > 0) { throw new SchedulerDynamicEditException( @@ -1936,9 +1936,11 @@ public class CapacityScheduler extends + " pending apps"); } - ((PlanQueue) disposableLeafQueue.getParent()).removeChildQueue(q); + ((AbstractManagedParentQueue) disposableLeafQueue.getParent()) + .removeChildQueue(q); this.queueManager.removeQueue(queueName); - LOG.info("Removal of ReservationQueue " + queueName + " has succeeded"); + LOG.info("Removal of AutoCreatedLeafQueue " + + queueName + " has succeeded"); } finally { writeLock.unlock(); } @@ -1949,25 +1951,28 @@ public class CapacityScheduler extends throws SchedulerDynamicEditException { try { writeLock.lock(); - if (!(queue instanceof ReservationQueue)) { + if (!(queue instanceof AutoCreatedLeafQueue)) { throw new SchedulerDynamicEditException( - "Queue " + queue.getQueueName() + " is not a ReservationQueue"); + "Queue " + queue.getQueueName() + " is not a AutoCreatedLeafQueue"); } - ReservationQueue newQueue = (ReservationQueue) queue; + AutoCreatedLeafQueue newQueue = (AutoCreatedLeafQueue) queue; - if (newQueue.getParent() == null || !(newQueue - .getParent() instanceof PlanQueue)) { + if (newQueue.getParent() == null + || !(AbstractManagedParentQueue.class. + isAssignableFrom(newQueue.getParent().getClass()))) { throw new SchedulerDynamicEditException( "ParentQueue for " + newQueue.getQueueName() - + " is not properly set (should be set and be a PlanQueue)"); + + " is not properly set" + + " (should be set and be a PlanQueue or ManagedParentQueue)"); } - PlanQueue parentPlan = (PlanQueue) newQueue.getParent(); + AbstractManagedParentQueue parentPlan = + (AbstractManagedParentQueue) newQueue.getParent(); String queuename = newQueue.getQueueName(); parentPlan.addChildQueue(newQueue); this.queueManager.addQueue(queuename, newQueue); - LOG.info("Creation of ReservationQueue " + newQueue + " succeeded"); + LOG.info("Creation of AutoCreatedLeafQueue " + newQueue + " succeeded"); } finally { writeLock.unlock(); } @@ -1981,21 +1986,22 @@ public class CapacityScheduler extends LeafQueue queue = this.queueManager.getAndCheckLeafQueue(inQueue); ParentQueue parent = (ParentQueue) queue.getParent(); - if (!(queue instanceof ReservationQueue)) { + if (!(queue instanceof AutoCreatedLeafQueue)) { throw new SchedulerDynamicEditException( "Entitlement can not be" + " modified dynamically since queue " - + inQueue + " is not a ReservationQueue"); + + inQueue + " is not a AutoCreatedLeafQueue"); } - if (!(parent instanceof PlanQueue)) { + if (parent == null + || !(AbstractManagedParentQueue.class.isAssignableFrom(parent.getClass()))) { throw new SchedulerDynamicEditException( - "The parent of ReservationQueue " + inQueue - + " must be an PlanQueue"); + "The parent of AutoCreatedLeafQueue " + inQueue + + " must be a PlanQueue/ManagedParentQueue"); } - ReservationQueue newQueue = (ReservationQueue) queue; + AutoCreatedLeafQueue newQueue = (AutoCreatedLeafQueue) queue; - float sumChilds = ((PlanQueue) parent).sumOfChildCapacities(); + float sumChilds = parent.sumOfChildCapacities(); float newChildCap = sumChilds - queue.getCapacity() + entitlement.getCapacity(); @@ -2010,12 +2016,13 @@ public class CapacityScheduler extends newQueue.setEntitlement(entitlement); } else{ throw new SchedulerDynamicEditException( - "Sum of child queues would exceed 100% for PlanQueue: " + parent - .getQueueName()); + "Sum of child queues should exceed 100% for auto creating parent " + + "queue : " + parent.getQueueName()); } LOG.info( - "Set entitlement for ReservationQueue " + inQueue + " to " + queue - .getCapacity() + " request was (" + entitlement.getCapacity() + "Set entitlement for AutoCreatedLeafQueue " + inQueue + + " to " + queue.getCapacity() + + " request was (" + entitlement.getCapacity() + ")"); } finally { writeLock.unlock(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/13fa2d4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java index 48c289f..7be2529 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java @@ -238,7 +238,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager< queueName + ReservationConstants.DEFAULT_QUEUE_SUFFIX; List<CSQueue> childQueues = new ArrayList<>(); - ReservationQueue resQueue = new ReservationQueue(csContext, + AutoCreatedLeafQueue resQueue = new AutoCreatedLeafQueue(csContext, defReservationId, (PlanQueue) queue); try { resQueue.setEntitlement(new QueueEntitlement(1.0f, 1.0f)); @@ -303,7 +303,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager< Map<String, CSQueue> newQueues) throws IOException { // check that all static queues are included in the newQueues list for (Map.Entry<String, CSQueue> e : queues.entrySet()) { - if (!(e.getValue() instanceof ReservationQueue)) { + if (!(e.getValue() instanceof AutoCreatedLeafQueue)) { String queueName = e.getKey(); CSQueue oldQueue = e.getValue(); CSQueue newQueue = newQueues.get(queueName); http://git-wip-us.apache.org/repos/asf/hadoop/blob/13fa2d4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index f24e30a..f2f1baf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -1997,6 +1997,10 @@ public class LeafQueue extends AbstractCSQueue { queueCapacities.setAbsoluteCapacity(absoluteCapacity); } + public void setMaxApplicationsPerUser(int maxApplicationsPerUser) { + this.maxApplicationsPerUser = maxApplicationsPerUser; + } + public void setMaxApplications(int maxApplications) { this.maxApplications = maxApplications; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/13fa2d4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 6800b74..2c288f2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -1080,4 +1080,17 @@ public class ParentQueue extends AbstractCSQueue { public QueueOrderingPolicy getQueueOrderingPolicy() { return queueOrderingPolicy; } + + protected float sumOfChildCapacities() { + try { + writeLock.lock(); + float ret = 0; + for (CSQueue l : childQueues) { + ret += l.getCapacity(); + } + return ret; + } finally { + writeLock.unlock(); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/13fa2d4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java index 882262f..4ab2e9f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java @@ -19,11 +19,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.io.IOException; -import java.util.Iterator; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,189 +31,48 @@ import org.slf4j.LoggerFactory; * reservations, but functionality wise is a sub-class of ParentQueue * */ -public class PlanQueue extends ParentQueue { +public class PlanQueue extends AbstractManagedParentQueue { private static final Logger LOG = LoggerFactory.getLogger(PlanQueue.class); - private int maxAppsForReservation; - private int maxAppsPerUserForReservation; - private int userLimit; - private float userLimitFactor; - protected CapacitySchedulerContext schedulerContext; private boolean showReservationsAsQueues; public PlanQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { super(cs, queueName, parent, old); - - this.schedulerContext = cs; - // Set the reservation queue attributes for the Plan - CapacitySchedulerConfiguration conf = cs.getConfiguration(); - String queuePath = super.getQueuePath(); - int maxAppsForReservation = conf.getMaximumApplicationsPerQueue(queuePath); - showReservationsAsQueues = conf.getShowReservationAsQueues(queuePath); - if (maxAppsForReservation < 0) { - maxAppsForReservation = - (int) (CapacitySchedulerConfiguration.DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS * super - .getAbsoluteCapacity()); - } - int userLimit = conf.getUserLimit(queuePath); - float userLimitFactor = conf.getUserLimitFactor(queuePath); - int maxAppsPerUserForReservation = - (int) (maxAppsForReservation * (userLimit / 100.0f) * userLimitFactor); - updateQuotas(userLimit, userLimitFactor, maxAppsForReservation, - maxAppsPerUserForReservation); - - StringBuffer queueInfo = new StringBuffer(); - queueInfo.append("Created Plan Queue: ").append(queueName) - .append("\nwith capacity: [").append(super.getCapacity()) - .append("]\nwith max capacity: [").append(super.getMaximumCapacity()) - .append("\nwith max reservation apps: [").append(maxAppsForReservation) - .append("]\nwith max reservation apps per user: [") - .append(maxAppsPerUserForReservation).append("]\nwith user limit: [") - .append(userLimit).append("]\nwith user limit factor: [") - .append(userLimitFactor).append("]."); - LOG.info(queueInfo.toString()); } @Override - public void reinitialize(CSQueue newlyParsedQueue, - Resource clusterResource) throws IOException { - try { - writeLock.lock(); - // Sanity check - if (!(newlyParsedQueue instanceof PlanQueue) || !newlyParsedQueue - .getQueuePath().equals(getQueuePath())) { - throw new IOException( - "Trying to reinitialize " + getQueuePath() + " from " - + newlyParsedQueue.getQueuePath()); - } - - PlanQueue newlyParsedParentQueue = (PlanQueue) newlyParsedQueue; - - if (newlyParsedParentQueue.getChildQueues().size() != 1) { - throw new IOException( - "Reservable Queue should not have sub-queues in the" - + "configuration expect the default reservation queue"); - } - - // Set new configs - setupQueueConfigs(clusterResource); - - updateQuotas(newlyParsedParentQueue.userLimit, - newlyParsedParentQueue.userLimitFactor, - newlyParsedParentQueue.maxAppsForReservation, - newlyParsedParentQueue.maxAppsPerUserForReservation); - - // run reinitialize on each existing queue, to trigger absolute cap - // recomputations - for (CSQueue res : this.getChildQueues()) { - res.reinitialize(res, clusterResource); - } - showReservationsAsQueues = - newlyParsedParentQueue.showReservationsAsQueues; - } finally { - writeLock.unlock(); - } + public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) + throws IOException { + validate(newlyParsedQueue); + super.reinitialize(newlyParsedQueue, clusterResource); } - void addChildQueue(CSQueue newQueue) - throws SchedulerDynamicEditException { - try { - writeLock.lock(); - if (newQueue.getCapacity() > 0) { - throw new SchedulerDynamicEditException( - "Queue " + newQueue + " being added has non zero capacity."); - } - boolean added = this.childQueues.add(newQueue); - if (LOG.isDebugEnabled()) { - LOG.debug("updateChildQueues (action: add queue): " + added + " " - + getChildQueuesToPrint()); - } - } finally { - writeLock.unlock(); - } + @Override + protected void initializeLeafQueueConfigs() { + String queuePath = super.getQueuePath(); + showReservationsAsQueues = csContext.getConfiguration() + .getShowReservationAsQueues(queuePath); + super.initializeLeafQueueConfigs(); } - void removeChildQueue(CSQueue remQueue) - throws SchedulerDynamicEditException { - try { - writeLock.lock(); - if (remQueue.getCapacity() > 0) { - throw new SchedulerDynamicEditException( - "Queue " + remQueue + " being removed has non zero capacity."); - } - Iterator<CSQueue> qiter = childQueues.iterator(); - while (qiter.hasNext()) { - CSQueue cs = qiter.next(); - if (cs.equals(remQueue)) { - qiter.remove(); - if (LOG.isDebugEnabled()) { - LOG.debug("Removed child queue: {}", cs.getQueueName()); - } - } - } - } finally { - writeLock.unlock(); + private void validate(final CSQueue newlyParsedQueue) throws IOException { + // Sanity check + if (!(newlyParsedQueue instanceof PlanQueue) || !newlyParsedQueue + .getQueuePath().equals(getQueuePath())) { + throw new IOException( + "Trying to reinitialize " + getQueuePath() + " from " + + newlyParsedQueue.getQueuePath()); } - } - protected float sumOfChildCapacities() { - try { - writeLock.lock(); - float ret = 0; - for (CSQueue l : childQueues) { - ret += l.getCapacity(); - } - return ret; - } finally { - writeLock.unlock(); - } - } + PlanQueue newlyParsedParentQueue = (PlanQueue) newlyParsedQueue; - private void updateQuotas(int userLimit, float userLimitFactor, - int maxAppsForReservation, int maxAppsPerUserForReservation) { - this.userLimit = userLimit; - this.userLimitFactor = userLimitFactor; - this.maxAppsForReservation = maxAppsForReservation; - this.maxAppsPerUserForReservation = maxAppsPerUserForReservation; - } - - /** - * Number of maximum applications for each of the reservations in this Plan. - * - * @return maxAppsForreservation - */ - public int getMaxApplicationsForReservations() { - return maxAppsForReservation; - } - - /** - * Number of maximum applications per user for each of the reservations in - * this Plan. - * - * @return maxAppsPerUserForreservation - */ - public int getMaxApplicationsPerUserForReservation() { - return maxAppsPerUserForReservation; - } - - /** - * User limit value for each of the reservations in this Plan. - * - * @return userLimit - */ - public int getUserLimitForReservation() { - return userLimit; - } - - /** - * User limit factor value for each of the reservations in this Plan. - * - * @return userLimitFactor - */ - public float getUserLimitFactor() { - return userLimitFactor; + if (newlyParsedParentQueue.getChildQueues().size() != 1) { + throw new IOException( + "Reservable Queue should not have sub-queues in the" + + "configuration expect the default reservation queue"); + } } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/13fa2d4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java deleted file mode 100644 index 3d1b731..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java +++ /dev/null @@ -1,122 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; - -import java.io.IOException; - -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This represents a dynamic {@link LeafQueue} managed by the - * {@link ReservationSystem} - * - */ -public class ReservationQueue extends LeafQueue { - - private static final Logger LOG = LoggerFactory - .getLogger(ReservationQueue.class); - - private PlanQueue parent; - - public ReservationQueue(CapacitySchedulerContext cs, String queueName, - PlanQueue parent) throws IOException { - super(cs, queueName, parent, null); - // the following parameters are common to all reservation in the plan - updateQuotas(parent.getUserLimitForReservation(), - parent.getUserLimitFactor(), - parent.getMaxApplicationsForReservations(), - parent.getMaxApplicationsPerUserForReservation()); - this.parent = parent; - } - - @Override - public void reinitialize(CSQueue newlyParsedQueue, - Resource clusterResource) throws IOException { - try { - writeLock.lock(); - // Sanity check - if (!(newlyParsedQueue instanceof ReservationQueue) || !newlyParsedQueue - .getQueuePath().equals(getQueuePath())) { - throw new IOException( - "Trying to reinitialize " + getQueuePath() + " from " - + newlyParsedQueue.getQueuePath()); - } - super.reinitialize(newlyParsedQueue, clusterResource); - CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, - this, labelManager, null); - - updateQuotas(parent.getUserLimitForReservation(), - parent.getUserLimitFactor(), - parent.getMaxApplicationsForReservations(), - parent.getMaxApplicationsPerUserForReservation()); - } finally { - writeLock.unlock(); - } - } - - /** - * This methods to change capacity for a queue and adjusts its - * absoluteCapacity - * - * @param entitlement the new entitlement for the queue (capacity, - * maxCapacity, etc..) - * @throws SchedulerDynamicEditException - */ - public void setEntitlement(QueueEntitlement entitlement) - throws SchedulerDynamicEditException { - try { - writeLock.lock(); - float capacity = entitlement.getCapacity(); - if (capacity < 0 || capacity > 1.0f) { - throw new SchedulerDynamicEditException( - "Capacity demand is not in the [0,1] range: " + capacity); - } - setCapacity(capacity); - setAbsoluteCapacity(getParent().getAbsoluteCapacity() * getCapacity()); - // note: we currently set maxCapacity to capacity - // this might be revised later - setMaxCapacity(entitlement.getMaxCapacity()); - if (LOG.isDebugEnabled()) { - LOG.debug("successfully changed to " + capacity + " for queue " + this - .getQueueName()); - } - } finally { - writeLock.unlock(); - } - } - - private void updateQuotas(int userLimit, float userLimitFactor, - int maxAppsForReservation, int maxAppsPerUserForReservation) { - setUserLimit(userLimit); - setUserLimitFactor(userLimitFactor); - setMaxApplications(maxAppsForReservation); - maxApplicationsPerUser = maxAppsPerUserForReservation; - } - - @Override - protected void setupConfigurableCapacities() { - CSQueueUtils.updateAndCheckCapacitiesByLabel(getQueuePath(), - queueCapacities, parent == null ? null : parent.getQueueCapacities()); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/13fa2d4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAutoCreatedLeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAutoCreatedLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAutoCreatedLeafQueue.java new file mode 100644 index 0000000..b403e72 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAutoCreatedLeafQueue.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; + +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Before; +import org.junit.Test; + +/** + * Test class for dynamic auto created leaf queues. + * @see AutoCreatedLeafQueue + */ +public class TestAutoCreatedLeafQueue { + + private CapacitySchedulerConfiguration csConf; + private CapacitySchedulerContext csContext; + final static int DEF_MAX_APPS = 10000; + final static int GB = 1024; + private final ResourceCalculator resourceCalculator = + new DefaultResourceCalculator(); + private AutoCreatedLeafQueue autoCreatedLeafQueue; + + @Before + public void setup() throws IOException { + // setup a context / conf + csConf = new CapacitySchedulerConfiguration(); + YarnConfiguration conf = new YarnConfiguration(); + csContext = mock(CapacitySchedulerContext.class); + when(csContext.getConfiguration()).thenReturn(csConf); + when(csContext.getConf()).thenReturn(conf); + when(csContext.getMinimumResourceCapability()).thenReturn( + Resources.createResource(GB, 1)); + when(csContext.getMaximumResourceCapability()).thenReturn( + Resources.createResource(16 * GB, 32)); + when(csContext.getClusterResource()).thenReturn( + Resources.createResource(100 * 16 * GB, 100 * 32)); + when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); + RMContext mockRMContext = TestUtils.getMockRMContext(); + when(csContext.getRMContext()).thenReturn(mockRMContext); + + // create a queue + PlanQueue pq = new PlanQueue(csContext, "root", null, null); + autoCreatedLeafQueue = new AutoCreatedLeafQueue(csContext, "a", pq); + } + + private void validateAutoCreatedLeafQueue(double capacity) { + assertTrue(" actual capacity: " + autoCreatedLeafQueue.getCapacity(), + autoCreatedLeafQueue.getCapacity() - capacity < CSQueueUtils.EPSILON); + assertEquals(autoCreatedLeafQueue.maxApplications, DEF_MAX_APPS); + assertEquals(autoCreatedLeafQueue.maxApplicationsPerUser, DEF_MAX_APPS); + } + + @Test + public void testAddSubtractCapacity() throws Exception { + + // verify that setting, adding, subtracting capacity works + autoCreatedLeafQueue.setCapacity(1.0F); + validateAutoCreatedLeafQueue(1); + autoCreatedLeafQueue.setEntitlement(new QueueEntitlement(0.9f, 1f)); + validateAutoCreatedLeafQueue(0.9); + autoCreatedLeafQueue.setEntitlement(new QueueEntitlement(1f, 1f)); + validateAutoCreatedLeafQueue(1); + autoCreatedLeafQueue.setEntitlement(new QueueEntitlement(0f, 1f)); + validateAutoCreatedLeafQueue(0); + + try { + autoCreatedLeafQueue.setEntitlement(new QueueEntitlement(1.1f, 1f)); + fail(); + } catch (SchedulerDynamicEditException iae) { + // expected + validateAutoCreatedLeafQueue(1); + } + + try { + autoCreatedLeafQueue.setEntitlement(new QueueEntitlement(-0.1f, 1f)); + fail(); + } catch (SchedulerDynamicEditException iae) { + // expected + validateAutoCreatedLeafQueue(1); + } + + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/13fa2d4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerDynamicBehavior.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerDynamicBehavior.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerDynamicBehavior.java index 9aba30c..9425d5e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerDynamicBehavior.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerDynamicBehavior.java @@ -77,21 +77,21 @@ public class TestCapacitySchedulerDynamicBehavior { CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); //set default queue capacity to zero - ((ReservationQueue) cs + ((AutoCreatedLeafQueue) cs .getQueue("a" + ReservationConstants.DEFAULT_QUEUE_SUFFIX)) .setEntitlement( new QueueEntitlement(0f, 1f)); // Test add one reservation dynamically and manually modify capacity - ReservationQueue a1 = - new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a")); + AutoCreatedLeafQueue a1 = + new AutoCreatedLeafQueue(cs, "a1", (PlanQueue) cs.getQueue("a")); cs.addQueue(a1); a1.setEntitlement(new QueueEntitlement(A1_CAPACITY / 100, 1f)); // Test add another reservation queue and use setEntitlement to modify // capacity - ReservationQueue a2 = - new ReservationQueue(cs, "a2", (PlanQueue) cs.getQueue("a")); + AutoCreatedLeafQueue a2 = + new AutoCreatedLeafQueue(cs, "a2", (PlanQueue) cs.getQueue("a")); cs.addQueue(a2); cs.setEntitlement("a2", new QueueEntitlement(A2_CAPACITY / 100, 1.0f)); @@ -113,8 +113,8 @@ public class TestCapacitySchedulerDynamicBehavior { try { // Test invalid addition (adding non-zero size queue) - ReservationQueue a1 = - new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a")); + AutoCreatedLeafQueue a1 = + new AutoCreatedLeafQueue(cs, "a1", (PlanQueue) cs.getQueue("a")); a1.setEntitlement(new QueueEntitlement(A1_CAPACITY / 100, 1f)); cs.addQueue(a1); fail(); @@ -123,11 +123,11 @@ public class TestCapacitySchedulerDynamicBehavior { } // Test add one reservation dynamically and manually modify capacity - ReservationQueue a1 = - new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a")); + AutoCreatedLeafQueue a1 = + new AutoCreatedLeafQueue(cs, "a1", (PlanQueue) cs.getQueue("a")); cs.addQueue(a1); //set default queue capacity to zero - ((ReservationQueue) cs + ((AutoCreatedLeafQueue) cs .getQueue("a" + ReservationConstants.DEFAULT_QUEUE_SUFFIX)) .setEntitlement( new QueueEntitlement(0f, 1f)); @@ -135,8 +135,8 @@ public class TestCapacitySchedulerDynamicBehavior { // Test add another reservation queue and use setEntitlement to modify // capacity - ReservationQueue a2 = - new ReservationQueue(cs, "a2", (PlanQueue) cs.getQueue("a")); + AutoCreatedLeafQueue a2 = + new AutoCreatedLeafQueue(cs, "a2", (PlanQueue) cs.getQueue("a")); cs.addQueue(a2); @@ -162,8 +162,8 @@ public class TestCapacitySchedulerDynamicBehavior { CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); // Test add one reservation dynamically and manually modify capacity - ReservationQueue a1 = - new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a")); + AutoCreatedLeafQueue a1 = + new AutoCreatedLeafQueue(cs, "a1", (PlanQueue) cs.getQueue("a")); cs.addQueue(a1); a1.setEntitlement(new QueueEntitlement(A1_CAPACITY / 100, 1f)); @@ -230,8 +230,8 @@ public class TestCapacitySchedulerDynamicBehavior { // create the default reservation queue String defQName = "a" + ReservationConstants.DEFAULT_QUEUE_SUFFIX; - ReservationQueue defQ = - new ReservationQueue(scheduler, defQName, + AutoCreatedLeafQueue defQ = + new AutoCreatedLeafQueue(scheduler, defQName, (PlanQueue) scheduler.getQueue("a")); scheduler.addQueue(defQ); defQ.setEntitlement(new QueueEntitlement(1f, 1f)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/13fa2d4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java deleted file mode 100644 index e23e93c..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java +++ /dev/null @@ -1,110 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.io.IOException; - -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; -import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; -import org.apache.hadoop.yarn.util.resource.ResourceCalculator; -import org.apache.hadoop.yarn.util.resource.Resources; -import org.junit.Before; -import org.junit.Test; - -public class TestReservationQueue { - - CapacitySchedulerConfiguration csConf; - CapacitySchedulerContext csContext; - final static int DEF_MAX_APPS = 10000; - final static int GB = 1024; - private final ResourceCalculator resourceCalculator = - new DefaultResourceCalculator(); - ReservationQueue reservationQueue; - - @Before - public void setup() throws IOException { - // setup a context / conf - csConf = new CapacitySchedulerConfiguration(); - YarnConfiguration conf = new YarnConfiguration(); - csContext = mock(CapacitySchedulerContext.class); - when(csContext.getConfiguration()).thenReturn(csConf); - when(csContext.getConf()).thenReturn(conf); - when(csContext.getMinimumResourceCapability()).thenReturn( - Resources.createResource(GB, 1)); - when(csContext.getMaximumResourceCapability()).thenReturn( - Resources.createResource(16 * GB, 32)); - when(csContext.getClusterResource()).thenReturn( - Resources.createResource(100 * 16 * GB, 100 * 32)); - when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); - - RMContext mockRMContext = TestUtils.getMockRMContext(); - when(csContext.getRMContext()).thenReturn(mockRMContext); - - // create a queue - PlanQueue pq = new PlanQueue(csContext, "root", null, null); - reservationQueue = new ReservationQueue(csContext, "a", pq); - } - - private void validateReservationQueue(double capacity) { - assertTrue(" actual capacity: " + reservationQueue.getCapacity(), - reservationQueue.getCapacity() - capacity < CSQueueUtils.EPSILON); - assertEquals(reservationQueue.maxApplications, DEF_MAX_APPS); - assertEquals(reservationQueue.maxApplicationsPerUser, DEF_MAX_APPS); - } - - @Test - public void testAddSubtractCapacity() throws Exception { - - // verify that setting, adding, subtracting capacity works - reservationQueue.setCapacity(1.0F); - validateReservationQueue(1); - reservationQueue.setEntitlement(new QueueEntitlement(0.9f, 1f)); - validateReservationQueue(0.9); - reservationQueue.setEntitlement(new QueueEntitlement(1f, 1f)); - validateReservationQueue(1); - reservationQueue.setEntitlement(new QueueEntitlement(0f, 1f)); - validateReservationQueue(0); - - try { - reservationQueue.setEntitlement(new QueueEntitlement(1.1f, 1f)); - fail(); - } catch (SchedulerDynamicEditException iae) { - // expected - validateReservationQueue(1); - } - - try { - reservationQueue.setEntitlement(new QueueEntitlement(-0.1f, 1f)); - fail(); - } catch (SchedulerDynamicEditException iae) { - // expected - validateReservationQueue(1); - } - - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org