YARN-2394. FairScheduler: Configure fairSharePreemptionThreshold per queue. 
(Wei Yan via kasha)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1dcaba9a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1dcaba9a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1dcaba9a

Branch: refs/heads/MR-2841
Commit: 1dcaba9a7aa27f7ca4ba693e3abb56ab3c59c8a7
Parents: ce04621
Author: Karthik Kambatla <ka...@apache.org>
Authored: Wed Sep 3 10:27:36 2014 -0700
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Wed Sep 3 10:27:36 2014 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../scheduler/fair/AllocationConfiguration.java |  22 ++-
 .../fair/AllocationFileLoaderService.java       |  48 ++++-
 .../scheduler/fair/FSLeafQueue.java             |  54 ++++-
 .../scheduler/fair/FSParentQueue.java           |   6 +-
 .../resourcemanager/scheduler/fair/FSQueue.java |  23 ++-
 .../scheduler/fair/FairScheduler.java           |  60 ++----
 .../scheduler/fair/QueueManager.java            |  32 +--
 .../fair/TestAllocationFileLoaderService.java   |  45 ++++-
 .../scheduler/fair/TestFSLeafQueue.java         | 198 ++++++++++++++++---
 .../scheduler/fair/TestFairScheduler.java       | 158 ++++-----------
 .../src/site/apt/FairScheduler.apt.vm           |  10 +
 12 files changed, 412 insertions(+), 247 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/1dcaba9a/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index a6a1b9b3..64ccd28 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -61,6 +61,9 @@ Release 2.6.0 - UNRELEASED
     YARN-2395. FairScheduler: Preemption timeout should be configurable per 
     queue. (Wei Yan via kasha)
 
+    YARN-2394. FairScheduler: Configure fairSharePreemptionThreshold per queue.
+    (Wei Yan via kasha)
+
   IMPROVEMENTS
 
     YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1dcaba9a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
