http://git-wip-us.apache.org/repos/asf/hadoop/blob/60e4116b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
index 9b499c8..7e668b4 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
@@ -17,26 +17,13 @@
  */
 package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Set;
-import java.util.TreeSet;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -50,7 +37,6 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptableQueue;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import org.apache.hadoop.yarn.util.Clock;
@@ -58,8 +44,16 @@ import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableSet;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 /**
  * This class implement a {@link SchedulingEditPolicy} that is designed to be
@@ -80,79 +74,59 @@ import com.google.common.collect.ImmutableSet;
  * this policy will trigger forced termination of containers (again by 
generating
  * {@link ContainerPreemptEvent}).
  */
-public class ProportionalCapacityPreemptionPolicy implements 
SchedulingEditPolicy {
-
+public class ProportionalCapacityPreemptionPolicy
+    implements SchedulingEditPolicy, CapacitySchedulerPreemptionContext {
   private static final Log LOG =
     LogFactory.getLog(ProportionalCapacityPreemptionPolicy.class);
 
-  /** If true, run the policy but do not affect the cluster with preemption and
-   * kill events. */
-  public static final String OBSERVE_ONLY =
-      "yarn.resourcemanager.monitor.capacity.preemption.observe_only";
-  /** Time in milliseconds between invocations of this policy */
-  public static final String MONITORING_INTERVAL =
-      "yarn.resourcemanager.monitor.capacity.preemption.monitoring_interval";
-  /** Time in milliseconds between requesting a preemption from an application
-   * and killing the container. */
-  public static final String WAIT_TIME_BEFORE_KILL =
-      "yarn.resourcemanager.monitor.capacity.preemption.max_wait_before_kill";
-  /** Maximum percentage of resources preempted in a single round. By
-   * controlling this value one can throttle the pace at which containers are
-   * reclaimed from the cluster. After computing the total desired preemption,
-   * the policy scales it back within this limit. */
-  public static final String TOTAL_PREEMPTION_PER_ROUND =
-      
"yarn.resourcemanager.monitor.capacity.preemption.total_preemption_per_round";
-  /** Maximum amount of resources above the target capacity ignored for
-   * preemption. This defines a deadzone around the target capacity that helps
-   * prevent thrashing and oscillations around the computed target balance.
-   * High values would slow the time to capacity and (absent natural
-   * completions) it might prevent convergence to guaranteed capacity. */
-  public static final String MAX_IGNORED_OVER_CAPACITY =
-    
"yarn.resourcemanager.monitor.capacity.preemption.max_ignored_over_capacity";
-  /**
-   * Given a computed preemption target, account for containers naturally
-   * expiring and preempt only this percentage of the delta. This determines
-   * the rate of geometric convergence into the deadzone ({@link
-   * #MAX_IGNORED_OVER_CAPACITY}). For example, a termination factor of 0.5
-   * will reclaim almost 95% of resources within 5 * {@link
-   * #WAIT_TIME_BEFORE_KILL}, even absent natural termination. */
-  public static final String NATURAL_TERMINATION_FACTOR =
-      
"yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor";
-
-  private RMContext rmContext;
-
   private final Clock clock;
+
+  // Configurable fields
   private double maxIgnoredOverCapacity;
   private long maxWaitTime;
-  private CapacityScheduler scheduler;
   private long monitoringInterval;
-  private final Map<RMContainer, Long> preempted = new HashMap<>();
-
-  private ResourceCalculator rc;
   private float percentageClusterPreemptionAllowed;
   private double naturalTerminationFactor;
   private boolean observeOnly;
+  private boolean lazyPreempionEnabled;
+
+  // Pointer to other RM components
+  private RMContext rmContext;
+  private ResourceCalculator rc;
+  private CapacityScheduler scheduler;
+  private RMNodeLabelsManager nlm;
+
+  // Internal properties to make decisions of what to preempt
+  private final Map<RMContainer,Long> preemptionCandidates =
+    new HashMap<>();
   private Map<String, Map<String, TempQueuePerPartition>> queueToPartitions =
       new HashMap<>();
-  private RMNodeLabelsManager nlm;
+  private List<PreemptionCandidatesSelector>
+      candidatesSelectionPolicies = new ArrayList<>();
+  private Set<String> allPartitions;
+  private Set<String> leafQueueNames;
 
   // Preemptable Entities, synced from scheduler at every run
-  private Map<String, PreemptableQueue> preemptableEntities = null;
+  private Map<String, PreemptableQueue> preemptableQueues;
   private Set<ContainerId> killableContainers;
 
+  @SuppressWarnings("unchecked")
   public ProportionalCapacityPreemptionPolicy() {
     clock = SystemClock.getInstance();
+    allPartitions = Collections.EMPTY_SET;
+    leafQueueNames = Collections.EMPTY_SET;
+    preemptableQueues = Collections.EMPTY_MAP;
   }
 
-  public ProportionalCapacityPreemptionPolicy(Configuration config,
-      RMContext context, CapacityScheduler scheduler) {
-    this(config, context, scheduler, SystemClock.getInstance());
-  }
-
-  public ProportionalCapacityPreemptionPolicy(Configuration config,
-      RMContext context, CapacityScheduler scheduler, Clock clock) {
-    init(config, context, scheduler);
+  @SuppressWarnings("unchecked")
+  @VisibleForTesting
+  public ProportionalCapacityPreemptionPolicy(RMContext context,
+      CapacityScheduler scheduler, Clock clock) {
+    init(context.getYarnConfiguration(), context, scheduler);
     this.clock = clock;
+    allPartitions = Collections.EMPTY_SET;
+    leafQueueNames = Collections.EMPTY_SET;
+    preemptableQueues = Collections.EMPTY_MAP;
   }
 
   public void init(Configuration config, RMContext context,
@@ -166,19 +140,45 @@ public class ProportionalCapacityPreemptionPolicy 
implements SchedulingEditPolic
     }
     rmContext = context;
     scheduler = (CapacityScheduler) sched;
-    maxIgnoredOverCapacity = config.getDouble(MAX_IGNORED_OVER_CAPACITY, 0.1);
-    naturalTerminationFactor =
-      config.getDouble(NATURAL_TERMINATION_FACTOR, 0.2);
-    maxWaitTime = config.getLong(WAIT_TIME_BEFORE_KILL, 15000);
-    monitoringInterval = config.getLong(MONITORING_INTERVAL, 3000);
-    percentageClusterPreemptionAllowed =
-      config.getFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 0.1);
-    observeOnly = config.getBoolean(OBSERVE_ONLY, false);
+    CapacitySchedulerConfiguration csConfig = scheduler.getConfiguration();
+
+    maxIgnoredOverCapacity = csConfig.getDouble(
+        CapacitySchedulerConfiguration.PREEMPTION_MAX_IGNORED_OVER_CAPACITY,
+        
CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_MAX_IGNORED_OVER_CAPACITY);
+
+    naturalTerminationFactor = csConfig.getDouble(
+        CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR,
+        
CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_NATURAL_TERMINATION_FACTOR);
+
+    maxWaitTime = csConfig.getLong(
+        CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL,
+        
CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_WAIT_TIME_BEFORE_KILL);
+
+    monitoringInterval = csConfig.getLong(
+        CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL,
+        CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_MONITORING_INTERVAL);
+
+    percentageClusterPreemptionAllowed = csConfig.getFloat(
+        CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
+        CapacitySchedulerConfiguration.DEFAULT_TOTAL_PREEMPTION_PER_ROUND);
+
+    observeOnly = csConfig.getBoolean(
+        CapacitySchedulerConfiguration.PREEMPTION_OBSERVE_ONLY,
+        CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_OBSERVE_ONLY);
+
+    lazyPreempionEnabled = csConfig.getBoolean(
+        CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED,
+        CapacitySchedulerConfiguration.DEFAULT_LAZY_PREEMPTION_ENABLED);
+
     rc = scheduler.getResourceCalculator();
     nlm = scheduler.getRMContext().getNodeLabelManager();
+
+    // initialize candidates preemption selection policies
+    candidatesSelectionPolicies.add(
+        new FifoCandidatesSelector(this));
   }
   
-  @VisibleForTesting
+  @Override
   public ResourceCalculator getResourceCalculator() {
     return rc;
   }
@@ -191,42 +191,37 @@ public class ProportionalCapacityPreemptionPolicy 
implements SchedulingEditPolic
   }
 
   @SuppressWarnings("unchecked")
