YARN-4359. Update LowCost agents logic to take advantage of YARN-4358. 
(Jonathan Yaniv and Ishai Menache via Subru).

(cherry picked from commit a3a615eeab8c14ccdc548311097e62a916963dc5)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0eae1c63
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0eae1c63
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0eae1c63

Branch: refs/heads/branch-2
Commit: 0eae1c636862a13d89f23a3af113bab9dec5c651
Parents: 3e7f97f
Author: Subru Krishnan <su...@apache.org>
Authored: Mon May 1 16:01:07 2017 -0700
Committer: Subru Krishnan <su...@apache.org>
Committed: Mon May 1 18:30:58 2017 -0700

----------------------------------------------------------------------
 .../reservation/InMemoryPlan.java               |  11 +
 .../resourcemanager/reservation/PlanView.java   |   9 +
 .../planning/AlignedPlannerWithGreedy.java      |  15 +-
 .../planning/GreedyReservationAgent.java        |  13 +-
 .../reservation/planning/IterativePlanner.java  | 196 ++++-----
 .../reservation/planning/ReservationAgent.java  |  23 +-
 .../planning/SimpleCapacityReplanner.java       |   8 +-
 .../reservation/planning/StageAllocator.java    |  10 +-
 .../planning/StageAllocatorGreedy.java          |   4 +-
 .../planning/StageAllocatorGreedyRLE.java       |   4 +-
 .../planning/StageAllocatorLowCostAligned.java  | 279 +++++++++----
 .../planning/StageEarliestStart.java            |  46 ---
 .../planning/StageEarliestStartByDemand.java    | 106 -----
 .../StageEarliestStartByJobArrival.java         |  39 --
 .../planning/StageExecutionInterval.java        |  47 +++
 .../StageExecutionIntervalByDemand.java         | 144 +++++++
 .../StageExecutionIntervalUnconstrained.java    |  73 ++++
 .../planning/TestAlignedPlanner.java            | 413 +++++++++++++++++--
 .../planning/TestGreedyReservationAgent.java    |  10 +-
 .../planning/TestSimpleCapacityReplanner.java   |   4 +-
 20 files changed, 989 insertions(+), 465 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0eae1c63/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
index 3afcd47..783fd09 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
@@ -687,4 +687,15 @@ public class InMemoryPlan implements Plan {
       readLock.unlock();
     }
   }