index 228a761..de5a999 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
@@ -70,6 +70,12 @@ public class AllocationConfiguration {
   // allowed to preempt other jobs' tasks.
   private final Map<String, Long> fairSharePreemptionTimeouts;
 
+  // The fair share preemption threshold for each queue. If a queue waits
+  // fairSharePreemptionTimeout without receiving
+  // fairshare * fairSharePreemptionThreshold resources, it is allowed to
+  // preempt other queues' tasks.
+  private final Map<String, Float> fairSharePreemptionThresholds;
+
   private final Map<String, SchedulingPolicy> schedulingPolicies;
   
   private final SchedulingPolicy defaultSchedulingPolicy;
@@ -92,6 +98,7 @@ public class AllocationConfiguration {
       SchedulingPolicy defaultSchedulingPolicy,
       Map<String, Long> minSharePreemptionTimeouts,
       Map<String, Long> fairSharePreemptionTimeouts,
+      Map<String, Float> fairSharePreemptionThresholds,
       Map<String, Map<QueueACL, AccessControlList>> queueAcls,
       QueuePlacementPolicy placementPolicy,
       Map<FSQueueType, Set<String>> configuredQueues) {
@@ -108,6 +115,7 @@ public class AllocationConfiguration {
     this.schedulingPolicies = schedulingPolicies;
     this.minSharePreemptionTimeouts = minSharePreemptionTimeouts;
     this.fairSharePreemptionTimeouts = fairSharePreemptionTimeouts;
+    this.fairSharePreemptionThresholds = fairSharePreemptionThresholds;
     this.queueAcls = queueAcls;
     this.placementPolicy = placementPolicy;
     this.configuredQueues = configuredQueues;
@@ -126,6 +134,7 @@ public class AllocationConfiguration {
     queueAcls = new HashMap<String, Map<QueueACL, AccessControlList>>();
     minSharePreemptionTimeouts = new HashMap<String, Long>();
     fairSharePreemptionTimeouts = new HashMap<String, Long>();
+    fairSharePreemptionThresholds = new HashMap<String, Float>();
     schedulingPolicies = new HashMap<String, SchedulingPolicy>();
     defaultSchedulingPolicy = SchedulingPolicy.DEFAULT_POLICY;
     configuredQueues = new HashMap<FSQueueType, Set<String>>();
@@ -171,7 +180,18 @@ public class AllocationConfiguration {
     return (fairSharePreemptionTimeout == null) ?
         -1 : fairSharePreemptionTimeout;
   }
-  
+
+  /**
+   * Get a queue's fair share preemption threshold in the allocation file.
+   * Return -1f if not set.
+   */
+  public float getFairSharePreemptionThreshold(String queueName) {
+    Float fairSharePreemptionThreshold =
+        fairSharePreemptionThresholds.get(queueName);
+    return (fairSharePreemptionThreshold == null) ?
+        -1f : fairSharePreemptionThreshold;
+  }
+
   public ResourceWeights getQueueWeight(String queue) {
     ResourceWeights weight = queueWeights.get(queue);
     return (weight == null) ? ResourceWeights.NEUTRAL : weight;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1dcaba9a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
index 970ee99..c2dfc84 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
@@ -218,6 +218,8 @@ public class AllocationFileLoaderService extends 
AbstractService {
     Map<String, SchedulingPolicy> queuePolicies = new HashMap<String, 
SchedulingPolicy>();
     Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>();
     Map<String, Long> fairSharePreemptionTimeouts = new HashMap<String, 
Long>();
+    Map<String, Float> fairSharePreemptionThresholds =
+        new HashMap<String, Float>();
     Map<String, Map<QueueACL, AccessControlList>> queueAcls =
         new HashMap<String, Map<QueueACL, AccessControlList>>();
     int userMaxAppsDefault = Integer.MAX_VALUE;
@@ -225,6 +227,7 @@ public class AllocationFileLoaderService extends 
AbstractService {
     float queueMaxAMShareDefault = -1.0f;
     long defaultFairSharePreemptionTimeout = Long.MAX_VALUE;
     long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
+    float defaultFairSharePreemptionThreshold = 0.5f;
     SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.DEFAULT_POLICY;
 
     QueuePlacementPolicy newPlacementPolicy = null;
@@ -277,7 +280,8 @@ public class AllocationFileLoaderService extends 
AbstractService {
           String text = ((Text)element.getFirstChild()).getData().trim();
           int val = Integer.parseInt(text);
           userMaxAppsDefault = val;
-        } else if 
("defaultFairSharePreemptionTimeout".equals(element.getTagName())) {
+        } else if ("defaultFairSharePreemptionTimeout"
+            .equals(element.getTagName())) {
           String text = ((Text)element.getFirstChild()).getData().trim();
           long val = Long.parseLong(text) * 1000L;
           defaultFairSharePreemptionTimeout = val;
@@ -287,10 +291,17 @@ public class AllocationFileLoaderService extends 
AbstractService {
             long val = Long.parseLong(text) * 1000L;
             defaultFairSharePreemptionTimeout = val;
           }
-        } else if 
("defaultMinSharePreemptionTimeout".equals(element.getTagName())) {
+        } else if ("defaultMinSharePreemptionTimeout"
+            .equals(element.getTagName())) {
           String text = ((Text)element.getFirstChild()).getData().trim();
           long val = Long.parseLong(text) * 1000L;
           defaultMinSharePreemptionTimeout = val;
+        } else if ("defaultFairSharePreemptionThreshold"
+            .equals(element.getTagName())) {
+          String text = ((Text)element.getFirstChild()).getData().trim();
+          float val = Float.parseFloat(text);
+          val = Math.max(Math.min(val, 1.0f), 0.0f);
+          defaultFairSharePreemptionThreshold = val;
         } else if ("queueMaxAppsDefault".equals(element.getTagName())) {
           String text = ((Text)element.getFirstChild()).getData().trim();
           int val = Integer.parseInt(text);
@@ -326,7 +337,7 @@ public class AllocationFileLoaderService extends 
AbstractService {
       loadQueue(parent, element, minQueueResources, maxQueueResources,
           queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
           queuePolicies, minSharePreemptionTimeouts, 
fairSharePreemptionTimeouts,
-          queueAcls, configuredQueues);
+          fairSharePreemptionThresholds, queueAcls, configuredQueues);
     }
 
     // Load placement policy and pass it configured queues
@@ -349,11 +360,18 @@ public class AllocationFileLoaderService extends 
AbstractService {
           defaultFairSharePreemptionTimeout);
     }
 
+    // Set the fair share preemption threshold for the root queue
+    if (!fairSharePreemptionThresholds.containsKey(QueueManager.ROOT_QUEUE)) {
+      fairSharePreemptionThresholds.put(QueueManager.ROOT_QUEUE,
+          defaultFairSharePreemptionThreshold);
+    }
+
     AllocationConfiguration info = new 
AllocationConfiguration(minQueueResources,
         maxQueueResources, queueMaxApps, userMaxApps, queueWeights,
         queueMaxAMShares, userMaxAppsDefault, queueMaxAppsDefault,
         queueMaxAMShareDefault, queuePolicies, defaultSchedPolicy,
-        minSharePreemptionTimeouts, fairSharePreemptionTimeouts, queueAcls,
+        minSharePreemptionTimeouts, fairSharePreemptionTimeouts,
+        fairSharePreemptionThresholds, queueAcls,
         newPlacementPolicy, configuredQueues);
     
     lastSuccessfulReload = clock.getTime();
@@ -365,13 +383,15 @@ public class AllocationFileLoaderService extends 
AbstractService {
   /**
    * Loads a queue from a queue element in the configuration file
    */
-  private void loadQueue(String parentName, Element element, Map<String, 
Resource> minQueueResources,
+  private void loadQueue(String parentName, Element element,
+      Map<String, Resource> minQueueResources,
       Map<String, Resource> maxQueueResources, Map<String, Integer> 
queueMaxApps,
       Map<String, Integer> userMaxApps, Map<String, Float> queueMaxAMShares,
       Map<String, ResourceWeights> queueWeights,
       Map<String, SchedulingPolicy> queuePolicies,
       Map<String, Long> minSharePreemptionTimeouts,
       Map<String, Long> fairSharePreemptionTimeouts,
+      Map<String, Float> fairSharePreemptionThresholds,
       Map<String, Map<QueueACL, AccessControlList>> queueAcls, 
       Map<FSQueueType, Set<String>> configuredQueues) 
       throws AllocationConfigurationException {
@@ -418,6 +438,11 @@ public class AllocationFileLoaderService extends 
AbstractService {
         String text = ((Text)field.getFirstChild()).getData().trim();
         long val = Long.parseLong(text) * 1000L;
         fairSharePreemptionTimeouts.put(queueName, val);
+      } else if ("fairSharePreemptionThreshold".equals(field.getTagName())) {
+        String text = ((Text)field.getFirstChild()).getData().trim();
+        float val = Float.parseFloat(text);
+        val = Math.max(Math.min(val, 1.0f), 0.0f);
+        fairSharePreemptionThresholds.put(queueName, val);
       } else if ("schedulingPolicy".equals(field.getTagName())
           || "schedulingMode".equals(field.getTagName())) {
         String text = ((Text)field.getFirstChild()).getData().trim();
@@ -434,7 +459,8 @@ public class AllocationFileLoaderService extends 
AbstractService {
         loadQueue(queueName, field, minQueueResources, maxQueueResources,
             queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
             queuePolicies, minSharePreemptionTimeouts,
-            fairSharePreemptionTimeouts, queueAcls, configuredQueues);
+            fairSharePreemptionTimeouts, fairSharePreemptionThresholds,
+            queueAcls, configuredQueues);
         configuredQueues.get(FSQueueType.PARENT).add(queueName);
         isLeaf = false;
       }
@@ -449,11 +475,15 @@ public class AllocationFileLoaderService extends 
AbstractService {
       }
     }
     queueAcls.put(queueName, acls);
-    if (maxQueueResources.containsKey(queueName) && 
minQueueResources.containsKey(queueName)
+    if (maxQueueResources.containsKey(queueName) &&
+        minQueueResources.containsKey(queueName)
         && !Resources.fitsIn(minQueueResources.get(queueName),
             maxQueueResources.get(queueName))) {
-      LOG.warn(String.format("Queue %s has max resources %s less than min 
resources %s",
-          queueName, maxQueueResources.get(queueName), 
minQueueResources.get(queueName)));
+      LOG.warn(
+          String.format(
+              "Queue %s has max resources %s less than min resources %s",
+          queueName, maxQueueResources.get(queueName),
+              minQueueResources.get(queueName)));
     }
   }
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1dcaba9a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
index 49e8ef0..345ea8b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -54,7 +55,7 @@ public class FSLeafQueue extends FSQueue {
   
   // Variables used for preemption
   private long lastTimeAtMinShare;
-  private long lastTimeAtHalfFairShare;
+  private long lastTimeAtFairShareThreshold;
   
   // Track the AM resource usage for this queue
   private Resource amResourceUsage;
@@ -65,7 +66,7 @@ public class FSLeafQueue extends FSQueue {
       FSParentQueue parent) {
     super(name, scheduler, parent);
     this.lastTimeAtMinShare = scheduler.getClock().getTime();
-    this.lastTimeAtHalfFairShare = scheduler.getClock().getTime();
+    this.lastTimeAtFairShareThreshold = scheduler.getClock().getTime();
     activeUsersManager = new ActiveUsersManager(getMetrics());
     amResourceUsage = Resource.newInstance(0, 0);
   }
@@ -275,16 +276,17 @@ public class FSLeafQueue extends FSQueue {
     return lastTimeAtMinShare;
   }
 
-  public void setLastTimeAtMinShare(long lastTimeAtMinShare) {
+  private void setLastTimeAtMinShare(long lastTimeAtMinShare) {
     this.lastTimeAtMinShare = lastTimeAtMinShare;
   }
 
-  public long getLastTimeAtHalfFairShare() {
-    return lastTimeAtHalfFairShare;
+  public long getLastTimeAtFairShareThreshold() {
+    return lastTimeAtFairShareThreshold;
   }
 
-  public void setLastTimeAtHalfFairShare(long lastTimeAtHalfFairShare) {
-    this.lastTimeAtHalfFairShare = lastTimeAtHalfFairShare;
+  private void setLastTimeAtFairShareThreshold(
+      long lastTimeAtFairShareThreshold) {
+    this.lastTimeAtFairShareThreshold = lastTimeAtFairShareThreshold;
   }
 
   @Override
@@ -329,6 +331,20 @@ public class FSLeafQueue extends FSQueue {
   }
 
   /**
+   * Update the preemption fields for the queue, i.e. the times since last was
+   * at its guaranteed share and over its fair share threshold.
+   */
+  public void updateStarvationStats() {
+    long now = scheduler.getClock().getTime();
+    if (!isStarvedForMinShare()) {
+      setLastTimeAtMinShare(now);
+    }
+    if (!isStarvedForFairShare()) {
+      setLastTimeAtFairShareThreshold(now);
+    }
+  }
+
+  /**
    * Helper method to check if the queue should preempt containers
    *
    * @return true if check passes (can preempt) or false otherwise
@@ -337,4 +353,28 @@ public class FSLeafQueue extends FSQueue {
     return parent.getPolicy().checkIfUsageOverFairShare(getResourceUsage(),
         getFairShare());
   }
+
+  /**
+   * Is a queue being starved for its min share.
+   */
+  @VisibleForTesting
+  boolean isStarvedForMinShare() {
+    return isStarved(getMinShare());
+  }
+
+  /**
+   * Is a queue being starved for its fair share threshold.
+   */
+  @VisibleForTesting
+  boolean isStarvedForFairShare() {
+    return isStarved(
+        Resources.multiply(getFairShare(), getFairSharePreemptionThreshold()));
+  }
+
+  private boolean isStarved(Resource share) {
+    Resource desiredShare = 
Resources.min(FairScheduler.getResourceCalculator(),
+        scheduler.getClusterResource(), share, getDemand());
+    return Resources.lessThan(FairScheduler.getResourceCalculator(),
+        scheduler.getClusterResource(), getResourceUsage(), desiredShare);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1dcaba9a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
index 1209970..f74106a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
@@ -78,11 +78,11 @@ public class FSParentQueue extends FSQueue {
   }
 
   @Override
-  public void updatePreemptionTimeouts() {
-    super.updatePreemptionTimeouts();
+  public void updatePreemptionVariables() {
+    super.updatePreemptionVariables();
     // For child queues
     for (FSQueue childQueue : childQueues) {
-      childQueue.updatePreemptionTimeouts();
+      childQueue.updatePreemptionVariables();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1dcaba9a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
index b9fcc4b..d4e043d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
@@ -54,6 +54,7 @@ public abstract class FSQueue implements Queue, Schedulable {
 
   private long fairSharePreemptionTimeout = Long.MAX_VALUE;
   private long minSharePreemptionTimeout = Long.MAX_VALUE;
+  private float fairSharePreemptionThreshold = 0.5f;
 
   public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) {
     this.name = name;
@@ -186,6 +187,14 @@ public abstract class FSQueue implements Queue, 
Schedulable {
     this.minSharePreemptionTimeout = minSharePreemptionTimeout;
   }
 
+  public float getFairSharePreemptionThreshold() {
+    return fairSharePreemptionThreshold;
+  }
+
+  public void setFairSharePreemptionThreshold(float 
fairSharePreemptionThreshold) {
+    this.fairSharePreemptionThreshold = fairSharePreemptionThreshold;
+  }
+
   /**
    * Recomputes the shares for all child queues and applications based on this
    * queue's current share
@@ -193,21 +202,27 @@ public abstract class FSQueue implements Queue, 
Schedulable {
   public abstract void recomputeShares();
 
   /**
-   * Update the min/fair share preemption timeouts for this queue.
+   * Update the min/fair share preemption timeouts and threshold for this 
queue.
    */
-  public void updatePreemptionTimeouts() {
-    // For min share
+  public void updatePreemptionVariables() {
+    // For min share timeout
     minSharePreemptionTimeout = scheduler.getAllocationConfiguration()
         .getMinSharePreemptionTimeout(getName());
     if (minSharePreemptionTimeout == -1 && parent != null) {
       minSharePreemptionTimeout = parent.getMinSharePreemptionTimeout();
     }
-    // For fair share
+    // For fair share timeout
     fairSharePreemptionTimeout = scheduler.getAllocationConfiguration()
         .getFairSharePreemptionTimeout(getName());
     if (fairSharePreemptionTimeout == -1 && parent != null) {
       fairSharePreemptionTimeout = parent.getFairSharePreemptionTimeout();
     }
+    // For fair share preemption threshold
+    fairSharePreemptionThreshold = scheduler.getAllocationConfiguration()
+        .getFairSharePreemptionThreshold(getName());
+    if (fairSharePreemptionThreshold < 0 && parent != null) {
+      fairSharePreemptionThreshold = parent.getFairSharePreemptionThreshold();
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1dcaba9a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 2798b8d..a35e49f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -299,7 +299,7 @@ public class FairScheduler extends
    */
   protected synchronized void update() {
     long start = getClock().getTime();
-    updatePreemptionVariables(); // Determine if any queues merit preemption
+    updateStarvationStats(); // Determine if any queues merit preemption
 
     FSQueue rootQueue = queueMgr.getRootQueue();
 
@@ -329,48 +329,20 @@ public class FairScheduler extends
 
   /**
    * Update the preemption fields for all QueueScheduables, i.e. the times 
since
-   * each queue last was at its guaranteed share and at > 1/2 of its fair share
-   * for each type of task.
+   * each queue last was at its guaranteed share and over its fair share
+   * threshold for each type of task.
    */
-  private void updatePreemptionVariables() {
-    long now = getClock().getTime();
-    lastPreemptionUpdateTime = now;
+  private void updateStarvationStats() {
+    lastPreemptionUpdateTime = clock.getTime();
     for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
-      if (!isStarvedForMinShare(sched)) {
-        sched.setLastTimeAtMinShare(now);
-      }
-      if (!isStarvedForFairShare(sched)) {
-        sched.setLastTimeAtHalfFairShare(now);
-      }
+      sched.updateStarvationStats();
     }
   }
 
   /**
-   * Is a queue below its min share for the given task type?
-   */
-  boolean isStarvedForMinShare(FSLeafQueue sched) {
-    Resource desiredShare = Resources.min(RESOURCE_CALCULATOR, clusterResource,
-      sched.getMinShare(), sched.getDemand());
-    return Resources.lessThan(RESOURCE_CALCULATOR, clusterResource,
-        sched.getResourceUsage(), desiredShare);
-  }
-
-  /**
-   * Is a queue being starved for fair share for the given task type? This is
-   * defined as being below half its fair share.
-   */
-  boolean isStarvedForFairShare(FSLeafQueue sched) {
-    Resource desiredFairShare = Resources.min(RESOURCE_CALCULATOR,
-        clusterResource,
-        Resources.multiply(sched.getFairShare(), .5), sched.getDemand());
-    return Resources.lessThan(RESOURCE_CALCULATOR, clusterResource,
-        sched.getResourceUsage(), desiredFairShare);
-  }
-
-  /**
    * Check for queues that need tasks preempted, either because they have been
    * below their guaranteed share for minSharePreemptionTimeout or they have
-   * been below half their fair share for the fairSharePreemptionTimeout. If
+   * been below their fair share threshold for the fairSharePreemptionTimeout. 
If
    * such queues exist, compute how many tasks of each type need to be 
preempted
    * and then select the right ones using preemptTasks.
    */
@@ -499,11 +471,11 @@ public class FairScheduler extends
    * Return the resource amount that this queue is allowed to preempt, if any.
    * If the queue has been below its min share for at least its preemption
    * timeout, it should preempt the difference between its current share and
-   * this min share. If it has been below half its fair share for at least the
-   * fairSharePreemptionTimeout, it should preempt enough tasks to get up to 
its
-   * full fair share. If both conditions hold, we preempt the max of the two
-   * amounts (this shouldn't happen unless someone sets the timeouts to be
-   * identical for some reason).
+   * this min share. If it has been below its fair share preemption threshold
+   * for at least the fairSharePreemptionTimeout, it should preempt enough 
tasks
+   * to get up to its full fair share. If both conditions hold, we preempt the
+   * max of the two amounts (this shouldn't happen unless someone sets the
+   * timeouts to be identical for some reason).
    */
   protected Resource resToPreempt(FSLeafQueue sched, long curTime) {
     long minShareTimeout = sched.getMinSharePreemptionTimeout();
@@ -516,7 +488,7 @@ public class FairScheduler extends
       resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterResource,
           Resources.none(), Resources.subtract(target, 
sched.getResourceUsage()));
     }
-    if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) {
+    if (curTime - sched.getLastTimeAtFairShareThreshold() > fairShareTimeout) {
       Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource,
           sched.getFairShare(), sched.getDemand());
       resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterResource,
@@ -1094,7 +1066,11 @@ public class FairScheduler extends
   public FSAppAttempt getSchedulerApp(ApplicationAttemptId appAttemptId) {
     return super.getApplicationAttempt(appAttemptId);
   }
-  
+
+  public static ResourceCalculator getResourceCalculator() {
+    return RESOURCE_CALCULATOR;
+  }
+
   /**
    * Subqueue metrics might be a little out of date because fair shares are
    * recalculated at the update interval, but the root queue metrics needs to

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1dcaba9a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
index 2444ba4..61b3b6c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
@@ -181,7 +181,7 @@ public class QueueManager {
         parent.addChildQueue(leafQueue);
         queues.put(leafQueue.getName(), leafQueue);
         leafQueues.add(leafQueue);
-        setPreemptionTimeout(leafQueue, parent, queueConf);
+        leafQueue.updatePreemptionVariables();
         return leafQueue;
       } else {
         FSParentQueue newParent = new FSParentQueue(queueName, scheduler, 
parent);
@@ -193,7 +193,7 @@ public class QueueManager {
         }
         parent.addChildQueue(newParent);
         queues.put(newParent.getName(), newParent);
-        setPreemptionTimeout(newParent, parent, queueConf);
+        newParent.updatePreemptionVariables();
         parent = newParent;
       }
     }
@@ -202,29 +202,6 @@ public class QueueManager {
   }
 
   /**
-   * Set the min/fair share preemption timeouts for the given queue.
-   * If the timeout is configured in the allocation file, the queue will use
-   * that value; otherwise, the queue inherits the value from its parent queue.
-   */
-  private void setPreemptionTimeout(FSQueue queue,
-      FSParentQueue parentQueue, AllocationConfiguration queueConf) {
-    // For min share
-    long minSharePreemptionTimeout =
-        queueConf.getMinSharePreemptionTimeout(queue.getQueueName());
-    if (minSharePreemptionTimeout == -1) {
-      minSharePreemptionTimeout = parentQueue.getMinSharePreemptionTimeout();
-    }
-    queue.setMinSharePreemptionTimeout(minSharePreemptionTimeout);
-    // For fair share
-    long fairSharePreemptionTimeout =
-        queueConf.getFairSharePreemptionTimeout(queue.getQueueName());
-    if (fairSharePreemptionTimeout == -1) {
-      fairSharePreemptionTimeout = parentQueue.getFairSharePreemptionTimeout();
-    }
-    queue.setFairSharePreemptionTimeout(fairSharePreemptionTimeout);
-  }
-
-  /**
    * Make way for the given queue if possible, by removing incompatible
    * queues with no apps in them. Incompatibility could be due to
    * (1) queueToCreate being currently a parent but needs to change to leaf
@@ -409,7 +386,8 @@ public class QueueManager {
 
     // Update steady fair shares for all queues
     rootQueue.recomputeSteadyShares();
-    // Update the fair share preemption timeouts for all queues recursively
-    rootQueue.updatePreemptionTimeouts();
+    // Update the fair share preemption timeouts and preemption for all queues
+    // recursively
+    rootQueue.updatePreemptionVariables();
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1dcaba9a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java
index 14b3111..656e20d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java
@@ -187,13 +187,15 @@ public class TestAllocationFileLoaderService {
     out.println("<queue name=\"queueF\" type=\"parent\" >");
     out.println("</queue>");
     // Create hierarchical queues G,H, with different min/fair share preemption
-    // timeouts
+    // timeouts and preemption thresholds
     out.println("<queue name=\"queueG\">");
     
out.println("<fairSharePreemptionTimeout>120</fairSharePreemptionTimeout>");
     out.println("<minSharePreemptionTimeout>50</minSharePreemptionTimeout>");
+    
out.println("<fairSharePreemptionThreshold>0.6</fairSharePreemptionThreshold>");
     out.println("   <queue name=\"queueH\">");
     out.println("   
<fairSharePreemptionTimeout>180</fairSharePreemptionTimeout>");
     out.println("   
<minSharePreemptionTimeout>40</minSharePreemptionTimeout>");
+    out.println("   
<fairSharePreemptionThreshold>0.7</fairSharePreemptionThreshold>");
     out.println("   </queue>");
     out.println("</queue>");
     // Set default limit of apps per queue to 15
@@ -211,6 +213,8 @@ public class TestAllocationFileLoaderService {
         + "</defaultMinSharePreemptionTimeout>");
     // Set default fair share preemption timeout to 5 minutes
     
out.println("<defaultFairSharePreemptionTimeout>300</defaultFairSharePreemptionTimeout>");
+    // Set default fair share preemption threshold to 0.4
+    
out.println("<defaultFairSharePreemptionThreshold>0.4</defaultFairSharePreemptionThreshold>");
     // Set default scheduling policy to DRF
     
out.println("<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>");
     out.println("</allocations>");
@@ -299,6 +303,26 @@ public class TestAllocationFileLoaderService {
     assertEquals(120000, 
queueConf.getFairSharePreemptionTimeout("root.queueG"));
     assertEquals(180000, 
queueConf.getFairSharePreemptionTimeout("root.queueG.queueH"));
 
+    assertEquals(.4f, queueConf.getFairSharePreemptionThreshold("root"), 0.01);
+    assertEquals(-1, queueConf.getFairSharePreemptionThreshold("root." +
+        YarnConfiguration.DEFAULT_QUEUE_NAME), 0.01);
+    assertEquals(-1,
+        queueConf.getFairSharePreemptionThreshold("root.queueA"), 0.01);
+    assertEquals(-1,
+        queueConf.getFairSharePreemptionThreshold("root.queueB"), 0.01);
+    assertEquals(-1,
+        queueConf.getFairSharePreemptionThreshold("root.queueC"), 0.01);
+    assertEquals(-1,
+        queueConf.getFairSharePreemptionThreshold("root.queueD"), 0.01);
+    assertEquals(-1,
+        queueConf.getFairSharePreemptionThreshold("root.queueE"), 0.01);
+    assertEquals(-1,
+        queueConf.getFairSharePreemptionThreshold("root.queueF"), 0.01);
+    assertEquals(.6f,
+        queueConf.getFairSharePreemptionThreshold("root.queueG"), 0.01);
+    assertEquals(.7f,
+        queueConf.getFairSharePreemptionThreshold("root.queueG.queueH"), 0.01);
+
     assertTrue(queueConf.getConfiguredQueues()
         .get(FSQueueType.PARENT)
         .contains("root.queueF"));
@@ -346,9 +370,10 @@ public class TestAllocationFileLoaderService {
     out.println("<pool name=\"queueD\">");
     out.println("<maxRunningApps>3</maxRunningApps>");
     out.println("</pool>");
-    // Give queue E a preemption timeout of one minute
+    // Give queue E a preemption timeout of one minute and 0.3f threshold
     out.println("<pool name=\"queueE\">");
     out.println("<minSharePreemptionTimeout>60</minSharePreemptionTimeout>");
+    
out.println("<fairSharePreemptionThreshold>0.3</fairSharePreemptionThreshold>");
     out.println("</pool>");
     // Set default limit of apps per queue to 15
     out.println("<queueMaxAppsDefault>15</queueMaxAppsDefault>");
@@ -363,6 +388,8 @@ public class TestAllocationFileLoaderService {
         + "</defaultMinSharePreemptionTimeout>");
     // Set fair share preemption timeout to 5 minutes
     
out.println("<fairSharePreemptionTimeout>300</fairSharePreemptionTimeout>");
+    // Set default fair share preemption threshold to 0.6f
+    
out.println("<defaultFairSharePreemptionThreshold>0.6</defaultFairSharePreemptionThreshold>");
     out.println("</allocations>");
     out.close();
     
@@ -429,6 +456,20 @@ public class TestAllocationFileLoaderService {
     assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueC"));
     assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueD"));
     assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueE"));
+
+    assertEquals(.6f, queueConf.getFairSharePreemptionThreshold("root"), 0.01);
+    assertEquals(-1, queueConf.getFairSharePreemptionThreshold("root."
+        + YarnConfiguration.DEFAULT_QUEUE_NAME), 0.01);
+    assertEquals(-1,
+        queueConf.getFairSharePreemptionThreshold("root.queueA"), 0.01);
+    assertEquals(-1,
+        queueConf.getFairSharePreemptionThreshold("root.queueB"), 0.01);
+    assertEquals(-1,
+        queueConf.getFairSharePreemptionThreshold("root.queueC"), 0.01);
+    assertEquals(-1,
+        queueConf.getFairSharePreemptionThreshold("root.queueD"), 0.01);
+    assertEquals(.3f,
+        queueConf.getFairSharePreemptionThreshold("root.queueE"), 0.01);
   }
   
   @Test

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1dcaba9a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
index 7323b6a..97736be 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
@@ -18,50 +18,66 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.io.File;
+import java.io.FileWriter;
 import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Collection;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
-public class TestFSLeafQueue {
-  private FSLeafQueue schedulable = null;
-  private Resource maxResource = Resources.createResource(10);
+public class TestFSLeafQueue extends FairSchedulerTestBase {
+  private final static String ALLOC_FILE = new File(TEST_DIR,
+      TestFSLeafQueue.class.getName() + ".xml").getAbsolutePath();
+  private Resource maxResource = Resources.createResource(1024 * 8);
 
   @Before
   public void setup() throws IOException {
-    FairScheduler scheduler = new FairScheduler();
-    Configuration conf = createConfiguration();
-    // All tests assume only one assignment per node update
-    conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");
-    ResourceManager resourceManager = new ResourceManager();
-    resourceManager.init(conf);
-    ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
-    scheduler.init(conf);
-    scheduler.start();
-    scheduler.reinitialize(conf, resourceManager.getRMContext());
-    
-    String queueName = "root.queue1";
-    scheduler.allocConf = mock(AllocationConfiguration.class);
-    
when(scheduler.allocConf.getMaxResources(queueName)).thenReturn(maxResource);
-    
when(scheduler.allocConf.getMinResources(queueName)).thenReturn(Resources.none());
+    conf = createConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
+        ResourceScheduler.class);
+  }
 
-    schedulable = new FSLeafQueue(queueName, scheduler, null);
+  @After
+  public void teardown() {
+    if (resourceManager != null) {
+      resourceManager.stop();
+      resourceManager = null;
+    }
+    conf = null;
   }
 
   @Test
   public void testUpdateDemand() {
+    conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");
+    resourceManager = new MockRM(conf);
+    resourceManager.start();
+    scheduler = (FairScheduler) resourceManager.getResourceScheduler();
+    scheduler.allocConf = mock(AllocationConfiguration.class);
+
+    String queueName = "root.queue1";
+    
when(scheduler.allocConf.getMaxResources(queueName)).thenReturn(maxResource);
+    
when(scheduler.allocConf.getMinResources(queueName)).thenReturn(Resources.none());
+    FSLeafQueue schedulable = new FSLeafQueue(queueName, scheduler, null);
+
     FSAppAttempt app = mock(FSAppAttempt.class);
     Mockito.when(app.getDemand()).thenReturn(maxResource);
 
@@ -73,11 +89,137 @@ public class TestFSLeafQueue {
     assertTrue("Demand is greater than max allowed ",
         Resources.equals(schedulable.getDemand(), maxResource));
   }
-  
-  private Configuration createConfiguration() {
-    Configuration conf = new YarnConfiguration();
-    conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
-        ResourceScheduler.class);
-    return conf;
+
+  @Test (timeout = 5000)
+  public void test() throws Exception {
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<queue name=\"queueA\">");
+    out.println("<minResources>2048mb,0vcores</minResources>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueB\">");
+    out.println("<minResources>2048mb,0vcores</minResources>");
+    out.println("</queue>");
+    out.println("</allocations>");
+    out.close();
+
+    resourceManager = new MockRM(conf);
+    resourceManager.start();
+    scheduler = (FairScheduler) resourceManager.getResourceScheduler();
+
+    // Add one big node (only care about aggregate capacity)
+    RMNode node1 =
+        MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1,
+            "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    scheduler.update();
+
+    // Queue A wants 3 * 1024. Node update gives this all to A
+    createSchedulingRequest(3 * 1024, "queueA", "user1");
+    scheduler.update();
+    NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1);
+    scheduler.handle(nodeEvent2);
+
+    // Queue B arrives and wants 1 * 1024
+    createSchedulingRequest(1 * 1024, "queueB", "user1");
+    scheduler.update();
+    Collection<FSLeafQueue> queues = 
scheduler.getQueueManager().getLeafQueues();
+    assertEquals(3, queues.size());
+
+    // Queue A should be above min share, B below.
+    FSLeafQueue queueA =
+        scheduler.getQueueManager().getLeafQueue("queueA", false);
+    FSLeafQueue queueB =
+        scheduler.getQueueManager().getLeafQueue("queueB", false);
+    assertFalse(queueA.isStarvedForMinShare());
+    assertTrue(queueB.isStarvedForMinShare());
+
+    // Node checks in again, should allocate for B
+    scheduler.handle(nodeEvent2);
+    // Now B should have min share ( = demand here)
+    assertFalse(queueB.isStarvedForMinShare());
+  }
+
+  @Test (timeout = 5000)
+  public void testIsStarvedForFairShare() throws Exception {
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<queue name=\"queueA\">");
+    out.println("<weight>.2</weight>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueB\">");
+    out.println("<weight>.8</weight>");
+    
out.println("<fairSharePreemptionThreshold>.4</fairSharePreemptionThreshold>");
+    out.println("<queue name=\"queueB1\">");
+    out.println("</queue>");
+    out.println("<queue name=\"queueB2\">");
+    
out.println("<fairSharePreemptionThreshold>.6</fairSharePreemptionThreshold>");
+    out.println("</queue>");
+    out.println("</queue>");
+    
out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
+    out.println("</allocations>");
+    out.close();
+
+    resourceManager = new MockRM(conf);
+    resourceManager.start();
+    scheduler = (FairScheduler) resourceManager.getResourceScheduler();
+
+    // Add one big node (only care about aggregate capacity)
+    RMNode node1 =
+        MockNodes.newNodeInfo(1, Resources.createResource(10 * 1024, 10), 1,
+            "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    scheduler.update();
+
+    // Queue A wants 4 * 1024. Node update gives this all to A
+    createSchedulingRequest(1 * 1024, "queueA", "user1", 4);
+    scheduler.update();
+    NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1);
+    for (int i = 0; i < 4; i ++) {
+      scheduler.handle(nodeEvent2);
+    }
+
+    QueueManager queueMgr = scheduler.getQueueManager();
+    FSLeafQueue queueA = queueMgr.getLeafQueue("queueA", false);
+    assertEquals(4 * 1024, queueA.getResourceUsage().getMemory());
+
+    // Both queue B1 and queue B2 want 3 * 1024
+    createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 3);
+    createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 3);
+    scheduler.update();
+    for (int i = 0; i < 4; i ++) {
+      scheduler.handle(nodeEvent2);
+    }
+
+    FSLeafQueue queueB1 = queueMgr.getLeafQueue("queueB.queueB1", false);
+    FSLeafQueue queueB2 = queueMgr.getLeafQueue("queueB.queueB2", false);
+    assertEquals(2 * 1024, queueB1.getResourceUsage().getMemory());
+    assertEquals(2 * 1024, queueB2.getResourceUsage().getMemory());
+
+    // For queue B1, the fairSharePreemptionThreshold is 0.4, and the fair 
share
+    // threshold is 1.6 * 1024
+    assertFalse(queueB1.isStarvedForFairShare());
+
+    // For queue B2, the fairSharePreemptionThreshold is 0.6, and the fair 
share
+    // threshold is 2.4 * 1024
+    assertTrue(queueB2.isStarvedForFairShare());
+
+    // Node checks in again
+    scheduler.handle(nodeEvent2);
+    scheduler.handle(nodeEvent2);
+    assertEquals(3 * 1024, queueB1.getResourceUsage().getMemory());
+    assertEquals(3 * 1024, queueB2.getResourceUsage().getMemory());
+
+    // Both queue B1 and queue B2 usages go to 3 * 1024
+    assertFalse(queueB1.isStarvedForFairShare());
+    assertFalse(queueB2.isStarvedForFairShare());
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1dcaba9a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index 6e0127d..05b1925 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -1061,9 +1061,11 @@ public class TestFairScheduler extends 
FairSchedulerTestBase {
     out.println("  </queue>");
     out.println("  
<fairSharePreemptionTimeout>100</fairSharePreemptionTimeout>");
     out.println("  
<minSharePreemptionTimeout>120</minSharePreemptionTimeout>");
+    out.println("  
<fairSharePreemptionThreshold>.5</fairSharePreemptionThreshold>");
     out.println("</queue>");
     
out.println("<defaultFairSharePreemptionTimeout>300</defaultFairSharePreemptionTimeout>");
     
out.println("<defaultMinSharePreemptionTimeout>200</defaultMinSharePreemptionTimeout>");
+    
out.println("<defaultFairSharePreemptionThreshold>.6</defaultFairSharePreemptionThreshold>");
     out.println("</allocations>");
     out.close();
 
@@ -1080,125 +1082,7 @@ public class TestFairScheduler extends 
FairSchedulerTestBase {
 
     assertEquals(100000, root.getFairSharePreemptionTimeout());
     assertEquals(120000, root.getMinSharePreemptionTimeout());
-  }
-  
-  @Test (timeout = 5000)
-  public void testIsStarvedForMinShare() throws Exception {
-    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
-
-    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
-    out.println("<?xml version=\"1.0\"?>");
-    out.println("<allocations>");
-    out.println("<queue name=\"queueA\">");
-    out.println("<minResources>2048mb,0vcores</minResources>");
-    out.println("</queue>");
-    out.println("<queue name=\"queueB\">");
-    out.println("<minResources>2048mb,0vcores</minResources>");
-    out.println("</queue>");
-    out.println("</allocations>");
-    out.close();
-
-    scheduler.init(conf);
-    scheduler.start();
-    scheduler.reinitialize(conf, resourceManager.getRMContext());
-
-    // Add one big node (only care about aggregate capacity)
-    RMNode node1 =
-        MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1,
-            "127.0.0.1");
-    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
-    scheduler.handle(nodeEvent1);
-
-    // Queue A wants 3 * 1024. Node update gives this all to A
-    createSchedulingRequest(3 * 1024, "queueA", "user1");
-    scheduler.update();
-    NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1);
-    scheduler.handle(nodeEvent2);
-
-    // Queue B arrives and wants 1 * 1024
-    createSchedulingRequest(1 * 1024, "queueB", "user1");
-    scheduler.update();
-    Collection<FSLeafQueue> queues = 
scheduler.getQueueManager().getLeafQueues();
-    assertEquals(3, queues.size());
-
-    // Queue A should be above min share, B below.
-    for (FSLeafQueue p : queues) {
-      if (p.getName().equals("root.queueA")) {
-        assertEquals(false, scheduler.isStarvedForMinShare(p));
-      }
-      else if (p.getName().equals("root.queueB")) {
-        assertEquals(true, scheduler.isStarvedForMinShare(p));
-      }
-    }
-
-    // Node checks in again, should allocate for B
-    scheduler.handle(nodeEvent2);
-    // Now B should have min share ( = demand here)
-    for (FSLeafQueue p : queues) {
-      if (p.getName().equals("root.queueB")) {
-        assertEquals(false, scheduler.isStarvedForMinShare(p));
-      }
-    }
-  }
-
-  @Test (timeout = 5000)
-  public void testIsStarvedForFairShare() throws Exception {
-    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
-
-    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
-    out.println("<?xml version=\"1.0\"?>");
-    out.println("<allocations>");
-    out.println("<queue name=\"queueA\">");
-    out.println("<weight>.25</weight>");
-    out.println("</queue>");
-    out.println("<queue name=\"queueB\">");
-    out.println("<weight>.75</weight>");
-    out.println("</queue>");
-    out.println("</allocations>");
-    out.close();
-
-    scheduler.init(conf);
-    scheduler.start();
-    scheduler.reinitialize(conf, resourceManager.getRMContext());
-
-    // Add one big node (only care about aggregate capacity)
-    RMNode node1 =
-        MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1,
-            "127.0.0.1");
-    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
-    scheduler.handle(nodeEvent1);
-
-    // Queue A wants 3 * 1024. Node update gives this all to A
-    createSchedulingRequest(3 * 1024, "queueA", "user1");
-    scheduler.update();
-    NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1);
-    scheduler.handle(nodeEvent2);
-
-    // Queue B arrives and wants 1 * 1024
-    createSchedulingRequest(1 * 1024, "queueB", "user1");
-    scheduler.update();
-    Collection<FSLeafQueue> queues = 
scheduler.getQueueManager().getLeafQueues();
-    assertEquals(3, queues.size());
-
-    // Queue A should be above fair share, B below.
-    for (FSLeafQueue p : queues) {
-      if (p.getName().equals("root.queueA")) {
-        assertEquals(false, scheduler.isStarvedForFairShare(p));
-      }
-      else if (p.getName().equals("root.queueB")) {
-        assertEquals(true, scheduler.isStarvedForFairShare(p));
-      }
-    }
-
-    // Node checks in again, should allocate for B
-    scheduler.handle(nodeEvent2);
-    // B should not be starved for fair share, since entire demand is
-    // satisfied.
-    for (FSLeafQueue p : queues) {
-      if (p.getName().equals("root.queueB")) {
-        assertEquals(false, scheduler.isStarvedForFairShare(p));
-      }
-    }
+    assertEquals(0.5f, root.getFairSharePreemptionThreshold(), 0.01);
   }
 
   @Test (timeout = 5000)
@@ -1385,7 +1269,8 @@ public class TestFairScheduler extends 
FairSchedulerTestBase {
     out.println("<queue name=\"queueB\">");
     out.println("<weight>2</weight>");
     out.println("</queue>");
-    
out.print("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
+    
out.println("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
+    
out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
     out.println("</allocations>");
     out.close();
 
@@ -1468,8 +1353,9 @@ public class TestFairScheduler extends 
FairSchedulerTestBase {
     out.println("<weight>.25</weight>");
     out.println("<minResources>1024mb,0vcores</minResources>");
     out.println("</queue>");
-    
out.print("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
-    
out.print("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
+    
out.println("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
+    
out.println("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
+    
out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
     out.println("</allocations>");
     out.close();
 
@@ -1753,8 +1639,6 @@ public class TestFairScheduler extends 
FairSchedulerTestBase {
   @Test
   public void testBackwardsCompatiblePreemptionConfiguration() throws 
Exception {
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
-    MockClock clock = new MockClock();
-    scheduler.setClock(clock);
 
     PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
     out.println("<?xml version=\"1.0\"?>");
@@ -1842,6 +1726,32 @@ public class TestFairScheduler extends 
FairSchedulerTestBase {
         .getFairSharePreemptionTimeout());
   }
 
+  @Test
+  public void testPreemptionVariablesForQueueCreatedRuntime() throws Exception 
{
+    conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    // Set preemption variables for the root queue
+    FSParentQueue root = scheduler.getQueueManager().getRootQueue();
+    root.setMinSharePreemptionTimeout(10000);
+    root.setFairSharePreemptionTimeout(15000);
+    root.setFairSharePreemptionThreshold(.6f);
+
+    // User1 submits one application
+    ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1);
+    createApplicationWithAMResource(appAttemptId, "default", "user1", null);
+
+    // The user1 queue should inherit the configurations from the root queue
+    FSLeafQueue userQueue =
+        scheduler.getQueueManager().getLeafQueue("user1", true);
+    assertEquals(1, userQueue.getRunnableAppSchedulables().size());
+    assertEquals(10000, userQueue.getMinSharePreemptionTimeout());
+    assertEquals(15000, userQueue.getFairSharePreemptionTimeout());
+    assertEquals(.6f, userQueue.getFairSharePreemptionThreshold(), 0.001);
+  }
+
   @Test (timeout = 5000)
   public void testMultipleContainersWaitingForReservation() throws IOException 
{
     scheduler.init(conf);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1dcaba9a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm
index bd28bff..df61422 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm
@@ -277,6 +277,12 @@ Allocation file format
      threshold before it will try to preempt containers to take resources from 
other
      queues. If not set, the queue will inherit the value from its parent 
queue.
 
+   * fairSharePreemptionThreshold: the fair share preemption threshold for the
+     queue. If the queue waits fairSharePreemptionTimeout without receiving
+     fairSharePreemptionThreshold*fairShare resources, it is allowed to preempt
+     containers to take resources from other queues. If not set, the queue will
+     inherit the value from its parent queue.
+
  * <<User elements>>, which represent settings governing the behavior of 
individual 
      users. They can contain a single property: maxRunningApps, a limit on the 
      number of running apps for a particular user.
@@ -292,6 +298,10 @@ Allocation file format
    preemption timeout for the root queue; overridden by 
minSharePreemptionTimeout
    element in root queue.
 
+ * <<A defaultFairSharePreemptionThreshold element>>, which sets the fair share
+   preemption threshold for the root queue; overridden by 
fairSharePreemptionThreshold
+   element in root queue.
+
  * <<A queueMaxAppsDefault element>>, which sets the default running app limit
    for queues; overriden by maxRunningApps element in each queue.
 

Reply via email to