-  private void cleanupStaledKillableContainers(Resource cluster,
-      Set<String> leafQueueNames) {
-    for (String q : leafQueueNames) {
-      for (TempQueuePerPartition tq : getQueuePartitions(q)) {
-        // When queue's used - killable <= guaranteed and, killable > 0, we 
need
-        // to check if any of killable containers needs to be reverted
-        if (Resources.lessThanOrEqual(rc, cluster,
-            Resources.subtract(tq.current, tq.killable), tq.idealAssigned)
-            && Resources.greaterThan(rc, cluster, tq.killable, 
Resources.none())) {
-          // How many killable resources need to be reverted
-          // need-to-revert = already-marked-killable - (current - ideal)
-          Resource toBeRevertedFromKillable = Resources.subtract(tq.killable,
-              Resources.subtract(tq.current, tq.idealAssigned));
-
-          Resource alreadyReverted = Resources.createResource(0);
-
-          for (RMContainer c : 
preemptableEntities.get(q).getKillableContainers(
-              tq.partition).values()) {
-            if (Resources.greaterThanOrEqual(rc, cluster, alreadyReverted,
-                toBeRevertedFromKillable)) {
-              break;
-            }
-
-            if (Resources.greaterThan(rc, cluster,
-                Resources.add(alreadyReverted, c.getAllocatedResource()),
-                toBeRevertedFromKillable)) {
-              continue;
-            } else {
-              // This container need to be marked to unkillable
-              Resources.addTo(alreadyReverted, c.getAllocatedResource());
-              rmContext.getDispatcher().getEventHandler().handle(
-                  new ContainerPreemptEvent(c.getApplicationAttemptId(), c,
-                      SchedulerEventType.MARK_CONTAINER_FOR_NONKILLABLE));
-            }
+  private void preemptOrkillSelectedContainerAfterWait(
+      Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates) {
+    // preempt (or kill) the selected containers
+    for (Map.Entry<ApplicationAttemptId, Set<RMContainer>> e : 
selectedCandidates
+        .entrySet()) {
+      ApplicationAttemptId appAttemptId = e.getKey();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Send to scheduler: in app=" + appAttemptId
+            + " #containers-to-be-preemptionCandidates=" + 
e.getValue().size());
+      }
+      for (RMContainer container : e.getValue()) {
+        // if we tried to preempt this for more than maxWaitTime
+        if (preemptionCandidates.get(container) != null
+            && preemptionCandidates.get(container) + maxWaitTime < clock
+            .getTime()) {
+          // kill it
+          rmContext.getDispatcher().getEventHandler().handle(
+              new ContainerPreemptEvent(appAttemptId, container,
+                  SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE));
+          preemptionCandidates.remove(container);
+        } else {
+          if (preemptionCandidates.get(container) != null) {
+            // We already updated the information to scheduler earlier, we need
+            // not have to raise another event.
+            continue;
           }
-
+          //otherwise just send preemption events
+          rmContext.getDispatcher().getEventHandler().handle(
+              new ContainerPreemptEvent(appAttemptId, container,
+                  SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION));
+          preemptionCandidates.put(container, clock.getTime());
         }
       }
     }
@@ -234,11 +229,11 @@ public class ProportionalCapacityPreemptionPolicy 
implements SchedulingEditPolic
 
   private void syncKillableContainersFromScheduler() {
     // sync preemptable entities from scheduler
-    preemptableEntities =
-        scheduler.getPreemptionManager().getShallowCopyOfPreemptableEntities();
+    preemptableQueues =
+        scheduler.getPreemptionManager().getShallowCopyOfPreemptableQueues();
 
     killableContainers = new HashSet<>();
-    for (Map.Entry<String, PreemptableQueue> entry : preemptableEntities
+    for (Map.Entry<String, PreemptableQueue> entry : preemptableQueues
         .entrySet()) {
       PreemptableQueue entity = entry.getValue();
       for (Map<ContainerId, RMContainer> map : entity.getKillableContainers()
@@ -247,9 +242,34 @@ public class ProportionalCapacityPreemptionPolicy 
implements SchedulingEditPolic
       }
     }
   }
+
+  private void cleanupStaledPreemptionCandidates() {
+    // Keep the preemptionCandidates list clean
+    for (Iterator<RMContainer> i = preemptionCandidates.keySet().iterator();
+         i.hasNext(); ) {
+      RMContainer id = i.next();
+      // garbage collect containers that are irrelevant for preemption
+      if (preemptionCandidates.get(id) + 2 * maxWaitTime < clock.getTime()) {
+        i.remove();
+      }
+    }
+  }
+
+  private Set<String> getLeafQueueNames(TempQueuePerPartition q) {
+    if (q.children == null || q.children.isEmpty()) {
+      return ImmutableSet.of(q.queueName);
+    }
+
+    Set<String> leafQueueNames = new HashSet<>();
+    for (TempQueuePerPartition child : q.children) {
+      leafQueueNames.addAll(getLeafQueueNames(child));
+    }
+
+    return leafQueueNames;
+  }
   
   /**
-   * This method selects and tracks containers to be preempted. If a container
+   * This method selects and tracks containers to be preemptionCandidates. If 
a container
    * is in the target list for more than maxWaitTime it is killed.
    *
    * @param root the root of the CapacityScheduler queue hierarchy
@@ -258,13 +278,17 @@ public class ProportionalCapacityPreemptionPolicy 
implements SchedulingEditPolic
   @SuppressWarnings("unchecked")
   private void containerBasedPreemptOrKill(CSQueue root,
       Resource clusterResources) {
+    // Sync killable containers from scheduler when lazy preemption enabled
+    if (lazyPreempionEnabled) {
+      syncKillableContainersFromScheduler();
+    }
+
     // All partitions to look at
-    Set<String> allPartitions = new HashSet<>();
-    allPartitions.addAll(scheduler.getRMContext()
+    Set<String> partitions = new HashSet<>();
+    partitions.addAll(scheduler.getRMContext()
         .getNodeLabelManager().getClusterNodeLabelNames());
-    allPartitions.add(RMNodeLabelsManager.NO_LABEL);
-
-    syncKillableContainersFromScheduler();
+    partitions.add(RMNodeLabelsManager.NO_LABEL);
+    this.allPartitions = ImmutableSet.copyOf(partitions);
 
     // extract a summary of the queues from scheduler
     synchronized (scheduler) {
@@ -277,30 +301,22 @@ public class ProportionalCapacityPreemptionPolicy 
implements SchedulingEditPolic
       }
     }
 
+    this.leafQueueNames = ImmutableSet.copyOf(getLeafQueueNames(
+        getQueueByPartition(CapacitySchedulerConfiguration.ROOT,
+            RMNodeLabelsManager.NO_LABEL)));
+
     // compute total preemption allowed
     Resource totalPreemptionAllowed = Resources.multiply(clusterResources,
         percentageClusterPreemptionAllowed);
 
-    Set<String> leafQueueNames = null;
-    for (String partition : allPartitions) {
-      TempQueuePerPartition tRoot =
-          getQueueByPartition(CapacitySchedulerConfiguration.ROOT, partition);
-      // compute the ideal distribution of resources among queues
-      // updates cloned queues state accordingly
-      tRoot.idealAssigned = tRoot.guaranteed;
-
-      leafQueueNames =
-          recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed);
-    }
-
-    // remove containers from killable list when we want to preempt less 
resources
-    // from queue.
-    cleanupStaledKillableContainers(clusterResources, leafQueueNames);
-
-    // based on ideal allocation select containers to be preempted from each
+    // based on ideal allocation select containers to be preemptionCandidates 
from each
     // queue and each application
-    Map<ApplicationAttemptId,Set<RMContainer>> toPreempt =
-        getContainersToPreempt(leafQueueNames, clusterResources);
+    Map<ApplicationAttemptId, Set<RMContainer>> toPreempt = null;
+    for (PreemptionCandidatesSelector selector :
+        candidatesSelectionPolicies) {
+      toPreempt = selector.selectCandidates(toPreempt,
+          clusterResources, totalPreemptionAllowed);
+    }
 
     if (LOG.isDebugEnabled()) {
       logToCSV(new ArrayList<>(leafQueueNames));
@@ -311,582 +327,19 @@ public class ProportionalCapacityPreemptionPolicy 
implements SchedulingEditPolic
       return;
     }
 
-    // preempt (or kill) the selected containers
-    for (Map.Entry<ApplicationAttemptId,Set<RMContainer>> e
-         : toPreempt.entrySet()) {
-      ApplicationAttemptId appAttemptId = e.getKey();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Send to scheduler: in app=" + appAttemptId
-            + " #containers-to-be-preempted=" + e.getValue().size());
-      }
-      for (RMContainer container : e.getValue()) {
-        // if we tried to preempt this for more than maxWaitTime
-        if (preempted.get(container) != null &&
-            preempted.get(container) + maxWaitTime < clock.getTime()) {
-          // mark container killable
-          rmContext.getDispatcher().getEventHandler().handle(
-              new ContainerPreemptEvent(appAttemptId, container,
-                  SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE));
-          preempted.remove(container);
-        } else {
-          if (preempted.get(container) != null) {
-            // We already updated the information to scheduler earlier, we need
-            // not have to raise another event.
-            continue;
-          }
-          //otherwise just send preemption events
-          rmContext.getDispatcher().getEventHandler().handle(
-              new ContainerPreemptEvent(appAttemptId, container,
-                  SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION));
-          preempted.put(container, clock.getTime());
-        }
-      }
-    }
-
-    // Keep the preempted list clean
-    for (Iterator<RMContainer> i = preempted.keySet().iterator(); 
i.hasNext();){
-      RMContainer id = i.next();
-      // garbage collect containers that are irrelevant for preemption
-      if (preempted.get(id) + 2 * maxWaitTime < clock.getTime()) {
-        i.remove();
-      }
-    }
-  }
-
-  /**
-   * This method recursively computes the ideal assignment of resources to each
-   * level of the hierarchy. This ensures that leafs that are over-capacity but
-   * with parents within capacity will not be preempted. Preemptions are 
allowed
-   * within each subtree according to local over/under capacity.
-   *
-   * @param root the root of the cloned queue hierachy
-   * @param totalPreemptionAllowed maximum amount of preemption allowed
-   * @return a list of leaf queues updated with preemption targets
-   */
-  private Set<String> recursivelyComputeIdealAssignment(
-      TempQueuePerPartition root, Resource totalPreemptionAllowed) {
-    Set<String> leafQueueNames = new HashSet<>();
-    if (root.getChildren() != null &&
-        root.getChildren().size() > 0) {
-      // compute ideal distribution at this level
-      computeIdealResourceDistribution(rc, root.getChildren(),
-          totalPreemptionAllowed, root.idealAssigned);
-      // compute recursively for lower levels and build list of leafs
-      for(TempQueuePerPartition t : root.getChildren()) {
-        leafQueueNames.addAll(recursivelyComputeIdealAssignment(t,
-            totalPreemptionAllowed));
-      }
-    } else {
-      // we are in a leaf nothing to do, just return yourself
-      return ImmutableSet.of(root.queueName);
-    }
-    return leafQueueNames;
-  }
+    // TODO: need consider revert killable containers when no more demandings.
+    // Since we could have several selectors to make decisions concurrently.
+    // So computed ideal-allocation varies between different selectors.
+    //
+    // We may need to "score" killable containers and revert the most preferred
+    // containers. The bottom line is, we shouldn't preempt a queue which is 
already
+    // below its guaranteed resource.
 
-  /**
-   * This method computes (for a single level in the tree, passed as a {@code
-   * List<TempQueue>}) the ideal assignment of resources. This is done
-   * recursively to allocate capacity fairly across all queues with pending
-   * demands. It terminates when no resources are left to assign, or when all
-   * demand is satisfied.
-   *
-   * @param rc resource calculator
-   * @param queues a list of cloned queues to be assigned capacity to (this is
-   * an out param)
-   * @param totalPreemptionAllowed total amount of preemption we allow
-   * @param tot_guarant the amount of capacity assigned to this pool of queues
-   */
-  private void computeIdealResourceDistribution(ResourceCalculator rc,
-      List<TempQueuePerPartition> queues, Resource totalPreemptionAllowed,
-      Resource tot_guarant) {
-
-    // qAlloc tracks currently active queues (will decrease progressively as
-    // demand is met)
-    List<TempQueuePerPartition> qAlloc = new ArrayList<>(queues);
-    // unassigned tracks how much resources are still to assign, initialized
-    // with the total capacity for this set of queues
-    Resource unassigned = Resources.clone(tot_guarant);
-
-    // group queues based on whether they have non-zero guaranteed capacity
-    Set<TempQueuePerPartition> nonZeroGuarQueues = new HashSet<>();
-    Set<TempQueuePerPartition> zeroGuarQueues = new HashSet<>();
-
-    for (TempQueuePerPartition q : qAlloc) {
-      if (Resources
-          .greaterThan(rc, tot_guarant, q.guaranteed, Resources.none())) {
-        nonZeroGuarQueues.add(q);
-      } else {
-        zeroGuarQueues.add(q);
-      }
-    }
-
-    // first compute the allocation as a fixpoint based on guaranteed capacity
-    computeFixpointAllocation(rc, tot_guarant, nonZeroGuarQueues, unassigned,
-        false);
-
-    // if any capacity is left unassigned, distributed among zero-guarantee 
-    // queues uniformly (i.e., not based on guaranteed capacity, as this is 
zero)
-    if (!zeroGuarQueues.isEmpty()
-        && Resources.greaterThan(rc, tot_guarant, unassigned, 
Resources.none())) {
-      computeFixpointAllocation(rc, tot_guarant, zeroGuarQueues, unassigned,
-          true);
-    }
-    
-    // based on ideal assignment computed above and current assignment we 
derive
-    // how much preemption is required overall
-    Resource totPreemptionNeeded = Resource.newInstance(0, 0);
-    for (TempQueuePerPartition t:queues) {
-      if (Resources.greaterThan(rc, tot_guarant, t.current, t.idealAssigned)) {
-        Resources.addTo(totPreemptionNeeded,
-            Resources.subtract(t.current, t.idealAssigned));
-      }
-    }
-
-    // if we need to preempt more than is allowed, compute a factor (0<f<1)
-    // that is used to scale down how much we ask back from each queue
-    float scalingFactor = 1.0F;
-    if (Resources.greaterThan(rc, tot_guarant,
-          totPreemptionNeeded, totalPreemptionAllowed)) {
-       scalingFactor = Resources.divide(rc, tot_guarant,
-           totalPreemptionAllowed, totPreemptionNeeded);
-    }
-
-    // assign to each queue the amount of actual preemption based on local
-    // information of ideal preemption and scaling factor
-    for (TempQueuePerPartition t : queues) {
-      t.assignPreemption(scalingFactor, rc, tot_guarant);
-    }
-    if (LOG.isDebugEnabled()) {
-      long time = clock.getTime();
-      for (TempQueuePerPartition t : queues) {
-        LOG.debug(time + ": " + t);
-      }
-    }
-
-  }
-  
-  /**
-   * Given a set of queues compute the fix-point distribution of unassigned
-   * resources among them. As pending request of a queue are exhausted, the
-   * queue is removed from the set and remaining capacity redistributed among
-   * remaining queues. The distribution is weighted based on guaranteed
-   * capacity, unless asked to ignoreGuarantee, in which case resources are
-   * distributed uniformly.
-   */
-  private void computeFixpointAllocation(ResourceCalculator rc,
-      Resource tot_guarant, Collection<TempQueuePerPartition> qAlloc,
-      Resource unassigned, boolean ignoreGuarantee) {
-    // Prior to assigning the unused resources, process each queue as follows:
-    // If current > guaranteed, idealAssigned = guaranteed + untouchable extra
-    // Else idealAssigned = current;
-    // Subtract idealAssigned resources from unassigned.
-    // If the queue has all of its needs met (that is, if 
-    // idealAssigned >= current + pending), remove it from consideration.
-    // Sort queues from most under-guaranteed to most over-guaranteed.
-    TQComparator tqComparator = new TQComparator(rc, tot_guarant);
-    PriorityQueue<TempQueuePerPartition> orderedByNeed = new 
PriorityQueue<>(10,
-        tqComparator);
-    for (Iterator<TempQueuePerPartition> i = qAlloc.iterator(); i.hasNext();) {
-      TempQueuePerPartition q = i.next();
-      if (Resources.greaterThan(rc, tot_guarant, q.current, q.guaranteed)) {
-        q.idealAssigned = Resources.add(q.guaranteed, q.untouchableExtra);
-      } else {
-        q.idealAssigned = Resources.clone(q.current);
-      }
-      Resources.subtractFrom(unassigned, q.idealAssigned);
-      // If idealAssigned < (current + pending), q needs more resources, so
-      // add it to the list of underserved queues, ordered by need.
-      Resource curPlusPend = Resources.add(q.current, q.pending);
-      if (Resources.lessThan(rc, tot_guarant, q.idealAssigned, curPlusPend)) {
-        orderedByNeed.add(q);
-      }
-    }
-
-    //assign all cluster resources until no more demand, or no resources are 
left
-    while (!orderedByNeed.isEmpty()
-       && Resources.greaterThan(rc,tot_guarant, unassigned,Resources.none())) {
-      Resource wQassigned = Resource.newInstance(0, 0);
-      // we compute normalizedGuarantees capacity based on currently active
-      // queues
-      resetCapacity(rc, unassigned, orderedByNeed, ignoreGuarantee);
-
-      // For each underserved queue (or set of queues if multiple are equally
-      // underserved), offer its share of the unassigned resources based on its
-      // normalized guarantee. After the offer, if the queue is not satisfied,
-      // place it back in the ordered list of queues, recalculating its place
-      // in the order of most under-guaranteed to most over-guaranteed. In this
-      // way, the most underserved queue(s) are always given resources first.
-      Collection<TempQueuePerPartition> underserved =
-          getMostUnderservedQueues(orderedByNeed, tqComparator);
-      for (Iterator<TempQueuePerPartition> i = underserved.iterator(); i
-          .hasNext();) {
-        TempQueuePerPartition sub = i.next();
-        Resource wQavail = Resources.multiplyAndNormalizeUp(rc,
-            unassigned, sub.normalizedGuarantee, Resource.newInstance(1, 1));
-        Resource wQidle = sub.offer(wQavail, rc, tot_guarant);
-        Resource wQdone = Resources.subtract(wQavail, wQidle);
-
-        if (Resources.greaterThan(rc, tot_guarant,
-              wQdone, Resources.none())) {
-          // The queue is still asking for more. Put it back in the priority
-          // queue, recalculating its order based on need.
-          orderedByNeed.add(sub);
-        }
-        Resources.addTo(wQassigned, wQdone);
-      }
-      Resources.subtractFrom(unassigned, wQassigned);
-    }
-  }
-
-  // Take the most underserved TempQueue (the one on the head). Collect and
-  // return the list of all queues that have the same idealAssigned
-  // percentage of guaranteed.
-  protected Collection<TempQueuePerPartition> getMostUnderservedQueues(
-      PriorityQueue<TempQueuePerPartition> orderedByNeed, TQComparator 
tqComparator) {
-    ArrayList<TempQueuePerPartition> underserved = new ArrayList<>();
-    while (!orderedByNeed.isEmpty()) {
-      TempQueuePerPartition q1 = orderedByNeed.remove();
-      underserved.add(q1);
-      TempQueuePerPartition q2 = orderedByNeed.peek();
-      // q1's pct of guaranteed won't be larger than q2's. If it's less, then
-      // return what has already been collected. Otherwise, q1's pct of
-      // guaranteed == that of q2, so add q2 to underserved list during the
-      // next pass.
-      if (q2 == null || tqComparator.compare(q1,q2) < 0) {
-        return underserved;
-      }
-    }
-    return underserved;
-  }
-
-  /**
-   * Computes a normalizedGuaranteed capacity based on active queues
-   * @param rc resource calculator
-   * @param clusterResource the total amount of resources in the cluster
-   * @param queues the list of queues to consider
-   */
-  private void resetCapacity(ResourceCalculator rc, Resource clusterResource,
-      Collection<TempQueuePerPartition> queues, boolean ignoreGuar) {
-    Resource activeCap = Resource.newInstance(0, 0);
-    
-    if (ignoreGuar) {
-      for (TempQueuePerPartition q : queues) {
-        q.normalizedGuarantee = 1.0f / queues.size();
-      }
-    } else {
-      for (TempQueuePerPartition q : queues) {
-        Resources.addTo(activeCap, q.guaranteed);
-      }
-      for (TempQueuePerPartition q : queues) {
-        q.normalizedGuarantee = Resources.divide(rc, clusterResource,
-            q.guaranteed, activeCap);
-      }
-    }
-  }
-
-  private String getPartitionByRMContainer(RMContainer rmContainer) {
-    return scheduler.getSchedulerNode(rmContainer.getAllocatedNode())
-        .getPartition();
-  }
-
-  /**
-   * Return should we preempt rmContainer. If we should, deduct from
-   * <code>resourceToObtainByPartition</code>
-   */
-  private boolean tryPreemptContainerAndDeductResToObtain(
-      Map<String, Resource> resourceToObtainByPartitions,
-      RMContainer rmContainer, Resource clusterResource,
-      Map<ApplicationAttemptId, Set<RMContainer>> preemptMap) {
-    ApplicationAttemptId attemptId = rmContainer.getApplicationAttemptId();
-
-    // We will not account resource of a container twice or more
-    if (preemptMapContains(preemptMap, attemptId, rmContainer)) {
-      return false;
-    }
-
-    String nodePartition = getPartitionByRMContainer(rmContainer);
-    Resource toObtainByPartition =
-        resourceToObtainByPartitions.get(nodePartition);
-
-    if (null != toObtainByPartition
-        && Resources.greaterThan(rc, clusterResource, toObtainByPartition,
-            Resources.none())) {
-      Resources.subtractFrom(toObtainByPartition,
-          rmContainer.getAllocatedResource());
-      // When we have no more resource need to obtain, remove from map.
-      if (Resources.lessThanOrEqual(rc, clusterResource, toObtainByPartition,
-          Resources.none())) {
-        resourceToObtainByPartitions.remove(nodePartition);
-      }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Marked container=" + rmContainer.getContainerId()
-            + " in partition=" + nodePartition + " will be preempted");
-      }
-      // Add to preemptMap
-      addToPreemptMap(preemptMap, attemptId, rmContainer);
-      return true;
-    }
-
-    return false;
-  }
-
-  private boolean preemptMapContains(
-      Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
-      ApplicationAttemptId attemptId, RMContainer rmContainer) {
-    Set<RMContainer> rmContainers;
-    if (null == (rmContainers = preemptMap.get(attemptId))) {
-      return false;
-    }
-    return rmContainers.contains(rmContainer);
-  }
-
-  private void addToPreemptMap(
-      Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
-      ApplicationAttemptId appAttemptId, RMContainer containerToPreempt) {
-    Set<RMContainer> set;
-    if (null == (set = preemptMap.get(appAttemptId))) {
-      set = new HashSet<>();
-      preemptMap.put(appAttemptId, set);
-    }
-    set.add(containerToPreempt);
-  }
-
-  /**
-   * Based a resource preemption target drop reservations of containers and
-   * if necessary select containers for preemption from applications in each
-   * over-capacity queue. It uses {@link #NATURAL_TERMINATION_FACTOR} to
-   * account for containers that will naturally complete.
-   *
-   * @param leafQueueNames set of leaf queues to preempt from
-   * @param clusterResource total amount of cluster resources
-   * @return a map of applciationID to set of containers to preempt
-   */
-  private Map<ApplicationAttemptId,Set<RMContainer>> getContainersToPreempt(
-      Set<String> leafQueueNames, Resource clusterResource) {
-
-    Map<ApplicationAttemptId, Set<RMContainer>> preemptMap =
-        new HashMap<>();
-    List<RMContainer> skippedAMContainerlist = new ArrayList<>();
-
-    // Loop all leaf queues
-    for (String queueName : leafQueueNames) {
-      // check if preemption disabled for the queue
-      if (getQueueByPartition(queueName,
-          RMNodeLabelsManager.NO_LABEL).preemptionDisabled) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("skipping from queue=" + queueName
-              + " because it's a non-preemptable queue");
-        }
-        continue;
-      }
-
-      // compute resToObtainByPartition considered inter-queue preemption
-      LeafQueue leafQueue = null;
-
-      Map<String, Resource> resToObtainByPartition =
-          new HashMap<>();
-      for (TempQueuePerPartition qT : getQueuePartitions(queueName)) {
-        leafQueue = qT.leafQueue;
-        // we act only if we are violating balance by more than
-        // maxIgnoredOverCapacity
-        if (Resources.greaterThan(rc, clusterResource, qT.current,
-            Resources.multiply(qT.guaranteed, 1.0 + maxIgnoredOverCapacity))) {
-          // we introduce a dampening factor naturalTerminationFactor that
-          // accounts for natural termination of containers
-          Resource resToObtain =
-              Resources.multiply(qT.toBePreempted, naturalTerminationFactor);
-          // Only add resToObtain when it >= 0
-          if (Resources.greaterThan(rc, clusterResource, resToObtain,
-              Resources.none())) {
-            resToObtainByPartition.put(qT.partition, resToObtain);
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Queue=" + queueName + " partition=" + qT.partition
-                  + " resource-to-obtain=" + resToObtain);
-            }
-          }
-          qT.actuallyPreempted = Resources.clone(resToObtain);
-        } else {
-          qT.actuallyPreempted = Resources.none();
-        }
-      }
-
-      synchronized (leafQueue) {
-        // go through all ignore-partition-exclusivity containers first to make
-        // sure such containers will be preempted first
-        Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityContainers 
=
-            leafQueue.getIgnoreExclusivityRMContainers();
-        for (String partition : resToObtainByPartition.keySet()) {
-          if (ignorePartitionExclusivityContainers.containsKey(partition)) {
-            TreeSet<RMContainer> rmContainers =
-                ignorePartitionExclusivityContainers.get(partition);
-            // We will check container from reverse order, so latter submitted
-            // application's containers will be preempted first.
-            for (RMContainer c : rmContainers.descendingSet()) {
-              boolean preempted =
-                  tryPreemptContainerAndDeductResToObtain(
-                      resToObtainByPartition, c, clusterResource, preemptMap);
-              if (!preempted) {
-                break;
-              }
-            }
-          }
-        }
-
-        // preempt other containers
-        Resource skippedAMSize = Resource.newInstance(0, 0);
-        Iterator<FiCaSchedulerApp> desc =
-            leafQueue.getOrderingPolicy().getPreemptionIterator();
-        while (desc.hasNext()) {
-          FiCaSchedulerApp fc = desc.next();
-          // When we complete preempt from one partition, we will remove from
-          // resToObtainByPartition, so when it becomes empty, we can get no
-          // more preemption is needed
-          if (resToObtainByPartition.isEmpty()) {
-            break;
-          }
-
-          preemptFrom(fc, clusterResource, resToObtainByPartition,
-              skippedAMContainerlist, skippedAMSize, preemptMap);
-        }
-
-        // Can try preempting AMContainers (still saving atmost
-        // maxAMCapacityForThisQueue AMResource's) if more resources are
-        // required to be preempted from this Queue.
-        Resource maxAMCapacityForThisQueue = Resources.multiply(
-            Resources.multiply(clusterResource,
-                leafQueue.getAbsoluteCapacity()),
-            leafQueue.getMaxAMResourcePerQueuePercent());
-
-        preemptAMContainers(clusterResource, preemptMap, 
skippedAMContainerlist,
-            resToObtainByPartition, skippedAMSize, maxAMCapacityForThisQueue);
-      }
-    }
-
-    return preemptMap;
-  }
-
-  /**
-   * As more resources are needed for preemption, saved AMContainers has to be
-   * rescanned. Such AMContainers can be preempted based on resToObtain, but 
-   * maxAMCapacityForThisQueue resources will be still retained.
-   *  
-   * @param clusterResource
-   * @param preemptMap
-   * @param skippedAMContainerlist
-   * @param skippedAMSize
-   * @param maxAMCapacityForThisQueue
-   */
-  private void preemptAMContainers(Resource clusterResource,
-      Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
-      List<RMContainer> skippedAMContainerlist,
-      Map<String, Resource> resToObtainByPartition, Resource skippedAMSize,
-      Resource maxAMCapacityForThisQueue) {
-    for (RMContainer c : skippedAMContainerlist) {
-      // Got required amount of resources for preemption, can stop now
-      if (resToObtainByPartition.isEmpty()) {
-        break;
-      }
-      // Once skippedAMSize reaches down to maxAMCapacityForThisQueue,
-      // container selection iteration for preemption will be stopped.
-      if (Resources.lessThanOrEqual(rc, clusterResource, skippedAMSize,
-          maxAMCapacityForThisQueue)) {
-        break;
-      }
-
-      boolean preempted =
-          tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c,
-              clusterResource, preemptMap);
-      if (preempted) {
-        Resources.subtractFrom(skippedAMSize, c.getAllocatedResource());
-      }
-    }
-    skippedAMContainerlist.clear();
-  }
-
-  /**
-   * Given a target preemption for a specific application, select containers
-   * to preempt (after unreserving all reservation for that app).
-   */
-  @SuppressWarnings("unchecked")
-  private void preemptFrom(FiCaSchedulerApp app,
-      Resource clusterResource, Map<String, Resource> resToObtainByPartition,
-      List<RMContainer> skippedAMContainerlist, Resource skippedAMSize,
-      Map<ApplicationAttemptId, Set<RMContainer>> preemptMap) {
-    ApplicationAttemptId appId = app.getApplicationAttemptId();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Looking at application=" + app.getApplicationAttemptId()
-          + " resourceToObtain=" + resToObtainByPartition);
-    }
-
-    // first drop reserved containers towards rsrcPreempt
-    List<RMContainer> reservedContainers =
-        new ArrayList<>(app.getReservedContainers());
-    for (RMContainer c : reservedContainers) {
-      if (resToObtainByPartition.isEmpty()) {
-        return;
-      }
-
-      // Try to preempt this container
-      tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c,
-          clusterResource, preemptMap);
-
-      if (!observeOnly) {
-        rmContext.getDispatcher().getEventHandler().handle(
-            new ContainerPreemptEvent(
-                appId, c, SchedulerEventType.KILL_RESERVED_CONTAINER));
-      }
-    }
-
-    // if more resources are to be freed go through all live containers in
-    // reverse priority and reverse allocation order and mark them for
-    // preemption
-    List<RMContainer> liveContainers = new 
ArrayList<>(app.getLiveContainers());
-
-    sortContainers(liveContainers);
-
-    for (RMContainer c : liveContainers) {
-      if (resToObtainByPartition.isEmpty()) {
-        return;
-      }
-
-      // Skip AM Container from preemption for now.
-      if (c.isAMContainer()) {
-        skippedAMContainerlist.add(c);
-        Resources.addTo(skippedAMSize, c.getAllocatedResource());
-        continue;
-      }
-
-      // Skip already marked to killable containers
-      if (killableContainers.contains(c.getContainerId())) {
-        continue;
-      }
-
-      // Try to preempt this container
-      tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c,
-          clusterResource, preemptMap);
-    }
-  }
+    // preempt (or kill) the selected containers
+    preemptOrkillSelectedContainerAfterWait(toPreempt);
 