+
+  @Override
+  public RLESparseResourceAllocation getCumulativeLoadOverTime(
+      long start, long end) {
+    readLock.lock();
+    try {
+      return rleSparseVector.getRangeOverlapping(start, end);
+    } finally {
+      readLock.unlock();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0eae1c63/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
index 699f461..2767993 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java
@@ -174,4 +174,13 @@ public interface PlanView extends PlanContext {
   public RLESparseResourceAllocation getConsumptionForUserOverTime(String user,
       long start, long end);
 
+  /**
+   * Get the cumulative load over a time interval.
+   *
+   * @param start Start of the time interval.
+   * @param end End of the time interval.
+   * @return RLE sparse allocation.
+   */
+  RLESparseResourceAllocation getCumulativeLoadOverTime(long start, long end);
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0eae1c63/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java
index 00c2333..3853f41 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java
@@ -39,6 +39,8 @@ public class AlignedPlannerWithGreedy implements 
ReservationAgent {
   public static final int DEFAULT_SMOOTHNESS_FACTOR = 10;
   public static final String SMOOTHNESS_FACTOR =
       "yarn.resourcemanager.reservation-system.smoothness-factor";
+  private boolean allocateLeft = false;
+
 
   // Log
   private static final Logger LOG = LoggerFactory
@@ -49,26 +51,31 @@ public class AlignedPlannerWithGreedy implements 
ReservationAgent {
 
   // Constructor
   public AlignedPlannerWithGreedy() {
+
   }
 
   @Override
   public void init(Configuration conf) {
     int smoothnessFactor =
         conf.getInt(SMOOTHNESS_FACTOR, DEFAULT_SMOOTHNESS_FACTOR);
+    allocateLeft = conf.getBoolean(FAVOR_EARLY_ALLOCATION,
+            DEFAULT_GREEDY_FAVOR_EARLY_ALLOCATION);
 
     // List of algorithms
     List<ReservationAgent> listAlg = new LinkedList<ReservationAgent>();
 
     // LowCostAligned planning algorithm
     ReservationAgent algAligned =
-        new IterativePlanner(new StageEarliestStartByDemand(),
-            new StageAllocatorLowCostAligned(smoothnessFactor), false);
+        new IterativePlanner(new StageExecutionIntervalByDemand(),
+            new StageAllocatorLowCostAligned(smoothnessFactor, allocateLeft),
+            allocateLeft);
+
     listAlg.add(algAligned);
 
     // Greedy planning algorithm
     ReservationAgent algGreedy =
-        new IterativePlanner(new StageEarliestStartByJobArrival(),
-            new StageAllocatorGreedy(), false);
+        new IterativePlanner(new StageExecutionIntervalUnconstrained(),
+            new StageAllocatorGreedyRLE(allocateLeft), allocateLeft);
     listAlg.add(algGreedy);
 
     // Set planner:

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0eae1c63/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java
index 1559b97..637a17b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java
@@ -47,9 +47,6 @@ public class GreedyReservationAgent implements 
ReservationAgent {
 
   // Greedy planner
   private ReservationAgent planner;
-  public final static String GREEDY_FAVOR_EARLY_ALLOCATION =
-      "yarn.resourcemanager.reservation-system.favor-early-allocation";
-  public final static boolean DEFAULT_GREEDY_FAVOR_EARLY_ALLOCATION = true;
   private boolean allocateLeft;
 
   public GreedyReservationAgent() {
@@ -57,20 +54,20 @@ public class GreedyReservationAgent implements 
ReservationAgent {
 
   @Override
   public void init(Configuration conf) {
-    allocateLeft = conf.getBoolean(GREEDY_FAVOR_EARLY_ALLOCATION,
+    allocateLeft = conf.getBoolean(FAVOR_EARLY_ALLOCATION,
         DEFAULT_GREEDY_FAVOR_EARLY_ALLOCATION);
     if (allocateLeft) {
       LOG.info("Initializing the GreedyReservationAgent to favor \"early\""
           + " (left) allocations (controlled by parameter: "
-          + GREEDY_FAVOR_EARLY_ALLOCATION + ")");
+          + FAVOR_EARLY_ALLOCATION + ")");
     } else {
       LOG.info("Initializing the GreedyReservationAgent to favor \"late\""
           + " (right) allocations (controlled by parameter: "
-          + GREEDY_FAVOR_EARLY_ALLOCATION + ")");
+          + FAVOR_EARLY_ALLOCATION + ")");
     }
 
     planner =
-        new IterativePlanner(new StageEarliestStartByJobArrival(),
+        new IterativePlanner(new StageExecutionIntervalUnconstrained(),
             new StageAllocatorGreedyRLE(allocateLeft), allocateLeft);
   }
 
@@ -123,4 +120,4 @@ public class GreedyReservationAgent implements 
ReservationAgent {
 
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0eae1c63/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
index 24d237a..83f272e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
 
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.ListIterator;
 import java.util.Map;
@@ -32,26 +31,24 @@ import 
org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
 import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
+import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
 import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
 import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
-import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
 import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException;
 import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 /**
  * A planning algorithm consisting of two main phases. The algorithm iterates
- * over the job stages in descending order. For each stage, the algorithm: 1.
- * Determines an interval [stageArrivalTime, stageDeadline) in which the stage
- * is allocated. 2. Computes an allocation for the stage inside the interval.
- *
- * For ANY and ALL jobs, phase 1 sets the allocation window of each stage to be
- * [jobArrival, jobDeadline]. For ORDER and ORDER_NO_GAP jobs, the deadline of
- * each stage is set as succcessorStartTime - the starting time of its
- * succeeding stage (or jobDeadline if it is the last stage).
- *
- * The phases are set using the two functions: 1. setAlgEarliestStartTime 2.
- * setAlgComputeStageAllocation
+ * over the job stages in ascending/descending order, depending on the flag
+ * allocateLeft. For each stage, the algorithm: 1. Determines an interval
+ * [stageArrival, stageDeadline) in which the stage is allocated. 2. Computes 
an
+ * allocation for the stage inside the interval. For ANY and ALL jobs, phase 1
+ * sets the allocation window of each stage to be [jobArrival, jobDeadline]. 
For
+ * ORDER and ORDER_NO_GAP jobs, the deadline of each stage is set as
+ * succcessorStartTime - the starting time of its succeeding stage (or
+ * jobDeadline if it is the last stage). The phases are set using the two
+ * functions: 1. setAlgStageExecutionInterval 2.setAlgStageAllocator
  */
 public class IterativePlanner extends PlanningAlgorithm {
 
@@ -60,7 +57,7 @@ public class IterativePlanner extends PlanningAlgorithm {
   private RLESparseResourceAllocation planModifications;
 
   // Data extracted from plan
-  private Map<Long, Resource> planLoads;
+  private RLESparseResourceAllocation planLoads;
   private Resource capacity;
   private long step;
 
@@ -70,16 +67,16 @@ public class IterativePlanner extends PlanningAlgorithm {
   private long jobDeadline;
 
   // Phase algorithms
-  private StageEarliestStart algStageEarliestStart = null;
+  private StageExecutionInterval algStageExecutionInterval = null;
   private StageAllocator algStageAllocator = null;
   private final boolean allocateLeft;
 
   // Constructor
-  public IterativePlanner(StageEarliestStart algEarliestStartTime,
+  public IterativePlanner(StageExecutionInterval algStageExecutionInterval,
       StageAllocator algStageAllocator, boolean allocateLeft) {
 
     this.allocateLeft = allocateLeft;
-    setAlgStageEarliestStart(algEarliestStartTime);
+    setAlgStageExecutionInterval(algStageExecutionInterval);
     setAlgStageAllocator(algStageAllocator);
 
   }
@@ -101,12 +98,6 @@ public class IterativePlanner extends PlanningAlgorithm {
     // Current stage
     ReservationRequest currentReservationStage;
 
-    // Stage deadlines
-    long stageDeadline = stepRoundDown(reservation.getDeadline(), step);
-    long successorStartingTime = -1;
-    long predecessorEndTime = stepRoundDown(reservation.getArrival(), step);
-    long stageArrivalTime = -1;
-
     // Iterate the stages in reverse order
     while (stageProvider.hasNext()) {
 
@@ -116,27 +107,17 @@ public class IterativePlanner extends PlanningAlgorithm {
       // Validate that the ReservationRequest respects basic constraints
       validateInputStage(plan, currentReservationStage);
 
-      // Compute an adjusted earliestStart for this resource
-      // (we need this to provision some space for the ORDER contracts)
+      // Set the stageArrival and stageDeadline
+      ReservationInterval stageInterval =
+          setStageExecutionInterval(plan, reservation, currentReservationStage,
+              allocations);
+      Long stageArrival = stageInterval.getStartTime();
+      Long stageDeadline = stageInterval.getEndTime();
 
-      if (allocateLeft) {
-        stageArrivalTime = predecessorEndTime;
-      } else {
-        stageArrivalTime = reservation.getArrival();
-        if (jobType == ReservationRequestInterpreter.R_ORDER
-            || jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) {
-          stageArrivalTime =
-              computeEarliestStartingTime(plan, reservation,
-                  stageProvider.getCurrentIndex(), currentReservationStage,
-                  stageDeadline);
-        }
-        stageArrivalTime = stepRoundUp(stageArrivalTime, step);
-        stageArrivalTime = Math.max(stageArrivalTime, 
reservation.getArrival());
-      }
-      // Compute the allocation of a single stage
+      // Compute stage allocation
       Map<ReservationInterval, Resource> curAlloc =
-          computeStageAllocation(plan, currentReservationStage,
-              stageArrivalTime, stageDeadline, user, reservationId);
+          computeStageAllocation(plan, currentReservationStage, stageArrival,
+              stageDeadline, user, reservationId);
 
       // If we did not find an allocation, return NULL
       // (unless it's an ANY job, then we simply continue).
@@ -152,9 +133,13 @@ public class IterativePlanner extends PlanningAlgorithm {
 
       }
 
-      // Get the start & end time of the current allocation
-      Long stageStartTime = findEarliestTime(curAlloc);
-      Long stageEndTime = findLatestTime(curAlloc);
+      // Validate ORDER_NO_GAP
+      if (jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) {
+        if (!validateOrderNoGap(allocations, curAlloc, allocateLeft)) {
+          throw new PlanningException(
+              "The allocation found does not respect ORDER_NO_GAP");
+        }
+      }
 
       // If we did find an allocation for the stage, add it
       for (Entry<ReservationInterval, Resource> entry : curAlloc.entrySet()) {
@@ -165,33 +150,6 @@ public class IterativePlanner extends PlanningAlgorithm {
       if (jobType == ReservationRequestInterpreter.R_ANY) {
         break;
       }
-
-      // If ORDER job, set the stageDeadline of the next stage to be processed
-      if (jobType == ReservationRequestInterpreter.R_ORDER
-          || jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) {
-
-        // CHECK ORDER_NO_GAP
-        // Verify that there is no gap, in case the job is ORDER_NO_GAP
-        // note that the test is different left-to-right and right-to-left
-        if (jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP
-            && successorStartingTime != -1
-            && ((allocateLeft && predecessorEndTime < stageStartTime) ||
-                (!allocateLeft && (stageEndTime < successorStartingTime))
-               )
-            || (!isNonPreemptiveAllocation(curAlloc))) {
-          throw new PlanningException(
-              "The allocation found does not respect ORDER_NO_GAP");
-        }
-
-        if (allocateLeft) {
-          // Store the stageStartTime and set the new stageDeadline
-          predecessorEndTime = stageEndTime;
-        } else {
-          // Store the stageStartTime and set the new stageDeadline
-          successorStartingTime = stageStartTime;
-          stageDeadline = stageStartTime;
-        }
-      }
     }
 
     // If the allocation is empty, return an error
@@ -200,7 +158,39 @@ public class IterativePlanner extends PlanningAlgorithm {
     }
 
     return allocations;
+  }
 
+  protected static boolean validateOrderNoGap(
+      RLESparseResourceAllocation allocations,
+      Map<ReservationInterval, Resource> curAlloc, boolean allocateLeft) {
+
+    // Left to right
+    if (allocateLeft) {
+      Long stageStartTime = findEarliestTime(curAlloc);
+      Long allocationEndTime = allocations.getLatestNonNullTime();
+
+      // Check that there is no gap between stages
+      if ((allocationEndTime != -1) && (allocationEndTime < stageStartTime)) {
+        return false;
+      }
+      // Right to left
+    } else {
+      Long stageEndTime = findLatestTime(curAlloc);
+      Long allocationStartTime = allocations.getEarliestStartTime();
+
+      // Check that there is no gap between stages
+      if ((allocationStartTime != -1) && (stageEndTime < allocationStartTime)) 
{
+        return false;
+      }
+    }
+
+    // Check that the stage allocation does not violate ORDER_NO_GAP
+    if (!isNonPreemptiveAllocation(curAlloc)) {
+      return false;
+    }
+
+    // The allocation is legal
+    return true;
   }
 
   protected void initialize(Plan plan, ReservationId reservationId,
@@ -223,35 +213,15 @@ public class IterativePlanner extends PlanningAlgorithm {
 
     // planLoads are not used by other StageAllocators... and don't deal
     // well with huge reservation ranges
-    if (this.algStageAllocator instanceof StageAllocatorLowCostAligned) {
-      planLoads = getAllLoadsInInterval(plan, jobArrival, jobDeadline);
-      ReservationAllocation oldRes = plan.getReservationById(reservationId);
-      if (oldRes != null) {
-        planModifications =
-            RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
-                plan.getTotalCapacity(), planModifications,
-                oldRes.getResourcesOverTime(), RLEOperator.subtract,
-                jobArrival, jobDeadline);
-      }
+    planLoads = plan.getCumulativeLoadOverTime(jobArrival, jobDeadline);
+    ReservationAllocation oldRes = plan.getReservationById(reservationId);
+    if (oldRes != null) {
+      planLoads =
+          RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
+              plan.getTotalCapacity(), planLoads,
+              oldRes.getResourcesOverTime(), RLEOperator.subtract, jobArrival,
+              jobDeadline);
     }
-
-  }
-
-  private Map<Long, Resource> getAllLoadsInInterval(Plan plan, long startTime,
-      long endTime) {
-
-    // Create map
-    Map<Long, Resource> loads = new HashMap<Long, Resource>();
-
-    // Calculate the load for every time slot between [start,end)
-    for (long t = startTime; t < endTime; t += step) {
-      Resource load = plan.getTotalCommittedResources(t);
-      loads.put(t, load);
-    }
-
-    // Return map
-    return loads;
-
   }
 
   private void validateInputStage(Plan plan, ReservationRequest rr)
@@ -286,7 +256,7 @@ public class IterativePlanner extends PlanningAlgorithm {
 
   }
 
-  private boolean isNonPreemptiveAllocation(
+  private static boolean isNonPreemptiveAllocation(
       Map<ReservationInterval, Resource> curAlloc) {
 
     // Checks whether a stage allocation is non preemptive or not.
@@ -329,14 +299,13 @@ public class IterativePlanner extends PlanningAlgorithm {
 
   }
 
-  // Call algEarliestStartTime()
-  protected long computeEarliestStartingTime(Plan plan,
-      ReservationDefinition reservation, int index,
-      ReservationRequest currentReservationStage, long stageDeadline) {
-
-    return algStageEarliestStart.setEarliestStartTime(plan, reservation, index,
-        currentReservationStage, stageDeadline);
-
+  // Call setStageExecutionInterval()
+  protected ReservationInterval setStageExecutionInterval(Plan plan,
+      ReservationDefinition reservation,
+      ReservationRequest currentReservationStage,
+      RLESparseResourceAllocation allocations) {
+    return algStageExecutionInterval.computeExecutionInterval(plan,
+        reservation, currentReservationStage, allocateLeft, allocations);
   }
 
   // Call algStageAllocator
@@ -350,10 +319,11 @@ public class IterativePlanner extends PlanningAlgorithm {
 
   }
 
-  // Set the algorithm: algStageEarliestStart
-  public IterativePlanner setAlgStageEarliestStart(StageEarliestStart alg) {
+  // Set the algorithm: algStageExecutionInterval
+  public IterativePlanner setAlgStageExecutionInterval(
+      StageExecutionInterval alg) {
 
-    this.algStageEarliestStart = alg;
+    this.algStageExecutionInterval = alg;
     return this; // To allow concatenation of setAlg() functions
 
   }
@@ -375,7 +345,7 @@ public class IterativePlanner extends PlanningAlgorithm {
 
     private final boolean allocateLeft;
 
-    private ListIterator<ReservationRequest> li;
+    private final ListIterator<ReservationRequest> li;
 
     public StageProvider(boolean allocateLeft,
         ReservationDefinition reservation) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0eae1c63/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/ReservationAgent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/ReservationAgent.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/ReservationAgent.java
index 52e7055..3c448b3 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/ReservationAgent.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/ReservationAgent.java
@@ -29,14 +29,25 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.Plan
 public interface ReservationAgent {
 
   /**
+   * Constant defining the preferential treatment of time for equally valid
+   * allocations.
+   */
+  final static String FAVOR_EARLY_ALLOCATION =
+      "yarn.resourcemanager.reservation-system.favor-early-allocation";
+  /**
+   * By default favor early allocations.
+   */
+  final static boolean DEFAULT_GREEDY_FAVOR_EARLY_ALLOCATION = true;
+
+  /**
    * Create a reservation for the user that abides by the specified contract
-   * 
+   *
    * @param reservationId the identifier of the reservation to be created.
    * @param user the user who wants to create the reservation
    * @param plan the Plan to which the reservation must be fitted
    * @param contract encapsulates the resources the user requires for his
    *          session
-   * 
+   *
    * @return whether the create operation was successful or not
    * @throws PlanningException if the session cannot be fitted into the plan
    */
@@ -45,13 +56,13 @@ public interface ReservationAgent {
 
   /**
    * Update a reservation for the user that abides by the specified contract
-   * 
+   *
    * @param reservationId the identifier of the reservation to be updated
    * @param user the user who wants to create the session
    * @param plan the Plan to which the reservation must be fitted
    * @param contract encapsulates the resources the user requires for his
    *          reservation
-   * 
+   *
    * @return whether the update operation was successful or not
    * @throws PlanningException if the reservation cannot be fitted into the 
plan
    */
@@ -60,11 +71,11 @@ public interface ReservationAgent {
 
   /**
    * Delete an user reservation
-   * 
+   *
    * @param reservationId the identifier of the reservation to be deleted
    * @param user the user who wants to create the reservation
    * @param plan the Plan to which the session must be fitted
-   * 
+   *
    * @return whether the delete operation was successful or not
    * @throws PlanningException if the reservation cannot be fitted into the 
plan
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0eae1c63/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimpleCapacityReplanner.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimpleCapacityReplanner.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimpleCapacityReplanner.java
index 7507783..7bfc730 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimpleCapacityReplanner.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimpleCapacityReplanner.java
@@ -42,7 +42,7 @@ import com.google.common.annotations.VisibleForTesting;
  * This (re)planner scan a period of time from now to a maximum time window (or
  * the end of the last session, whichever comes first) checking the overall
  * capacity is not violated.
- * 
+ *
  * It greedily removes sessions in reversed order of acceptance (latest 
accepted
  * is the first removed).
  */
@@ -90,8 +90,8 @@ public class SimpleCapacityReplanner implements Planner {
 
     // loop on all moment in time from now to the end of the check Zone
     // or the end of the planned sessions whichever comes first
-    for (long t = now; 
-         (t < plan.getLastEndTime() && t < (now + lengthOfCheckZone)); 
+    for (long t = now;
+         (t < plan.getLastEndTime() && t < (now + lengthOfCheckZone));
          t += plan.getStep()) {
       Resource excessCap =
           Resources.subtract(plan.getTotalCommittedResources(t), totCap);
@@ -102,7 +102,7 @@ public class SimpleCapacityReplanner implements Planner {
             new TreeSet<ReservationAllocation>(plan.getReservationsAtTime(t));
         for (Iterator<ReservationAllocation> resIter =
             curReservations.iterator(); resIter.hasNext()
-            && Resources.greaterThan(resCalc, totCap, excessCap, 
+            && Resources.greaterThan(resCalc, totCap, excessCap,
                 ZERO_RESOURCE);) {
           ReservationAllocation reservation = resIter.next();
           plan.deleteReservation(reservation.getReservationId());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0eae1c63/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
index b95f8d4..ec6d9c0 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
@@ -41,19 +41,21 @@ public interface StageAllocator {
    * @param planModifications the allocations performed by the planning
    *          algorithm which are not yet reflected by plan
    * @param rr the stage
-   * @param stageEarliestStart the arrival time (earliest starting time) set 
for
+   * @param stageArrival the arrival time (earliest starting time) set for
    *          the stage by the two phase planning algorithm
    * @param stageDeadline the deadline of the stage set by the two phase
    *          planning algorithm
+   * @param user name of the user
+   * @param oldId identifier of the old reservation
    *
    * @return The computed allocation (or null if the stage could not be
    *         allocated)
    * @throws PlanningException
    */
   Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
-      Map<Long, Resource> planLoads,
+      RLESparseResourceAllocation planLoads,
       RLESparseResourceAllocation planModifications, ReservationRequest rr,
-      long stageEarliestStart, long stageDeadline, String user,
+      long stageArrival, long stageDeadline, String user,
       ReservationId oldId) throws PlanningException;
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0eae1c63/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
index c836970..da04336 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
@@ -26,8 +26,8 @@ import org.apache.hadoop.yarn.api.records.ReservationRequest;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
 import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
-import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
 import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
+import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
 import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
@@ -41,7 +41,7 @@ public class StageAllocatorGreedy implements StageAllocator {
 
   @Override
   public Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
-      Map<Long, Resource> planLoads,
+      RLESparseResourceAllocation planLoads,
       RLESparseResourceAllocation planModifications, ReservationRequest rr,
       long stageEarliestStart, long stageDeadline, String user,
       ReservationId oldId) throws PlanningException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0eae1c63/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java
index 5e748fc..ec83e02 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java
@@ -29,8 +29,8 @@ import org.apache.hadoop.yarn.api.records.ReservationRequest;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
 import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
-import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
 import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
+import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
 import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
@@ -52,7 +52,7 @@ public class StageAllocatorGreedyRLE implements 
StageAllocator {
 
   @Override
   public Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
-      Map<Long, Resource> planLoads,
+      RLESparseResourceAllocation planLoads,
       RLESparseResourceAllocation planModifications, ReservationRequest rr,
       long stageEarliestStart, long stageDeadline, String user,
       ReservationId oldId) throws PlanningException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0eae1c63/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
index b9fd8e1..e45f58c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
@@ -18,8 +18,12 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
 
+import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
 import java.util.TreeSet;
 
 import org.apache.hadoop.yarn.api.records.ReservationId;
@@ -27,46 +31,55 @@ import 
org.apache.hadoop.yarn.api.records.ReservationRequest;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
 import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
+import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
 import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 /**
  * A stage allocator that iteratively allocates containers in the
  * {@link DurationInterval} with lowest overall cost. The algorithm only
- * considers intervals of the form: [stageDeadline - (n+1)*duration,
- * stageDeadline - n*duration) for an integer n. This guarantees that the
- * allocations are aligned (as opposed to overlapping duration intervals).
- *
- * The smoothnessFactor parameter controls the number of containers that are
- * simultaneously allocated in each iteration of the algorithm.
+ * considers non-overlapping intervals of length 'duration'. This guarantees
+ * that the allocations are aligned. If 'allocateLeft == true', the intervals
+ * considered by the algorithm are aligned to stageArrival; otherwise, they are
+ * aligned to stageDeadline. The smoothnessFactor parameter controls the number
+ * of containers that are simultaneously allocated in each iteration of the
+ * algorithm.
  */
 
 public class StageAllocatorLowCostAligned implements StageAllocator {
 
+  private final boolean allocateLeft;
   // Smoothness factor
   private int smoothnessFactor = 10;
 
   // Constructor
-  public StageAllocatorLowCostAligned() {
+  public StageAllocatorLowCostAligned(boolean allocateLeft) {
+    this.allocateLeft = allocateLeft;
   }
 
   // Constructor
-  public StageAllocatorLowCostAligned(int smoothnessFactor) {
+  public StageAllocatorLowCostAligned(int smoothnessFactor,
+      boolean allocateLeft) {
+    this.allocateLeft = allocateLeft;
     this.smoothnessFactor = smoothnessFactor;
   }
 
-  // computeJobAllocation()
   @Override
-  public Map<ReservationInterval, Resource> computeStageAllocation(
-      Plan plan, Map<Long, Resource> planLoads,
+  public Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
+      RLESparseResourceAllocation planLoads,
       RLESparseResourceAllocation planModifications, ReservationRequest rr,
-      long stageEarliestStart, long stageDeadline, String user,
-      ReservationId oldId) {
+      long stageArrival, long stageDeadline, String user, ReservationId oldId)
+      throws PlanningException {
 
     // Initialize
     ResourceCalculator resCalc = plan.getResourceCalculator();
     Resource capacity = plan.getTotalCapacity();
+
+    RLESparseResourceAllocation netRLERes = plan
+        .getAvailableResourceOverTime(user, oldId, stageArrival, 
stageDeadline);
+
     long step = plan.getStep();
 
     // Create allocationRequestsearlies
@@ -76,16 +89,15 @@ public class StageAllocatorLowCostAligned implements 
StageAllocator {
     // Initialize parameters
     long duration = stepRoundUp(rr.getDuration(), step);
     int windowSizeInDurations =
-        (int) ((stageDeadline - stageEarliestStart) / duration);
+        (int) ((stageDeadline - stageArrival) / duration);
     int totalGangs = rr.getNumContainers() / rr.getConcurrency();
     int numContainersPerGang = rr.getConcurrency();
     Resource gang =
         Resources.multiply(rr.getCapability(), numContainersPerGang);
 
     // Set maxGangsPerUnit
-    int maxGangsPerUnit =
-        (int) Math.max(
-            Math.floor(((double) totalGangs) / windowSizeInDurations), 1);
+    int maxGangsPerUnit = (int) Math
+        .max(Math.floor(((double) totalGangs) / windowSizeInDurations), 1);
     maxGangsPerUnit = Math.max(maxGangsPerUnit / smoothnessFactor, 1);
 
     // If window size is too small, return null
@@ -93,6 +105,8 @@ public class StageAllocatorLowCostAligned implements 
StageAllocator {
       return null;
     }
 
+    final int preferLeft = allocateLeft ? 1 : -1;
+
     // Initialize tree sorted by costs
     TreeSet<DurationInterval> durationIntervalsSortedByCost =
         new TreeSet<DurationInterval>(new Comparator<DurationInterval>() {
@@ -104,23 +118,26 @@ public class StageAllocatorLowCostAligned implements 
StageAllocator {
               return cmp;
             }
 
-            return (-1) * Long.compare(val1.getEndTime(), val2.getEndTime());
+            return preferLeft
+                * Long.compare(val1.getEndTime(), val2.getEndTime());
           }
         });
 
+    List<Long> intervalEndTimes =
+        computeIntervalEndTimes(stageArrival, stageDeadline, duration);
+
     // Add durationIntervals that end at (endTime - n*duration) for some n.
-    for (long intervalEnd = stageDeadline; intervalEnd >= stageEarliestStart
-        + duration; intervalEnd -= duration) {
+    for (long intervalEnd : intervalEndTimes) {
 
       long intervalStart = intervalEnd - duration;
 
       // Get duration interval [intervalStart,intervalEnd)
       DurationInterval durationInterval =
           getDurationInterval(intervalStart, intervalEnd, planLoads,
-              planModifications, capacity, resCalc, step);
+              planModifications, capacity, netRLERes, resCalc, step, gang);
 
       // If the interval can fit a gang, add it to the tree
-      if (durationInterval.canAllocate(gang, capacity, resCalc)) {
+      if (durationInterval.canAllocate()) {
         durationIntervalsSortedByCost.add(durationInterval);
       }
     }
@@ -139,8 +156,7 @@ public class StageAllocatorLowCostAligned implements 
StageAllocator {
           durationIntervalsSortedByCost.first();
       int numGangsToAllocate = Math.min(maxGangsPerUnit, remainingGangs);
       numGangsToAllocate =
-          Math.min(numGangsToAllocate,
-              bestDurationInterval.numCanFit(gang, capacity, resCalc));
+          Math.min(numGangsToAllocate, bestDurationInterval.numCanFit());
       // Add it
       remainingGangs -= numGangsToAllocate;
 
@@ -148,9 +164,8 @@ public class StageAllocatorLowCostAligned implements 
StageAllocator {
           new ReservationInterval(bestDurationInterval.getStartTime(),
               bestDurationInterval.getEndTime());
 
-      Resource reservationRes =
-          Resources.multiply(rr.getCapability(), rr.getConcurrency()
-              * numGangsToAllocate);
+      Resource reservationRes = Resources.multiply(rr.getCapability(),
+          rr.getConcurrency() * numGangsToAllocate);
 
       planModifications.addInterval(reservationInt, reservationRes);
       allocationRequests.addInterval(reservationInt, reservationRes);
@@ -162,10 +177,10 @@ public class StageAllocatorLowCostAligned implements 
StageAllocator {
       DurationInterval updatedDurationInterval =
           getDurationInterval(bestDurationInterval.getStartTime(),
               bestDurationInterval.getStartTime() + duration, planLoads,
-              planModifications, capacity, resCalc, step);
+              planModifications, capacity, netRLERes, resCalc, step, gang);
 
       // Add to tree, if possible
-      if (updatedDurationInterval.canAllocate(gang, capacity, resCalc)) {
+      if (updatedDurationInterval.canAllocate()) {
         durationIntervalsSortedByCost.add(updatedDurationInterval);
       }
 
@@ -180,10 +195,12 @@ public class StageAllocatorLowCostAligned implements 
StageAllocator {
       return allocations;
     } else {
 
-      // If we are here is because we did not manage to satisfy this request.
-      // We remove unwanted side-effect from planModifications (needed for 
ANY).
-      for (Map.Entry<ReservationInterval, Resource> tempAllocation
-          : allocations.entrySet()) {
+      // If we are here is because we did not manage to satisfy this
+      // request.
+      // We remove unwanted side-effect from planModifications (needed for
+      // ANY).
+      for (Map.Entry<ReservationInterval, Resource> tempAllocation : 
allocations
+          .entrySet()) {
 
         planModifications.removeInterval(tempAllocation.getKey(),
             tempAllocation.getValue());
@@ -196,37 +213,144 @@ public class StageAllocatorLowCostAligned implements 
StageAllocator {
 
   }
 
-  protected DurationInterval getDurationInterval(long startTime, long endTime,
-      Map<Long, Resource> planLoads,
+  private List<Long> computeIntervalEndTimes(long stageEarliestStart,
+      long stageDeadline, long duration) {
+
+    List<Long> intervalEndTimes = new ArrayList<Long>();
+    if (!allocateLeft) {
+      for (long intervalEnd = stageDeadline; intervalEnd >= stageEarliestStart
+          + duration; intervalEnd -= duration) {
+        intervalEndTimes.add(intervalEnd);
+      }
+    } else {
+      for (long intervalStart =
+          stageEarliestStart; intervalStart <= stageDeadline
+              - duration; intervalStart += duration) {
+        intervalEndTimes.add(intervalStart + duration);
+      }
+    }
+
+    return intervalEndTimes;
+  }
+
+  protected static DurationInterval getDurationInterval(long startTime,
+      long endTime, RLESparseResourceAllocation planLoads,
       RLESparseResourceAllocation planModifications, Resource capacity,
-      ResourceCalculator resCalc, long step) {
+      RLESparseResourceAllocation netRLERes, ResourceCalculator resCalc,
+      long step, Resource requestedResources) throws PlanningException {
 
-    // Initialize the dominant loads structure
-    Resource dominantResources = Resource.newInstance(0, 0);
+    // Get the total cost associated with the duration interval
+    double totalCost = getDurationIntervalTotalCost(startTime, endTime,
+        planLoads, planModifications, capacity, resCalc, step);
 
-    // Calculate totalCost and maxLoad
-    double totalCost = 0.0;
-    for (long t = startTime; t < endTime; t += step) {
+    // Calculate how many gangs can fit, i.e., how many times can 'capacity'
+    // be allocated within the duration interval [startTime, endTime)
+    int gangsCanFit = getDurationIntervalGangsCanFit(startTime, endTime,
+        planModifications, capacity, netRLERes, resCalc, requestedResources);
+
+    // Return the desired durationInterval
+    return new DurationInterval(startTime, endTime, totalCost, gangsCanFit);
+
+  }
+
+  protected static double getDurationIntervalTotalCost(long startTime,
+      long endTime, RLESparseResourceAllocation planLoads,
+      RLESparseResourceAllocation planModifications, Resource capacity,
+      ResourceCalculator resCalc, long step) throws PlanningException {
+
+    // Compute the current resource load within the interval 
[startTime,endTime)
+    // by adding planLoads (existing load) and planModifications (load that
+    // corresponds to the current job).
+    RLESparseResourceAllocation currentLoad =
+        RLESparseResourceAllocation.merge(resCalc, capacity, planLoads,
+            planModifications, RLEOperator.add, startTime, endTime);
+
+    // Convert load from RLESparseResourceAllocation to a Map representation
+    NavigableMap<Long, Resource> mapCurrentLoad = currentLoad.getCumulative();
 
-      // Get the load
-      Resource load = getLoadAtTime(t, planLoads, planModifications);
+    // Initialize auxiliary variables
+    double totalCost = 0.0;
+    Long tPrev = -1L;
+    Resource loadPrev = Resources.none();
+    double cost = 0.0;
+
+    // Iterate over time points. For each point 't', accumulate the total cost
+    // that corresponds to the interval [tPrev, t). The cost associated within
+    // this interval is fixed for each of the time steps, therefore the cost of
+    // a single step is multiplied by (t - tPrev) / step.
+    for (Entry<Long, Resource> e : mapCurrentLoad.entrySet()) {
+      Long t = e.getKey();
+      Resource load = e.getValue();
+      if (tPrev != -1L) {
+        tPrev = Math.max(tPrev, startTime);
+        cost = calcCostOfLoad(loadPrev, capacity, resCalc);
+        totalCost = totalCost + cost * (t - tPrev) / step;
+      }
 
-      // Increase the total cost
-      totalCost += calcCostOfLoad(load, capacity, resCalc);
+      tPrev = t;
+      loadPrev = load;
+    }
 
-      // Update the dominant resources
-      dominantResources = Resources.componentwiseMax(dominantResources, load);
+    // Add the cost associated with the last interval (the for loop does not
+    // calculate it).
+    if (loadPrev != null) {
 
+      // This takes care of the corner case of a single entry
+      tPrev = Math.max(tPrev, startTime);
+      cost = calcCostOfLoad(loadPrev, capacity, resCalc);
+      totalCost = totalCost + cost * (endTime - tPrev) / step;
     }
 
-    // Return the corresponding durationInterval
-    return new DurationInterval(startTime, endTime, totalCost,
-        dominantResources);
+    // Return the overall cost
+    return totalCost;
+  }
+
+  protected static int getDurationIntervalGangsCanFit(long startTime,
+      long endTime, RLESparseResourceAllocation planModifications,
+      Resource capacity, RLESparseResourceAllocation netRLERes,
+      ResourceCalculator resCalc, Resource requestedResources)
+      throws PlanningException {
+
+    // Initialize auxiliary variables
+    int gangsCanFit = Integer.MAX_VALUE;
+    int curGangsCanFit;
+
+    // Calculate the total amount of available resources between startTime
+    // and endTime, by subtracting planModifications from netRLERes
+    RLESparseResourceAllocation netAvailableResources =
+        RLESparseResourceAllocation.merge(resCalc, capacity, netRLERes,
+            planModifications, RLEOperator.subtractTestNonNegative, startTime,
+            endTime);
+
+    // Convert result to a map
+    NavigableMap<Long, Resource> mapAvailableCapacity =
+        netAvailableResources.getCumulative();
+
+    // Iterate over the map representation.
+    // At each point, calculate how many times does 'requestedResources' fit.
+    // The result is the minimum over all time points.
+    for (Entry<Long, Resource> e : mapAvailableCapacity.entrySet()) {
+      Long t = e.getKey();
+      Resource curAvailable = e.getValue();
+      if (t >= endTime) {
+        break;
+      }
 
+      if (curAvailable == null) {
+        gangsCanFit = 0;
+      } else {
+        curGangsCanFit = (int) Math.floor(Resources.divide(resCalc, capacity,
+            curAvailable, requestedResources));
+        if (curGangsCanFit < gangsCanFit) {
+          gangsCanFit = curGangsCanFit;
+        }
+      }
+    }
+    return gangsCanFit;
   }
 
   protected double calcCostOfInterval(long startTime, long endTime,
-      Map<Long, Resource> planLoads,
+      RLESparseResourceAllocation planLoads,
       RLESparseResourceAllocation planModifications, Resource capacity,
       ResourceCalculator resCalc, long step) {
 
@@ -242,7 +366,8 @@ public class StageAllocatorLowCostAligned implements 
StageAllocator {
 
   }
 
-  protected double calcCostOfTimeSlot(long t, Map<Long, Resource> planLoads,
+  protected double calcCostOfTimeSlot(long t,
+      RLESparseResourceAllocation planLoads,
       RLESparseResourceAllocation planModifications, Resource capacity,
       ResourceCalculator resCalc) {
 
@@ -254,17 +379,17 @@ public class StageAllocatorLowCostAligned implements 
StageAllocator {
 
   }
 
-  protected Resource getLoadAtTime(long t, Map<Long, Resource> planLoads,
+  protected Resource getLoadAtTime(long t,
+      RLESparseResourceAllocation planLoads,
       RLESparseResourceAllocation planModifications) {
 
-    Resource planLoad = planLoads.get(t);
-    planLoad = (planLoad == null) ? Resource.newInstance(0, 0) : planLoad;
+    Resource planLoad = planLoads.getCapacityAtTime(t);
 
     return Resources.add(planLoad, planModifications.getCapacityAtTime(t));
 
   }
 
-  protected double calcCostOfLoad(Resource load, Resource capacity,
+  protected static double calcCostOfLoad(Resource load, Resource capacity,
       ResourceCalculator resCalc) {
 
     return resCalc.ratio(load, capacity);
@@ -289,42 +414,30 @@ public class StageAllocatorLowCostAligned implements 
StageAllocator {
     private long startTime;
     private long endTime;
     private double cost;
-    private Resource maxLoad;
+    private final int gangsCanFit;
 
     // Constructor
     public DurationInterval(long startTime, long endTime, double cost,
-        Resource maxLoad) {
+        int gangsCanfit) {
       this.startTime = startTime;
       this.endTime = endTime;
       this.cost = cost;
-      this.maxLoad = maxLoad;
+      this.gangsCanFit = gangsCanfit;
     }
 
     // canAllocate() - boolean function, returns whether requestedResources
     // can be allocated during the durationInterval without
     // violating capacity constraints
-    public boolean canAllocate(Resource requestedResources, Resource capacity,
-        ResourceCalculator resCalc) {
-
-      Resource updatedMaxLoad = Resources.add(maxLoad, requestedResources);
-      return (resCalc.compare(capacity, updatedMaxLoad, capacity) <= 0);
-
+    public boolean canAllocate() {
+      return (gangsCanFit > 0);
     }
 
     // numCanFit() - returns the maximal number of requestedResources can be
     // allocated during the durationInterval without violating
     // capacity constraints
-    public int numCanFit(Resource requestedResources, Resource capacity,
-        ResourceCalculator resCalc) {
-
-      // Represents the largest resource demand that can be satisfied 
throughout
-      // the entire DurationInterval (i.e., during [startTime,endTime))
-      Resource availableResources = Resources.subtract(capacity, maxLoad);
-
-      // Maximal number of requestedResources that fit inside the interval
-      return (int) Math.floor(Resources.divide(resCalc, capacity,
-          availableResources, requestedResources));
 
+    public int numCanFit() {
+      return gangsCanFit;
     }
 
     public long getStartTime() {
@@ -343,14 +456,6 @@ public class StageAllocatorLowCostAligned implements 
StageAllocator {
       this.endTime = value;
     }
 
-    public Resource getMaxLoad() {
-      return this.maxLoad;
-    }
-
-    public void setMaxLoad(Resource value) {
-      this.maxLoad = value;
-    }
-
     public double getTotalCost() {
       return this.cost;
     }
@@ -359,11 +464,17 @@ public class StageAllocatorLowCostAligned implements 
StageAllocator {
       this.cost = value;
     }
 
+    @Override
     public String toString() {
+
       StringBuilder sb = new StringBuilder();
+
       sb.append(" start: " + startTime).append(" end: " + endTime)
-          .append(" cost: " + cost).append(" maxLoad: " + maxLoad);
+          .append(" cost: " + cost).append(" gangsCanFit: " + gangsCanFit);
+
       return sb.toString();
+
     }
+
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0eae1c63/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStart.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStart.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStart.java
deleted file mode 100644
index 547616a..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStart.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
-
-import org.apache.hadoop.yarn.api.records.ReservationDefinition;
-import org.apache.hadoop.yarn.api.records.ReservationRequest;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
-
-/**
- * Interface for setting the earliest start time of a stage in 
IterativePlanner.
- */
-public interface StageEarliestStart {
-
-  /**
-   * Computes the earliest allowed starting time for a given stage.
-   *
-   * @param plan the Plan to which the reservation must be fitted
-   * @param reservation the job contract
-   * @param index the index of the stage in the job contract
-   * @param currentReservationStage the stage
-   * @param stageDeadline the deadline of the stage set by the two phase
-   *          planning algorithm
-   *
-   * @return the earliest allowed starting time for the stage.
-   */
-  long setEarliestStartTime(Plan plan, ReservationDefinition reservation,
-          int index, ReservationRequest currentReservationStage,
-          long stageDeadline);
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0eae1c63/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByDemand.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByDemand.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByDemand.java
deleted file mode 100644
index 43d6584..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByDemand.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
-
-import java.util.ListIterator;
-
-import org.apache.hadoop.yarn.api.records.ReservationDefinition;
-import org.apache.hadoop.yarn.api.records.ReservationRequest;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
-
-/**
- * Sets the earliest start time of a stage proportional to the job weight. The
- * interval [jobArrival, stageDeadline) is divided as follows. First, each 
stage
- * is guaranteed at least its requested duration. Then, the stage receives a
- * fraction of the remaining time. The fraction is calculated as the ratio
- * between the weight (total requested resources) of the stage and the total
- * weight of all proceeding stages.
- */
-
-public class StageEarliestStartByDemand implements StageEarliestStart {
-
-  private long step;
-
-  @Override
-  public long setEarliestStartTime(Plan plan,
-      ReservationDefinition reservation, int index, ReservationRequest current,
-      long stageDeadline) {
-
-    step = plan.getStep();
-
-    // If this is the first stage, don't bother with the computation.
-    if (index < 1) {
-      return reservation.getArrival();
-    }
-
-    // Get iterator
-    ListIterator<ReservationRequest> li =
-        reservation.getReservationRequests().getReservationResources()
-            .listIterator(index);
-    ReservationRequest rr;
-
-    // Calculate the total weight & total duration
-    double totalWeight = calcWeight(current);
-    long totalDuration = getRoundedDuration(current, plan);
-
-    while (li.hasPrevious()) {
-      rr = li.previous();
-      totalWeight += calcWeight(rr);
-      totalDuration += getRoundedDuration(rr, plan);
-    }
-
-    // Compute the weight of the current stage as compared to remaining ones
-    double ratio = calcWeight(current) / totalWeight;
-
-    // Estimate an early start time, such that:
-    // 1. Every stage is guaranteed to receive at least its duration
-    // 2. The remainder of the window is divided between stages
-    // proportionally to its workload (total memory consumption)
-    long window = stageDeadline - reservation.getArrival();
-    long windowRemainder = window - totalDuration;
-    long earlyStart =
-        (long) (stageDeadline - getRoundedDuration(current, plan)
-            - (windowRemainder * ratio));
-
-    // Realign if necessary (since we did some arithmetic)
-    earlyStart = stepRoundUp(earlyStart, step);
-
-    // Return
-    return earlyStart;
-
-  }
-
-  // Weight = total memory consumption of stage
-  protected double calcWeight(ReservationRequest stage) {
-    return (stage.getDuration() * stage.getCapability().getMemorySize())
-        * (stage.getNumContainers());
-  }
-
-  protected long getRoundedDuration(ReservationRequest stage, Plan plan) {
-    return stepRoundUp(stage.getDuration(), step);
-  }
-
-  protected static long stepRoundDown(long t, long step) {
-    return (t / step) * step;
-  }
-
-  protected static long stepRoundUp(long t, long step) {
-    return ((t + step - 1) / step) * step;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0eae1c63/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByJobArrival.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByJobArrival.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByJobArrival.java
deleted file mode 100644
index 8347816..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByJobArrival.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
-
-import org.apache.hadoop.yarn.api.records.ReservationDefinition;
-import org.apache.hadoop.yarn.api.records.ReservationRequest;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
-
-/**
- * Sets the earliest start time of a stage as the job arrival time.
- */
-public class StageEarliestStartByJobArrival implements StageEarliestStart {
-
-  @Override
-  public long setEarliestStartTime(Plan plan,
-      ReservationDefinition reservation, int index, ReservationRequest current,
-      long stageDeadline) {
-
-    return reservation.getArrival();
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0eae1c63/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionInterval.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionInterval.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionInterval.java
new file mode 100644
index 0000000..8f7f5f7
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionInterval.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
+import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+
+/**
+ * An auxiliary class used to compute the time interval in which the stage can
+ * be allocated resources by {@link IterativePlanner}.
+ */
+public interface StageExecutionInterval {
+  /**
+   * Computes the earliest allowed starting time for a given stage.
+   *
+   * @param plan the Plan to which the reservation must be fitted
+   * @param reservation the job contract
+   * @param currentReservationStage the stage
+   * @param allocateLeft is the job allocated from left to right
+   * @param allocations Existing resource assignments for the job
+   * @return the time interval in which the stage can get resources.
+   */
+  ReservationInterval computeExecutionInterval(Plan plan,
+      ReservationDefinition reservation,
+      ReservationRequest currentReservationStage, boolean allocateLeft,
+      RLESparseResourceAllocation allocations);
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0eae1c63/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionIntervalByDemand.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionIntervalByDemand.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionIntervalByDemand.java
new file mode 100644
index 0000000..95f1d4b
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionIntervalByDemand.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
+import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.IterativePlanner.StageProvider;
+
+/**
+ * An implementation of {@link StageExecutionInterval}, which sets the 
execution
+ * interval of the stage. For ANY and ALL jobs, the interval is
+ * [jobArrival,jobDeadline]. For ORDER jobs, the the maximal possible time
+ * interval is divided as follows: First, each stage is guaranteed at least its
+ * requested duration. Then, the stage receives a fraction of the remaining
+ * time. The fraction is calculated as the ratio between the weight (total
+ * requested resources) of the stage and the total weight of all remaining
+ * stages.
+ */
+
+public class StageExecutionIntervalByDemand implements StageExecutionInterval {
+
+  private long step;
+
+  @Override
+  public ReservationInterval computeExecutionInterval(Plan plan,
+      ReservationDefinition reservation,
+      ReservationRequest currentReservationStage, boolean allocateLeft,
+      RLESparseResourceAllocation allocations) {
+
+    // Use StageExecutionIntervalUnconstrained to get the maximal interval
+    ReservationInterval maxInterval =
+        (new StageExecutionIntervalUnconstrained()).computeExecutionInterval(
+            plan, reservation, currentReservationStage, allocateLeft,
+            allocations);
+
+    ReservationRequestInterpreter jobType =
+        reservation.getReservationRequests().getInterpreter();
+
+    // For unconstrained jobs, such as ALL & ANY, we can use the unconstrained
+    // version
+    if ((jobType != ReservationRequestInterpreter.R_ORDER)
+        && (jobType != ReservationRequestInterpreter.R_ORDER_NO_GAP)) {
+      return maxInterval;
+    }
+
+    // For ORDER and ORDER_NO_GAP, take a sub-interval of maxInterval
+    step = plan.getStep();
+
+    double totalWeight = 0.0;
+    long totalDuration = 0;
+
+    // Iterate over the stages that haven't been allocated.
+    // For allocateLeft == True, we iterate in reverse order, starting from the
+    // last
+    // stage, until we reach the current stage.
+    // For allocateLeft == False, we do the opposite.
+    StageProvider stageProvider = new StageProvider(!allocateLeft, 
reservation);
+
+    while (stageProvider.hasNext()) {
+      ReservationRequest rr = stageProvider.next();
+      totalWeight += calcWeight(rr);
+      totalDuration += getRoundedDuration(rr, step);
+
+      // Stop once we reach current
+      if (rr == currentReservationStage) {
+        break;
+      }
+    }
+
+    // Compute the weight of the current stage as compared to remaining ones
+    double ratio = calcWeight(currentReservationStage) / totalWeight;
+
+    // Estimate an early start time, such that:
+    // 1. Every stage is guaranteed to receive at least its duration
+    // 2. The remainder of the window is divided between stages
+    // proportionally to its workload (total memory consumption)
+    long maxIntervalArrival = maxInterval.getStartTime();
+    long maxIntervalDeadline = maxInterval.getEndTime();
+    long window = maxIntervalDeadline - maxIntervalArrival;
+    long windowRemainder = window - totalDuration;
+
+    if (allocateLeft) {
+      long latestEnd =
+          (long) (maxIntervalArrival
+              + getRoundedDuration(currentReservationStage, step)
+              + (windowRemainder * ratio));
+
+      // Realign if necessary (since we did some arithmetic)
+      latestEnd = stepRoundDown(latestEnd, step);
+
+      // Return new interval
+      return new ReservationInterval(maxIntervalArrival, latestEnd);
+    } else {
+      long earlyStart =
+          (long) (maxIntervalDeadline
+              - getRoundedDuration(currentReservationStage, step)
+              - (windowRemainder * ratio));
+
+      // Realign if necessary (since we did some arithmetic)
+      earlyStart = stepRoundUp(earlyStart, step);
+
+      // Return new interval
+      return new ReservationInterval(earlyStart, maxIntervalDeadline);
+    }
+  }
+
+  // Weight = total memory consumption of stage
+  protected double calcWeight(ReservationRequest stage) {
+    return (stage.getDuration() * stage.getCapability().getMemorySize())
+        * (stage.getNumContainers());
+  }
+
+  protected long getRoundedDuration(ReservationRequest stage, Long s) {
+    return stepRoundUp(stage.getDuration(), s);
+  }
+
+  protected static long stepRoundDown(long t, long s) {
+    return (t / s) * s;
+  }
+
+  protected static long stepRoundUp(long t, long s) {
+    return ((t + s - 1) / s) * s;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0eae1c63/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionIntervalUnconstrained.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionIntervalUnconstrained.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionIntervalUnconstrained.java
new file mode 100644
index 0000000..cccd9d8
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionIntervalUnconstrained.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
+import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+
+/**
+ * An implementation of {@link StageExecutionInterval} which gives each stage
+ * the maximal possible time interval, given the job constraints. Specifically,
+ * for ANY and ALL jobs, the interval would be [jobArrival, jobDeadline). For
+ * ORDER jobs, the stage cannot start before its predecessors (if allocateLeft
+ * == true) or cannot end before its successors (if allocateLeft == false)
+ */
+public class StageExecutionIntervalUnconstrained implements
+    StageExecutionInterval {
+
+  @Override
+  public ReservationInterval computeExecutionInterval(Plan plan,
+      ReservationDefinition reservation,
+      ReservationRequest currentReservationStage, boolean allocateLeft,
+      RLESparseResourceAllocation allocations) {
+
+    Long stageArrival = reservation.getArrival();
+    Long stageDeadline = reservation.getDeadline();
+
+    ReservationRequestInterpreter jobType =
+        reservation.getReservationRequests().getInterpreter();
+
+    // Left to right
+    if (allocateLeft) {
+      // If ORDER job, change the stage arrival time
+      if ((jobType == ReservationRequestInterpreter.R_ORDER)
+          || (jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP)) {
+        Long allocationEndTime = allocations.getLatestNonNullTime();
+        if (allocationEndTime != -1) {
+          stageArrival = allocationEndTime;
+        }
+      }
+      // Right to left
+    } else {
+      // If ORDER job, change the stage deadline
+      if ((jobType == ReservationRequestInterpreter.R_ORDER)
+          || (jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP)) {
+        Long allocationStartTime = allocations.getEarliestStartTime();
+        if (allocationStartTime != -1) {
+          stageDeadline = allocationStartTime;
+        }
+      }
+    }
+    return new ReservationInterval(stageArrival, stageDeadline);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to