Author: tucu Date: Thu Apr 18 18:13:36 2013 New Revision: 1469511 URL: http://svn.apache.org/r1469511 Log: YARN-482. FS: Extend SchedulingMode to intermediate queues. (kkambatl via tucu)
Added: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java - copied unchanged from r1469506, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ - copied from r1469506, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java - copied unchanged from r1469506, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java - copied unchanged from r1469506, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java - copied unchanged from r1469506, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java Removed: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingMode.java hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/modes/FairSchedulingMode.java hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/modes/FifoSchedulingMode.java hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingMode.java Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1469511&r1=1469510&r2=1469511&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Thu Apr 18 18:13:36 2013 @@ -23,6 +23,9 @@ Release 2.0.5-beta - UNRELEASED NEW FEATURES + YARN-482. FS: Extend SchedulingMode to intermediate queues. + (kkambatl via tucu) + IMPROVEMENTS YARN-365. Change NM heartbeat handling to not generate a scheduler event Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java?rev=1469511&r1=1469510&r2=1469511&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java Thu Apr 18 18:13:36 2013 @@ -278,9 +278,7 @@ public class AppSchedulable extends Sche } } - - @Override - public Resource assignContainer(FSSchedulerNode node, boolean reserved) { + private Resource assignContainer(FSSchedulerNode node, boolean reserved) { LOG.info("Node offered to app: " + getName() + " reserved: " + reserved); if (reserved) { @@ -345,4 +343,13 @@ public class AppSchedulable extends Sche } return Resources.none(); } + + public Resource assignReservedContainer(FSSchedulerNode node) { + return assignContainer(node, true); + } + + @Override + public Resource assignContainer(FSSchedulerNode node) { + return assignContainer(node, false); + } } \ No newline at end of file Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java?rev=1469511&r1=1469510&r2=1469511&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java Thu Apr 18 18:13:36 2013 @@ -40,9 +40,6 @@ public class FSLeafQueue extends FSQueue private final List<AppSchedulable> appScheds = new ArrayList<AppSchedulable>(); - - /** Scheduling mode for jobs inside the queue (fair or FIFO) */ - private SchedulingMode schedulingMode; private final FairScheduler scheduler; private final QueueManager queueMgr; @@ -86,13 +83,18 @@ public class FSLeafQueue extends FSQueue return appScheds; } - public void setSchedulingMode(SchedulingMode mode) { - this.schedulingMode = mode; + @Override + public void setPolicy(SchedulingPolicy policy) + throws AllocationConfigurationException { + if (!SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_LEAF)) { + throwPolicyDoesnotApplyException(policy); + } + super.policy = policy; } @Override - public void recomputeFairShares() { - schedulingMode.computeShares(getAppSchedulables(), getFairShare()); + public void recomputeShares() { + policy.computeShares(getAppSchedulables(), getFairShare()); } @Override @@ -136,42 +138,27 @@ public class FSLeafQueue extends FSQueue } @Override - public Resource assignContainer(FSSchedulerNode node, boolean reserved) { - LOG.debug("Node offered to queue: " + getName() + " reserved: " + reserved); - // If this queue is over its limit, reject - if (Resources.greaterThan(getResourceUsage(), - queueMgr.getMaxResources(getName()))) { - return Resources.none(); + public Resource assignContainer(FSSchedulerNode node) { + Resource assigned = Resources.none(); + if (LOG.isDebugEnabled()) { + LOG.debug("Node offered to queue: " + getName()); } - // If this node already has reserved resources for an app, first try to - // finish allocating resources for that app. - if (reserved) { - for (AppSchedulable sched : appScheds) { - if (sched.getApp().getApplicationAttemptId() == - node.getReservedContainer().getApplicationAttemptId()) { - return sched.assignContainer(node, reserved); - } - } - return Resources.none(); // We should never get here + if (!assignContainerPreCheck(node)) { + return assigned; } - // Otherwise, chose app to schedule based on given policy. - else { - Comparator<Schedulable> comparator = schedulingMode.getComparator(); - - Collections.sort(appScheds, comparator); - for (AppSchedulable sched: appScheds) { - if (sched.getRunnable()) { - Resource assignedResource = sched.assignContainer(node, reserved); - if (!assignedResource.equals(Resources.none())) { - return assignedResource; - } + Comparator<Schedulable> comparator = policy.getComparator(); + Collections.sort(appScheds, comparator); + for (AppSchedulable sched : appScheds) { + if (sched.getRunnable()) { + assigned = sched.assignContainer(node); + if (Resources.greaterThan(assigned, Resources.none())) { + break; } } - - return Resources.none(); } + return assigned; } @Override Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java?rev=1469511&r1=1469510&r2=1469511&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java Thu Apr 18 18:13:36 2013 @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import org.apache.commons.logging.Log; @@ -33,7 +34,6 @@ public class FSParentQueue extends FSQue private static final Log LOG = LogFactory.getLog( FSParentQueue.class.getName()); - private final List<FSQueue> childQueues = new ArrayList<FSQueue>(); private final QueueManager queueMgr; @@ -50,11 +50,11 @@ public class FSParentQueue extends FSQue } @Override - public void recomputeFairShares() { - SchedulingMode.getDefault().computeShares(childQueues, getFairShare()); + public void recomputeShares() { + policy.computeShares(childQueues, getFairShare()); for (FSQueue childQueue : childQueues) { childQueue.getMetrics().setAvailableResourcesToQueue(childQueue.getFairShare()); - childQueue.recomputeFairShares(); + childQueue.recomputeShares(); } } @@ -131,13 +131,41 @@ public class FSParentQueue extends FSQue } @Override - public Resource assignContainer(FSSchedulerNode node, boolean reserved) { - throw new IllegalStateException( - "Parent queue should not be assigned container"); + public Resource assignContainer(FSSchedulerNode node) { + Resource assigned = Resources.none(); + + // If this queue is over its limit, reject + if (Resources.greaterThan(getResourceUsage(), + queueMgr.getMaxResources(getName()))) { + return assigned; + } + + Collections.sort(childQueues, policy.getComparator()); + for (FSQueue child : childQueues) { + assigned = child.assignContainer(node); + if (node.getReservedContainer() != null + || Resources.greaterThan(assigned, Resources.none())) { + break; + } + } + return assigned; } @Override public Collection<FSQueue> getChildQueues() { return childQueues; } + + @Override + public void setPolicy(SchedulingPolicy policy) + throws AllocationConfigurationException { + boolean allowed = + SchedulingPolicy.isApplicableTo(policy, (this == queueMgr + .getRootQueue()) ? SchedulingPolicy.DEPTH_ROOT + : SchedulingPolicy.DEPTH_INTERMEDIATE); + if (!allowed) { + throwPolicyDoesnotApplyException(policy); + } + super.policy = policy; + } } Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java?rev=1469511&r1=1469510&r2=1469511&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java Thu Apr 18 18:13:36 2013 @@ -45,6 +45,8 @@ public abstract class FSQueue extends Sc protected final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + protected SchedulingPolicy policy = SchedulingPolicy.getDefault(); + public FSQueue(String name, QueueManager queueMgr, FairScheduler scheduler, FSParentQueue parent) { this.name = name; @@ -63,6 +65,19 @@ public abstract class FSQueue extends Sc return name; } + public SchedulingPolicy getPolicy() { + return policy; + } + + protected void throwPolicyDoesnotApplyException(SchedulingPolicy policy) + throws AllocationConfigurationException { + throw new AllocationConfigurationException("SchedulingPolicy " + policy + + " does not apply to queue " + getName()); + } + + public abstract void setPolicy(SchedulingPolicy policy) + throws AllocationConfigurationException; + @Override public double getWeight() { return queueMgr.getQueueWeight(getName()); @@ -130,13 +145,27 @@ public abstract class FSQueue extends Sc } /** - * Recomputes the fair shares for all queues and applications - * under this queue. + * Recomputes the shares for all child queues and applications based on this + * queue's current share */ - public abstract void recomputeFairShares(); + public abstract void recomputeShares(); /** * Gets the children of this queue, if any. */ public abstract Collection<FSQueue> getChildQueues(); + + /** + * Helper method to check if the queue should attempt assigning resources + * + * @return true if check passes (can assign) or false otherwise + */ + protected boolean assignContainerPreCheck(FSSchedulerNode node) { + if (Resources.greaterThan(getResourceUsage(), + queueMgr.getMaxResources(getName())) + || node.getReservedContainer() != null) { + return false; + } + return true; + } } Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java?rev=1469511&r1=1469510&r2=1469511&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java Thu Apr 18 18:13:36 2013 @@ -52,6 +52,7 @@ public class FSSchedulerNode extends Sch private volatile int numContainers; private RMContainer reservedContainer; + private AppSchedulable reservedAppSchedulable; /* set of containers that are allocated containers */ private final Map<ContainerId, RMContainer> launchedContainers = @@ -221,6 +222,7 @@ public class FSSchedulerNode extends Sch " on node " + this + " for application " + application); } this.reservedContainer = reservedContainer; + this.reservedAppSchedulable = application.getAppSchedulable(); } public synchronized void unreserveResource( @@ -237,11 +239,15 @@ public class FSSchedulerNode extends Sch " on node " + this); } - reservedContainer = null; + this.reservedContainer = null; + this.reservedAppSchedulable = null; } public synchronized RMContainer getReservedContainer() { return reservedContainer; } + public synchronized AppSchedulable getReservedAppSchedulable() { + return reservedAppSchedulable; + } } Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1469511&r1=1469510&r2=1469511&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Thu Apr 18 18:13:36 2013 @@ -161,7 +161,7 @@ public class FairScheduler implements Re protected boolean assignMultiple; // Allocate multiple containers per // heartbeat protected int maxAssign; // Max containers to assign per heartbeat - + public FairScheduler() { clock = new SystemClock(); queueMgr = new QueueManager(this); @@ -217,7 +217,7 @@ public class FairScheduler implements Re rootQueue.setFairShare(clusterCapacity); // Recursively compute fair shares for all queues // and update metrics - rootQueue.recomputeFairShares(); + rootQueue.recomputeShares(); // Update recorded capacity of root queue (child queues are updated // when fair share is calculated). @@ -786,39 +786,24 @@ public class FairScheduler implements Re // 1. Check for reserved applications // 2. Schedule if there are no reservations - // If we have have an application that has reserved a resource on this node - // already, we try to complete the reservation. - RMContainer reservedContainer = node.getReservedContainer(); - if (reservedContainer != null) { - FSSchedulerApp reservedApplication = - applications.get(reservedContainer.getApplicationAttemptId()); + AppSchedulable reservedAppSchedulable = node.getReservedAppSchedulable(); + if (reservedAppSchedulable != null) { + // Reservation exists; try to fulfill the reservation + LOG.info("Trying to fulfill reservation for application " + + reservedAppSchedulable.getApp().getApplicationAttemptId() + + " on node: " + nm); - // Try to fulfill the reservation - LOG.info("Trying to fulfill reservation for application " + - reservedApplication.getApplicationId() + " on node: " + nm); - - FSLeafQueue queue = queueMgr.getLeafQueue(reservedApplication.getQueueName()); - queue.assignContainer(node, true); + node.getReservedAppSchedulable().assignReservedContainer(node); } - - // Otherwise, schedule at queue which is furthest below fair share else { + // No reservation, schedule at queue which is farthest below fair share int assignedContainers = 0; while (node.getReservedContainer() == null) { - // At most one task is scheduled each iteration of this loop - List<FSLeafQueue> scheds = new ArrayList<FSLeafQueue>( - queueMgr.getLeafQueues()); - Collections.sort(scheds, SchedulingMode.getDefault().getComparator()); boolean assignedContainer = false; - for (FSLeafQueue sched : scheds) { - Resource assigned = sched.assignContainer(node, false); - if (Resources.greaterThan(assigned, Resources.none()) || - node.getReservedContainer() != null) { - eventLog.log("ASSIGN", nm.getHostName(), assigned); - assignedContainers++; - assignedContainer = true; - break; - } + if (Resources.greaterThan( + queueMgr.getRootQueue().assignContainer(node), + Resources.none())) { + assignedContainer = true; } if (!assignedContainer) { break; } if (!assignMultiple) { break; } Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java?rev=1469511&r1=1469510&r2=1469511&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java Thu Apr 18 18:13:36 2013 @@ -143,7 +143,6 @@ public class QueueManager { if (leafQueue == null) { return null; } - leafQueue.setSchedulingMode(info.defaultSchedulingMode); queue = leafQueue; } else if (queue instanceof FSParentQueue) { return null; @@ -302,7 +301,7 @@ public class QueueManager { Map<String, Integer> queueMaxApps = new HashMap<String, Integer>(); Map<String, Integer> userMaxApps = new HashMap<String, Integer>(); Map<String, Double> queueWeights = new HashMap<String, Double>(); - Map<String, SchedulingMode> queueModes = new HashMap<String, SchedulingMode>(); + Map<String, SchedulingPolicy> queuePolicies = new HashMap<String, SchedulingPolicy>(); Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>(); Map<String, Map<QueueACL, AccessControlList>> queueAcls = new HashMap<String, Map<QueueACL, AccessControlList>>(); @@ -310,7 +309,7 @@ public class QueueManager { int queueMaxAppsDefault = Integer.MAX_VALUE; long fairSharePreemptionTimeout = Long.MAX_VALUE; long defaultMinSharePreemptionTimeout = Long.MAX_VALUE; - SchedulingMode defaultSchedulingMode = SchedulingMode.getDefault(); + SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.getDefault(); // Remember all queue names so we can display them on web UI, etc. List<String> queueNamesInAllocFile = new ArrayList<String>(); @@ -339,7 +338,7 @@ public class QueueManager { if ("queue".equals(element.getTagName()) || "pool".equals(element.getTagName())) { loadQueue("root", element, minQueueResources, maxQueueResources, queueMaxApps, - userMaxApps, queueWeights, queueModes, minSharePreemptionTimeouts, + userMaxApps, queueWeights, queuePolicies, minSharePreemptionTimeouts, queueAcls, queueNamesInAllocFile); } else if ("user".equals(element.getTagName())) { String userName = element.getAttribute("name"); @@ -370,11 +369,12 @@ public class QueueManager { } else if ("queueMaxAppsDefault".equals(element.getTagName())) { String text = ((Text)element.getFirstChild()).getData().trim(); int val = Integer.parseInt(text); - queueMaxAppsDefault = val;} - else if ("defaultQueueSchedulingMode".equals(element.getTagName())) { + queueMaxAppsDefault = val; + } else if ("defaultQueueSchedulingPolicy".equals(element.getTagName()) + || "defaultQueueSchedulingMode".equals(element.getTagName())) { String text = ((Text)element.getFirstChild()).getData().trim(); - SchedulingMode.setDefault(text); - defaultSchedulingMode = SchedulingMode.getDefault(); + SchedulingPolicy.setDefault(text); + defaultSchedPolicy = SchedulingPolicy.getDefault(); } else { LOG.warn("Bad element in allocations file: " + element.getTagName()); } @@ -385,7 +385,7 @@ public class QueueManager { synchronized (this) { info = new QueueManagerInfo(minQueueResources, maxQueueResources, queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault, - queueMaxAppsDefault, defaultSchedulingMode, minSharePreemptionTimeouts, + queueMaxAppsDefault, defaultSchedPolicy, minSharePreemptionTimeouts, queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout); // Root queue should have empty ACLs. As a queue's ACL is the union of @@ -396,14 +396,15 @@ public class QueueManager { rootAcls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(" ")); rootAcls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(" ")); queueAcls.put(ROOT_QUEUE, rootAcls); - + + // Create all queus for (String name: queueNamesInAllocFile) { - FSLeafQueue queue = getLeafQueue(name); - if (queueModes.containsKey(name)) { - queue.setSchedulingMode(queueModes.get(name)); - } else { - queue.setSchedulingMode(defaultSchedulingMode); - } + getLeafQueue(name); + } + + // Set custom policies as specified + for (Map.Entry<String, SchedulingPolicy> entry : queuePolicies.entrySet()) { + queues.get(entry.getKey()).setPolicy(entry.getValue()); } } } @@ -414,7 +415,8 @@ public class QueueManager { private void loadQueue(String parentName, Element element, Map<String, Resource> minQueueResources, Map<String, Resource> maxQueueResources, Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps, Map<String, Double> queueWeights, - Map<String, SchedulingMode> queueModes, Map<String, Long> minSharePreemptionTimeouts, + Map<String, SchedulingPolicy> queuePolicies, + Map<String, Long> minSharePreemptionTimeouts, Map<String, Map<QueueACL, AccessControlList>> queueAcls, List<String> queueNamesInAllocFile) throws AllocationConfigurationException { String queueName = parentName + "." + element.getAttribute("name"); @@ -448,9 +450,10 @@ public class QueueManager { String text = ((Text)field.getFirstChild()).getData().trim(); long val = Long.parseLong(text) * 1000L; minSharePreemptionTimeouts.put(queueName, val); - } else if ("schedulingMode".equals(field.getTagName())) { + } else if ("schedulingPolicy".equals(field.getTagName()) + || "schedulingMode".equals(field.getTagName())) { String text = ((Text)field.getFirstChild()).getData().trim(); - queueModes.put(queueName, SchedulingMode.parse(text)); + queuePolicies.put(queueName, SchedulingPolicy.parse(text)); } else if ("aclSubmitApps".equals(field.getTagName())) { String text = ((Text)field.getFirstChild()).getData().trim(); acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text)); @@ -459,8 +462,9 @@ public class QueueManager { acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text)); } else if ("queue".endsWith(field.getTagName()) || "pool".equals(field.getTagName())) { - loadQueue(queueName, field, minQueueResources, maxQueueResources, queueMaxApps, - userMaxApps, queueWeights, queueModes, minSharePreemptionTimeouts, + loadQueue(queueName, field, minQueueResources, maxQueueResources, + queueMaxApps, userMaxApps, queueWeights, queuePolicies, + minSharePreemptionTimeouts, queueAcls, queueNamesInAllocFile); isLeaf = false; } @@ -615,13 +619,13 @@ public class QueueManager { // below half its fair share for this long, it is allowed to preempt tasks. public final long fairSharePreemptionTimeout; - public final SchedulingMode defaultSchedulingMode; + public final SchedulingPolicy defaultSchedulingPolicy; public QueueManagerInfo(Map<String, Resource> minQueueResources, Map<String, Resource> maxQueueResources, Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps, Map<String, Double> queueWeights, int userMaxAppsDefault, - int queueMaxAppsDefault, SchedulingMode defaultSchedulingMode, + int queueMaxAppsDefault, SchedulingPolicy defaultSchedulingPolicy, Map<String, Long> minSharePreemptionTimeouts, Map<String, Map<QueueACL, AccessControlList>> queueAcls, long fairSharePreemptionTimeout, long defaultMinSharePreemptionTimeout) { @@ -632,7 +636,7 @@ public class QueueManager { this.queueWeights = queueWeights; this.userMaxAppsDefault = userMaxAppsDefault; this.queueMaxAppsDefault = queueMaxAppsDefault; - this.defaultSchedulingMode = defaultSchedulingMode; + this.defaultSchedulingPolicy = defaultSchedulingPolicy; this.minSharePreemptionTimeouts = minSharePreemptionTimeouts; this.queueAcls = queueAcls; this.fairSharePreemptionTimeout = fairSharePreemptionTimeout; @@ -651,7 +655,7 @@ public class QueueManager { minSharePreemptionTimeouts = new HashMap<String, Long>(); defaultMinSharePreemptionTimeout = Long.MAX_VALUE; fairSharePreemptionTimeout = Long.MAX_VALUE; - defaultSchedulingMode = SchedulingMode.getDefault(); + defaultSchedulingPolicy = SchedulingPolicy.getDefault(); } } } Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java?rev=1469511&r1=1469510&r2=1469511&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java Thu Apr 18 18:13:36 2013 @@ -93,11 +93,9 @@ public abstract class Schedulable { /** * Assign a container on this node if possible, and return the amount of - * resources assigned. If {@code reserved} is true, it means a reservation - * already exists on this node, and the schedulable should fulfill that - * reservation if possible. + * resources assigned. */ - public abstract Resource assignContainer(FSSchedulerNode node, boolean reserved); + public abstract Resource assignContainer(FSSchedulerNode node); /** Assign a fair share to this Schedulable. */ public void setFairShare(Resource fairShare) { Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java?rev=1469511&r1=1469510&r2=1469511&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java Thu Apr 18 18:13:36 2013 @@ -68,7 +68,7 @@ public class FakeSchedulable extends Sch } @Override - public Resource assignContainer(FSSchedulerNode node, boolean reserved) { + public Resource assignContainer(FSSchedulerNode node) { return null; } Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java?rev=1469511&r1=1469510&r2=1469511&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java Thu Apr 18 18:13:36 2013 @@ -24,7 +24,7 @@ import java.util.List; import junit.framework.Assert; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FairSchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; import org.junit.Before; import org.junit.Test; @@ -33,12 +33,12 @@ import org.junit.Test; */ public class TestComputeFairShares { private List<Schedulable> scheds; - private SchedulingMode schedulingMode; + private SchedulingPolicy schedulingMode; @Before public void setUp() throws Exception { scheds = new ArrayList<Schedulable>(); - schedulingMode = new FairSchedulingMode(); + schedulingMode = new FairSharePolicy(); } /** Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1469511&r1=1469510&r2=1469511&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Thu Apr 18 18:13:36 2013 @@ -73,7 +73,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FifoSchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; import org.apache.hadoop.yarn.util.BuilderUtils; import org.junit.After; import org.junit.Before; @@ -284,7 +284,7 @@ public class TestFairScheduler { assertEquals(capacity / 4, queue2.getFairShare().getMemory()); assertEquals(capacity / 4, queue3.getFairShare().getMemory()); } - + @Test public void testHierarchicalQueuesSimilarParents() { QueueManager queueManager = scheduler.getQueueManager(); @@ -1359,7 +1359,7 @@ public class TestFairScheduler { FSSchedulerApp app2 = scheduler.applications.get(attId2); FSLeafQueue queue1 = scheduler.getQueueManager().getLeafQueue("queue1"); - queue1.setSchedulingMode(new FifoSchedulingMode()); + queue1.setPolicy(new FifoPolicy()); scheduler.update(); @@ -1381,7 +1381,80 @@ public class TestFairScheduler { assertEquals(2, app1.getLiveContainers().size()); assertEquals(1, app2.getLiveContainers().size()); } - + + /** + * Test to verify the behavior of + * {@link FSQueue#assignContainer(FSSchedulerNode)}) + * + * Create two queues under root (fifoQueue and fairParent), and two queues + * under fairParent (fairChild1 and fairChild2). Submit two apps to the + * fifoQueue and one each to the fairChild* queues, all apps requiring 4 + * containers each of the total 16 container capacity + * + * Assert the number of containers for each app after 4, 8, 12 and 16 updates. + * + * @throws Exception + */ + @Test(timeout = 5000) + public void testAssignContainer() throws Exception { + final String user = "user1"; + final String fifoQueue = "fifo"; + final String fairParent = "fairParent"; + final String fairChild1 = fairParent + ".fairChild1"; + final String fairChild2 = fairParent + ".fairChild2"; + + RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(8192)); + RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(8192)); + + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + + scheduler.handle(nodeEvent1); + scheduler.handle(nodeEvent2); + + ApplicationAttemptId attId1 = + createSchedulingRequest(1024, fifoQueue, user, 4); + ApplicationAttemptId attId2 = + createSchedulingRequest(1024, fairChild1, user, 4); + ApplicationAttemptId attId3 = + createSchedulingRequest(1024, fairChild2, user, 4); + ApplicationAttemptId attId4 = + createSchedulingRequest(1024, fifoQueue, user, 4); + + FSSchedulerApp app1 = scheduler.applications.get(attId1); + FSSchedulerApp app2 = scheduler.applications.get(attId2); + FSSchedulerApp app3 = scheduler.applications.get(attId3); + FSSchedulerApp app4 = scheduler.applications.get(attId4); + + scheduler.getQueueManager().getLeafQueue(fifoQueue) + .setPolicy(SchedulingPolicy.parse("fifo")); + scheduler.update(); + + NodeUpdateSchedulerEvent updateEvent1 = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2); + + for (int i = 0; i < 8; i++) { + scheduler.handle(updateEvent1); + scheduler.handle(updateEvent2); + if ((i + 1) % 2 == 0) { + // 4 node updates: fifoQueue should have received 2, and fairChild* + // should have received one each + String ERR = + "Wrong number of assigned containers after " + (i + 1) + " updates"; + if (i < 4) { + // app1 req still not met + assertEquals(ERR, (i + 1), app1.getLiveContainers().size()); + assertEquals(ERR, 0, app4.getLiveContainers().size()); + } else { + // app1 req has been met, app4 should be served now + assertEquals(ERR, 4, app1.getLiveContainers().size()); + assertEquals(ERR, (i - 3), app4.getLiveContainers().size()); + } + assertEquals(ERR, (i + 1) / 2, app2.getLiveContainers().size()); + assertEquals(ERR, (i + 1) / 2, app3.getLiveContainers().size()); + } + } + } @SuppressWarnings("unchecked") @Test