YARN-2394. FairScheduler: Configure fairSharePreemptionThreshold per queue. (Wei Yan via kasha)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1dcaba9a Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1dcaba9a Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1dcaba9a Branch: refs/heads/MR-2841 Commit: 1dcaba9a7aa27f7ca4ba693e3abb56ab3c59c8a7 Parents: ce04621 Author: Karthik Kambatla <ka...@apache.org> Authored: Wed Sep 3 10:27:36 2014 -0700 Committer: Karthik Kambatla <ka...@apache.org> Committed: Wed Sep 3 10:27:36 2014 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/fair/AllocationConfiguration.java | 22 ++- .../fair/AllocationFileLoaderService.java | 48 ++++- .../scheduler/fair/FSLeafQueue.java | 54 ++++- .../scheduler/fair/FSParentQueue.java | 6 +- .../resourcemanager/scheduler/fair/FSQueue.java | 23 ++- .../scheduler/fair/FairScheduler.java | 60 ++---- .../scheduler/fair/QueueManager.java | 32 +-- .../fair/TestAllocationFileLoaderService.java | 45 ++++- .../scheduler/fair/TestFSLeafQueue.java | 198 ++++++++++++++++--- .../scheduler/fair/TestFairScheduler.java | 158 ++++----------- .../src/site/apt/FairScheduler.apt.vm | 10 + 12 files changed, 412 insertions(+), 247 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/1dcaba9a/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index a6a1b9b3..64ccd28 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -61,6 +61,9 @@ Release 2.6.0 - UNRELEASED YARN-2395. FairScheduler: Preemption timeout should be configurable per queue. (Wei Yan via kasha) + YARN-2394. FairScheduler: Configure fairSharePreemptionThreshold per queue. + (Wei Yan via kasha) + IMPROVEMENTS YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc http://git-wip-us.apache.org/repos/asf/hadoop/blob/1dcaba9a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java index 228a761..de5a999 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java @@ -70,6 +70,12 @@ public class AllocationConfiguration { // allowed to preempt other jobs' tasks. private final Map<String, Long> fairSharePreemptionTimeouts; + // The fair share preemption threshold for each queue. If a queue waits + // fairSharePreemptionTimeout without receiving + // fairshare * fairSharePreemptionThreshold resources, it is allowed to + // preempt other queues' tasks. + private final Map<String, Float> fairSharePreemptionThresholds; + private final Map<String, SchedulingPolicy> schedulingPolicies; private final SchedulingPolicy defaultSchedulingPolicy; @@ -92,6 +98,7 @@ public class AllocationConfiguration { SchedulingPolicy defaultSchedulingPolicy, Map<String, Long> minSharePreemptionTimeouts, Map<String, Long> fairSharePreemptionTimeouts, + Map<String, Float> fairSharePreemptionThresholds, Map<String, Map<QueueACL, AccessControlList>> queueAcls, QueuePlacementPolicy placementPolicy, Map<FSQueueType, Set<String>> configuredQueues) { @@ -108,6 +115,7 @@ public class AllocationConfiguration { this.schedulingPolicies = schedulingPolicies; this.minSharePreemptionTimeouts = minSharePreemptionTimeouts; this.fairSharePreemptionTimeouts = fairSharePreemptionTimeouts; + this.fairSharePreemptionThresholds = fairSharePreemptionThresholds; this.queueAcls = queueAcls; this.placementPolicy = placementPolicy; this.configuredQueues = configuredQueues; @@ -126,6 +134,7 @@ public class AllocationConfiguration { queueAcls = new HashMap<String, Map<QueueACL, AccessControlList>>(); minSharePreemptionTimeouts = new HashMap<String, Long>(); fairSharePreemptionTimeouts = new HashMap<String, Long>(); + fairSharePreemptionThresholds = new HashMap<String, Float>(); schedulingPolicies = new HashMap<String, SchedulingPolicy>(); defaultSchedulingPolicy = SchedulingPolicy.DEFAULT_POLICY; configuredQueues = new HashMap<FSQueueType, Set<String>>(); @@ -171,7 +180,18 @@ public class AllocationConfiguration { return (fairSharePreemptionTimeout == null) ? -1 : fairSharePreemptionTimeout; } - + + /** + * Get a queue's fair share preemption threshold in the allocation file. + * Return -1f if not set. + */ + public float getFairSharePreemptionThreshold(String queueName) { + Float fairSharePreemptionThreshold = + fairSharePreemptionThresholds.get(queueName); + return (fairSharePreemptionThreshold == null) ? + -1f : fairSharePreemptionThreshold; + } + public ResourceWeights getQueueWeight(String queue) { ResourceWeights weight = queueWeights.get(queue); return (weight == null) ? ResourceWeights.NEUTRAL : weight; http://git-wip-us.apache.org/repos/asf/hadoop/blob/1dcaba9a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java index 970ee99..c2dfc84 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java @@ -218,6 +218,8 @@ public class AllocationFileLoaderService extends AbstractService { Map<String, SchedulingPolicy> queuePolicies = new HashMap<String, SchedulingPolicy>(); Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>(); Map<String, Long> fairSharePreemptionTimeouts = new HashMap<String, Long>(); + Map<String, Float> fairSharePreemptionThresholds = + new HashMap<String, Float>(); Map<String, Map<QueueACL, AccessControlList>> queueAcls = new HashMap<String, Map<QueueACL, AccessControlList>>(); int userMaxAppsDefault = Integer.MAX_VALUE; @@ -225,6 +227,7 @@ public class AllocationFileLoaderService extends AbstractService { float queueMaxAMShareDefault = -1.0f; long defaultFairSharePreemptionTimeout = Long.MAX_VALUE; long defaultMinSharePreemptionTimeout = Long.MAX_VALUE; + float defaultFairSharePreemptionThreshold = 0.5f; SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.DEFAULT_POLICY; QueuePlacementPolicy newPlacementPolicy = null; @@ -277,7 +280,8 @@ public class AllocationFileLoaderService extends AbstractService { String text = ((Text)element.getFirstChild()).getData().trim(); int val = Integer.parseInt(text); userMaxAppsDefault = val; - } else if ("defaultFairSharePreemptionTimeout".equals(element.getTagName())) { + } else if ("defaultFairSharePreemptionTimeout" + .equals(element.getTagName())) { String text = ((Text)element.getFirstChild()).getData().trim(); long val = Long.parseLong(text) * 1000L; defaultFairSharePreemptionTimeout = val; @@ -287,10 +291,17 @@ public class AllocationFileLoaderService extends AbstractService { long val = Long.parseLong(text) * 1000L; defaultFairSharePreemptionTimeout = val; } - } else if ("defaultMinSharePreemptionTimeout".equals(element.getTagName())) { + } else if ("defaultMinSharePreemptionTimeout" + .equals(element.getTagName())) { String text = ((Text)element.getFirstChild()).getData().trim(); long val = Long.parseLong(text) * 1000L; defaultMinSharePreemptionTimeout = val; + } else if ("defaultFairSharePreemptionThreshold" + .equals(element.getTagName())) { + String text = ((Text)element.getFirstChild()).getData().trim(); + float val = Float.parseFloat(text); + val = Math.max(Math.min(val, 1.0f), 0.0f); + defaultFairSharePreemptionThreshold = val; } else if ("queueMaxAppsDefault".equals(element.getTagName())) { String text = ((Text)element.getFirstChild()).getData().trim(); int val = Integer.parseInt(text); @@ -326,7 +337,7 @@ public class AllocationFileLoaderService extends AbstractService { loadQueue(parent, element, minQueueResources, maxQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights, queuePolicies, minSharePreemptionTimeouts, fairSharePreemptionTimeouts, - queueAcls, configuredQueues); + fairSharePreemptionThresholds, queueAcls, configuredQueues); } // Load placement policy and pass it configured queues @@ -349,11 +360,18 @@ public class AllocationFileLoaderService extends AbstractService { defaultFairSharePreemptionTimeout); } + // Set the fair share preemption threshold for the root queue + if (!fairSharePreemptionThresholds.containsKey(QueueManager.ROOT_QUEUE)) { + fairSharePreemptionThresholds.put(QueueManager.ROOT_QUEUE, + defaultFairSharePreemptionThreshold); + } + AllocationConfiguration info = new AllocationConfiguration(minQueueResources, maxQueueResources, queueMaxApps, userMaxApps, queueWeights, queueMaxAMShares, userMaxAppsDefault, queueMaxAppsDefault, queueMaxAMShareDefault, queuePolicies, defaultSchedPolicy, - minSharePreemptionTimeouts, fairSharePreemptionTimeouts, queueAcls, + minSharePreemptionTimeouts, fairSharePreemptionTimeouts, + fairSharePreemptionThresholds, queueAcls, newPlacementPolicy, configuredQueues); lastSuccessfulReload = clock.getTime(); @@ -365,13 +383,15 @@ public class AllocationFileLoaderService extends AbstractService { /** * Loads a queue from a queue element in the configuration file */ - private void loadQueue(String parentName, Element element, Map<String, Resource> minQueueResources, + private void loadQueue(String parentName, Element element, + Map<String, Resource> minQueueResources, Map<String, Resource> maxQueueResources, Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps, Map<String, Float> queueMaxAMShares, Map<String, ResourceWeights> queueWeights, Map<String, SchedulingPolicy> queuePolicies, Map<String, Long> minSharePreemptionTimeouts, Map<String, Long> fairSharePreemptionTimeouts, + Map<String, Float> fairSharePreemptionThresholds, Map<String, Map<QueueACL, AccessControlList>> queueAcls, Map<FSQueueType, Set<String>> configuredQueues) throws AllocationConfigurationException { @@ -418,6 +438,11 @@ public class AllocationFileLoaderService extends AbstractService { String text = ((Text)field.getFirstChild()).getData().trim(); long val = Long.parseLong(text) * 1000L; fairSharePreemptionTimeouts.put(queueName, val); + } else if ("fairSharePreemptionThreshold".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + float val = Float.parseFloat(text); + val = Math.max(Math.min(val, 1.0f), 0.0f); + fairSharePreemptionThresholds.put(queueName, val); } else if ("schedulingPolicy".equals(field.getTagName()) || "schedulingMode".equals(field.getTagName())) { String text = ((Text)field.getFirstChild()).getData().trim(); @@ -434,7 +459,8 @@ public class AllocationFileLoaderService extends AbstractService { loadQueue(queueName, field, minQueueResources, maxQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights, queuePolicies, minSharePreemptionTimeouts, - fairSharePreemptionTimeouts, queueAcls, configuredQueues); + fairSharePreemptionTimeouts, fairSharePreemptionThresholds, + queueAcls, configuredQueues); configuredQueues.get(FSQueueType.PARENT).add(queueName); isLeaf = false; } @@ -449,11 +475,15 @@ public class AllocationFileLoaderService extends AbstractService { } } queueAcls.put(queueName, acls); - if (maxQueueResources.containsKey(queueName) && minQueueResources.containsKey(queueName) + if (maxQueueResources.containsKey(queueName) && + minQueueResources.containsKey(queueName) && !Resources.fitsIn(minQueueResources.get(queueName), maxQueueResources.get(queueName))) { - LOG.warn(String.format("Queue %s has max resources %s less than min resources %s", - queueName, maxQueueResources.get(queueName), minQueueResources.get(queueName))); + LOG.warn( + String.format( + "Queue %s has max resources %s less than min resources %s", + queueName, maxQueueResources.get(queueName), + minQueueResources.get(queueName))); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1dcaba9a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 49e8ef0..345ea8b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -54,7 +55,7 @@ public class FSLeafQueue extends FSQueue { // Variables used for preemption private long lastTimeAtMinShare; - private long lastTimeAtHalfFairShare; + private long lastTimeAtFairShareThreshold; // Track the AM resource usage for this queue private Resource amResourceUsage; @@ -65,7 +66,7 @@ public class FSLeafQueue extends FSQueue { FSParentQueue parent) { super(name, scheduler, parent); this.lastTimeAtMinShare = scheduler.getClock().getTime(); - this.lastTimeAtHalfFairShare = scheduler.getClock().getTime(); + this.lastTimeAtFairShareThreshold = scheduler.getClock().getTime(); activeUsersManager = new ActiveUsersManager(getMetrics()); amResourceUsage = Resource.newInstance(0, 0); } @@ -275,16 +276,17 @@ public class FSLeafQueue extends FSQueue { return lastTimeAtMinShare; } - public void setLastTimeAtMinShare(long lastTimeAtMinShare) { + private void setLastTimeAtMinShare(long lastTimeAtMinShare) { this.lastTimeAtMinShare = lastTimeAtMinShare; } - public long getLastTimeAtHalfFairShare() { - return lastTimeAtHalfFairShare; + public long getLastTimeAtFairShareThreshold() { + return lastTimeAtFairShareThreshold; } - public void setLastTimeAtHalfFairShare(long lastTimeAtHalfFairShare) { - this.lastTimeAtHalfFairShare = lastTimeAtHalfFairShare; + private void setLastTimeAtFairShareThreshold( + long lastTimeAtFairShareThreshold) { + this.lastTimeAtFairShareThreshold = lastTimeAtFairShareThreshold; } @Override @@ -329,6 +331,20 @@ public class FSLeafQueue extends FSQueue { } /** + * Update the preemption fields for the queue, i.e. the times since last was + * at its guaranteed share and over its fair share threshold. + */ + public void updateStarvationStats() { + long now = scheduler.getClock().getTime(); + if (!isStarvedForMinShare()) { + setLastTimeAtMinShare(now); + } + if (!isStarvedForFairShare()) { + setLastTimeAtFairShareThreshold(now); + } + } + + /** * Helper method to check if the queue should preempt containers * * @return true if check passes (can preempt) or false otherwise @@ -337,4 +353,28 @@ public class FSLeafQueue extends FSQueue { return parent.getPolicy().checkIfUsageOverFairShare(getResourceUsage(), getFairShare()); } + + /** + * Is a queue being starved for its min share. + */ + @VisibleForTesting + boolean isStarvedForMinShare() { + return isStarved(getMinShare()); + } + + /** + * Is a queue being starved for its fair share threshold. + */ + @VisibleForTesting + boolean isStarvedForFairShare() { + return isStarved( + Resources.multiply(getFairShare(), getFairSharePreemptionThreshold())); + } + + private boolean isStarved(Resource share) { + Resource desiredShare = Resources.min(FairScheduler.getResourceCalculator(), + scheduler.getClusterResource(), share, getDemand()); + return Resources.lessThan(FairScheduler.getResourceCalculator(), + scheduler.getClusterResource(), getResourceUsage(), desiredShare); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1dcaba9a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index 1209970..f74106a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -78,11 +78,11 @@ public class FSParentQueue extends FSQueue { } @Override - public void updatePreemptionTimeouts() { - super.updatePreemptionTimeouts(); + public void updatePreemptionVariables() { + super.updatePreemptionVariables(); // For child queues for (FSQueue childQueue : childQueues) { - childQueue.updatePreemptionTimeouts(); + childQueue.updatePreemptionVariables(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1dcaba9a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index b9fcc4b..d4e043d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -54,6 +54,7 @@ public abstract class FSQueue implements Queue, Schedulable { private long fairSharePreemptionTimeout = Long.MAX_VALUE; private long minSharePreemptionTimeout = Long.MAX_VALUE; + private float fairSharePreemptionThreshold = 0.5f; public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) { this.name = name; @@ -186,6 +187,14 @@ public abstract class FSQueue implements Queue, Schedulable { this.minSharePreemptionTimeout = minSharePreemptionTimeout; } + public float getFairSharePreemptionThreshold() { + return fairSharePreemptionThreshold; + } + + public void setFairSharePreemptionThreshold(float fairSharePreemptionThreshold) { + this.fairSharePreemptionThreshold = fairSharePreemptionThreshold; + } + /** * Recomputes the shares for all child queues and applications based on this * queue's current share @@ -193,21 +202,27 @@ public abstract class FSQueue implements Queue, Schedulable { public abstract void recomputeShares(); /** - * Update the min/fair share preemption timeouts for this queue. + * Update the min/fair share preemption timeouts and threshold for this queue. */ - public void updatePreemptionTimeouts() { - // For min share + public void updatePreemptionVariables() { + // For min share timeout minSharePreemptionTimeout = scheduler.getAllocationConfiguration() .getMinSharePreemptionTimeout(getName()); if (minSharePreemptionTimeout == -1 && parent != null) { minSharePreemptionTimeout = parent.getMinSharePreemptionTimeout(); } - // For fair share + // For fair share timeout fairSharePreemptionTimeout = scheduler.getAllocationConfiguration() .getFairSharePreemptionTimeout(getName()); if (fairSharePreemptionTimeout == -1 && parent != null) { fairSharePreemptionTimeout = parent.getFairSharePreemptionTimeout(); } + // For fair share preemption threshold + fairSharePreemptionThreshold = scheduler.getAllocationConfiguration() + .getFairSharePreemptionThreshold(getName()); + if (fairSharePreemptionThreshold < 0 && parent != null) { + fairSharePreemptionThreshold = parent.getFairSharePreemptionThreshold(); + } } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/1dcaba9a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 2798b8d..a35e49f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -299,7 +299,7 @@ public class FairScheduler extends */ protected synchronized void update() { long start = getClock().getTime(); - updatePreemptionVariables(); // Determine if any queues merit preemption + updateStarvationStats(); // Determine if any queues merit preemption FSQueue rootQueue = queueMgr.getRootQueue(); @@ -329,48 +329,20 @@ public class FairScheduler extends /** * Update the preemption fields for all QueueScheduables, i.e. the times since - * each queue last was at its guaranteed share and at > 1/2 of its fair share - * for each type of task. + * each queue last was at its guaranteed share and over its fair share + * threshold for each type of task. */ - private void updatePreemptionVariables() { - long now = getClock().getTime(); - lastPreemptionUpdateTime = now; + private void updateStarvationStats() { + lastPreemptionUpdateTime = clock.getTime(); for (FSLeafQueue sched : queueMgr.getLeafQueues()) { - if (!isStarvedForMinShare(sched)) { - sched.setLastTimeAtMinShare(now); - } - if (!isStarvedForFairShare(sched)) { - sched.setLastTimeAtHalfFairShare(now); - } + sched.updateStarvationStats(); } } /** - * Is a queue below its min share for the given task type? - */ - boolean isStarvedForMinShare(FSLeafQueue sched) { - Resource desiredShare = Resources.min(RESOURCE_CALCULATOR, clusterResource, - sched.getMinShare(), sched.getDemand()); - return Resources.lessThan(RESOURCE_CALCULATOR, clusterResource, - sched.getResourceUsage(), desiredShare); - } - - /** - * Is a queue being starved for fair share for the given task type? This is - * defined as being below half its fair share. - */ - boolean isStarvedForFairShare(FSLeafQueue sched) { - Resource desiredFairShare = Resources.min(RESOURCE_CALCULATOR, - clusterResource, - Resources.multiply(sched.getFairShare(), .5), sched.getDemand()); - return Resources.lessThan(RESOURCE_CALCULATOR, clusterResource, - sched.getResourceUsage(), desiredFairShare); - } - - /** * Check for queues that need tasks preempted, either because they have been * below their guaranteed share for minSharePreemptionTimeout or they have - * been below half their fair share for the fairSharePreemptionTimeout. If + * been below their fair share threshold for the fairSharePreemptionTimeout. If * such queues exist, compute how many tasks of each type need to be preempted * and then select the right ones using preemptTasks. */ @@ -499,11 +471,11 @@ public class FairScheduler extends * Return the resource amount that this queue is allowed to preempt, if any. * If the queue has been below its min share for at least its preemption * timeout, it should preempt the difference between its current share and - * this min share. If it has been below half its fair share for at least the - * fairSharePreemptionTimeout, it should preempt enough tasks to get up to its - * full fair share. If both conditions hold, we preempt the max of the two - * amounts (this shouldn't happen unless someone sets the timeouts to be - * identical for some reason). + * this min share. If it has been below its fair share preemption threshold + * for at least the fairSharePreemptionTimeout, it should preempt enough tasks + * to get up to its full fair share. If both conditions hold, we preempt the + * max of the two amounts (this shouldn't happen unless someone sets the + * timeouts to be identical for some reason). */ protected Resource resToPreempt(FSLeafQueue sched, long curTime) { long minShareTimeout = sched.getMinSharePreemptionTimeout(); @@ -516,7 +488,7 @@ public class FairScheduler extends resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterResource, Resources.none(), Resources.subtract(target, sched.getResourceUsage())); } - if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) { + if (curTime - sched.getLastTimeAtFairShareThreshold() > fairShareTimeout) { Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource, sched.getFairShare(), sched.getDemand()); resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterResource, @@ -1094,7 +1066,11 @@ public class FairScheduler extends public FSAppAttempt getSchedulerApp(ApplicationAttemptId appAttemptId) { return super.getApplicationAttempt(appAttemptId); } - + + public static ResourceCalculator getResourceCalculator() { + return RESOURCE_CALCULATOR; + } + /** * Subqueue metrics might be a little out of date because fair shares are * recalculated at the update interval, but the root queue metrics needs to http://git-wip-us.apache.org/repos/asf/hadoop/blob/1dcaba9a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java index 2444ba4..61b3b6c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java @@ -181,7 +181,7 @@ public class QueueManager { parent.addChildQueue(leafQueue); queues.put(leafQueue.getName(), leafQueue); leafQueues.add(leafQueue); - setPreemptionTimeout(leafQueue, parent, queueConf); + leafQueue.updatePreemptionVariables(); return leafQueue; } else { FSParentQueue newParent = new FSParentQueue(queueName, scheduler, parent); @@ -193,7 +193,7 @@ public class QueueManager { } parent.addChildQueue(newParent); queues.put(newParent.getName(), newParent); - setPreemptionTimeout(newParent, parent, queueConf); + newParent.updatePreemptionVariables(); parent = newParent; } } @@ -202,29 +202,6 @@ public class QueueManager { } /** - * Set the min/fair share preemption timeouts for the given queue. - * If the timeout is configured in the allocation file, the queue will use - * that value; otherwise, the queue inherits the value from its parent queue. - */ - private void setPreemptionTimeout(FSQueue queue, - FSParentQueue parentQueue, AllocationConfiguration queueConf) { - // For min share - long minSharePreemptionTimeout = - queueConf.getMinSharePreemptionTimeout(queue.getQueueName()); - if (minSharePreemptionTimeout == -1) { - minSharePreemptionTimeout = parentQueue.getMinSharePreemptionTimeout(); - } - queue.setMinSharePreemptionTimeout(minSharePreemptionTimeout); - // For fair share - long fairSharePreemptionTimeout = - queueConf.getFairSharePreemptionTimeout(queue.getQueueName()); - if (fairSharePreemptionTimeout == -1) { - fairSharePreemptionTimeout = parentQueue.getFairSharePreemptionTimeout(); - } - queue.setFairSharePreemptionTimeout(fairSharePreemptionTimeout); - } - - /** * Make way for the given queue if possible, by removing incompatible * queues with no apps in them. Incompatibility could be due to * (1) queueToCreate being currently a parent but needs to change to leaf @@ -409,7 +386,8 @@ public class QueueManager { // Update steady fair shares for all queues rootQueue.recomputeSteadyShares(); - // Update the fair share preemption timeouts for all queues recursively - rootQueue.updatePreemptionTimeouts(); + // Update the fair share preemption timeouts and preemption for all queues + // recursively + rootQueue.updatePreemptionVariables(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1dcaba9a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java index 14b3111..656e20d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java @@ -187,13 +187,15 @@ public class TestAllocationFileLoaderService { out.println("<queue name=\"queueF\" type=\"parent\" >"); out.println("</queue>"); // Create hierarchical queues G,H, with different min/fair share preemption - // timeouts + // timeouts and preemption thresholds out.println("<queue name=\"queueG\">"); out.println("<fairSharePreemptionTimeout>120</fairSharePreemptionTimeout>"); out.println("<minSharePreemptionTimeout>50</minSharePreemptionTimeout>"); + out.println("<fairSharePreemptionThreshold>0.6</fairSharePreemptionThreshold>"); out.println(" <queue name=\"queueH\">"); out.println(" <fairSharePreemptionTimeout>180</fairSharePreemptionTimeout>"); out.println(" <minSharePreemptionTimeout>40</minSharePreemptionTimeout>"); + out.println(" <fairSharePreemptionThreshold>0.7</fairSharePreemptionThreshold>"); out.println(" </queue>"); out.println("</queue>"); // Set default limit of apps per queue to 15 @@ -211,6 +213,8 @@ public class TestAllocationFileLoaderService { + "</defaultMinSharePreemptionTimeout>"); // Set default fair share preemption timeout to 5 minutes out.println("<defaultFairSharePreemptionTimeout>300</defaultFairSharePreemptionTimeout>"); + // Set default fair share preemption threshold to 0.4 + out.println("<defaultFairSharePreemptionThreshold>0.4</defaultFairSharePreemptionThreshold>"); // Set default scheduling policy to DRF out.println("<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>"); out.println("</allocations>"); @@ -299,6 +303,26 @@ public class TestAllocationFileLoaderService { assertEquals(120000, queueConf.getFairSharePreemptionTimeout("root.queueG")); assertEquals(180000, queueConf.getFairSharePreemptionTimeout("root.queueG.queueH")); + assertEquals(.4f, queueConf.getFairSharePreemptionThreshold("root"), 0.01); + assertEquals(-1, queueConf.getFairSharePreemptionThreshold("root." + + YarnConfiguration.DEFAULT_QUEUE_NAME), 0.01); + assertEquals(-1, + queueConf.getFairSharePreemptionThreshold("root.queueA"), 0.01); + assertEquals(-1, + queueConf.getFairSharePreemptionThreshold("root.queueB"), 0.01); + assertEquals(-1, + queueConf.getFairSharePreemptionThreshold("root.queueC"), 0.01); + assertEquals(-1, + queueConf.getFairSharePreemptionThreshold("root.queueD"), 0.01); + assertEquals(-1, + queueConf.getFairSharePreemptionThreshold("root.queueE"), 0.01); + assertEquals(-1, + queueConf.getFairSharePreemptionThreshold("root.queueF"), 0.01); + assertEquals(.6f, + queueConf.getFairSharePreemptionThreshold("root.queueG"), 0.01); + assertEquals(.7f, + queueConf.getFairSharePreemptionThreshold("root.queueG.queueH"), 0.01); + assertTrue(queueConf.getConfiguredQueues() .get(FSQueueType.PARENT) .contains("root.queueF")); @@ -346,9 +370,10 @@ public class TestAllocationFileLoaderService { out.println("<pool name=\"queueD\">"); out.println("<maxRunningApps>3</maxRunningApps>"); out.println("</pool>"); - // Give queue E a preemption timeout of one minute + // Give queue E a preemption timeout of one minute and 0.3f threshold out.println("<pool name=\"queueE\">"); out.println("<minSharePreemptionTimeout>60</minSharePreemptionTimeout>"); + out.println("<fairSharePreemptionThreshold>0.3</fairSharePreemptionThreshold>"); out.println("</pool>"); // Set default limit of apps per queue to 15 out.println("<queueMaxAppsDefault>15</queueMaxAppsDefault>"); @@ -363,6 +388,8 @@ public class TestAllocationFileLoaderService { + "</defaultMinSharePreemptionTimeout>"); // Set fair share preemption timeout to 5 minutes out.println("<fairSharePreemptionTimeout>300</fairSharePreemptionTimeout>"); + // Set default fair share preemption threshold to 0.6f + out.println("<defaultFairSharePreemptionThreshold>0.6</defaultFairSharePreemptionThreshold>"); out.println("</allocations>"); out.close(); @@ -429,6 +456,20 @@ public class TestAllocationFileLoaderService { assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueC")); assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueD")); assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueE")); + + assertEquals(.6f, queueConf.getFairSharePreemptionThreshold("root"), 0.01); + assertEquals(-1, queueConf.getFairSharePreemptionThreshold("root." + + YarnConfiguration.DEFAULT_QUEUE_NAME), 0.01); + assertEquals(-1, + queueConf.getFairSharePreemptionThreshold("root.queueA"), 0.01); + assertEquals(-1, + queueConf.getFairSharePreemptionThreshold("root.queueB"), 0.01); + assertEquals(-1, + queueConf.getFairSharePreemptionThreshold("root.queueC"), 0.01); + assertEquals(-1, + queueConf.getFairSharePreemptionThreshold("root.queueD"), 0.01); + assertEquals(.3f, + queueConf.getFairSharePreemptionThreshold("root.queueE"), 0.01); } @Test http://git-wip-us.apache.org/repos/asf/hadoop/blob/1dcaba9a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java index 7323b6a..97736be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java @@ -18,50 +18,66 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.io.File; +import java.io.FileWriter; import java.io.IOException; +import java.io.PrintWriter; +import java.util.Collection; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.AsyncDispatcher; -import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; -public class TestFSLeafQueue { - private FSLeafQueue schedulable = null; - private Resource maxResource = Resources.createResource(10); +public class TestFSLeafQueue extends FairSchedulerTestBase { + private final static String ALLOC_FILE = new File(TEST_DIR, + TestFSLeafQueue.class.getName() + ".xml").getAbsolutePath(); + private Resource maxResource = Resources.createResource(1024 * 8); @Before public void setup() throws IOException { - FairScheduler scheduler = new FairScheduler(); - Configuration conf = createConfiguration(); - // All tests assume only one assignment per node update - conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false"); - ResourceManager resourceManager = new ResourceManager(); - resourceManager.init(conf); - ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start(); - scheduler.init(conf); - scheduler.start(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); - - String queueName = "root.queue1"; - scheduler.allocConf = mock(AllocationConfiguration.class); - when(scheduler.allocConf.getMaxResources(queueName)).thenReturn(maxResource); - when(scheduler.allocConf.getMinResources(queueName)).thenReturn(Resources.none()); + conf = createConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class, + ResourceScheduler.class); + } - schedulable = new FSLeafQueue(queueName, scheduler, null); + @After + public void teardown() { + if (resourceManager != null) { + resourceManager.stop(); + resourceManager = null; + } + conf = null; } @Test public void testUpdateDemand() { + conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false"); + resourceManager = new MockRM(conf); + resourceManager.start(); + scheduler = (FairScheduler) resourceManager.getResourceScheduler(); + scheduler.allocConf = mock(AllocationConfiguration.class); + + String queueName = "root.queue1"; + when(scheduler.allocConf.getMaxResources(queueName)).thenReturn(maxResource); + when(scheduler.allocConf.getMinResources(queueName)).thenReturn(Resources.none()); + FSLeafQueue schedulable = new FSLeafQueue(queueName, scheduler, null); + FSAppAttempt app = mock(FSAppAttempt.class); Mockito.when(app.getDemand()).thenReturn(maxResource); @@ -73,11 +89,137 @@ public class TestFSLeafQueue { assertTrue("Demand is greater than max allowed ", Resources.equals(schedulable.getDemand(), maxResource)); } - - private Configuration createConfiguration() { - Configuration conf = new YarnConfiguration(); - conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class, - ResourceScheduler.class); - return conf; + + @Test (timeout = 5000) + public void test() throws Exception { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println("<?xml version=\"1.0\"?>"); + out.println("<allocations>"); + out.println("<queue name=\"queueA\">"); + out.println("<minResources>2048mb,0vcores</minResources>"); + out.println("</queue>"); + out.println("<queue name=\"queueB\">"); + out.println("<minResources>2048mb,0vcores</minResources>"); + out.println("</queue>"); + out.println("</allocations>"); + out.close(); + + resourceManager = new MockRM(conf); + resourceManager.start(); + scheduler = (FairScheduler) resourceManager.getResourceScheduler(); + + // Add one big node (only care about aggregate capacity) + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1, + "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + scheduler.update(); + + // Queue A wants 3 * 1024. Node update gives this all to A + createSchedulingRequest(3 * 1024, "queueA", "user1"); + scheduler.update(); + NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1); + scheduler.handle(nodeEvent2); + + // Queue B arrives and wants 1 * 1024 + createSchedulingRequest(1 * 1024, "queueB", "user1"); + scheduler.update(); + Collection<FSLeafQueue> queues = scheduler.getQueueManager().getLeafQueues(); + assertEquals(3, queues.size()); + + // Queue A should be above min share, B below. + FSLeafQueue queueA = + scheduler.getQueueManager().getLeafQueue("queueA", false); + FSLeafQueue queueB = + scheduler.getQueueManager().getLeafQueue("queueB", false); + assertFalse(queueA.isStarvedForMinShare()); + assertTrue(queueB.isStarvedForMinShare()); + + // Node checks in again, should allocate for B + scheduler.handle(nodeEvent2); + // Now B should have min share ( = demand here) + assertFalse(queueB.isStarvedForMinShare()); + } + + @Test (timeout = 5000) + public void testIsStarvedForFairShare() throws Exception { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println("<?xml version=\"1.0\"?>"); + out.println("<allocations>"); + out.println("<queue name=\"queueA\">"); + out.println("<weight>.2</weight>"); + out.println("</queue>"); + out.println("<queue name=\"queueB\">"); + out.println("<weight>.8</weight>"); + out.println("<fairSharePreemptionThreshold>.4</fairSharePreemptionThreshold>"); + out.println("<queue name=\"queueB1\">"); + out.println("</queue>"); + out.println("<queue name=\"queueB2\">"); + out.println("<fairSharePreemptionThreshold>.6</fairSharePreemptionThreshold>"); + out.println("</queue>"); + out.println("</queue>"); + out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>"); + out.println("</allocations>"); + out.close(); + + resourceManager = new MockRM(conf); + resourceManager.start(); + scheduler = (FairScheduler) resourceManager.getResourceScheduler(); + + // Add one big node (only care about aggregate capacity) + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(10 * 1024, 10), 1, + "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + scheduler.update(); + + // Queue A wants 4 * 1024. Node update gives this all to A + createSchedulingRequest(1 * 1024, "queueA", "user1", 4); + scheduler.update(); + NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1); + for (int i = 0; i < 4; i ++) { + scheduler.handle(nodeEvent2); + } + + QueueManager queueMgr = scheduler.getQueueManager(); + FSLeafQueue queueA = queueMgr.getLeafQueue("queueA", false); + assertEquals(4 * 1024, queueA.getResourceUsage().getMemory()); + + // Both queue B1 and queue B2 want 3 * 1024 + createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 3); + createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 3); + scheduler.update(); + for (int i = 0; i < 4; i ++) { + scheduler.handle(nodeEvent2); + } + + FSLeafQueue queueB1 = queueMgr.getLeafQueue("queueB.queueB1", false); + FSLeafQueue queueB2 = queueMgr.getLeafQueue("queueB.queueB2", false); + assertEquals(2 * 1024, queueB1.getResourceUsage().getMemory()); + assertEquals(2 * 1024, queueB2.getResourceUsage().getMemory()); + + // For queue B1, the fairSharePreemptionThreshold is 0.4, and the fair share + // threshold is 1.6 * 1024 + assertFalse(queueB1.isStarvedForFairShare()); + + // For queue B2, the fairSharePreemptionThreshold is 0.6, and the fair share + // threshold is 2.4 * 1024 + assertTrue(queueB2.isStarvedForFairShare()); + + // Node checks in again + scheduler.handle(nodeEvent2); + scheduler.handle(nodeEvent2); + assertEquals(3 * 1024, queueB1.getResourceUsage().getMemory()); + assertEquals(3 * 1024, queueB2.getResourceUsage().getMemory()); + + // Both queue B1 and queue B2 usages go to 3 * 1024 + assertFalse(queueB1.isStarvedForFairShare()); + assertFalse(queueB2.isStarvedForFairShare()); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1dcaba9a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 6e0127d..05b1925 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -1061,9 +1061,11 @@ public class TestFairScheduler extends FairSchedulerTestBase { out.println(" </queue>"); out.println(" <fairSharePreemptionTimeout>100</fairSharePreemptionTimeout>"); out.println(" <minSharePreemptionTimeout>120</minSharePreemptionTimeout>"); + out.println(" <fairSharePreemptionThreshold>.5</fairSharePreemptionThreshold>"); out.println("</queue>"); out.println("<defaultFairSharePreemptionTimeout>300</defaultFairSharePreemptionTimeout>"); out.println("<defaultMinSharePreemptionTimeout>200</defaultMinSharePreemptionTimeout>"); + out.println("<defaultFairSharePreemptionThreshold>.6</defaultFairSharePreemptionThreshold>"); out.println("</allocations>"); out.close(); @@ -1080,125 +1082,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { assertEquals(100000, root.getFairSharePreemptionTimeout()); assertEquals(120000, root.getMinSharePreemptionTimeout()); - } - - @Test (timeout = 5000) - public void testIsStarvedForMinShare() throws Exception { - conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); - - PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); - out.println("<?xml version=\"1.0\"?>"); - out.println("<allocations>"); - out.println("<queue name=\"queueA\">"); - out.println("<minResources>2048mb,0vcores</minResources>"); - out.println("</queue>"); - out.println("<queue name=\"queueB\">"); - out.println("<minResources>2048mb,0vcores</minResources>"); - out.println("</queue>"); - out.println("</allocations>"); - out.close(); - - scheduler.init(conf); - scheduler.start(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); - - // Add one big node (only care about aggregate capacity) - RMNode node1 = - MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1, - "127.0.0.1"); - NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); - scheduler.handle(nodeEvent1); - - // Queue A wants 3 * 1024. Node update gives this all to A - createSchedulingRequest(3 * 1024, "queueA", "user1"); - scheduler.update(); - NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1); - scheduler.handle(nodeEvent2); - - // Queue B arrives and wants 1 * 1024 - createSchedulingRequest(1 * 1024, "queueB", "user1"); - scheduler.update(); - Collection<FSLeafQueue> queues = scheduler.getQueueManager().getLeafQueues(); - assertEquals(3, queues.size()); - - // Queue A should be above min share, B below. - for (FSLeafQueue p : queues) { - if (p.getName().equals("root.queueA")) { - assertEquals(false, scheduler.isStarvedForMinShare(p)); - } - else if (p.getName().equals("root.queueB")) { - assertEquals(true, scheduler.isStarvedForMinShare(p)); - } - } - - // Node checks in again, should allocate for B - scheduler.handle(nodeEvent2); - // Now B should have min share ( = demand here) - for (FSLeafQueue p : queues) { - if (p.getName().equals("root.queueB")) { - assertEquals(false, scheduler.isStarvedForMinShare(p)); - } - } - } - - @Test (timeout = 5000) - public void testIsStarvedForFairShare() throws Exception { - conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); - - PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); - out.println("<?xml version=\"1.0\"?>"); - out.println("<allocations>"); - out.println("<queue name=\"queueA\">"); - out.println("<weight>.25</weight>"); - out.println("</queue>"); - out.println("<queue name=\"queueB\">"); - out.println("<weight>.75</weight>"); - out.println("</queue>"); - out.println("</allocations>"); - out.close(); - - scheduler.init(conf); - scheduler.start(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); - - // Add one big node (only care about aggregate capacity) - RMNode node1 = - MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1, - "127.0.0.1"); - NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); - scheduler.handle(nodeEvent1); - - // Queue A wants 3 * 1024. Node update gives this all to A - createSchedulingRequest(3 * 1024, "queueA", "user1"); - scheduler.update(); - NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1); - scheduler.handle(nodeEvent2); - - // Queue B arrives and wants 1 * 1024 - createSchedulingRequest(1 * 1024, "queueB", "user1"); - scheduler.update(); - Collection<FSLeafQueue> queues = scheduler.getQueueManager().getLeafQueues(); - assertEquals(3, queues.size()); - - // Queue A should be above fair share, B below. - for (FSLeafQueue p : queues) { - if (p.getName().equals("root.queueA")) { - assertEquals(false, scheduler.isStarvedForFairShare(p)); - } - else if (p.getName().equals("root.queueB")) { - assertEquals(true, scheduler.isStarvedForFairShare(p)); - } - } - - // Node checks in again, should allocate for B - scheduler.handle(nodeEvent2); - // B should not be starved for fair share, since entire demand is - // satisfied. - for (FSLeafQueue p : queues) { - if (p.getName().equals("root.queueB")) { - assertEquals(false, scheduler.isStarvedForFairShare(p)); - } - } + assertEquals(0.5f, root.getFairSharePreemptionThreshold(), 0.01); } @Test (timeout = 5000) @@ -1385,7 +1269,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { out.println("<queue name=\"queueB\">"); out.println("<weight>2</weight>"); out.println("</queue>"); - out.print("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>"); + out.println("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>"); + out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>"); out.println("</allocations>"); out.close(); @@ -1468,8 +1353,9 @@ public class TestFairScheduler extends FairSchedulerTestBase { out.println("<weight>.25</weight>"); out.println("<minResources>1024mb,0vcores</minResources>"); out.println("</queue>"); - out.print("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>"); - out.print("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>"); + out.println("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>"); + out.println("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>"); + out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>"); out.println("</allocations>"); out.close(); @@ -1753,8 +1639,6 @@ public class TestFairScheduler extends FairSchedulerTestBase { @Test public void testBackwardsCompatiblePreemptionConfiguration() throws Exception { conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); - MockClock clock = new MockClock(); - scheduler.setClock(clock); PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println("<?xml version=\"1.0\"?>"); @@ -1842,6 +1726,32 @@ public class TestFairScheduler extends FairSchedulerTestBase { .getFairSharePreemptionTimeout()); } + @Test + public void testPreemptionVariablesForQueueCreatedRuntime() throws Exception { + conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true"); + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Set preemption variables for the root queue + FSParentQueue root = scheduler.getQueueManager().getRootQueue(); + root.setMinSharePreemptionTimeout(10000); + root.setFairSharePreemptionTimeout(15000); + root.setFairSharePreemptionThreshold(.6f); + + // User1 submits one application + ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1); + createApplicationWithAMResource(appAttemptId, "default", "user1", null); + + // The user1 queue should inherit the configurations from the root queue + FSLeafQueue userQueue = + scheduler.getQueueManager().getLeafQueue("user1", true); + assertEquals(1, userQueue.getRunnableAppSchedulables().size()); + assertEquals(10000, userQueue.getMinSharePreemptionTimeout()); + assertEquals(15000, userQueue.getFairSharePreemptionTimeout()); + assertEquals(.6f, userQueue.getFairSharePreemptionThreshold(), 0.001); + } + @Test (timeout = 5000) public void testMultipleContainersWaitingForReservation() throws IOException { scheduler.init(conf); http://git-wip-us.apache.org/repos/asf/hadoop/blob/1dcaba9a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm index bd28bff..df61422 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm @@ -277,6 +277,12 @@ Allocation file format threshold before it will try to preempt containers to take resources from other queues. If not set, the queue will inherit the value from its parent queue. + * fairSharePreemptionThreshold: the fair share preemption threshold for the + queue. If the queue waits fairSharePreemptionTimeout without receiving + fairSharePreemptionThreshold*fairShare resources, it is allowed to preempt + containers to take resources from other queues. If not set, the queue will + inherit the value from its parent queue. + * <<User elements>>, which represent settings governing the behavior of individual users. They can contain a single property: maxRunningApps, a limit on the number of running apps for a particular user. @@ -292,6 +298,10 @@ Allocation file format preemption timeout for the root queue; overridden by minSharePreemptionTimeout element in root queue. + * <<A defaultFairSharePreemptionThreshold element>>, which sets the fair share + preemption threshold for the root queue; overridden by fairSharePreemptionThreshold + element in root queue. + * <<A queueMaxAppsDefault element>>, which sets the default running app limit for queues; overriden by maxRunningApps element in each queue.