YARN-4390. Do surgical preemption based on reserved container in CapacityScheduler. Contributed by Wangda Tan
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bb62e059 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bb62e059 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bb62e059 Branch: refs/heads/trunk Commit: bb62e0592566b2fcae7136b30972aad2d3ac55b0 Parents: 35cf503 Author: Jian He <jia...@apache.org> Authored: Thu May 5 12:56:21 2016 -0700 Committer: Jian He <jia...@apache.org> Committed: Thu May 5 12:56:21 2016 -0700 ---------------------------------------------------------------------- .../monitor/SchedulingMonitor.java | 16 +- .../CapacitySchedulerPreemptionUtils.java | 35 +- .../capacity/FifoCandidatesSelector.java | 26 +- .../capacity/PreemptableResourceCalculator.java | 139 ++-- .../ProportionalCapacityPreemptionPolicy.java | 96 ++- .../ReservedContainerCandidatesSelector.java | 316 +++++++++ .../monitor/capacity/TempQueuePerPartition.java | 174 ++++- .../rmcontainer/RMContainer.java | 2 + .../rmcontainer/RMContainerImpl.java | 10 + .../scheduler/AbstractYarnScheduler.java | 2 +- .../scheduler/SchedulerApplicationAttempt.java | 4 + .../scheduler/SchedulerNode.java | 29 +- .../scheduler/capacity/CapacityScheduler.java | 10 +- .../CapacitySchedulerConfiguration.java | 24 +- .../scheduler/common/fica/FiCaSchedulerApp.java | 1 + .../common/fica/FiCaSchedulerNode.java | 7 +- .../scheduler/fair/FSAppAttempt.java | 1 + .../scheduler/fair/FairScheduler.java | 3 +- .../scheduler/fifo/FifoScheduler.java | 2 +- ...alCapacityPreemptionPolicyMockFramework.java | 689 +++++++++++++++++++ ...estProportionalCapacityPreemptionPolicy.java | 62 +- ...pacityPreemptionPolicyForNodePartitions.java | 687 +----------------- ...tyPreemptionPolicyForReservedContainers.java | 430 ++++++++++++ ...alCapacityPreemptionPolicyMockFramework.java | 247 +++++++ .../TestSchedulerApplicationAttempt.java | 3 +- .../CapacitySchedulerPreemptionTestBase.java | 149 ++++ .../capacity/TestCapacityScheduler.java | 2 +- .../TestCapacitySchedulerLazyPreemption.java | 638 +++++++++++++++++ .../TestCapacitySchedulerPreemption.java | 677 ------------------ ...TestCapacitySchedulerSurgicalPreemption.java | 246 +++++++ .../scheduler/fair/TestFairScheduler.java | 4 +- .../scheduler/fifo/TestFifoScheduler.java | 4 +- 32 files changed, 3197 insertions(+), 1538 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb62e059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java index 55ec858..03e180d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java @@ -21,6 +21,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; @@ -84,10 +85,17 @@ public class SchedulingMonitor extends AbstractService { @Override public void run() { while (!stopped && !Thread.currentThread().isInterrupted()) { - //invoke the preemption policy at a regular pace - //the policy will generate preemption or kill events - //managed by the dispatcher - invokePolicy(); + try { + //invoke the preemption policy at a regular pace + //the policy will generate preemption or kill events + //managed by the dispatcher + invokePolicy(); + } catch (YarnRuntimeException e) { + LOG.error("YarnRuntimeException raised while executing preemption" + + " checker, skip this run..., exception=", e); + } + + // Wait before next run try { Thread.sleep(monitorInterval); } catch (InterruptedException e) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb62e059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java index a71f108..42d8730 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java @@ -21,9 +21,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.util.resource.Resources; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -40,9 +42,9 @@ public class CapacitySchedulerPreemptionUtils { // Only add resToObtainByPartition when actuallyToBePreempted resource >= 0 if (Resources.greaterThan(context.getResourceCalculator(), - clusterResource, qT.actuallyToBePreempted, Resources.none())) { + clusterResource, qT.getActuallyToBePreempted(), Resources.none())) { resToObtainByPartition.put(qT.partition, - Resources.clone(qT.actuallyToBePreempted)); + Resources.clone(qT.getActuallyToBePreempted())); } } @@ -62,4 +64,33 @@ public class CapacitySchedulerPreemptionUtils { } return containers.contains(container); } + + public static void deductPreemptableResourcesBasedSelectedCandidates( + CapacitySchedulerPreemptionContext context, + Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates) { + for (Set<RMContainer> containers : selectedCandidates.values()) { + for (RMContainer c : containers) { + SchedulerNode schedulerNode = context.getScheduler().getSchedulerNode( + c.getAllocatedNode()); + if (null == schedulerNode) { + continue; + } + + String partition = schedulerNode.getPartition(); + String queue = c.getQueueName(); + TempQueuePerPartition tq = context.getQueueByPartition(queue, + partition); + + Resource res = c.getReservedResource(); + if (null == res) { + res = c.getAllocatedResource(); + } + + if (null != res) { + tq.deductActuallyToBePreempted(context.getResourceCalculator(), + tq.totalPartitionResource, res); + } + } + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb62e059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java index 499d0ff..a8c62fd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java @@ -36,7 +36,6 @@ import org.apache.hadoop.yarn.util.resource.Resources; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -55,7 +54,7 @@ public class FifoCandidatesSelector super(preemptionContext); preemptableAmountCalculator = new PreemptableResourceCalculator( - preemptionContext); + preemptionContext, false); } @Override @@ -66,8 +65,13 @@ public class FifoCandidatesSelector preemptableAmountCalculator.computeIdealAllocation(clusterResource, totalPreemptionAllowed); - Map<ApplicationAttemptId, Set<RMContainer>> preemptMap = - new HashMap<>(); + // Previous selectors (with higher priority) could have already + // selected containers. We need to deduct preemptable resources + // based on already selected candidates. + CapacitySchedulerPreemptionUtils + .deductPreemptableResourcesBasedSelectedCandidates(preemptionContext, + selectedCandidates); + List<RMContainer> skippedAMContainerlist = new ArrayList<>(); // Loop all leaf queues @@ -109,7 +113,7 @@ public class FifoCandidatesSelector continue; } boolean preempted = tryPreemptContainerAndDeductResToObtain( - resToObtainByPartition, c, clusterResource, preemptMap, + resToObtainByPartition, c, clusterResource, selectedCandidates, totalPreemptionAllowed); if (!preempted) { continue; @@ -132,7 +136,7 @@ public class FifoCandidatesSelector } preemptFrom(fc, clusterResource, resToObtainByPartition, - skippedAMContainerlist, skippedAMSize, preemptMap, + skippedAMContainerlist, skippedAMSize, selectedCandidates, totalPreemptionAllowed); } @@ -144,13 +148,13 @@ public class FifoCandidatesSelector leafQueue.getAbsoluteCapacity()), leafQueue.getMaxAMResourcePerQueuePercent()); - preemptAMContainers(clusterResource, preemptMap, skippedAMContainerlist, + preemptAMContainers(clusterResource, selectedCandidates, skippedAMContainerlist, resToObtainByPartition, skippedAMSize, maxAMCapacityForThisQueue, totalPreemptionAllowed); } } - return preemptMap; + return selectedCandidates; } /** @@ -236,9 +240,9 @@ public class FifoCandidatesSelector resourceToObtainByPartitions.remove(nodePartition); } if (LOG.isDebugEnabled()) { - LOG.debug("Marked container=" + rmContainer.getContainerId() - + " in partition=" + nodePartition - + " to be preemption candidates"); + LOG.debug(this.getClass().getName() + " Marked container=" + rmContainer + .getContainerId() + " from partition=" + nodePartition + " queue=" + + rmContainer.getQueueName() + " to be preemption candidates"); } // Add to preemptMap addToPreemptMap(preemptMap, attemptId, rmContainer); http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb62e059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java index 2217210..d1d2485 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java @@ -37,7 +37,7 @@ import java.util.Set; /** * Calculate how much resources need to be preempted for each queue, - * will be used by {@link FifoCandidatesSelector} + * will be used by {@link PreemptionCandidatesSelector} */ public class PreemptableResourceCalculator { private static final Log LOG = @@ -45,6 +45,7 @@ public class PreemptableResourceCalculator { private final CapacitySchedulerPreemptionContext context; private final ResourceCalculator rc; + private boolean isReservedPreemptionCandidatesSelector; static class TQComparator implements Comparator<TempQueuePerPartition> { private ResourceCalculator rc; @@ -71,18 +72,31 @@ public class PreemptableResourceCalculator { // capacity and therefore considered last for resources. private double getIdealPctOfGuaranteed(TempQueuePerPartition q) { double pctOver = Integer.MAX_VALUE; - if (q != null && Resources.greaterThan( - rc, clusterRes, q.guaranteed, Resources.none())) { - pctOver = - Resources.divide(rc, clusterRes, q.idealAssigned, q.guaranteed); + if (q != null && Resources.greaterThan(rc, clusterRes, + q.getGuaranteed(), + Resources.none())) { + pctOver = Resources.divide(rc, clusterRes, q.idealAssigned, + q.getGuaranteed()); } return (pctOver); } } - public PreemptableResourceCalculator(CapacitySchedulerPreemptionContext preemptionContext) { + /** + * PreemptableResourceCalculator constructor + * + * @param preemptionContext + * @param isReservedPreemptionCandidatesSelector this will be set by + * different implementation of candidate selectors, please refer to + * TempQueuePerPartition#offer for details. + */ + public PreemptableResourceCalculator( + CapacitySchedulerPreemptionContext preemptionContext, + boolean isReservedPreemptionCandidatesSelector) { context = preemptionContext; rc = preemptionContext.getResourceCalculator(); + this.isReservedPreemptionCandidatesSelector = + isReservedPreemptionCandidatesSelector; } /** @@ -101,11 +115,11 @@ public class PreemptableResourceCalculator { } } else { for (TempQueuePerPartition q : queues) { - Resources.addTo(activeCap, q.guaranteed); + Resources.addTo(activeCap, q.getGuaranteed()); } for (TempQueuePerPartition q : queues) { q.normalizedGuarantee = Resources.divide(rc, clusterResource, - q.guaranteed, activeCap); + q.getGuaranteed(), activeCap); } } } @@ -114,7 +128,8 @@ public class PreemptableResourceCalculator { // return the list of all queues that have the same idealAssigned // percentage of guaranteed. protected Collection<TempQueuePerPartition> getMostUnderservedQueues( - PriorityQueue<TempQueuePerPartition> orderedByNeed, TQComparator tqComparator) { + PriorityQueue<TempQueuePerPartition> orderedByNeed, + TQComparator tqComparator) { ArrayList<TempQueuePerPartition> underserved = new ArrayList<>(); while (!orderedByNeed.isEmpty()) { TempQueuePerPartition q1 = orderedByNeed.remove(); @@ -155,15 +170,19 @@ public class PreemptableResourceCalculator { tqComparator); for (Iterator<TempQueuePerPartition> i = qAlloc.iterator(); i.hasNext();) { TempQueuePerPartition q = i.next(); - if (Resources.greaterThan(rc, tot_guarant, q.current, q.guaranteed)) { - q.idealAssigned = Resources.add(q.guaranteed, q.untouchableExtra); + Resource used = q.getUsed(); + + if (Resources.greaterThan(rc, tot_guarant, used, + q.getGuaranteed())) { + q.idealAssigned = Resources.add( + q.getGuaranteed(), q.untouchableExtra); } else { - q.idealAssigned = Resources.clone(q.current); + q.idealAssigned = Resources.clone(used); } Resources.subtractFrom(unassigned, q.idealAssigned); - // If idealAssigned < (current + pending), q needs more resources, so + // If idealAssigned < (allocated + used + pending), q needs more resources, so // add it to the list of underserved queues, ordered by need. - Resource curPlusPend = Resources.add(q.current, q.pending); + Resource curPlusPend = Resources.add(q.getUsed(), q.pending); if (Resources.lessThan(rc, tot_guarant, q.idealAssigned, curPlusPend)) { orderedByNeed.add(q); } @@ -190,7 +209,8 @@ public class PreemptableResourceCalculator { TempQueuePerPartition sub = i.next(); Resource wQavail = Resources.multiplyAndNormalizeUp(rc, unassigned, sub.normalizedGuarantee, Resource.newInstance(1, 1)); - Resource wQidle = sub.offer(wQavail, rc, tot_guarant); + Resource wQidle = sub.offer(wQavail, rc, tot_guarant, + isReservedPreemptionCandidatesSelector); Resource wQdone = Resources.subtract(wQavail, wQidle); if (Resources.greaterThan(rc, tot_guarant, @@ -234,8 +254,8 @@ public class PreemptableResourceCalculator { Set<TempQueuePerPartition> zeroGuarQueues = new HashSet<>(); for (TempQueuePerPartition q : qAlloc) { - if (Resources - .greaterThan(rc, tot_guarant, q.guaranteed, Resources.none())) { + if (Resources.greaterThan(rc, tot_guarant, + q.getGuaranteed(), Resources.none())) { nonZeroGuarQueues.add(q); } else { zeroGuarQueues.add(q); @@ -258,19 +278,22 @@ public class PreemptableResourceCalculator { // how much preemption is required overall Resource totPreemptionNeeded = Resource.newInstance(0, 0); for (TempQueuePerPartition t:queues) { - if (Resources.greaterThan(rc, tot_guarant, t.current, t.idealAssigned)) { - Resources.addTo(totPreemptionNeeded, - Resources.subtract(t.current, t.idealAssigned)); + if (Resources.greaterThan(rc, tot_guarant, + t.getUsed(), t.idealAssigned)) { + Resources.addTo(totPreemptionNeeded, Resources + .subtract(t.getUsed(), t.idealAssigned)); } } - // if we need to preempt more than is allowed, compute a factor (0<f<1) - // that is used to scale down how much we ask back from each queue + /** + * if we need to preempt more than is allowed, compute a factor (0<f<1) + * that is used to scale down how much we ask back from each queue + */ float scalingFactor = 1.0F; - if (Resources.greaterThan(rc, tot_guarant, - totPreemptionNeeded, totalPreemptionAllowed)) { - scalingFactor = Resources.divide(rc, tot_guarant, - totalPreemptionAllowed, totPreemptionNeeded); + if (Resources.greaterThan(rc, + tot_guarant, totPreemptionNeeded, totalPreemptionAllowed)) { + scalingFactor = Resources.divide(rc, tot_guarant, totalPreemptionAllowed, + totPreemptionNeeded); } // assign to each queue the amount of actual preemption based on local @@ -278,12 +301,6 @@ public class PreemptableResourceCalculator { for (TempQueuePerPartition t : queues) { t.assignPreemption(scalingFactor, rc, tot_guarant); } - if (LOG.isDebugEnabled()) { - for (TempQueuePerPartition t : queues) { - LOG.debug(t); - } - } - } /** @@ -329,12 +346,31 @@ public class PreemptableResourceCalculator { for (TempQueuePerPartition qT : context.getQueuePartitions(queueName)) { // we act only if we are violating balance by more than // maxIgnoredOverCapacity - if (Resources.greaterThan(rc, clusterResource, qT.current, - Resources.multiply(qT.guaranteed, 1.0 + context.getMaxIgnoreOverCapacity()))) { - // we introduce a dampening factor naturalTerminationFactor that - // accounts for natural termination of containers - Resource resToObtain = Resources.multiply(qT.toBePreempted, - context.getNaturalTerminationFactor()); + if (Resources.greaterThan(rc, clusterResource, + qT.getUsed(), Resources + .multiply(qT.getGuaranteed(), + 1.0 + context.getMaxIgnoreOverCapacity()))) { + /* + * We introduce a dampening factor naturalTerminationFactor that + * accounts for natural termination of containers. + * + * This is added to control pace of preemption, let's say: + * If preemption policy calculated a queue *should be* preempted 20 GB + * And the nature_termination_factor set to 0.1. As a result, preemption + * policy will select 20 GB * 0.1 = 2GB containers to be preempted. + * + * However, it doesn't work for YARN-4390: + * For example, if a queue needs to be preempted 20GB for *one single* + * large container, preempt 10% of such resource isn't useful. + * So to make it simple, only apply nature_termination_factor when + * selector is not reservedPreemptionCandidatesSelector. + */ + Resource resToObtain = qT.toBePreempted; + if (!isReservedPreemptionCandidatesSelector) { + resToObtain = Resources.multiply(qT.toBePreempted, + context.getNaturalTerminationFactor()); + } + // Only add resToObtain when it >= 0 if (Resources.greaterThan(rc, clusterResource, resToObtain, Resources.none())) { @@ -343,22 +379,39 @@ public class PreemptableResourceCalculator { + " resource-to-obtain=" + resToObtain); } } - qT.actuallyToBePreempted = Resources.clone(resToObtain); + qT.setActuallyToBePreempted(Resources.clone(resToObtain)); } else { - qT.actuallyToBePreempted = Resources.none(); + qT.setActuallyToBePreempted(Resources.none()); + } + + if (LOG.isDebugEnabled()) { + LOG.debug(qT); } } } } + private void updatePreemptableExtras(TempQueuePerPartition cur) { + if (cur.children == null || cur.children.isEmpty()) { + cur.updatePreemptableExtras(rc); + } else { + for (TempQueuePerPartition child : cur.children) { + updatePreemptableExtras(child); + } + cur.updatePreemptableExtras(rc); + } + } + public void computeIdealAllocation(Resource clusterResource, Resource totalPreemptionAllowed) { for (String partition : context.getAllPartitions()) { - TempQueuePerPartition tRoot = - context.getQueueByPartition(CapacitySchedulerConfiguration.ROOT, partition); + TempQueuePerPartition tRoot = context.getQueueByPartition( + CapacitySchedulerConfiguration.ROOT, partition); + updatePreemptableExtras(tRoot); + // compute the ideal distribution of resources among queues // updates cloned queues state accordingly - tRoot.idealAssigned = tRoot.guaranteed; + tRoot.idealAssigned = tRoot.getGuaranteed(); recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb62e059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 9b70e53..36383502 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -34,7 +34,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResour import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptableQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; @@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import java.io.IOException; +import java.text.MessageFormat; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -173,6 +174,15 @@ public class ProportionalCapacityPreemptionPolicy rc = scheduler.getResourceCalculator(); nlm = scheduler.getRMContext().getNodeLabelManager(); + // Do we need to specially consider reserved containers? + boolean selectCandidatesForResevedContainers = csConfig.getBoolean( + CapacitySchedulerConfiguration.PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS, + CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS); + if (selectCandidatesForResevedContainers) { + candidatesSelectionPolicies.add( + new ReservedContainerCandidatesSelector(this)); + } + // initialize candidates preemption selection policies candidatesSelectionPolicies.add( new FifoCandidatesSelector(this)); @@ -185,9 +195,15 @@ public class ProportionalCapacityPreemptionPolicy @Override public synchronized void editSchedule() { + long startTs = clock.getTime(); + CSQueue root = scheduler.getRootQueue(); Resource clusterResources = Resources.clone(scheduler.getClusterResource()); containerBasedPreemptOrKill(root, clusterResources); + + if (LOG.isDebugEnabled()) { + LOG.debug("Total time used=" + (clock.getTime() - startTs) + " ms."); + } } @SuppressWarnings("unchecked") @@ -298,8 +314,8 @@ public class ProportionalCapacityPreemptionPolicy queueToPartitions.clear(); for (String partitionToLookAt : allPartitions) { - cloneQueues(root, - nlm.getResourceByLabel(partitionToLookAt, clusterResources), + cloneQueues(root, Resources + .clone(nlm.getResourceByLabel(partitionToLookAt, clusterResources)), partitionToLookAt); } } @@ -314,9 +330,15 @@ public class ProportionalCapacityPreemptionPolicy // based on ideal allocation select containers to be preemptionCandidates from each // queue and each application - Map<ApplicationAttemptId, Set<RMContainer>> toPreempt = null; + Map<ApplicationAttemptId, Set<RMContainer>> toPreempt = + new HashMap<>(); for (PreemptionCandidatesSelector selector : candidatesSelectionPolicies) { + if (LOG.isDebugEnabled()) { + LOG.debug(MessageFormat + .format("Trying to use {0} to select preemption candidates", + selector.getClass().getName())); + } toPreempt = selector.selectCandidates(toPreempt, clusterResources, totalPreemptionAllowed); } @@ -382,14 +404,15 @@ public class ProportionalCapacityPreemptionPolicy float absMaxCap = qc.getAbsoluteMaximumCapacity(partitionToLookAt); boolean preemptionDisabled = curQueue.getPreemptionDisabled(); - Resource current = curQueue.getQueueResourceUsage().getUsed( - partitionToLookAt); - Resource guaranteed = Resources.multiply(partitionResource, absCap); - Resource maxCapacity = Resources.multiply(partitionResource, absMaxCap); + Resource current = Resources.clone( + curQueue.getQueueResourceUsage().getUsed(partitionToLookAt)); Resource killable = Resources.none(); + + Resource reserved = Resources.clone( + curQueue.getQueueResourceUsage().getReserved(partitionToLookAt)); if (null != preemptableQueues.get(queueName)) { - killable = preemptableQueues.get(queueName) - .getKillableResource(partitionToLookAt); + killable = Resources.clone(preemptableQueues.get(queueName) + .getKillableResource(partitionToLookAt)); } // when partition is a non-exclusive partition, the actual maxCapacity @@ -397,53 +420,24 @@ public class ProportionalCapacityPreemptionPolicy try { if (!scheduler.getRMContext().getNodeLabelManager() .isExclusiveNodeLabel(partitionToLookAt)) { - maxCapacity = - Resources.max(rc, partitionResource, maxCapacity, current); + absMaxCap = 1.0f; } } catch (IOException e) { // This may cause by partition removed when running capacity monitor, // just ignore the error, this will be corrected when doing next check. } - Resource extra = Resource.newInstance(0, 0); - if (Resources.greaterThan(rc, partitionResource, current, guaranteed)) { - extra = Resources.subtract(current, guaranteed); - } - if (curQueue instanceof LeafQueue) { - LeafQueue l = (LeafQueue) curQueue; - Resource pending = - l.getTotalPendingResourcesConsideringUserLimit( - partitionResource, partitionToLookAt); - ret = new TempQueuePerPartition(queueName, current, pending, guaranteed, - maxCapacity, preemptionDisabled, partitionToLookAt, killable); - if (preemptionDisabled) { - ret.untouchableExtra = extra; - } else { - ret.preemptableExtra = extra; - } - ret.setLeafQueue(l); - } else { - Resource pending = Resource.newInstance(0, 0); - ret = - new TempQueuePerPartition(curQueue.getQueueName(), current, pending, - guaranteed, maxCapacity, false, partitionToLookAt, killable); - Resource childrensPreemptable = Resource.newInstance(0, 0); + ret = new TempQueuePerPartition(queueName, current, preemptionDisabled, + partitionToLookAt, killable, absCap, absMaxCap, partitionResource, + reserved, curQueue); + + if (curQueue instanceof ParentQueue) { + // Recursively add children for (CSQueue c : curQueue.getChildQueues()) { - TempQueuePerPartition subq = - cloneQueues(c, partitionResource, partitionToLookAt); - Resources.addTo(childrensPreemptable, subq.preemptableExtra); + TempQueuePerPartition subq = cloneQueues(c, partitionResource, + partitionToLookAt); ret.addChild(subq); } - // untouchableExtra = max(extra - childrenPreemptable, 0) - if (Resources.greaterThanOrEqual( - rc, partitionResource, childrensPreemptable, extra)) { - ret.untouchableExtra = Resource.newInstance(0, 0); - } else { - ret.untouchableExtra = - Resources.subtract(extra, childrensPreemptable); - } - ret.preemptableExtra = Resources.min( - rc, partitionResource, childrensPreemptable, extra); } } addTempQueuePartition(ret); @@ -486,7 +480,8 @@ public class ProportionalCapacityPreemptionPolicy String partition) { Map<String, TempQueuePerPartition> partitionToQueues; if (null == (partitionToQueues = queueToPartitions.get(queueName))) { - return null; + throw new YarnRuntimeException("This shouldn't happen, cannot find " + + "TempQueuePerPartition for queueName=" + queueName); } return partitionToQueues.get(partition); } @@ -497,7 +492,8 @@ public class ProportionalCapacityPreemptionPolicy @Override public Collection<TempQueuePerPartition> getQueuePartitions(String queueName) { if (!queueToPartitions.containsKey(queueName)) { - return null; + throw new YarnRuntimeException("This shouldn't happen, cannot find " + + "TempQueuePerPartition collection for queueName=" + queueName); } return queueToPartitions.get(queueName).values(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb62e059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ReservedContainerCandidatesSelector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ReservedContainerCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ReservedContainerCandidatesSelector.java new file mode 100644 index 0000000..de23d0a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ReservedContainerCandidatesSelector.java @@ -0,0 +1,316 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class ReservedContainerCandidatesSelector + extends PreemptionCandidatesSelector { + private static final Log LOG = + LogFactory.getLog(ReservedContainerCandidatesSelector.class); + + private PreemptableResourceCalculator preemptableAmountCalculator; + + /** + * A temporary data structure to remember what to preempt on a node + */ + private static class NodeForPreemption { + private float preemptionCost; + private FiCaSchedulerNode schedulerNode; + private List<RMContainer> selectedContainers; + + public NodeForPreemption(float preemptionCost, + FiCaSchedulerNode schedulerNode, List<RMContainer> selectedContainers) { + this.preemptionCost = preemptionCost; + this.schedulerNode = schedulerNode; + this.selectedContainers = selectedContainers; + } + } + + ReservedContainerCandidatesSelector( + CapacitySchedulerPreemptionContext preemptionContext) { + super(preemptionContext); + preemptableAmountCalculator = new PreemptableResourceCalculator( + preemptionContext, true); + } + + @Override + public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates( + Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates, + Resource clusterResource, + Resource totalPreemptedResourceAllowed) { + // Calculate how much resources we need to preempt + preemptableAmountCalculator.computeIdealAllocation(clusterResource, + totalPreemptedResourceAllowed); + + // Get queue to preemptable resource by partition + Map<String, Map<String, Resource>> queueToPreemptableResourceByPartition = + new HashMap<>(); + for (String leafQueue : preemptionContext.getLeafQueueNames()) { + queueToPreemptableResourceByPartition.put(leafQueue, + CapacitySchedulerPreemptionUtils + .getResToObtainByPartitionForLeafQueue(preemptionContext, + leafQueue, clusterResource)); + } + + // Get list of nodes for preemption, ordered by preemption cost + List<NodeForPreemption> nodesForPreemption = getNodesForPreemption( + clusterResource, queueToPreemptableResourceByPartition, + selectedCandidates, totalPreemptedResourceAllowed); + + for (NodeForPreemption nfp : nodesForPreemption) { + RMContainer reservedContainer = nfp.schedulerNode.getReservedContainer(); + if (null == reservedContainer) { + continue; + } + + NodeForPreemption preemptionResult = getPreemptionCandidatesOnNode( + nfp.schedulerNode, clusterResource, + queueToPreemptableResourceByPartition, selectedCandidates, + totalPreemptedResourceAllowed, false); + if (null != preemptionResult) { + for (RMContainer c : preemptionResult.selectedContainers) { + ApplicationAttemptId appId = c.getApplicationAttemptId(); + Set<RMContainer> containers = selectedCandidates.get(appId); + if (null == containers) { + containers = new HashSet<>(); + selectedCandidates.put(appId, containers); + } + + containers.add(c); + if (LOG.isDebugEnabled()) { + LOG.debug(this.getClass().getName() + " Marked container=" + c + .getContainerId() + " from queue=" + c.getQueueName() + + " to be preemption candidates"); + } + } + } + } + + return selectedCandidates; + } + + private Resource getPreemptableResource(String queueName, + String partitionName, + Map<String, Map<String, Resource>> queueToPreemptableResourceByPartition) { + Map<String, Resource> partitionToPreemptable = + queueToPreemptableResourceByPartition.get(queueName); + if (null == partitionToPreemptable) { + return null; + } + + Resource preemptable = partitionToPreemptable.get(partitionName); + return preemptable; + } + + private boolean tryToPreemptFromQueue(Resource cluster, String queueName, + String partitionName, + Map<String, Map<String, Resource>> queueToPreemptableResourceByPartition, + Resource required, Resource totalPreemptionAllowed, boolean readOnly) { + Resource preemptable = getPreemptableResource(queueName, partitionName, + queueToPreemptableResourceByPartition); + if (null == preemptable) { + return false; + } + + if (!Resources.fitsIn(rc, cluster, required, preemptable)) { + return false; + } + + if (!Resources.fitsIn(rc, cluster, required, totalPreemptionAllowed)) { + return false; + } + + if (!readOnly) { + Resources.subtractFrom(preemptable, required); + Resources.subtractFrom(totalPreemptionAllowed, required); + } + return true; + } + + + + /** + * Try to check if we can preempt resources for reserved container in given node + * @param node + * @param cluster + * @param queueToPreemptableResourceByPartition it's a map of + * <queueName, <partition, preemptable-resource>> + * @param readOnly do we want to modify preemptable resource after we selected + * candidates + * @return NodeForPreemption if it's possible to preempt containers on the node + * to satisfy reserved resource + */ + private NodeForPreemption getPreemptionCandidatesOnNode( + FiCaSchedulerNode node, Resource cluster, + Map<String, Map<String, Resource>> queueToPreemptableResourceByPartition, + Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates, + Resource totalPreemptionAllowed, boolean readOnly) { + RMContainer reservedContainer = node.getReservedContainer(); + Resource available = Resources.clone(node.getUnallocatedResource()); + Resource totalSelected = Resources.createResource(0); + List<RMContainer> sortedRunningContainers = + node.getCopiedListOfRunningContainers(); + List<RMContainer> selectedContainers = new ArrayList<>(); + Map<ContainerId, RMContainer> killableContainers = + node.getKillableContainers(); + + // Sort running container by launch time, we preferred to preempt recent + // launched preempt container + Collections.sort(sortedRunningContainers, new Comparator<RMContainer>() { + @Override public int compare(RMContainer o1, RMContainer o2) { + return -1 * o1.getContainerId().compareTo(o2.getContainerId()); + } + }); + + // First check: can we preempt containers to allocate the + // reservedContainer? + boolean canAllocateReservedContainer = false; + + // At least, we can get available + killable resources from this node + Resource cur = Resources.add(available, node.getTotalKillableResources()); + String partition = node.getPartition(); + + // Avoid preempt any container if required <= available + killable + if (Resources.fitsIn(rc, cluster, reservedContainer.getReservedResource(), + cur)) { + return null; + } + + // Extra cost of am container preemption + float amPreemptionCost = 0f; + + for (RMContainer c : sortedRunningContainers) { + String containerQueueName = c.getQueueName(); + + // Skip container if it is already marked killable + if (killableContainers.containsKey(c.getContainerId())) { + continue; + } + + // An alternative approach is add a "penalty cost" if AM container is + // selected. Here for safety, avoid preempt AM container in any cases + if (c.isAMContainer()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skip selecting AM container on host=" + node.getNodeID() + + " AM container=" + c.getContainerId()); + } + continue; + } + + // Can we preempt container c? + // Check if we have quota to preempt this container + boolean canPreempt = tryToPreemptFromQueue(cluster, containerQueueName, + partition, queueToPreemptableResourceByPartition, + c.getAllocatedResource(), totalPreemptionAllowed, readOnly); + + // If we can, add to selected container, and change resource accordingly. + if (canPreempt) { + if (!CapacitySchedulerPreemptionUtils.isContainerAlreadySelected(c, + selectedCandidates)) { + if (!readOnly) { + selectedContainers.add(c); + } + Resources.addTo(totalSelected, c.getAllocatedResource()); + } + Resources.addTo(cur, c.getAllocatedResource()); + if (Resources.fitsIn(rc, cluster, + reservedContainer.getReservedResource(), cur)) { + canAllocateReservedContainer = true; + break; + } + } + } + + if (!canAllocateReservedContainer) { + if (!readOnly) { + // Revert queue preemption quotas + for (RMContainer c : selectedContainers) { + Resource res = getPreemptableResource(c.getQueueName(), partition, + queueToPreemptableResourceByPartition); + if (null == res) { + // This shouldn't happen in normal cases, one possible cause is + // container moved to different queue while executing preemption logic. + // Ignore such failures. + continue; + } + Resources.addTo(res, c.getAllocatedResource()); + } + } + return null; + } + + float ratio = Resources.ratio(rc, totalSelected, + reservedContainer.getReservedResource()); + + // Compute preemption score + NodeForPreemption nfp = new NodeForPreemption(ratio + amPreemptionCost, + node, selectedContainers); + + return nfp; + } + + private List<NodeForPreemption> getNodesForPreemption(Resource cluster, + Map<String, Map<String, Resource>> queueToPreemptableResourceByPartition, + Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates, + Resource totalPreemptionAllowed) { + List<NodeForPreemption> nfps = new ArrayList<>(); + + // get nodes have reserved container + for (FiCaSchedulerNode node : preemptionContext.getScheduler() + .getAllNodes()) { + if (node.getReservedContainer() != null) { + NodeForPreemption nfp = getPreemptionCandidatesOnNode(node, cluster, + queueToPreemptableResourceByPartition, selectedCandidates, + totalPreemptionAllowed, true); + if (null != nfp) { + // Null means we cannot preempt containers on the node to satisfy + // reserved container + nfps.add(nfp); + } + } + } + + // Return sorted node-for-preemptions (by cost) + Collections.sort(nfps, new Comparator<NodeForPreemption>() { + @Override + public int compare(NodeForPreemption o1, NodeForPreemption o2) { + return Float.compare(o1.preemptionCost, o2.preemptionCost); + } + }); + + return nfps; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb62e059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java index 8b01a73..116cd22 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -32,20 +33,22 @@ import java.util.ArrayList; public class TempQueuePerPartition { // Following fields are copied from scheduler final String queueName; - final Resource current; - final Resource pending; - final Resource guaranteed; - final Resource maxCapacity; - final Resource killable; final String partition; + final Resource pending; + + private final Resource current; + private final Resource killable; + private final Resource reserved; + private final float absCapacity; + private final float absMaxCapacity; + final Resource totalPartitionResource; // Following fields are setted and used by candidate selection policies Resource idealAssigned; Resource toBePreempted; Resource untouchableExtra; Resource preemptableExtra; - // For logging purpose - Resource actuallyToBePreempted; + private Resource actuallyToBePreempted; double normalizedGuarantee; @@ -53,14 +56,22 @@ public class TempQueuePerPartition { LeafQueue leafQueue; boolean preemptionDisabled; - TempQueuePerPartition(String queueName, Resource current, Resource pending, - Resource guaranteed, Resource maxCapacity, boolean preemptionDisabled, - String partition, Resource killable) { + TempQueuePerPartition(String queueName, Resource current, + boolean preemptionDisabled, String partition, Resource killable, + float absCapacity, float absMaxCapacity, Resource totalPartitionResource, + Resource reserved, CSQueue queue) { this.queueName = queueName; this.current = current; - this.pending = pending; - this.guaranteed = guaranteed; - this.maxCapacity = maxCapacity; + + if (queue instanceof LeafQueue) { + LeafQueue l = (LeafQueue) queue; + pending = l.getTotalPendingResourcesConsideringUserLimit( + totalPartitionResource, partition); + leafQueue = l; + } else { + pending = Resources.createResource(0); + } + this.idealAssigned = Resource.newInstance(0, 0); this.actuallyToBePreempted = Resource.newInstance(0, 0); this.toBePreempted = Resource.newInstance(0, 0); @@ -71,6 +82,10 @@ public class TempQueuePerPartition { this.preemptionDisabled = preemptionDisabled; this.partition = partition; this.killable = killable; + this.absCapacity = absCapacity; + this.absMaxCapacity = absMaxCapacity; + this.totalPartitionResource = totalPartitionResource; + this.reserved = reserved; } public void setLeafQueue(LeafQueue l) { @@ -92,31 +107,101 @@ public class TempQueuePerPartition { return children; } + public Resource getUsed() { + return current; + } + + public Resource getUsedDeductReservd() { + return Resources.subtract(current, reserved); + } + // This function "accepts" all the resources it can (pending) and return // the unused ones Resource offer(Resource avail, ResourceCalculator rc, - Resource clusterResource) { + Resource clusterResource, boolean considersReservedResource) { Resource absMaxCapIdealAssignedDelta = Resources.componentwiseMax( - Resources.subtract(maxCapacity, idealAssigned), + Resources.subtract(getMax(), idealAssigned), Resource.newInstance(0, 0)); // remain = avail - min(avail, (max - assigned), (current + pending - assigned)) - Resource accepted = - Resources.min(rc, clusterResource, - absMaxCapIdealAssignedDelta, - Resources.min(rc, clusterResource, avail, Resources.subtract( - Resources.add(current, pending), idealAssigned))); + Resource accepted = Resources.min(rc, clusterResource, + absMaxCapIdealAssignedDelta, Resources.min(rc, clusterResource, avail, + Resources + /* + * When we're using FifoPreemptionSelector + * (considerReservedResource = false). + * + * We should deduct reserved resource to avoid excessive preemption: + * + * For example, if an under-utilized queue has used = reserved = 20. + * Preemption policy will try to preempt 20 containers + * (which is not satisfied) from different hosts. + * + * In FifoPreemptionSelector, there's no guarantee that preempted + * resource can be used by pending request, so policy will preempt + * resources repeatly. + */ + .subtract(Resources.add( + (considersReservedResource ? getUsed() : + getUsedDeductReservd()), + pending), idealAssigned))); Resource remain = Resources.subtract(avail, accepted); Resources.addTo(idealAssigned, accepted); return remain; } + public Resource getGuaranteed() { + return Resources.multiply(totalPartitionResource, absCapacity); + } + + public Resource getMax() { + return Resources.multiply(totalPartitionResource, absMaxCapacity); + } + + public void updatePreemptableExtras(ResourceCalculator rc) { + // Reset untouchableExtra and preemptableExtra + untouchableExtra = Resources.none(); + preemptableExtra = Resources.none(); + + Resource extra = Resources.subtract(getUsed(), + getGuaranteed()); + if (Resources.lessThan(rc, totalPartitionResource, extra, + Resources.none())) { + extra = Resources.none(); + } + + if (null == children || children.isEmpty()) { + // If it is a leaf queue + if (preemptionDisabled) { + untouchableExtra = extra; + } else { + preemptableExtra = extra; + } + } else { + // If it is a parent queue + Resource childrensPreemptable = Resource.newInstance(0, 0); + for (TempQueuePerPartition child : children) { + Resources.addTo(childrensPreemptable, child.preemptableExtra); + } + // untouchableExtra = max(extra - childrenPreemptable, 0) + if (Resources.greaterThanOrEqual(rc, totalPartitionResource, + childrensPreemptable, extra)) { + untouchableExtra = Resource.newInstance(0, 0); + } else { + untouchableExtra = Resources.subtract(extra, childrensPreemptable); + } + preemptableExtra = Resources.min(rc, totalPartitionResource, + childrensPreemptable, extra); + } + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append(" NAME: " + queueName) .append(" CUR: ").append(current) .append(" PEN: ").append(pending) - .append(" GAR: ").append(guaranteed) + .append(" RESERVED: ").append(reserved) + .append(" GAR: ").append(getGuaranteed()) .append(" NORM: ").append(normalizedGuarantee) .append(" IDEAL_ASSIGNED: ").append(idealAssigned) .append(" IDEAL_PREEMPT: ").append(toBePreempted) @@ -130,14 +215,45 @@ public class TempQueuePerPartition { public void assignPreemption(float scalingFactor, ResourceCalculator rc, Resource clusterResource) { - if (Resources.greaterThan(rc, clusterResource, - Resources.subtract(current, killable), idealAssigned)) { - toBePreempted = Resources.multiply(Resources - .subtract(Resources.subtract(current, killable), idealAssigned), - scalingFactor); + Resource usedDeductKillable = Resources.subtract( + getUsed(), killable); + Resource totalResource = Resources.add(getUsed(), pending); + + // The minimum resource that we need to keep for a queue is: + // max(idealAssigned, min(used + pending, guaranteed)). + // + // Doing this because when we calculate ideal allocation doesn't consider + // reserved resource, ideal-allocation calculated could be less than + // guaranteed and total. We should avoid preempt from a queue if it is already + // <= its guaranteed resource. + Resource minimumQueueResource = Resources.max(rc, clusterResource, + Resources.min(rc, clusterResource, totalResource, getGuaranteed()), + idealAssigned); + + if (Resources.greaterThan(rc, clusterResource, usedDeductKillable, + minimumQueueResource)) { + toBePreempted = Resources.multiply( + Resources.subtract(usedDeductKillable, minimumQueueResource), scalingFactor); } else { - toBePreempted = Resource.newInstance(0, 0); + toBePreempted = Resources.none(); + } + } + + public Resource getActuallyToBePreempted() { + return actuallyToBePreempted; + } + + public void setActuallyToBePreempted(Resource res) { + this.actuallyToBePreempted = res; + } + + public void deductActuallyToBePreempted(ResourceCalculator rc, + Resource cluster, Resource toBeDeduct) { + if (Resources.greaterThan(rc, cluster, actuallyToBePreempted, toBeDeduct)) { + Resources.subtractFrom(actuallyToBePreempted, toBeDeduct); } + actuallyToBePreempted = Resources.max(rc, cluster, actuallyToBePreempted, + Resources.none()); } void appendLogString(StringBuilder sb) { @@ -146,8 +262,8 @@ public class TempQueuePerPartition { .append(current.getVirtualCores()).append(", ") .append(pending.getMemory()).append(", ") .append(pending.getVirtualCores()).append(", ") - .append(guaranteed.getMemory()).append(", ") - .append(guaranteed.getVirtualCores()).append(", ") + .append(getGuaranteed().getMemory()).append(", ") + .append(getGuaranteed().getVirtualCores()).append(", ") .append(idealAssigned.getMemory()).append(", ") .append(idealAssigned.getVirtualCores()).append(", ") .append(toBePreempted.getMemory()).append(", ") http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb62e059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java index dfe0886..f37923f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java @@ -89,4 +89,6 @@ public interface RMContainer extends EventHandler<RMContainerEvent> { boolean hasIncreaseReservation(); void cancelIncreaseReservation(); + + String getQueueName(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb62e059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index 919f12d..973a7db 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -181,6 +181,7 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> { // Only used for container resource increase and decrease. This is the // resource to rollback to should container resource increase token expires. private Resource lastConfirmedResource; + private volatile String queueName; public RMContainerImpl(Container container, ApplicationAttemptId appAttemptId, NodeId nodeId, String user, @@ -817,4 +818,13 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> { public void cancelIncreaseReservation() { hasIncreaseReservation = false; } + + public void setQueueName(String queueName) { + this.queueName = queueName; + } + + @Override + public String getQueueName() { + return queueName; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb62e059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 0f79a57..8f03de2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -659,7 +659,7 @@ public abstract class AbstractYarnScheduler nodeTracker.removeNode(nm.getNodeID()); // update resource to node - node.setTotalResource(newResource); + node.updateTotalResource(newResource); nodeTracker.addNode((N) node); } else { http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb62e059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 2542009..7308e22 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -404,6 +404,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { node.getNodeID(), appSchedulingInfo.getUser(), rmContext); attemptResourceUsage.incReserved(node.getPartition(), container.getResource()); + ((RMContainerImpl)rmContainer).setQueueName(this.getQueueName()); // Reset the re-reservation count resetReReservations(priority); @@ -748,14 +749,17 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { public synchronized void move(Queue newQueue) { QueueMetrics oldMetrics = queue.getMetrics(); QueueMetrics newMetrics = newQueue.getMetrics(); + String newQueueName = newQueue.getQueueName(); String user = getUser(); for (RMContainer liveContainer : liveContainers.values()) { Resource resource = liveContainer.getContainer().getResource(); + ((RMContainerImpl)liveContainer).setQueueName(newQueueName); oldMetrics.releaseResources(user, 1, resource); newMetrics.allocateResources(user, 1, resource, false); } for (Map<NodeId, RMContainer> map : reservedContainers.values()) { for (RMContainer reservedContainer : map.values()) { + ((RMContainerImpl)reservedContainer).setQueueName(newQueueName); Resource resource = reservedContainer.getReservedResource(); oldMetrics.unreserveResource(user, resource); newMetrics.reserveResource(user, resource); http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb62e059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index e219494..1f57e07 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -98,12 +98,12 @@ public abstract class SchedulerNode { * Set total resources on the node. * @param resource Total resources on the node. */ - public synchronized void setTotalResource(Resource resource){ + public synchronized void updateTotalResource(Resource resource){ this.totalResource = resource; this.unallocatedResource = Resources.subtract(totalResource, - this.allocatedResource); + this.allocatedResource); } - + /** * Get the ID of the node which contains both its hostname and port. * @return The ID of the node. @@ -185,7 +185,7 @@ public abstract class SchedulerNode { + getUnallocatedResource() + " available after allocation"); } } - + /** * Increase the resources allocated to a container. * @param containerId Identifier of the container to change. @@ -195,7 +195,7 @@ public abstract class SchedulerNode { Resource deltaResource) { changeContainerResource(containerId, deltaResource, true); } - + /** * Decrease the resources allocated to a container. * @param containerId Identifier of the container to change. @@ -242,10 +242,11 @@ public abstract class SchedulerNode { } /** - * Update the resources of the node when allocating a new container. - * @param container Container to allocate. + * Update the resources of the node when releasing a container. + * @param container Container to release. */ - protected synchronized void updateResource(Container container) { + protected synchronized void updateResourceForReleasedContainer( + Container container) { addUnallocatedResource(container.getResource()); --numContainers; } @@ -262,7 +263,7 @@ public abstract class SchedulerNode { // Remove the containers from the nodemanger if (null != launchedContainers.remove(container.getId())) { - updateResource(container); + updateResourceForReleasedContainer(container); } if (LOG.isDebugEnabled()) { @@ -338,7 +339,7 @@ public abstract class SchedulerNode { * Get the running containers in the node. * @return List of running containers in the node. */ - public synchronized List<RMContainer> getRunningContainers() { + public synchronized List<RMContainer> getCopiedListOfRunningContainers() { return new ArrayList<RMContainer>(launchedContainers.values()); } @@ -355,7 +356,7 @@ public abstract class SchedulerNode { * @param reservedContainer Reserved container in the node. */ protected synchronized void - setReservedContainer(RMContainer reservedContainer) { + setReservedContainer(RMContainer reservedContainer) { this.reservedContainer = reservedContainer; } @@ -385,7 +386,7 @@ public abstract class SchedulerNode { public void updateLabels(Set<String> labels) { this.labels = labels; } - + /** * Get partition of which the node belongs to, if node-labels of this node is * empty or null, it belongs to NO_LABEL partition. And since we only support @@ -394,7 +395,7 @@ public abstract class SchedulerNode { */ public String getPartition() { if (this.labels == null || this.labels.isEmpty()) { - return RMNodeLabelsManager.NO_LABEL; + return RMNodeLabelsManager.NO_LABEL; } else { return this.labels.iterator().next(); } @@ -432,4 +433,4 @@ public abstract class SchedulerNode { public ResourceUtilization getNodeUtilization() { return this.nodeUtilization; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb62e059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 34a9829..920e983 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -50,7 +50,6 @@ import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -1160,7 +1159,7 @@ public class CapacityScheduler extends String oldPartition = node.getPartition(); // Update resources of these containers - for (RMContainer rmContainer : node.getRunningContainers()) { + for (RMContainer rmContainer : node.getCopiedListOfRunningContainers()) { FiCaSchedulerApp application = getApplicationAttempt(rmContainer.getApplicationAttemptId()); if (null != application) { @@ -1509,7 +1508,7 @@ public class CapacityScheduler extends } // Remove running containers - List<RMContainer> runningContainers = node.getRunningContainers(); + List<RMContainer> runningContainers = node.getCopiedListOfRunningContainers(); for (RMContainer container : runningContainers) { super.completedContainer(container, SchedulerUtils.createAbnormalContainerStatus( @@ -1633,6 +1632,11 @@ public class CapacityScheduler extends public FiCaSchedulerNode getNode(NodeId nodeId) { return nodeTracker.getNode(nodeId); } + + @Lock(Lock.NoLock.class) + public List<FiCaSchedulerNode> getAllNodes() { + return nodeTracker.getAllNodes(); + } @Override @Lock(Lock.NoLock.class) http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb62e059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 88e39de..d5bca66 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -1021,21 +1021,24 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur return getBoolean(LAZY_PREEMPTION_ENALBED, DEFAULT_LAZY_PREEMPTION_ENABLED); } + private static final String PREEMPTION_CONFIG_PREFIX = + "yarn.resourcemanager.monitor.capacity.preemption."; + /** If true, run the policy but do not affect the cluster with preemption and * kill events. */ public static final String PREEMPTION_OBSERVE_ONLY = - "yarn.resourcemanager.monitor.capacity.preemption.observe_only"; + PREEMPTION_CONFIG_PREFIX + "observe_only"; public static final boolean DEFAULT_PREEMPTION_OBSERVE_ONLY = false; /** Time in milliseconds between invocations of this policy */ public static final String PREEMPTION_MONITORING_INTERVAL = - "yarn.resourcemanager.monitor.capacity.preemption.monitoring_interval"; + PREEMPTION_CONFIG_PREFIX + "monitoring_interval"; public static final long DEFAULT_PREEMPTION_MONITORING_INTERVAL = 3000L; /** Time in milliseconds between requesting a preemption from an application * and killing the container. */ public static final String PREEMPTION_WAIT_TIME_BEFORE_KILL = - "yarn.resourcemanager.monitor.capacity.preemption.max_wait_before_kill"; + PREEMPTION_CONFIG_PREFIX + "max_wait_before_kill"; public static final long DEFAULT_PREEMPTION_WAIT_TIME_BEFORE_KILL = 15000L; /** Maximum percentage of resources preemptionCandidates in a single round. By @@ -1043,7 +1046,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur * reclaimed from the cluster. After computing the total desired preemption, * the policy scales it back within this limit. */ public static final String TOTAL_PREEMPTION_PER_ROUND = - "yarn.resourcemanager.monitor.capacity.preemption.total_preemption_per_round"; + PREEMPTION_CONFIG_PREFIX + "total_preemption_per_round"; public static final float DEFAULT_TOTAL_PREEMPTION_PER_ROUND = 0.1f; /** Maximum amount of resources above the target capacity ignored for @@ -1052,7 +1055,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur * High values would slow the time to capacity and (absent natural * completions) it might prevent convergence to guaranteed capacity. */ public static final String PREEMPTION_MAX_IGNORED_OVER_CAPACITY = - "yarn.resourcemanager.monitor.capacity.preemption.max_ignored_over_capacity"; + PREEMPTION_CONFIG_PREFIX + "max_ignored_over_capacity"; public static final float DEFAULT_PREEMPTION_MAX_IGNORED_OVER_CAPACITY = 0.1f; /** * Given a computed preemption target, account for containers naturally @@ -1062,7 +1065,16 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur * will reclaim almost 95% of resources within 5 * {@link * #PREEMPTION_WAIT_TIME_BEFORE_KILL}, even absent natural termination. */ public static final String PREEMPTION_NATURAL_TERMINATION_FACTOR = - "yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor"; + PREEMPTION_CONFIG_PREFIX + "natural_termination_factor"; public static final float DEFAULT_PREEMPTION_NATURAL_TERMINATION_FACTOR = 0.2f; + + /** + * When calculating which containers to be preempted, we will try to preempt + * containers for reserved containers first. By default is false. + */ + public static final String PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS = + PREEMPTION_CONFIG_PREFIX + "select_based_on_reserved_containers"; + public static final boolean DEFAULT_PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS = + false; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb62e059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index f764cac..8009580 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -199,6 +199,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { new RMContainerImpl(container, this.getApplicationAttemptId(), node.getNodeID(), appSchedulingInfo.getUser(), this.rmContext, request.getNodeLabelExpression()); + ((RMContainerImpl)rmContainer).setQueueName(this.getQueueName()); updateAMContainerDiagnostics(AMState.ASSIGNED, null); http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb62e059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java index 1d0e78a..f90a53c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicat import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.util.resource.Resources; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -142,9 +143,9 @@ public class FiCaSchedulerNode extends SchedulerNode { } @Override - protected synchronized void updateResource( + protected synchronized void updateResourceForReleasedContainer( Container container) { - super.updateResource(container); + super.updateResourceForReleasedContainer(container); if (killableContainers.containsKey(container.getId())) { Resources.subtractFrom(totalKillableResources, container.getResource()); killableContainers.remove(container.getId()); @@ -170,6 +171,6 @@ public class FiCaSchedulerNode extends SchedulerNode { } public synchronized Map<ContainerId, RMContainer> getKillableContainers() { - return killableContainers; + return Collections.unmodifiableMap(killableContainers); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb62e059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 95144a1..482751f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -378,6 +378,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt RMContainer rmContainer = new RMContainerImpl(container, getApplicationAttemptId(), node.getNodeID(), appSchedulingInfo.getUser(), rmContext); + ((RMContainerImpl)rmContainer).setQueueName(this.getQueueName()); // Add it to allContainers list. newlyAllocatedContainers.add(rmContainer); http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb62e059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 6a14c4c..c59ba12 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -904,7 +904,8 @@ public class FairScheduler extends } // Remove running containers - List<RMContainer> runningContainers = node.getRunningContainers(); + List<RMContainer> runningContainers = + node.getCopiedListOfRunningContainers(); for (RMContainer container : runningContainers) { super.completedContainer(container, SchedulerUtils.createAbnormalContainerStatus( http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb62e059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index fba4c13..3e6225f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -944,7 +944,7 @@ public class FifoScheduler extends return; } // Kill running containers - for(RMContainer container : node.getRunningContainers()) { + for(RMContainer container : node.getCopiedListOfRunningContainers()) { super.completedContainer(container, SchedulerUtils.createAbnormalContainerStatus( container.getContainerId(), --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org