-  /**
-   * Compare by reversed priority order first, and then reversed containerId
-   * order
-   * @param containers
-   */
-  @VisibleForTesting
-  static void sortContainers(List<RMContainer> containers){
-    Collections.sort(containers, new Comparator<RMContainer>() {
-      @Override
-      public int compare(RMContainer a, RMContainer b) {
-        Comparator<Priority> c = new org.apache.hadoop.yarn.server
-            .resourcemanager.resource.Priority.Comparator();
-        int priorityComp = c.compare(b.getContainer().getPriority(),
-                                     a.getContainer().getPriority());
-        if (priorityComp != 0) {
-          return priorityComp;
-        }
-        return b.getContainerId().compareTo(a.getContainerId());
-      }
-    });
+    // cleanup staled preemption candidates
+    cleanupStaledPreemptionCandidates();
   }
 
   @Override
@@ -901,7 +354,7 @@ public class ProportionalCapacityPreemptionPolicy 
implements SchedulingEditPolic
 
   @VisibleForTesting
   public Map<RMContainer, Long> getToPreemptContainers() {
-    return preempted;
+    return preemptionCandidates;
   }
 
   /**
@@ -929,8 +382,8 @@ public class ProportionalCapacityPreemptionPolicy 
implements SchedulingEditPolic
       Resource guaranteed = Resources.multiply(partitionResource, absCap);
       Resource maxCapacity = Resources.multiply(partitionResource, absMaxCap);
       Resource killable = Resources.none();
-      if (null != preemptableEntities.get(queueName)) {
-         killable = preemptableEntities.get(queueName)
+      if (null != preemptableQueues.get(queueName)) {
+         killable = preemptableQueues.get(queueName)
             .getKillableResource(partitionToLookAt);
       }
 
@@ -1023,9 +476,10 @@ public class ProportionalCapacityPreemptionPolicy 
implements SchedulingEditPolic
   /**
    * Get queue partition by given queueName and partitionName
    */
