http://git-wip-us.apache.org/repos/asf/hadoop/blob/60e4116b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 9b499c8..7e668b4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -17,26 +17,13 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.PriorityQueue; -import java.util.Set; -import java.util.TreeSet; - +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -50,7 +37,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptableQueue; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.util.Clock; @@ -58,8 +44,16 @@ import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableSet; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; /** * This class implement a {@link SchedulingEditPolicy} that is designed to be @@ -80,79 +74,59 @@ import com.google.common.collect.ImmutableSet; * this policy will trigger forced termination of containers (again by generating * {@link ContainerPreemptEvent}). */ -public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolicy { - +public class ProportionalCapacityPreemptionPolicy + implements SchedulingEditPolicy, CapacitySchedulerPreemptionContext { private static final Log LOG = LogFactory.getLog(ProportionalCapacityPreemptionPolicy.class); - /** If true, run the policy but do not affect the cluster with preemption and - * kill events. */ - public static final String OBSERVE_ONLY = - "yarn.resourcemanager.monitor.capacity.preemption.observe_only"; - /** Time in milliseconds between invocations of this policy */ - public static final String MONITORING_INTERVAL = - "yarn.resourcemanager.monitor.capacity.preemption.monitoring_interval"; - /** Time in milliseconds between requesting a preemption from an application - * and killing the container. */ - public static final String WAIT_TIME_BEFORE_KILL = - "yarn.resourcemanager.monitor.capacity.preemption.max_wait_before_kill"; - /** Maximum percentage of resources preempted in a single round. By - * controlling this value one can throttle the pace at which containers are - * reclaimed from the cluster. After computing the total desired preemption, - * the policy scales it back within this limit. */ - public static final String TOTAL_PREEMPTION_PER_ROUND = - "yarn.resourcemanager.monitor.capacity.preemption.total_preemption_per_round"; - /** Maximum amount of resources above the target capacity ignored for - * preemption. This defines a deadzone around the target capacity that helps - * prevent thrashing and oscillations around the computed target balance. - * High values would slow the time to capacity and (absent natural - * completions) it might prevent convergence to guaranteed capacity. */ - public static final String MAX_IGNORED_OVER_CAPACITY = - "yarn.resourcemanager.monitor.capacity.preemption.max_ignored_over_capacity"; - /** - * Given a computed preemption target, account for containers naturally - * expiring and preempt only this percentage of the delta. This determines - * the rate of geometric convergence into the deadzone ({@link - * #MAX_IGNORED_OVER_CAPACITY}). For example, a termination factor of 0.5 - * will reclaim almost 95% of resources within 5 * {@link - * #WAIT_TIME_BEFORE_KILL}, even absent natural termination. */ - public static final String NATURAL_TERMINATION_FACTOR = - "yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor"; - - private RMContext rmContext; - private final Clock clock; + + // Configurable fields private double maxIgnoredOverCapacity; private long maxWaitTime; - private CapacityScheduler scheduler; private long monitoringInterval; - private final Map<RMContainer, Long> preempted = new HashMap<>(); - - private ResourceCalculator rc; private float percentageClusterPreemptionAllowed; private double naturalTerminationFactor; private boolean observeOnly; + private boolean lazyPreempionEnabled; + + // Pointer to other RM components + private RMContext rmContext; + private ResourceCalculator rc; + private CapacityScheduler scheduler; + private RMNodeLabelsManager nlm; + + // Internal properties to make decisions of what to preempt + private final Map<RMContainer,Long> preemptionCandidates = + new HashMap<>(); private Map<String, Map<String, TempQueuePerPartition>> queueToPartitions = new HashMap<>(); - private RMNodeLabelsManager nlm; + private List<PreemptionCandidatesSelector> + candidatesSelectionPolicies = new ArrayList<>(); + private Set<String> allPartitions; + private Set<String> leafQueueNames; // Preemptable Entities, synced from scheduler at every run - private Map<String, PreemptableQueue> preemptableEntities = null; + private Map<String, PreemptableQueue> preemptableQueues; private Set<ContainerId> killableContainers; + @SuppressWarnings("unchecked") public ProportionalCapacityPreemptionPolicy() { clock = SystemClock.getInstance(); + allPartitions = Collections.EMPTY_SET; + leafQueueNames = Collections.EMPTY_SET; + preemptableQueues = Collections.EMPTY_MAP; } - public ProportionalCapacityPreemptionPolicy(Configuration config, - RMContext context, CapacityScheduler scheduler) { - this(config, context, scheduler, SystemClock.getInstance()); - } - - public ProportionalCapacityPreemptionPolicy(Configuration config, - RMContext context, CapacityScheduler scheduler, Clock clock) { - init(config, context, scheduler); + @SuppressWarnings("unchecked") + @VisibleForTesting + public ProportionalCapacityPreemptionPolicy(RMContext context, + CapacityScheduler scheduler, Clock clock) { + init(context.getYarnConfiguration(), context, scheduler); this.clock = clock; + allPartitions = Collections.EMPTY_SET; + leafQueueNames = Collections.EMPTY_SET; + preemptableQueues = Collections.EMPTY_MAP; } public void init(Configuration config, RMContext context, @@ -166,19 +140,45 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic } rmContext = context; scheduler = (CapacityScheduler) sched; - maxIgnoredOverCapacity = config.getDouble(MAX_IGNORED_OVER_CAPACITY, 0.1); - naturalTerminationFactor = - config.getDouble(NATURAL_TERMINATION_FACTOR, 0.2); - maxWaitTime = config.getLong(WAIT_TIME_BEFORE_KILL, 15000); - monitoringInterval = config.getLong(MONITORING_INTERVAL, 3000); - percentageClusterPreemptionAllowed = - config.getFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 0.1); - observeOnly = config.getBoolean(OBSERVE_ONLY, false); + CapacitySchedulerConfiguration csConfig = scheduler.getConfiguration(); + + maxIgnoredOverCapacity = csConfig.getDouble( + CapacitySchedulerConfiguration.PREEMPTION_MAX_IGNORED_OVER_CAPACITY, + CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_MAX_IGNORED_OVER_CAPACITY); + + naturalTerminationFactor = csConfig.getDouble( + CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR, + CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_NATURAL_TERMINATION_FACTOR); + + maxWaitTime = csConfig.getLong( + CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL, + CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_WAIT_TIME_BEFORE_KILL); + + monitoringInterval = csConfig.getLong( + CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL, + CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_MONITORING_INTERVAL); + + percentageClusterPreemptionAllowed = csConfig.getFloat( + CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND, + CapacitySchedulerConfiguration.DEFAULT_TOTAL_PREEMPTION_PER_ROUND); + + observeOnly = csConfig.getBoolean( + CapacitySchedulerConfiguration.PREEMPTION_OBSERVE_ONLY, + CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_OBSERVE_ONLY); + + lazyPreempionEnabled = csConfig.getBoolean( + CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED, + CapacitySchedulerConfiguration.DEFAULT_LAZY_PREEMPTION_ENABLED); + rc = scheduler.getResourceCalculator(); nlm = scheduler.getRMContext().getNodeLabelManager(); + + // initialize candidates preemption selection policies + candidatesSelectionPolicies.add( + new FifoCandidatesSelector(this)); } - @VisibleForTesting + @Override public ResourceCalculator getResourceCalculator() { return rc; } @@ -191,42 +191,37 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic } @SuppressWarnings("unchecked") - private void cleanupStaledKillableContainers(Resource cluster, - Set<String> leafQueueNames) { - for (String q : leafQueueNames) { - for (TempQueuePerPartition tq : getQueuePartitions(q)) { - // When queue's used - killable <= guaranteed and, killable > 0, we need - // to check if any of killable containers needs to be reverted - if (Resources.lessThanOrEqual(rc, cluster, - Resources.subtract(tq.current, tq.killable), tq.idealAssigned) - && Resources.greaterThan(rc, cluster, tq.killable, Resources.none())) { - // How many killable resources need to be reverted - // need-to-revert = already-marked-killable - (current - ideal) - Resource toBeRevertedFromKillable = Resources.subtract(tq.killable, - Resources.subtract(tq.current, tq.idealAssigned)); - - Resource alreadyReverted = Resources.createResource(0); - - for (RMContainer c : preemptableEntities.get(q).getKillableContainers( - tq.partition).values()) { - if (Resources.greaterThanOrEqual(rc, cluster, alreadyReverted, - toBeRevertedFromKillable)) { - break; - } - - if (Resources.greaterThan(rc, cluster, - Resources.add(alreadyReverted, c.getAllocatedResource()), - toBeRevertedFromKillable)) { - continue; - } else { - // This container need to be marked to unkillable - Resources.addTo(alreadyReverted, c.getAllocatedResource()); - rmContext.getDispatcher().getEventHandler().handle( - new ContainerPreemptEvent(c.getApplicationAttemptId(), c, - SchedulerEventType.MARK_CONTAINER_FOR_NONKILLABLE)); - } + private void preemptOrkillSelectedContainerAfterWait( + Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates) { + // preempt (or kill) the selected containers + for (Map.Entry<ApplicationAttemptId, Set<RMContainer>> e : selectedCandidates + .entrySet()) { + ApplicationAttemptId appAttemptId = e.getKey(); + if (LOG.isDebugEnabled()) { + LOG.debug("Send to scheduler: in app=" + appAttemptId + + " #containers-to-be-preemptionCandidates=" + e.getValue().size()); + } + for (RMContainer container : e.getValue()) { + // if we tried to preempt this for more than maxWaitTime + if (preemptionCandidates.get(container) != null + && preemptionCandidates.get(container) + maxWaitTime < clock + .getTime()) { + // kill it + rmContext.getDispatcher().getEventHandler().handle( + new ContainerPreemptEvent(appAttemptId, container, + SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE)); + preemptionCandidates.remove(container); + } else { + if (preemptionCandidates.get(container) != null) { + // We already updated the information to scheduler earlier, we need + // not have to raise another event. + continue; } - + //otherwise just send preemption events + rmContext.getDispatcher().getEventHandler().handle( + new ContainerPreemptEvent(appAttemptId, container, + SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION)); + preemptionCandidates.put(container, clock.getTime()); } } } @@ -234,11 +229,11 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic private void syncKillableContainersFromScheduler() { // sync preemptable entities from scheduler - preemptableEntities = - scheduler.getPreemptionManager().getShallowCopyOfPreemptableEntities(); + preemptableQueues = + scheduler.getPreemptionManager().getShallowCopyOfPreemptableQueues(); killableContainers = new HashSet<>(); - for (Map.Entry<String, PreemptableQueue> entry : preemptableEntities + for (Map.Entry<String, PreemptableQueue> entry : preemptableQueues .entrySet()) { PreemptableQueue entity = entry.getValue(); for (Map<ContainerId, RMContainer> map : entity.getKillableContainers() @@ -247,9 +242,34 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic } } } + + private void cleanupStaledPreemptionCandidates() { + // Keep the preemptionCandidates list clean + for (Iterator<RMContainer> i = preemptionCandidates.keySet().iterator(); + i.hasNext(); ) { + RMContainer id = i.next(); + // garbage collect containers that are irrelevant for preemption + if (preemptionCandidates.get(id) + 2 * maxWaitTime < clock.getTime()) { + i.remove(); + } + } + } + + private Set<String> getLeafQueueNames(TempQueuePerPartition q) { + if (q.children == null || q.children.isEmpty()) { + return ImmutableSet.of(q.queueName); + } + + Set<String> leafQueueNames = new HashSet<>(); + for (TempQueuePerPartition child : q.children) { + leafQueueNames.addAll(getLeafQueueNames(child)); + } + + return leafQueueNames; + } /** - * This method selects and tracks containers to be preempted. If a container + * This method selects and tracks containers to be preemptionCandidates. If a container * is in the target list for more than maxWaitTime it is killed. * * @param root the root of the CapacityScheduler queue hierarchy @@ -258,13 +278,17 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic @SuppressWarnings("unchecked") private void containerBasedPreemptOrKill(CSQueue root, Resource clusterResources) { + // Sync killable containers from scheduler when lazy preemption enabled + if (lazyPreempionEnabled) { + syncKillableContainersFromScheduler(); + } + // All partitions to look at - Set<String> allPartitions = new HashSet<>(); - allPartitions.addAll(scheduler.getRMContext() + Set<String> partitions = new HashSet<>(); + partitions.addAll(scheduler.getRMContext() .getNodeLabelManager().getClusterNodeLabelNames()); - allPartitions.add(RMNodeLabelsManager.NO_LABEL); - - syncKillableContainersFromScheduler(); + partitions.add(RMNodeLabelsManager.NO_LABEL); + this.allPartitions = ImmutableSet.copyOf(partitions); // extract a summary of the queues from scheduler synchronized (scheduler) { @@ -277,30 +301,22 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic } } + this.leafQueueNames = ImmutableSet.copyOf(getLeafQueueNames( + getQueueByPartition(CapacitySchedulerConfiguration.ROOT, + RMNodeLabelsManager.NO_LABEL))); + // compute total preemption allowed Resource totalPreemptionAllowed = Resources.multiply(clusterResources, percentageClusterPreemptionAllowed); - Set<String> leafQueueNames = null; - for (String partition : allPartitions) { - TempQueuePerPartition tRoot = - getQueueByPartition(CapacitySchedulerConfiguration.ROOT, partition); - // compute the ideal distribution of resources among queues - // updates cloned queues state accordingly - tRoot.idealAssigned = tRoot.guaranteed; - - leafQueueNames = - recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed); - } - - // remove containers from killable list when we want to preempt less resources - // from queue. - cleanupStaledKillableContainers(clusterResources, leafQueueNames); - - // based on ideal allocation select containers to be preempted from each + // based on ideal allocation select containers to be preemptionCandidates from each // queue and each application - Map<ApplicationAttemptId,Set<RMContainer>> toPreempt = - getContainersToPreempt(leafQueueNames, clusterResources); + Map<ApplicationAttemptId, Set<RMContainer>> toPreempt = null; + for (PreemptionCandidatesSelector selector : + candidatesSelectionPolicies) { + toPreempt = selector.selectCandidates(toPreempt, + clusterResources, totalPreemptionAllowed); + } if (LOG.isDebugEnabled()) { logToCSV(new ArrayList<>(leafQueueNames)); @@ -311,582 +327,19 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic return; } - // preempt (or kill) the selected containers - for (Map.Entry<ApplicationAttemptId,Set<RMContainer>> e - : toPreempt.entrySet()) { - ApplicationAttemptId appAttemptId = e.getKey(); - if (LOG.isDebugEnabled()) { - LOG.debug("Send to scheduler: in app=" + appAttemptId - + " #containers-to-be-preempted=" + e.getValue().size()); - } - for (RMContainer container : e.getValue()) { - // if we tried to preempt this for more than maxWaitTime - if (preempted.get(container) != null && - preempted.get(container) + maxWaitTime < clock.getTime()) { - // mark container killable - rmContext.getDispatcher().getEventHandler().handle( - new ContainerPreemptEvent(appAttemptId, container, - SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE)); - preempted.remove(container); - } else { - if (preempted.get(container) != null) { - // We already updated the information to scheduler earlier, we need - // not have to raise another event. - continue; - } - //otherwise just send preemption events - rmContext.getDispatcher().getEventHandler().handle( - new ContainerPreemptEvent(appAttemptId, container, - SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION)); - preempted.put(container, clock.getTime()); - } - } - } - - // Keep the preempted list clean - for (Iterator<RMContainer> i = preempted.keySet().iterator(); i.hasNext();){ - RMContainer id = i.next(); - // garbage collect containers that are irrelevant for preemption - if (preempted.get(id) + 2 * maxWaitTime < clock.getTime()) { - i.remove(); - } - } - } - - /** - * This method recursively computes the ideal assignment of resources to each - * level of the hierarchy. This ensures that leafs that are over-capacity but - * with parents within capacity will not be preempted. Preemptions are allowed - * within each subtree according to local over/under capacity. - * - * @param root the root of the cloned queue hierachy - * @param totalPreemptionAllowed maximum amount of preemption allowed - * @return a list of leaf queues updated with preemption targets - */ - private Set<String> recursivelyComputeIdealAssignment( - TempQueuePerPartition root, Resource totalPreemptionAllowed) { - Set<String> leafQueueNames = new HashSet<>(); - if (root.getChildren() != null && - root.getChildren().size() > 0) { - // compute ideal distribution at this level - computeIdealResourceDistribution(rc, root.getChildren(), - totalPreemptionAllowed, root.idealAssigned); - // compute recursively for lower levels and build list of leafs - for(TempQueuePerPartition t : root.getChildren()) { - leafQueueNames.addAll(recursivelyComputeIdealAssignment(t, - totalPreemptionAllowed)); - } - } else { - // we are in a leaf nothing to do, just return yourself - return ImmutableSet.of(root.queueName); - } - return leafQueueNames; - } + // TODO: need consider revert killable containers when no more demandings. + // Since we could have several selectors to make decisions concurrently. + // So computed ideal-allocation varies between different selectors. + // + // We may need to "score" killable containers and revert the most preferred + // containers. The bottom line is, we shouldn't preempt a queue which is already + // below its guaranteed resource. - /** - * This method computes (for a single level in the tree, passed as a {@code - * List<TempQueue>}) the ideal assignment of resources. This is done - * recursively to allocate capacity fairly across all queues with pending - * demands. It terminates when no resources are left to assign, or when all - * demand is satisfied. - * - * @param rc resource calculator - * @param queues a list of cloned queues to be assigned capacity to (this is - * an out param) - * @param totalPreemptionAllowed total amount of preemption we allow - * @param tot_guarant the amount of capacity assigned to this pool of queues - */ - private void computeIdealResourceDistribution(ResourceCalculator rc, - List<TempQueuePerPartition> queues, Resource totalPreemptionAllowed, - Resource tot_guarant) { - - // qAlloc tracks currently active queues (will decrease progressively as - // demand is met) - List<TempQueuePerPartition> qAlloc = new ArrayList<>(queues); - // unassigned tracks how much resources are still to assign, initialized - // with the total capacity for this set of queues - Resource unassigned = Resources.clone(tot_guarant); - - // group queues based on whether they have non-zero guaranteed capacity - Set<TempQueuePerPartition> nonZeroGuarQueues = new HashSet<>(); - Set<TempQueuePerPartition> zeroGuarQueues = new HashSet<>(); - - for (TempQueuePerPartition q : qAlloc) { - if (Resources - .greaterThan(rc, tot_guarant, q.guaranteed, Resources.none())) { - nonZeroGuarQueues.add(q); - } else { - zeroGuarQueues.add(q); - } - } - - // first compute the allocation as a fixpoint based on guaranteed capacity - computeFixpointAllocation(rc, tot_guarant, nonZeroGuarQueues, unassigned, - false); - - // if any capacity is left unassigned, distributed among zero-guarantee - // queues uniformly (i.e., not based on guaranteed capacity, as this is zero) - if (!zeroGuarQueues.isEmpty() - && Resources.greaterThan(rc, tot_guarant, unassigned, Resources.none())) { - computeFixpointAllocation(rc, tot_guarant, zeroGuarQueues, unassigned, - true); - } - - // based on ideal assignment computed above and current assignment we derive - // how much preemption is required overall - Resource totPreemptionNeeded = Resource.newInstance(0, 0); - for (TempQueuePerPartition t:queues) { - if (Resources.greaterThan(rc, tot_guarant, t.current, t.idealAssigned)) { - Resources.addTo(totPreemptionNeeded, - Resources.subtract(t.current, t.idealAssigned)); - } - } - - // if we need to preempt more than is allowed, compute a factor (0<f<1) - // that is used to scale down how much we ask back from each queue - float scalingFactor = 1.0F; - if (Resources.greaterThan(rc, tot_guarant, - totPreemptionNeeded, totalPreemptionAllowed)) { - scalingFactor = Resources.divide(rc, tot_guarant, - totalPreemptionAllowed, totPreemptionNeeded); - } - - // assign to each queue the amount of actual preemption based on local - // information of ideal preemption and scaling factor - for (TempQueuePerPartition t : queues) { - t.assignPreemption(scalingFactor, rc, tot_guarant); - } - if (LOG.isDebugEnabled()) { - long time = clock.getTime(); - for (TempQueuePerPartition t : queues) { - LOG.debug(time + ": " + t); - } - } - - } - - /** - * Given a set of queues compute the fix-point distribution of unassigned - * resources among them. As pending request of a queue are exhausted, the - * queue is removed from the set and remaining capacity redistributed among - * remaining queues. The distribution is weighted based on guaranteed - * capacity, unless asked to ignoreGuarantee, in which case resources are - * distributed uniformly. - */ - private void computeFixpointAllocation(ResourceCalculator rc, - Resource tot_guarant, Collection<TempQueuePerPartition> qAlloc, - Resource unassigned, boolean ignoreGuarantee) { - // Prior to assigning the unused resources, process each queue as follows: - // If current > guaranteed, idealAssigned = guaranteed + untouchable extra - // Else idealAssigned = current; - // Subtract idealAssigned resources from unassigned. - // If the queue has all of its needs met (that is, if - // idealAssigned >= current + pending), remove it from consideration. - // Sort queues from most under-guaranteed to most over-guaranteed. - TQComparator tqComparator = new TQComparator(rc, tot_guarant); - PriorityQueue<TempQueuePerPartition> orderedByNeed = new PriorityQueue<>(10, - tqComparator); - for (Iterator<TempQueuePerPartition> i = qAlloc.iterator(); i.hasNext();) { - TempQueuePerPartition q = i.next(); - if (Resources.greaterThan(rc, tot_guarant, q.current, q.guaranteed)) { - q.idealAssigned = Resources.add(q.guaranteed, q.untouchableExtra); - } else { - q.idealAssigned = Resources.clone(q.current); - } - Resources.subtractFrom(unassigned, q.idealAssigned); - // If idealAssigned < (current + pending), q needs more resources, so - // add it to the list of underserved queues, ordered by need. - Resource curPlusPend = Resources.add(q.current, q.pending); - if (Resources.lessThan(rc, tot_guarant, q.idealAssigned, curPlusPend)) { - orderedByNeed.add(q); - } - } - - //assign all cluster resources until no more demand, or no resources are left - while (!orderedByNeed.isEmpty() - && Resources.greaterThan(rc,tot_guarant, unassigned,Resources.none())) { - Resource wQassigned = Resource.newInstance(0, 0); - // we compute normalizedGuarantees capacity based on currently active - // queues - resetCapacity(rc, unassigned, orderedByNeed, ignoreGuarantee); - - // For each underserved queue (or set of queues if multiple are equally - // underserved), offer its share of the unassigned resources based on its - // normalized guarantee. After the offer, if the queue is not satisfied, - // place it back in the ordered list of queues, recalculating its place - // in the order of most under-guaranteed to most over-guaranteed. In this - // way, the most underserved queue(s) are always given resources first. - Collection<TempQueuePerPartition> underserved = - getMostUnderservedQueues(orderedByNeed, tqComparator); - for (Iterator<TempQueuePerPartition> i = underserved.iterator(); i - .hasNext();) { - TempQueuePerPartition sub = i.next(); - Resource wQavail = Resources.multiplyAndNormalizeUp(rc, - unassigned, sub.normalizedGuarantee, Resource.newInstance(1, 1)); - Resource wQidle = sub.offer(wQavail, rc, tot_guarant); - Resource wQdone = Resources.subtract(wQavail, wQidle); - - if (Resources.greaterThan(rc, tot_guarant, - wQdone, Resources.none())) { - // The queue is still asking for more. Put it back in the priority - // queue, recalculating its order based on need. - orderedByNeed.add(sub); - } - Resources.addTo(wQassigned, wQdone); - } - Resources.subtractFrom(unassigned, wQassigned); - } - } - - // Take the most underserved TempQueue (the one on the head). Collect and - // return the list of all queues that have the same idealAssigned - // percentage of guaranteed. - protected Collection<TempQueuePerPartition> getMostUnderservedQueues( - PriorityQueue<TempQueuePerPartition> orderedByNeed, TQComparator tqComparator) { - ArrayList<TempQueuePerPartition> underserved = new ArrayList<>(); - while (!orderedByNeed.isEmpty()) { - TempQueuePerPartition q1 = orderedByNeed.remove(); - underserved.add(q1); - TempQueuePerPartition q2 = orderedByNeed.peek(); - // q1's pct of guaranteed won't be larger than q2's. If it's less, then - // return what has already been collected. Otherwise, q1's pct of - // guaranteed == that of q2, so add q2 to underserved list during the - // next pass. - if (q2 == null || tqComparator.compare(q1,q2) < 0) { - return underserved; - } - } - return underserved; - } - - /** - * Computes a normalizedGuaranteed capacity based on active queues - * @param rc resource calculator - * @param clusterResource the total amount of resources in the cluster - * @param queues the list of queues to consider - */ - private void resetCapacity(ResourceCalculator rc, Resource clusterResource, - Collection<TempQueuePerPartition> queues, boolean ignoreGuar) { - Resource activeCap = Resource.newInstance(0, 0); - - if (ignoreGuar) { - for (TempQueuePerPartition q : queues) { - q.normalizedGuarantee = 1.0f / queues.size(); - } - } else { - for (TempQueuePerPartition q : queues) { - Resources.addTo(activeCap, q.guaranteed); - } - for (TempQueuePerPartition q : queues) { - q.normalizedGuarantee = Resources.divide(rc, clusterResource, - q.guaranteed, activeCap); - } - } - } - - private String getPartitionByRMContainer(RMContainer rmContainer) { - return scheduler.getSchedulerNode(rmContainer.getAllocatedNode()) - .getPartition(); - } - - /** - * Return should we preempt rmContainer. If we should, deduct from - * <code>resourceToObtainByPartition</code> - */ - private boolean tryPreemptContainerAndDeductResToObtain( - Map<String, Resource> resourceToObtainByPartitions, - RMContainer rmContainer, Resource clusterResource, - Map<ApplicationAttemptId, Set<RMContainer>> preemptMap) { - ApplicationAttemptId attemptId = rmContainer.getApplicationAttemptId(); - - // We will not account resource of a container twice or more - if (preemptMapContains(preemptMap, attemptId, rmContainer)) { - return false; - } - - String nodePartition = getPartitionByRMContainer(rmContainer); - Resource toObtainByPartition = - resourceToObtainByPartitions.get(nodePartition); - - if (null != toObtainByPartition - && Resources.greaterThan(rc, clusterResource, toObtainByPartition, - Resources.none())) { - Resources.subtractFrom(toObtainByPartition, - rmContainer.getAllocatedResource()); - // When we have no more resource need to obtain, remove from map. - if (Resources.lessThanOrEqual(rc, clusterResource, toObtainByPartition, - Resources.none())) { - resourceToObtainByPartitions.remove(nodePartition); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Marked container=" + rmContainer.getContainerId() - + " in partition=" + nodePartition + " will be preempted"); - } - // Add to preemptMap - addToPreemptMap(preemptMap, attemptId, rmContainer); - return true; - } - - return false; - } - - private boolean preemptMapContains( - Map<ApplicationAttemptId, Set<RMContainer>> preemptMap, - ApplicationAttemptId attemptId, RMContainer rmContainer) { - Set<RMContainer> rmContainers; - if (null == (rmContainers = preemptMap.get(attemptId))) { - return false; - } - return rmContainers.contains(rmContainer); - } - - private void addToPreemptMap( - Map<ApplicationAttemptId, Set<RMContainer>> preemptMap, - ApplicationAttemptId appAttemptId, RMContainer containerToPreempt) { - Set<RMContainer> set; - if (null == (set = preemptMap.get(appAttemptId))) { - set = new HashSet<>(); - preemptMap.put(appAttemptId, set); - } - set.add(containerToPreempt); - } - - /** - * Based a resource preemption target drop reservations of containers and - * if necessary select containers for preemption from applications in each - * over-capacity queue. It uses {@link #NATURAL_TERMINATION_FACTOR} to - * account for containers that will naturally complete. - * - * @param leafQueueNames set of leaf queues to preempt from - * @param clusterResource total amount of cluster resources - * @return a map of applciationID to set of containers to preempt - */ - private Map<ApplicationAttemptId,Set<RMContainer>> getContainersToPreempt( - Set<String> leafQueueNames, Resource clusterResource) { - - Map<ApplicationAttemptId, Set<RMContainer>> preemptMap = - new HashMap<>(); - List<RMContainer> skippedAMContainerlist = new ArrayList<>(); - - // Loop all leaf queues - for (String queueName : leafQueueNames) { - // check if preemption disabled for the queue - if (getQueueByPartition(queueName, - RMNodeLabelsManager.NO_LABEL).preemptionDisabled) { - if (LOG.isDebugEnabled()) { - LOG.debug("skipping from queue=" + queueName - + " because it's a non-preemptable queue"); - } - continue; - } - - // compute resToObtainByPartition considered inter-queue preemption - LeafQueue leafQueue = null; - - Map<String, Resource> resToObtainByPartition = - new HashMap<>(); - for (TempQueuePerPartition qT : getQueuePartitions(queueName)) { - leafQueue = qT.leafQueue; - // we act only if we are violating balance by more than - // maxIgnoredOverCapacity - if (Resources.greaterThan(rc, clusterResource, qT.current, - Resources.multiply(qT.guaranteed, 1.0 + maxIgnoredOverCapacity))) { - // we introduce a dampening factor naturalTerminationFactor that - // accounts for natural termination of containers - Resource resToObtain = - Resources.multiply(qT.toBePreempted, naturalTerminationFactor); - // Only add resToObtain when it >= 0 - if (Resources.greaterThan(rc, clusterResource, resToObtain, - Resources.none())) { - resToObtainByPartition.put(qT.partition, resToObtain); - if (LOG.isDebugEnabled()) { - LOG.debug("Queue=" + queueName + " partition=" + qT.partition - + " resource-to-obtain=" + resToObtain); - } - } - qT.actuallyPreempted = Resources.clone(resToObtain); - } else { - qT.actuallyPreempted = Resources.none(); - } - } - - synchronized (leafQueue) { - // go through all ignore-partition-exclusivity containers first to make - // sure such containers will be preempted first - Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityContainers = - leafQueue.getIgnoreExclusivityRMContainers(); - for (String partition : resToObtainByPartition.keySet()) { - if (ignorePartitionExclusivityContainers.containsKey(partition)) { - TreeSet<RMContainer> rmContainers = - ignorePartitionExclusivityContainers.get(partition); - // We will check container from reverse order, so latter submitted - // application's containers will be preempted first. - for (RMContainer c : rmContainers.descendingSet()) { - boolean preempted = - tryPreemptContainerAndDeductResToObtain( - resToObtainByPartition, c, clusterResource, preemptMap); - if (!preempted) { - break; - } - } - } - } - - // preempt other containers - Resource skippedAMSize = Resource.newInstance(0, 0); - Iterator<FiCaSchedulerApp> desc = - leafQueue.getOrderingPolicy().getPreemptionIterator(); - while (desc.hasNext()) { - FiCaSchedulerApp fc = desc.next(); - // When we complete preempt from one partition, we will remove from - // resToObtainByPartition, so when it becomes empty, we can get no - // more preemption is needed - if (resToObtainByPartition.isEmpty()) { - break; - } - - preemptFrom(fc, clusterResource, resToObtainByPartition, - skippedAMContainerlist, skippedAMSize, preemptMap); - } - - // Can try preempting AMContainers (still saving atmost - // maxAMCapacityForThisQueue AMResource's) if more resources are - // required to be preempted from this Queue. - Resource maxAMCapacityForThisQueue = Resources.multiply( - Resources.multiply(clusterResource, - leafQueue.getAbsoluteCapacity()), - leafQueue.getMaxAMResourcePerQueuePercent()); - - preemptAMContainers(clusterResource, preemptMap, skippedAMContainerlist, - resToObtainByPartition, skippedAMSize, maxAMCapacityForThisQueue); - } - } - - return preemptMap; - } - - /** - * As more resources are needed for preemption, saved AMContainers has to be - * rescanned. Such AMContainers can be preempted based on resToObtain, but - * maxAMCapacityForThisQueue resources will be still retained. - * - * @param clusterResource - * @param preemptMap - * @param skippedAMContainerlist - * @param skippedAMSize - * @param maxAMCapacityForThisQueue - */ - private void preemptAMContainers(Resource clusterResource, - Map<ApplicationAttemptId, Set<RMContainer>> preemptMap, - List<RMContainer> skippedAMContainerlist, - Map<String, Resource> resToObtainByPartition, Resource skippedAMSize, - Resource maxAMCapacityForThisQueue) { - for (RMContainer c : skippedAMContainerlist) { - // Got required amount of resources for preemption, can stop now - if (resToObtainByPartition.isEmpty()) { - break; - } - // Once skippedAMSize reaches down to maxAMCapacityForThisQueue, - // container selection iteration for preemption will be stopped. - if (Resources.lessThanOrEqual(rc, clusterResource, skippedAMSize, - maxAMCapacityForThisQueue)) { - break; - } - - boolean preempted = - tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c, - clusterResource, preemptMap); - if (preempted) { - Resources.subtractFrom(skippedAMSize, c.getAllocatedResource()); - } - } - skippedAMContainerlist.clear(); - } - - /** - * Given a target preemption for a specific application, select containers - * to preempt (after unreserving all reservation for that app). - */ - @SuppressWarnings("unchecked") - private void preemptFrom(FiCaSchedulerApp app, - Resource clusterResource, Map<String, Resource> resToObtainByPartition, - List<RMContainer> skippedAMContainerlist, Resource skippedAMSize, - Map<ApplicationAttemptId, Set<RMContainer>> preemptMap) { - ApplicationAttemptId appId = app.getApplicationAttemptId(); - if (LOG.isDebugEnabled()) { - LOG.debug("Looking at application=" + app.getApplicationAttemptId() - + " resourceToObtain=" + resToObtainByPartition); - } - - // first drop reserved containers towards rsrcPreempt - List<RMContainer> reservedContainers = - new ArrayList<>(app.getReservedContainers()); - for (RMContainer c : reservedContainers) { - if (resToObtainByPartition.isEmpty()) { - return; - } - - // Try to preempt this container - tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c, - clusterResource, preemptMap); - - if (!observeOnly) { - rmContext.getDispatcher().getEventHandler().handle( - new ContainerPreemptEvent( - appId, c, SchedulerEventType.KILL_RESERVED_CONTAINER)); - } - } - - // if more resources are to be freed go through all live containers in - // reverse priority and reverse allocation order and mark them for - // preemption - List<RMContainer> liveContainers = new ArrayList<>(app.getLiveContainers()); - - sortContainers(liveContainers); - - for (RMContainer c : liveContainers) { - if (resToObtainByPartition.isEmpty()) { - return; - } - - // Skip AM Container from preemption for now. - if (c.isAMContainer()) { - skippedAMContainerlist.add(c); - Resources.addTo(skippedAMSize, c.getAllocatedResource()); - continue; - } - - // Skip already marked to killable containers - if (killableContainers.contains(c.getContainerId())) { - continue; - } - - // Try to preempt this container - tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c, - clusterResource, preemptMap); - } - } + // preempt (or kill) the selected containers + preemptOrkillSelectedContainerAfterWait(toPreempt); - /** - * Compare by reversed priority order first, and then reversed containerId - * order - * @param containers - */ - @VisibleForTesting - static void sortContainers(List<RMContainer> containers){ - Collections.sort(containers, new Comparator<RMContainer>() { - @Override - public int compare(RMContainer a, RMContainer b) { - Comparator<Priority> c = new org.apache.hadoop.yarn.server - .resourcemanager.resource.Priority.Comparator(); - int priorityComp = c.compare(b.getContainer().getPriority(), - a.getContainer().getPriority()); - if (priorityComp != 0) { - return priorityComp; - } - return b.getContainerId().compareTo(a.getContainerId()); - } - }); + // cleanup staled preemption candidates + cleanupStaledPreemptionCandidates(); } @Override @@ -901,7 +354,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic @VisibleForTesting public Map<RMContainer, Long> getToPreemptContainers() { - return preempted; + return preemptionCandidates; } /** @@ -929,8 +382,8 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic Resource guaranteed = Resources.multiply(partitionResource, absCap); Resource maxCapacity = Resources.multiply(partitionResource, absMaxCap); Resource killable = Resources.none(); - if (null != preemptableEntities.get(queueName)) { - killable = preemptableEntities.get(queueName) + if (null != preemptableQueues.get(queueName)) { + killable = preemptableQueues.get(queueName) .getKillableResource(partitionToLookAt); } @@ -1023,9 +476,10 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic /** * Get queue partition by given queueName and partitionName */ - private TempQueuePerPartition getQueueByPartition(String queueName, + @Override + public TempQueuePerPartition getQueueByPartition(String queueName, String partition) { - Map<String, TempQueuePerPartition> partitionToQueues = null; + Map<String, TempQueuePerPartition> partitionToQueues; if (null == (partitionToQueues = queueToPartitions.get(queueName))) { return null; } @@ -1035,180 +489,56 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic /** * Get all queue partitions by given queueName */ - private Collection<TempQueuePerPartition> getQueuePartitions(String queueName) { + @Override + public Collection<TempQueuePerPartition> getQueuePartitions(String queueName) { if (!queueToPartitions.containsKey(queueName)) { return null; } return queueToPartitions.get(queueName).values(); } - /** - * Temporary data-structure tracking resource availability, pending resource - * need, current utilization. This is per-queue-per-partition data structure - */ - static class TempQueuePerPartition { - final String queueName; - final Resource current; - final Resource pending; - final Resource guaranteed; - final Resource maxCapacity; - final String partition; - final Resource killable; - Resource idealAssigned; - Resource toBePreempted; - - // For logging purpose - Resource actuallyPreempted; - Resource untouchableExtra; - Resource preemptableExtra; - - double normalizedGuarantee; - - final ArrayList<TempQueuePerPartition> children; - LeafQueue leafQueue; - boolean preemptionDisabled; - - TempQueuePerPartition(String queueName, Resource current, Resource pending, - Resource guaranteed, Resource maxCapacity, boolean preemptionDisabled, - String partition, Resource killableResource) { - this.queueName = queueName; - this.current = current; - this.pending = pending; - this.guaranteed = guaranteed; - this.maxCapacity = maxCapacity; - this.idealAssigned = Resource.newInstance(0, 0); - this.actuallyPreempted = Resource.newInstance(0, 0); - this.toBePreempted = Resource.newInstance(0, 0); - this.normalizedGuarantee = Float.NaN; - this.children = new ArrayList<>(); - this.untouchableExtra = Resource.newInstance(0, 0); - this.preemptableExtra = Resource.newInstance(0, 0); - this.preemptionDisabled = preemptionDisabled; - this.partition = partition; - this.killable = killableResource; - } - - public void setLeafQueue(LeafQueue l){ - assert children.size() == 0; - this.leafQueue = l; - } - - /** - * When adding a child we also aggregate its pending resource needs. - * @param q the child queue to add to this queue - */ - public void addChild(TempQueuePerPartition q) { - assert leafQueue == null; - children.add(q); - Resources.addTo(pending, q.pending); - } - - public ArrayList<TempQueuePerPartition> getChildren(){ - return children; - } - - // This function "accepts" all the resources it can (pending) and return - // the unused ones - Resource offer(Resource avail, ResourceCalculator rc, - Resource clusterResource) { - Resource absMaxCapIdealAssignedDelta = Resources.componentwiseMax( - Resources.subtract(maxCapacity, idealAssigned), - Resource.newInstance(0, 0)); - // remain = avail - min(avail, (max - assigned), (current + pending - assigned)) - Resource accepted = - Resources.min(rc, clusterResource, - absMaxCapIdealAssignedDelta, - Resources.min(rc, clusterResource, avail, Resources.subtract( - Resources.add(current, pending), idealAssigned))); - Resource remain = Resources.subtract(avail, accepted); - Resources.addTo(idealAssigned, accepted); - return remain; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append(" NAME: " + queueName) - .append(" CUR: ").append(current) - .append(" PEN: ").append(pending) - .append(" GAR: ").append(guaranteed) - .append(" NORM: ").append(normalizedGuarantee) - .append(" IDEAL_ASSIGNED: ").append(idealAssigned) - .append(" IDEAL_PREEMPT: ").append(toBePreempted) - .append(" ACTUAL_PREEMPT: ").append(actuallyPreempted) - .append(" UNTOUCHABLE: ").append(untouchableExtra) - .append(" PREEMPTABLE: ").append(preemptableExtra) - .append("\n"); - - return sb.toString(); - } + @Override + public CapacityScheduler getScheduler() { + return scheduler; + } - public void assignPreemption(float scalingFactor, - ResourceCalculator rc, Resource clusterResource) { - if (Resources.greaterThan(rc, clusterResource, - Resources.subtract(current, killable), idealAssigned)) { - toBePreempted = Resources.multiply(Resources.subtract( - Resources.subtract(current, killable), idealAssigned), - scalingFactor); - } else { - toBePreempted = Resource.newInstance(0, 0); - } - } + @Override + public RMContext getRMContext() { + return rmContext; + } - void appendLogString(StringBuilder sb) { - sb.append(queueName).append(", ") - .append(current.getMemory()).append(", ") - .append(current.getVirtualCores()).append(", ") - .append(pending.getMemory()).append(", ") - .append(pending.getVirtualCores()).append(", ") - .append(guaranteed.getMemory()).append(", ") - .append(guaranteed.getVirtualCores()).append(", ") - .append(idealAssigned.getMemory()).append(", ") - .append(idealAssigned.getVirtualCores()).append(", ") - .append(toBePreempted.getMemory()).append(", ") - .append(toBePreempted.getVirtualCores() ).append(", ") - .append(actuallyPreempted.getMemory()).append(", ") - .append(actuallyPreempted.getVirtualCores()); - } + @Override + public boolean isObserveOnly() { + return observeOnly; + } + @Override + public Set<ContainerId> getKillableContainers() { + return killableContainers; } - static class TQComparator implements Comparator<TempQueuePerPartition> { - private ResourceCalculator rc; - private Resource clusterRes; + @Override + public double getMaxIgnoreOverCapacity() { + return maxIgnoredOverCapacity; + } - TQComparator(ResourceCalculator rc, Resource clusterRes) { - this.rc = rc; - this.clusterRes = clusterRes; - } + @Override + public double getNaturalTerminationFactor() { + return naturalTerminationFactor; + } - @Override - public int compare(TempQueuePerPartition tq1, TempQueuePerPartition tq2) { - if (getIdealPctOfGuaranteed(tq1) < getIdealPctOfGuaranteed(tq2)) { - return -1; - } - if (getIdealPctOfGuaranteed(tq1) > getIdealPctOfGuaranteed(tq2)) { - return 1; - } - return 0; - } + @Override + public Set<String> getLeafQueueNames() { + return leafQueueNames; + } - // Calculates idealAssigned / guaranteed - // TempQueues with 0 guarantees are always considered the most over - // capacity and therefore considered last for resources. - private double getIdealPctOfGuaranteed(TempQueuePerPartition q) { - double pctOver = Integer.MAX_VALUE; - if (q != null && Resources.greaterThan( - rc, clusterRes, q.guaranteed, Resources.none())) { - pctOver = - Resources.divide(rc, clusterRes, q.idealAssigned, q.guaranteed); - } - return (pctOver); - } + @Override + public Set<String> getAllPartitions() { + return allPartitions; } @VisibleForTesting - public Map<String, Map<String, TempQueuePerPartition>> getQueuePartitions() { + Map<String, Map<String, TempQueuePerPartition>> getQueuePartitions() { return queueToPartitions; } }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/60e4116b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java new file mode 100644 index 0000000..8b01a73 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.util.ArrayList; + +/** + * Temporary data-structure tracking resource availability, pending resource + * need, current utilization. This is per-queue-per-partition data structure + */ +public class TempQueuePerPartition { + // Following fields are copied from scheduler + final String queueName; + final Resource current; + final Resource pending; + final Resource guaranteed; + final Resource maxCapacity; + final Resource killable; + final String partition; + + // Following fields are setted and used by candidate selection policies + Resource idealAssigned; + Resource toBePreempted; + Resource untouchableExtra; + Resource preemptableExtra; + // For logging purpose + Resource actuallyToBePreempted; + + double normalizedGuarantee; + + final ArrayList<TempQueuePerPartition> children; + LeafQueue leafQueue; + boolean preemptionDisabled; + + TempQueuePerPartition(String queueName, Resource current, Resource pending, + Resource guaranteed, Resource maxCapacity, boolean preemptionDisabled, + String partition, Resource killable) { + this.queueName = queueName; + this.current = current; + this.pending = pending; + this.guaranteed = guaranteed; + this.maxCapacity = maxCapacity; + this.idealAssigned = Resource.newInstance(0, 0); + this.actuallyToBePreempted = Resource.newInstance(0, 0); + this.toBePreempted = Resource.newInstance(0, 0); + this.normalizedGuarantee = Float.NaN; + this.children = new ArrayList<>(); + this.untouchableExtra = Resource.newInstance(0, 0); + this.preemptableExtra = Resource.newInstance(0, 0); + this.preemptionDisabled = preemptionDisabled; + this.partition = partition; + this.killable = killable; + } + + public void setLeafQueue(LeafQueue l) { + assert children.size() == 0; + this.leafQueue = l; + } + + /** + * When adding a child we also aggregate its pending resource needs. + * @param q the child queue to add to this queue + */ + public void addChild(TempQueuePerPartition q) { + assert leafQueue == null; + children.add(q); + Resources.addTo(pending, q.pending); + } + + public ArrayList<TempQueuePerPartition> getChildren(){ + return children; + } + + // This function "accepts" all the resources it can (pending) and return + // the unused ones + Resource offer(Resource avail, ResourceCalculator rc, + Resource clusterResource) { + Resource absMaxCapIdealAssignedDelta = Resources.componentwiseMax( + Resources.subtract(maxCapacity, idealAssigned), + Resource.newInstance(0, 0)); + // remain = avail - min(avail, (max - assigned), (current + pending - assigned)) + Resource accepted = + Resources.min(rc, clusterResource, + absMaxCapIdealAssignedDelta, + Resources.min(rc, clusterResource, avail, Resources.subtract( + Resources.add(current, pending), idealAssigned))); + Resource remain = Resources.subtract(avail, accepted); + Resources.addTo(idealAssigned, accepted); + return remain; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(" NAME: " + queueName) + .append(" CUR: ").append(current) + .append(" PEN: ").append(pending) + .append(" GAR: ").append(guaranteed) + .append(" NORM: ").append(normalizedGuarantee) + .append(" IDEAL_ASSIGNED: ").append(idealAssigned) + .append(" IDEAL_PREEMPT: ").append(toBePreempted) + .append(" ACTUAL_PREEMPT: ").append(actuallyToBePreempted) + .append(" UNTOUCHABLE: ").append(untouchableExtra) + .append(" PREEMPTABLE: ").append(preemptableExtra) + .append("\n"); + + return sb.toString(); + } + + public void assignPreemption(float scalingFactor, ResourceCalculator rc, + Resource clusterResource) { + if (Resources.greaterThan(rc, clusterResource, + Resources.subtract(current, killable), idealAssigned)) { + toBePreempted = Resources.multiply(Resources + .subtract(Resources.subtract(current, killable), idealAssigned), + scalingFactor); + } else { + toBePreempted = Resource.newInstance(0, 0); + } + } + + void appendLogString(StringBuilder sb) { + sb.append(queueName).append(", ") + .append(current.getMemory()).append(", ") + .append(current.getVirtualCores()).append(", ") + .append(pending.getMemory()).append(", ") + .append(pending.getVirtualCores()).append(", ") + .append(guaranteed.getMemory()).append(", ") + .append(guaranteed.getVirtualCores()).append(", ") + .append(idealAssigned.getMemory()).append(", ") + .append(idealAssigned.getVirtualCores()).append(", ") + .append(toBePreempted.getMemory()).append(", ") + .append(toBePreempted.getVirtualCores() ).append(", ") + .append(actuallyToBePreempted.getMemory()).append(", ") + .append(actuallyToBePreempted.getVirtualCores()); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/60e4116b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 3729264..88e39de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -1020,4 +1020,49 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur public boolean getLazyPreemptionEnabled() { return getBoolean(LAZY_PREEMPTION_ENALBED, DEFAULT_LAZY_PREEMPTION_ENABLED); } + + /** If true, run the policy but do not affect the cluster with preemption and + * kill events. */ + public static final String PREEMPTION_OBSERVE_ONLY = + "yarn.resourcemanager.monitor.capacity.preemption.observe_only"; + public static final boolean DEFAULT_PREEMPTION_OBSERVE_ONLY = false; + + /** Time in milliseconds between invocations of this policy */ + public static final String PREEMPTION_MONITORING_INTERVAL = + "yarn.resourcemanager.monitor.capacity.preemption.monitoring_interval"; + public static final long DEFAULT_PREEMPTION_MONITORING_INTERVAL = 3000L; + + /** Time in milliseconds between requesting a preemption from an application + * and killing the container. */ + public static final String PREEMPTION_WAIT_TIME_BEFORE_KILL = + "yarn.resourcemanager.monitor.capacity.preemption.max_wait_before_kill"; + public static final long DEFAULT_PREEMPTION_WAIT_TIME_BEFORE_KILL = 15000L; + + /** Maximum percentage of resources preemptionCandidates in a single round. By + * controlling this value one can throttle the pace at which containers are + * reclaimed from the cluster. After computing the total desired preemption, + * the policy scales it back within this limit. */ + public static final String TOTAL_PREEMPTION_PER_ROUND = + "yarn.resourcemanager.monitor.capacity.preemption.total_preemption_per_round"; + public static final float DEFAULT_TOTAL_PREEMPTION_PER_ROUND = 0.1f; + + /** Maximum amount of resources above the target capacity ignored for + * preemption. This defines a deadzone around the target capacity that helps + * prevent thrashing and oscillations around the computed target balance. + * High values would slow the time to capacity and (absent natural + * completions) it might prevent convergence to guaranteed capacity. */ + public static final String PREEMPTION_MAX_IGNORED_OVER_CAPACITY = + "yarn.resourcemanager.monitor.capacity.preemption.max_ignored_over_capacity"; + public static final float DEFAULT_PREEMPTION_MAX_IGNORED_OVER_CAPACITY = 0.1f; + /** + * Given a computed preemption target, account for containers naturally + * expiring and preempt only this percentage of the delta. This determines + * the rate of geometric convergence into the deadzone ({@link + * #PREEMPTION_MAX_IGNORED_OVER_CAPACITY}). For example, a termination factor of 0.5 + * will reclaim almost 95% of resources within 5 * {@link + * #PREEMPTION_WAIT_TIME_BEFORE_KILL}, even absent natural termination. */ + public static final String PREEMPTION_NATURAL_TERMINATION_FACTOR = + "yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor"; + public static final float DEFAULT_PREEMPTION_NATURAL_TERMINATION_FACTOR = + 0.2f; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/60e4116b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptableQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptableQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptableQueue.java index 19148d7..fefb56a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptableQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptableQueue.java @@ -86,12 +86,6 @@ public class PreemptableQueue { return res == null ? Resources.none() : res; } - @SuppressWarnings("unchecked") - public Map<ContainerId, RMContainer> getKillableContainers(String partition) { - Map<ContainerId, RMContainer> map = killableContainers.get(partition); - return map == null ? Collections.EMPTY_MAP : map; - } - public Map<String, Map<ContainerId, RMContainer>> getKillableContainers() { return killableContainers; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/60e4116b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java index a9f02a5..76fcd4a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java @@ -146,7 +146,7 @@ public class PreemptionManager { } } - public Map<String, PreemptableQueue> getShallowCopyOfPreemptableEntities() { + public Map<String, PreemptableQueue> getShallowCopyOfPreemptableQueues() { try { readLock.lock(); Map<String, PreemptableQueue> map = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/60e4116b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index e9129de..3db4782 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -17,38 +17,6 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; -import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MAX_IGNORED_OVER_CAPACITY; -import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL; -import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR; -import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY; -import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND; -import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.argThat; -import static org.mockito.Matchers.isA; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.util.ArrayList; -import java.util.Comparator; -import java.util.Deque; -import java.util.LinkedList; -import java.util.List; -import java.util.NavigableSet; -import java.util.Random; -import java.util.StringTokenizer; -import java.util.TreeSet; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -63,7 +31,6 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; -import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TempQueuePerPartition; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -95,6 +62,32 @@ import org.mockito.ArgumentMatcher; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Deque; +import java.util.LinkedList; +import java.util.List; +import java.util.NavigableSet; +import java.util.Random; +import java.util.StringTokenizer; +import java.util.TreeSet; + +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + public class TestProportionalCapacityPreemptionPolicy { static final long TS = 3141592653L; @@ -105,11 +98,10 @@ public class TestProportionalCapacityPreemptionPolicy { float setAMResourcePercent = 0.0f; Random rand = null; Clock mClock = null; - Configuration conf = null; + CapacitySchedulerConfiguration conf = null; CapacityScheduler mCS = null; RMContext rmContext = null; RMNodeLabelsManager lm = null; - CapacitySchedulerConfiguration schedConf = null; EventHandler<SchedulerEvent> mDisp = null; ResourceCalculator rc = new DefaultResourceCalculator(); Resource clusterResources = null; @@ -132,7 +124,7 @@ public class TestProportionalCapacityPreemptionPolicy { AMCONTAINER(0), CONTAINER(1), LABELEDCONTAINER(2); int value; - private priority(int value) { + priority(int value) { this.value = value; } @@ -146,12 +138,17 @@ public class TestProportionalCapacityPreemptionPolicy { @Before @SuppressWarnings("unchecked") public void setup() { - conf = new Configuration(false); - conf.setLong(WAIT_TIME_BEFORE_KILL, 10000); - conf.setLong(MONITORING_INTERVAL, 3000); + conf = new CapacitySchedulerConfiguration(new Configuration(false)); + conf.setLong( + CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL, 10000); + conf.setLong(CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL, + 3000); // report "ideal" preempt - conf.setFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 1.0); - conf.setFloat(NATURAL_TERMINATION_FACTOR, (float) 1.0); + conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND, + 1.0f); + conf.setFloat( + CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR, + 1.0f); conf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, ProportionalCapacityPreemptionPolicy.class.getCanonicalName()); conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true); @@ -164,8 +161,7 @@ public class TestProportionalCapacityPreemptionPolicy { mCS = mock(CapacityScheduler.class); when(mCS.getResourceCalculator()).thenReturn(rc); lm = mock(RMNodeLabelsManager.class); - schedConf = new CapacitySchedulerConfiguration(); - when(mCS.getConfiguration()).thenReturn(schedConf); + when(mCS.getConfiguration()).thenReturn(conf); rmContext = mock(RMContext.class); when(mCS.getRMContext()).thenReturn(rmContext); when(mCS.getPreemptionManager()).thenReturn(new PreemptionManager()); @@ -271,7 +267,9 @@ public class TestProportionalCapacityPreemptionPolicy { { -1, 1, 1, 1 }, // req granularity { 3, 0, 0, 0 }, // subqueues }; - conf.setLong(WAIT_TIME_BEFORE_KILL, killTime); + conf.setLong( + CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL, + killTime); ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); // ensure all pending rsrc from A get preempted from other queues @@ -308,7 +306,9 @@ public class TestProportionalCapacityPreemptionPolicy { { -1, 1, 1, 1 }, // req granularity { 3, 0, 0, 0 }, // subqueues }; - conf.setFloat(MAX_IGNORED_OVER_CAPACITY, (float) 0.1); + conf.setFloat( + CapacitySchedulerConfiguration.PREEMPTION_MAX_IGNORED_OVER_CAPACITY, + (float) 0.1); ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); policy.editSchedule(); // ignore 10% overcapacity to avoid jitter @@ -330,7 +330,7 @@ public class TestProportionalCapacityPreemptionPolicy { { 3, 0, 0, 0 }, // subqueues }; - schedConf.setPreemptionDisabled("root.queueB", true); + conf.setPreemptionDisabled("root.queueB", true); ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); policy.editSchedule(); @@ -343,7 +343,7 @@ public class TestProportionalCapacityPreemptionPolicy { // event handler will count only events from the following test and not the // previous one. setup(); - schedConf.setPreemptionDisabled("root.queueB", false); + conf.setPreemptionDisabled("root.queueB", false); ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData); policy2.editSchedule(); @@ -382,7 +382,7 @@ public class TestProportionalCapacityPreemptionPolicy { // Need to call setup() again to reset mDisp setup(); // Turn off preemption for queueB and it's children - schedConf.setPreemptionDisabled("root.queueA.queueB", true); + conf.setPreemptionDisabled("root.queueA.queueB", true); ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData); policy2.editSchedule(); ApplicationAttemptId expectedAttemptOnQueueC = @@ -429,7 +429,7 @@ public class TestProportionalCapacityPreemptionPolicy { // Need to call setup() again to reset mDisp setup(); // Turn off preemption for queueB(appA) - schedConf.setPreemptionDisabled("root.queueA.queueB", true); + conf.setPreemptionDisabled("root.queueA.queueB", true); ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData); policy2.editSchedule(); // Now that queueB(appA) is not preemptable, verify that resources come @@ -439,8 +439,8 @@ public class TestProportionalCapacityPreemptionPolicy { setup(); // Turn off preemption for two of the 3 queues with over-capacity. - schedConf.setPreemptionDisabled("root.queueD.queueE", true); - schedConf.setPreemptionDisabled("root.queueA.queueB", true); + conf.setPreemptionDisabled("root.queueD.queueE", true); + conf.setPreemptionDisabled("root.queueA.queueB", true); ProportionalCapacityPreemptionPolicy policy3 = buildPolicy(qData); policy3.editSchedule(); @@ -481,7 +481,7 @@ public class TestProportionalCapacityPreemptionPolicy { // Turn off preemption for queueA and it's children. queueF(appC)'s request // should starve. setup(); // Call setup() to reset mDisp - schedConf.setPreemptionDisabled("root.queueA", true); + conf.setPreemptionDisabled("root.queueA", true); ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData); policy2.editSchedule(); verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); // queueC @@ -505,7 +505,7 @@ public class TestProportionalCapacityPreemptionPolicy { { -1, -1, 1, 1, 1, -1, 1, 1, 1 }, // req granularity { 2, 3, 0, 0, 0, 3, 0, 0, 0 }, // subqueues }; - schedConf.setPreemptionDisabled("root.queueA.queueC", true); + conf.setPreemptionDisabled("root.queueA.queueC", true); ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); policy.editSchedule(); // Although queueC(appB) is way over capacity and is untouchable, @@ -529,7 +529,7 @@ public class TestProportionalCapacityPreemptionPolicy { { 3, 2, 0, 0, 2, 0, 0, 2, 0, 0 }, // subqueues }; - schedConf.setPreemptionDisabled("root", true); + conf.setPreemptionDisabled("root", true); ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); policy.editSchedule(); // All queues should be non-preemptable, so request should starve. @@ -556,7 +556,7 @@ public class TestProportionalCapacityPreemptionPolicy { { 2, 2, 0, 0, 2, 0, 0 }, // subqueues }; // QueueE inherits non-preemption from QueueD - schedConf.setPreemptionDisabled("root.queueD", true); + conf.setPreemptionDisabled("root.queueD", true); ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); policy.editSchedule(); // appC is running on QueueE. QueueE is over absMaxCap, but is not @@ -596,7 +596,10 @@ public class TestProportionalCapacityPreemptionPolicy { { -1, 1, 1, 0 }, // req granularity { 3, 0, 0, 0 }, // subqueues }; - conf.setFloat(NATURAL_TERMINATION_FACTOR, (float) 0.1); + conf.setFloat( + CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR, + (float) 0.1); + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); policy.editSchedule(); // ignore 10% imbalance between over-capacity queues @@ -616,7 +619,10 @@ public class TestProportionalCapacityPreemptionPolicy { { -1, 1, 1, 0 }, // req granularity { 3, 0, 0, 0 }, // subqueues }; - conf.setBoolean(OBSERVE_ONLY, true); + conf.setBoolean(CapacitySchedulerConfiguration.PREEMPTION_OBSERVE_ONLY, + true); + when(mCS.getConfiguration()).thenReturn( + new CapacitySchedulerConfiguration(conf)); ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); policy.editSchedule(); // verify even severe imbalance not affected @@ -735,7 +741,7 @@ public class TestProportionalCapacityPreemptionPolicy { containers.add(rm4); // sort them - ProportionalCapacityPreemptionPolicy.sortContainers(containers); + FifoCandidatesSelector.sortContainers(containers); // verify the "priority"-first, "reverse container-id"-second // ordering is enforced correctly @@ -957,7 +963,7 @@ public class TestProportionalCapacityPreemptionPolicy { ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData) { ProportionalCapacityPreemptionPolicy policy = new ProportionalCapacityPreemptionPolicy( - conf, rmContext, mCS, mClock); + rmContext, mCS, mClock); clusterResources = Resource.newInstance( leafAbsCapacities(qData[0], qData[7]), 0); ParentQueue mRoot = buildMockRootQueue(rand, qData); @@ -968,18 +974,13 @@ public class TestProportionalCapacityPreemptionPolicy { } ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData, - String[][] resData) { - return buildPolicy(qData, resData, false); - } - - ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData, String[][] resData, boolean useDominantResourceCalculator) { if (useDominantResourceCalculator) { when(mCS.getResourceCalculator()).thenReturn( new DominantResourceCalculator()); } ProportionalCapacityPreemptionPolicy policy = - new ProportionalCapacityPreemptionPolicy(conf, rmContext, mCS, mClock); + new ProportionalCapacityPreemptionPolicy(rmContext, mCS, mClock); clusterResources = leafAbsCapacities(parseResourceDetails(resData[0]), qData[2]); ParentQueue mRoot = buildMockRootQueue(rand, resData, qData); @@ -1124,7 +1125,7 @@ public class TestProportionalCapacityPreemptionPolicy { String qName = ""; while(tokenizer.hasMoreTokens()) { qName += tokenizer.nextToken(); - preemptionDisabled = schedConf.getPreemptionDisabled(qName, preemptionDisabled); + preemptionDisabled = conf.getPreemptionDisabled(qName, preemptionDisabled); qName += "."; } return preemptionDisabled;