-  private TempQueuePerPartition getQueueByPartition(String queueName,
+  @Override
+  public TempQueuePerPartition getQueueByPartition(String queueName,
       String partition) {
-    Map<String, TempQueuePerPartition> partitionToQueues = null;
+    Map<String, TempQueuePerPartition> partitionToQueues;
     if (null == (partitionToQueues = queueToPartitions.get(queueName))) {
       return null;
     }
@@ -1035,180 +489,56 @@ public class ProportionalCapacityPreemptionPolicy 
implements SchedulingEditPolic
   /**
    * Get all queue partitions by given queueName
    */
-  private Collection<TempQueuePerPartition> getQueuePartitions(String 
queueName) {
+  @Override
+  public Collection<TempQueuePerPartition> getQueuePartitions(String 
queueName) {
     if (!queueToPartitions.containsKey(queueName)) {
       return null;
     }
     return queueToPartitions.get(queueName).values();
   }
 
-  /**
-   * Temporary data-structure tracking resource availability, pending resource
-   * need, current utilization. This is per-queue-per-partition data structure
-   */
-  static class TempQueuePerPartition {
-    final String queueName;
-    final Resource current;
-    final Resource pending;
-    final Resource guaranteed;
-    final Resource maxCapacity;
-    final String partition;
-    final Resource killable;
-    Resource idealAssigned;
-    Resource toBePreempted;
-
-    // For logging purpose
-    Resource actuallyPreempted;
-    Resource untouchableExtra;
-    Resource preemptableExtra;
-
-    double normalizedGuarantee;
-
-    final ArrayList<TempQueuePerPartition> children;
-    LeafQueue leafQueue;
-    boolean preemptionDisabled;
-
-    TempQueuePerPartition(String queueName, Resource current, Resource pending,
-        Resource guaranteed, Resource maxCapacity, boolean preemptionDisabled,
-        String partition, Resource killableResource) {
-      this.queueName = queueName;
-      this.current = current;
-      this.pending = pending;
-      this.guaranteed = guaranteed;
-      this.maxCapacity = maxCapacity;
-      this.idealAssigned = Resource.newInstance(0, 0);
-      this.actuallyPreempted = Resource.newInstance(0, 0);
-      this.toBePreempted = Resource.newInstance(0, 0);
-      this.normalizedGuarantee = Float.NaN;
-      this.children = new ArrayList<>();
-      this.untouchableExtra = Resource.newInstance(0, 0);
-      this.preemptableExtra = Resource.newInstance(0, 0);
-      this.preemptionDisabled = preemptionDisabled;
-      this.partition = partition;
-      this.killable = killableResource;
-    }
-
-    public void setLeafQueue(LeafQueue l){
-      assert children.size() == 0;
-      this.leafQueue = l;
-    }
-
-    /**
-     * When adding a child we also aggregate its pending resource needs.
-     * @param q the child queue to add to this queue
-     */
-    public void addChild(TempQueuePerPartition q) {
-      assert leafQueue == null;
-      children.add(q);
-      Resources.addTo(pending, q.pending);
-    }
-
-    public ArrayList<TempQueuePerPartition> getChildren(){
-      return children;
-    }
-
-    // This function "accepts" all the resources it can (pending) and return
-    // the unused ones
-    Resource offer(Resource avail, ResourceCalculator rc,
-        Resource clusterResource) {
-      Resource absMaxCapIdealAssignedDelta = Resources.componentwiseMax(
-                      Resources.subtract(maxCapacity, idealAssigned),
-                      Resource.newInstance(0, 0));
-      // remain = avail - min(avail, (max - assigned), (current + pending - 
assigned))
-      Resource accepted = 
-          Resources.min(rc, clusterResource, 
-              absMaxCapIdealAssignedDelta,
-          Resources.min(rc, clusterResource, avail, Resources.subtract(
-              Resources.add(current, pending), idealAssigned)));
-      Resource remain = Resources.subtract(avail, accepted);
-      Resources.addTo(idealAssigned, accepted);
-      return remain;
-    }
-
-    @Override
-    public String toString() {
-      StringBuilder sb = new StringBuilder();
-      sb.append(" NAME: " + queueName)
-        .append(" CUR: ").append(current)
-        .append(" PEN: ").append(pending)
-        .append(" GAR: ").append(guaranteed)
-        .append(" NORM: ").append(normalizedGuarantee)
-        .append(" IDEAL_ASSIGNED: ").append(idealAssigned)
-        .append(" IDEAL_PREEMPT: ").append(toBePreempted)
-        .append(" ACTUAL_PREEMPT: ").append(actuallyPreempted)
-        .append(" UNTOUCHABLE: ").append(untouchableExtra)
-        .append(" PREEMPTABLE: ").append(preemptableExtra)
-        .append("\n");
-
-      return sb.toString();
-    }
+  @Override
+  public CapacityScheduler getScheduler() {
+    return scheduler;
+  }
 
-    public void assignPreemption(float scalingFactor,
-        ResourceCalculator rc, Resource clusterResource) {
-      if (Resources.greaterThan(rc, clusterResource,
-          Resources.subtract(current, killable), idealAssigned)) {
-        toBePreempted = Resources.multiply(Resources.subtract(
-            Resources.subtract(current, killable), idealAssigned),
-            scalingFactor);
-      } else {
-        toBePreempted = Resource.newInstance(0, 0);
-      }
-    }
+  @Override
+  public RMContext getRMContext() {
+    return rmContext;
+  }
 
-    void appendLogString(StringBuilder sb) {
-      sb.append(queueName).append(", ")
-        .append(current.getMemory()).append(", ")
-        .append(current.getVirtualCores()).append(", ")
-        .append(pending.getMemory()).append(", ")
-        .append(pending.getVirtualCores()).append(", ")
-        .append(guaranteed.getMemory()).append(", ")
-        .append(guaranteed.getVirtualCores()).append(", ")
-        .append(idealAssigned.getMemory()).append(", ")
-        .append(idealAssigned.getVirtualCores()).append(", ")
-        .append(toBePreempted.getMemory()).append(", ")
-        .append(toBePreempted.getVirtualCores() ).append(", ")
-        .append(actuallyPreempted.getMemory()).append(", ")
-        .append(actuallyPreempted.getVirtualCores());
-    }
+  @Override
+  public boolean isObserveOnly() {
+    return observeOnly;
+  }
 
+  @Override
+  public Set<ContainerId> getKillableContainers() {
+    return killableContainers;
   }
 
-  static class TQComparator implements Comparator<TempQueuePerPartition> {
-    private ResourceCalculator rc;
-    private Resource clusterRes;
+  @Override
+  public double getMaxIgnoreOverCapacity() {
+    return maxIgnoredOverCapacity;
+  }
 
-    TQComparator(ResourceCalculator rc, Resource clusterRes) {
-      this.rc = rc;
-      this.clusterRes = clusterRes;
-    }
+  @Override
+  public double getNaturalTerminationFactor() {
+    return naturalTerminationFactor;
+  }
 
-    @Override
-    public int compare(TempQueuePerPartition tq1, TempQueuePerPartition tq2) {
-      if (getIdealPctOfGuaranteed(tq1) < getIdealPctOfGuaranteed(tq2)) {
-        return -1;
-      }
-      if (getIdealPctOfGuaranteed(tq1) > getIdealPctOfGuaranteed(tq2)) {
-        return 1;
-      }
-      return 0;
-    }
+  @Override
+  public Set<String> getLeafQueueNames() {
+    return leafQueueNames;
+  }
 
-    // Calculates idealAssigned / guaranteed
-    // TempQueues with 0 guarantees are always considered the most over
-    // capacity and therefore considered last for resources.
-    private double getIdealPctOfGuaranteed(TempQueuePerPartition q) {
-      double pctOver = Integer.MAX_VALUE;
-      if (q != null && Resources.greaterThan(
-          rc, clusterRes, q.guaranteed, Resources.none())) {
-        pctOver =
-            Resources.divide(rc, clusterRes, q.idealAssigned, q.guaranteed);
-      }
-      return (pctOver);
-    }
+  @Override
+  public Set<String> getAllPartitions() {
+    return allPartitions;
   }
 
   @VisibleForTesting
-  public Map<String, Map<String, TempQueuePerPartition>> getQueuePartitions() {
+  Map<String, Map<String, TempQueuePerPartition>> getQueuePartitions() {
     return queueToPartitions;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/60e4116b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
new file mode 100644
index 0000000..8b01a73
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import java.util.ArrayList;
+
+/**
+ * Temporary data-structure tracking resource availability, pending resource
+ * need, current utilization. This is per-queue-per-partition data structure
+ */
+public class TempQueuePerPartition {
+  // Following fields are copied from scheduler
+  final String queueName;
+  final Resource current;
+  final Resource pending;
+  final Resource guaranteed;
+  final Resource maxCapacity;
+  final Resource killable;
+  final String partition;
+
+  // Following fields are setted and used by candidate selection policies
+  Resource idealAssigned;
+  Resource toBePreempted;
+  Resource untouchableExtra;
+  Resource preemptableExtra;
+  // For logging purpose
+  Resource actuallyToBePreempted;
+
+  double normalizedGuarantee;
+
+  final ArrayList<TempQueuePerPartition> children;
+  LeafQueue leafQueue;
+  boolean preemptionDisabled;
+
+  TempQueuePerPartition(String queueName, Resource current, Resource pending,
+      Resource guaranteed, Resource maxCapacity, boolean preemptionDisabled,
+      String partition, Resource killable) {
+    this.queueName = queueName;
+    this.current = current;
+    this.pending = pending;
+    this.guaranteed = guaranteed;
+    this.maxCapacity = maxCapacity;
+    this.idealAssigned = Resource.newInstance(0, 0);
+    this.actuallyToBePreempted = Resource.newInstance(0, 0);
+    this.toBePreempted = Resource.newInstance(0, 0);
+    this.normalizedGuarantee = Float.NaN;
+    this.children = new ArrayList<>();
+    this.untouchableExtra = Resource.newInstance(0, 0);
+    this.preemptableExtra = Resource.newInstance(0, 0);
+    this.preemptionDisabled = preemptionDisabled;
+    this.partition = partition;
+    this.killable = killable;
+  }
+
+  public void setLeafQueue(LeafQueue l) {
+    assert children.size() == 0;
+    this.leafQueue = l;
+  }
+
+  /**
+   * When adding a child we also aggregate its pending resource needs.
+   * @param q the child queue to add to this queue
+   */
+  public void addChild(TempQueuePerPartition q) {
+    assert leafQueue == null;
+    children.add(q);
+    Resources.addTo(pending, q.pending);
+  }
+
+  public ArrayList<TempQueuePerPartition> getChildren(){
+    return children;
+  }
+
+  // This function "accepts" all the resources it can (pending) and return
+  // the unused ones
+  Resource offer(Resource avail, ResourceCalculator rc,
+      Resource clusterResource) {
+    Resource absMaxCapIdealAssignedDelta = Resources.componentwiseMax(
+        Resources.subtract(maxCapacity, idealAssigned),
+        Resource.newInstance(0, 0));
+    // remain = avail - min(avail, (max - assigned), (current + pending - 
assigned))
+    Resource accepted =
+        Resources.min(rc, clusterResource,
+            absMaxCapIdealAssignedDelta,
+            Resources.min(rc, clusterResource, avail, Resources.subtract(
+                Resources.add(current, pending), idealAssigned)));
+    Resource remain = Resources.subtract(avail, accepted);
+    Resources.addTo(idealAssigned, accepted);
+    return remain;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(" NAME: " + queueName)
+        .append(" CUR: ").append(current)
+        .append(" PEN: ").append(pending)
+        .append(" GAR: ").append(guaranteed)
+        .append(" NORM: ").append(normalizedGuarantee)
+        .append(" IDEAL_ASSIGNED: ").append(idealAssigned)
+        .append(" IDEAL_PREEMPT: ").append(toBePreempted)
+        .append(" ACTUAL_PREEMPT: ").append(actuallyToBePreempted)
+        .append(" UNTOUCHABLE: ").append(untouchableExtra)
+        .append(" PREEMPTABLE: ").append(preemptableExtra)
+        .append("\n");
+
+    return sb.toString();
+  }
+
+  public void assignPreemption(float scalingFactor, ResourceCalculator rc,
+      Resource clusterResource) {
+    if (Resources.greaterThan(rc, clusterResource,
+        Resources.subtract(current, killable), idealAssigned)) {
+      toBePreempted = Resources.multiply(Resources
+              .subtract(Resources.subtract(current, killable), idealAssigned),
+          scalingFactor);
+    } else {
+      toBePreempted = Resource.newInstance(0, 0);
+    }
+  }
+
+  void appendLogString(StringBuilder sb) {
+    sb.append(queueName).append(", ")
+        .append(current.getMemory()).append(", ")
+        .append(current.getVirtualCores()).append(", ")
+        .append(pending.getMemory()).append(", ")
+        .append(pending.getVirtualCores()).append(", ")
+        .append(guaranteed.getMemory()).append(", ")
+        .append(guaranteed.getVirtualCores()).append(", ")
+        .append(idealAssigned.getMemory()).append(", ")
+        .append(idealAssigned.getVirtualCores()).append(", ")
+        .append(toBePreempted.getMemory()).append(", ")
+        .append(toBePreempted.getVirtualCores() ).append(", ")
+        .append(actuallyToBePreempted.getMemory()).append(", ")
+        .append(actuallyToBePreempted.getVirtualCores());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/60e4116b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index 3729264..88e39de 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -1020,4 +1020,49 @@ public class CapacitySchedulerConfiguration extends 
ReservationSchedulerConfigur
   public boolean getLazyPreemptionEnabled() {
     return getBoolean(LAZY_PREEMPTION_ENALBED, 
DEFAULT_LAZY_PREEMPTION_ENABLED);
   }
+
+  /** If true, run the policy but do not affect the cluster with preemption and
+   * kill events. */
+  public static final String PREEMPTION_OBSERVE_ONLY =
+      "yarn.resourcemanager.monitor.capacity.preemption.observe_only";
+  public static final boolean DEFAULT_PREEMPTION_OBSERVE_ONLY = false;
+
+  /** Time in milliseconds between invocations of this policy */
+  public static final String PREEMPTION_MONITORING_INTERVAL =
+      "yarn.resourcemanager.monitor.capacity.preemption.monitoring_interval";
+  public static final long DEFAULT_PREEMPTION_MONITORING_INTERVAL = 3000L;
+
+  /** Time in milliseconds between requesting a preemption from an application
+   * and killing the container. */
+  public static final String PREEMPTION_WAIT_TIME_BEFORE_KILL =
+      "yarn.resourcemanager.monitor.capacity.preemption.max_wait_before_kill";
+  public static final long DEFAULT_PREEMPTION_WAIT_TIME_BEFORE_KILL = 15000L;
+
+  /** Maximum percentage of resources preemptionCandidates in a single round. 
By
+   * controlling this value one can throttle the pace at which containers are
+   * reclaimed from the cluster. After computing the total desired preemption,
+   * the policy scales it back within this limit. */
+  public static final String TOTAL_PREEMPTION_PER_ROUND =
+      
"yarn.resourcemanager.monitor.capacity.preemption.total_preemption_per_round";
+  public static final float DEFAULT_TOTAL_PREEMPTION_PER_ROUND = 0.1f;
+
+  /** Maximum amount of resources above the target capacity ignored for
+   * preemption. This defines a deadzone around the target capacity that helps
+   * prevent thrashing and oscillations around the computed target balance.
+   * High values would slow the time to capacity and (absent natural
+   * completions) it might prevent convergence to guaranteed capacity. */
+  public static final String PREEMPTION_MAX_IGNORED_OVER_CAPACITY =
+      
"yarn.resourcemanager.monitor.capacity.preemption.max_ignored_over_capacity";
+  public static final float DEFAULT_PREEMPTION_MAX_IGNORED_OVER_CAPACITY = 
0.1f;
+  /**
+   * Given a computed preemption target, account for containers naturally
+   * expiring and preempt only this percentage of the delta. This determines
+   * the rate of geometric convergence into the deadzone ({@link
+   * #PREEMPTION_MAX_IGNORED_OVER_CAPACITY}). For example, a termination 
factor of 0.5
+   * will reclaim almost 95% of resources within 5 * {@link
+   * #PREEMPTION_WAIT_TIME_BEFORE_KILL}, even absent natural termination. */
+  public static final String PREEMPTION_NATURAL_TERMINATION_FACTOR =
+      
"yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor";
+  public static final float DEFAULT_PREEMPTION_NATURAL_TERMINATION_FACTOR =
+      0.2f;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/60e4116b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptableQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptableQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptableQueue.java
index 19148d7..fefb56a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptableQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptableQueue.java
@@ -86,12 +86,6 @@ public class PreemptableQueue {
     return res == null ? Resources.none() : res;
   }
 
-  @SuppressWarnings("unchecked")
-  public Map<ContainerId, RMContainer> getKillableContainers(String partition) 
{
-    Map<ContainerId, RMContainer> map = killableContainers.get(partition);
-    return map == null ? Collections.EMPTY_MAP : map;
-  }
-
   public Map<String, Map<ContainerId, RMContainer>> getKillableContainers() {
     return killableContainers;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/60e4116b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java
index a9f02a5..76fcd4a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java
@@ -146,7 +146,7 @@ public class PreemptionManager {
     }
   }
 
-  public Map<String, PreemptableQueue> getShallowCopyOfPreemptableEntities() {
+  public Map<String, PreemptableQueue> getShallowCopyOfPreemptableQueues() {
     try {
       readLock.lock();
       Map<String, PreemptableQueue> map = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/60e4116b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
index e9129de..3db4782 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
@@ -17,38 +17,6 @@
  */
 package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 
-import static 
org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MAX_IGNORED_OVER_CAPACITY;
-import static 
org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL;
-import static 
org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR;
-import static 
org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY;
-import static 
org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND;
-import static 
org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
-import static 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE;
-import static 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Matchers.isA;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.Deque;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.NavigableSet;
-import java.util.Random;
-import java.util.StringTokenizer;
-import java.util.TreeSet;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -63,7 +31,6 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
-import 
org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TempQueuePerPartition;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -95,6 +62,32 @@ import org.mockito.ArgumentMatcher;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.Random;
+import java.util.StringTokenizer;
+import java.util.TreeSet;
+
+import static 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE;
+import static 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 public class TestProportionalCapacityPreemptionPolicy {
 
   static final long TS = 3141592653L;
@@ -105,11 +98,10 @@ public class TestProportionalCapacityPreemptionPolicy {
   float setAMResourcePercent = 0.0f;
   Random rand = null;
   Clock mClock = null;
-  Configuration conf = null;
+  CapacitySchedulerConfiguration conf = null;
   CapacityScheduler mCS = null;
   RMContext rmContext = null;
   RMNodeLabelsManager lm = null;
-  CapacitySchedulerConfiguration schedConf = null;
   EventHandler<SchedulerEvent> mDisp = null;
   ResourceCalculator rc = new DefaultResourceCalculator();
   Resource clusterResources = null;
@@ -132,7 +124,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     AMCONTAINER(0), CONTAINER(1), LABELEDCONTAINER(2);
     int value;
 
-    private priority(int value) {
+    priority(int value) {
       this.value = value;
     }
 
@@ -146,12 +138,17 @@ public class TestProportionalCapacityPreemptionPolicy {
   @Before
   @SuppressWarnings("unchecked")
   public void setup() {
-    conf = new Configuration(false);
-    conf.setLong(WAIT_TIME_BEFORE_KILL, 10000);
-    conf.setLong(MONITORING_INTERVAL, 3000);
+    conf = new CapacitySchedulerConfiguration(new Configuration(false));
+    conf.setLong(
+        CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL, 
10000);
+    conf.setLong(CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL,
+        3000);
     // report "ideal" preempt
-    conf.setFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 1.0);
-    conf.setFloat(NATURAL_TERMINATION_FACTOR, (float) 1.0);
+    conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
+        1.0f);
+    conf.setFloat(
+        CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR,
+        1.0f);
     conf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
         ProportionalCapacityPreemptionPolicy.class.getCanonicalName());
     conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
@@ -164,8 +161,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     mCS = mock(CapacityScheduler.class);
     when(mCS.getResourceCalculator()).thenReturn(rc);
     lm = mock(RMNodeLabelsManager.class);
-    schedConf = new CapacitySchedulerConfiguration();
-    when(mCS.getConfiguration()).thenReturn(schedConf);
+    when(mCS.getConfiguration()).thenReturn(conf);
     rmContext = mock(RMContext.class);
     when(mCS.getRMContext()).thenReturn(rmContext);
     when(mCS.getPreemptionManager()).thenReturn(new PreemptionManager());
@@ -271,7 +267,9 @@ public class TestProportionalCapacityPreemptionPolicy {
       {  -1,  1,  1,  1 },  // req granularity
       {   3,  0,  0,  0 },  // subqueues
     };
-    conf.setLong(WAIT_TIME_BEFORE_KILL, killTime);
+    conf.setLong(
+        CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL,
+        killTime);
     ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
 
     // ensure all pending rsrc from A get preempted from other queues
@@ -308,7 +306,9 @@ public class TestProportionalCapacityPreemptionPolicy {
       {  -1,  1,  1,  1 },  // req granularity
       {   3,  0,  0,  0 },  // subqueues
     };
-    conf.setFloat(MAX_IGNORED_OVER_CAPACITY, (float) 0.1);
+    conf.setFloat(
+        CapacitySchedulerConfiguration.PREEMPTION_MAX_IGNORED_OVER_CAPACITY,
+        (float) 0.1);
     ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
     policy.editSchedule();
     // ignore 10% overcapacity to avoid jitter
@@ -330,7 +330,7 @@ public class TestProportionalCapacityPreemptionPolicy {
         {   3,   0,   0,  0 },  // subqueues
       };
 
-    schedConf.setPreemptionDisabled("root.queueB", true);
+    conf.setPreemptionDisabled("root.queueB", true);
 
     ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
     policy.editSchedule();
@@ -343,7 +343,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     // event handler will count only events from the following test and not the
     // previous one.
     setup();
-    schedConf.setPreemptionDisabled("root.queueB", false);
+    conf.setPreemptionDisabled("root.queueB", false);
     ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData);
 
     policy2.editSchedule();
@@ -382,7 +382,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     // Need to call setup() again to reset mDisp
     setup();
     // Turn off preemption for queueB and it's children
-    schedConf.setPreemptionDisabled("root.queueA.queueB", true);
+    conf.setPreemptionDisabled("root.queueA.queueB", true);
     ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData);
     policy2.editSchedule();
     ApplicationAttemptId expectedAttemptOnQueueC = 
@@ -429,7 +429,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     // Need to call setup() again to reset mDisp
     setup();
     // Turn off preemption for queueB(appA)
-    schedConf.setPreemptionDisabled("root.queueA.queueB", true);
+    conf.setPreemptionDisabled("root.queueA.queueB", true);
     ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData);
     policy2.editSchedule();
     // Now that queueB(appA) is not preemptable, verify that resources come
@@ -439,8 +439,8 @@ public class TestProportionalCapacityPreemptionPolicy {
 
     setup();
     // Turn off preemption for two of the 3 queues with over-capacity.
-    schedConf.setPreemptionDisabled("root.queueD.queueE", true);
-    schedConf.setPreemptionDisabled("root.queueA.queueB", true);
+    conf.setPreemptionDisabled("root.queueD.queueE", true);
+    conf.setPreemptionDisabled("root.queueA.queueB", true);
     ProportionalCapacityPreemptionPolicy policy3 = buildPolicy(qData);
     policy3.editSchedule();
 
@@ -481,7 +481,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     // Turn off preemption for queueA and it's children. queueF(appC)'s request
     // should starve.
     setup(); // Call setup() to reset mDisp
-    schedConf.setPreemptionDisabled("root.queueA", true);
+    conf.setPreemptionDisabled("root.queueA", true);
     ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData);
     policy2.editSchedule();
     verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); 
// queueC
@@ -505,7 +505,7 @@ public class TestProportionalCapacityPreemptionPolicy {
       {   -1,   -1,    1,    1,    1,   -1,    1,    1,    1 },  // req 
granularity
       {    2,    3,    0,    0,    0,    3,    0,    0,    0 },  // subqueues
     };
-    schedConf.setPreemptionDisabled("root.queueA.queueC", true);
+    conf.setPreemptionDisabled("root.queueA.queueC", true);
     ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
     policy.editSchedule();
     // Although queueC(appB) is way over capacity and is untouchable,
@@ -529,7 +529,7 @@ public class TestProportionalCapacityPreemptionPolicy {
         {   3,   2,   0,   0,   2,   0,   0,   2,   0,   0 },  // subqueues
    };
 
-    schedConf.setPreemptionDisabled("root", true);
+    conf.setPreemptionDisabled("root", true);
     ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
     policy.editSchedule();
     // All queues should be non-preemptable, so request should starve.
@@ -556,7 +556,7 @@ public class TestProportionalCapacityPreemptionPolicy {
         {   2,   2,   0,   0,   2,   0,   0 },  // subqueues
     };
     // QueueE inherits non-preemption from QueueD
-    schedConf.setPreemptionDisabled("root.queueD", true);
+    conf.setPreemptionDisabled("root.queueD", true);
     ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
     policy.editSchedule();
     // appC is running on QueueE. QueueE is over absMaxCap, but is not
@@ -596,7 +596,10 @@ public class TestProportionalCapacityPreemptionPolicy {
       {  -1,  1,  1,  0 },  // req granularity
       {   3,  0,  0,  0 },  // subqueues
     };
-    conf.setFloat(NATURAL_TERMINATION_FACTOR, (float) 0.1);
+    conf.setFloat(
+        CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR,
+        (float) 0.1);
+
     ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
     policy.editSchedule();
     // ignore 10% imbalance between over-capacity queues
@@ -616,7 +619,10 @@ public class TestProportionalCapacityPreemptionPolicy {
       {  -1,  1,  1,  0 },  // req granularity
       {   3,  0,  0,  0 },  // subqueues
     };
-    conf.setBoolean(OBSERVE_ONLY, true);
+    conf.setBoolean(CapacitySchedulerConfiguration.PREEMPTION_OBSERVE_ONLY,
+        true);
+    when(mCS.getConfiguration()).thenReturn(
+        new CapacitySchedulerConfiguration(conf));
     ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
     policy.editSchedule();
     // verify even severe imbalance not affected
@@ -735,7 +741,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     containers.add(rm4);
 
     // sort them
-    ProportionalCapacityPreemptionPolicy.sortContainers(containers);
+    FifoCandidatesSelector.sortContainers(containers);
 
     // verify the "priority"-first, "reverse container-id"-second
     // ordering is enforced correctly
@@ -957,7 +963,7 @@ public class TestProportionalCapacityPreemptionPolicy {
 
   ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData) {
     ProportionalCapacityPreemptionPolicy policy = new 
ProportionalCapacityPreemptionPolicy(
-        conf, rmContext, mCS, mClock);
+        rmContext, mCS, mClock);
     clusterResources = Resource.newInstance(
         leafAbsCapacities(qData[0], qData[7]), 0);
     ParentQueue mRoot = buildMockRootQueue(rand, qData);
@@ -968,18 +974,13 @@ public class TestProportionalCapacityPreemptionPolicy {
   }
 
   ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData,
-      String[][] resData) {
-    return buildPolicy(qData, resData, false);
-  }
-
-  ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData,
       String[][] resData, boolean useDominantResourceCalculator) {
     if (useDominantResourceCalculator) {
       when(mCS.getResourceCalculator()).thenReturn(
           new DominantResourceCalculator());
     }
     ProportionalCapacityPreemptionPolicy policy =
-        new ProportionalCapacityPreemptionPolicy(conf, rmContext, mCS, mClock);
+        new ProportionalCapacityPreemptionPolicy(rmContext, mCS, mClock);
     clusterResources = leafAbsCapacities(parseResourceDetails(resData[0]),
         qData[2]);
     ParentQueue mRoot = buildMockRootQueue(rand, resData, qData);
@@ -1124,7 +1125,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     String qName = "";
     while(tokenizer.hasMoreTokens()) {
       qName += tokenizer.nextToken();
-      preemptionDisabled = schedConf.getPreemptionDisabled(qName, 
preemptionDisabled);
+      preemptionDisabled = conf.getPreemptionDisabled(qName, 
preemptionDisabled);
       qName += ".";
     }
     return preemptionDisabled;

Reply via email to