YARN-5329. Placement Agent enhancements required to support recurring 
reservations in ReservationSystem. (Carlo Curino via Subru).


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e6e614e3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e6e614e3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e6e614e3

Branch: refs/heads/HDFS-10467
Commit: e6e614e380ed1d746973b50f666a9c40d272073e
Parents: fe84cdc
Author: Subru Krishnan <su...@apache.org>
Authored: Wed Oct 4 19:28:27 2017 -0700
Committer: Subru Krishnan <su...@apache.org>
Committed: Wed Oct 4 19:28:27 2017 -0700

----------------------------------------------------------------------
 .../reservation/InMemoryPlan.java               |   6 +
 .../PeriodicRLESparseResourceAllocation.java    |   3 +-
 .../RLESparseResourceAllocation.java            |   6 +-
 .../reservation/planning/IterativePlanner.java  |  27 ++-
 .../reservation/planning/PlanningAlgorithm.java |  79 ++++---
 .../reservation/planning/StageAllocator.java    |   3 +-
 .../planning/StageAllocatorGreedy.java          |   4 +-
 .../planning/StageAllocatorGreedyRLE.java       |   7 +-
 .../planning/StageAllocatorLowCostAligned.java  |   6 +-
 .../reservation/BaseSharingPolicyTest.java      |   8 +-
 .../reservation/TestCapacityOverTimePolicy.java |  29 +--
 .../reservation/TestInMemoryPlan.java           |  69 +++---
 .../planning/TestAlignedPlanner.java            |  40 +++-
 .../planning/TestGreedyReservationAgent.java    |  69 +++---
 .../planning/TestReservationAgents.java         | 213 +++++++++++++++++++
 15 files changed, 442 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6e614e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
index 9eb1820..7187510 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
@@ -723,6 +723,12 @@ public class InMemoryPlan implements Plan {
               + periodicRle.getTimePeriod() + ")");
         }
 
+        if (period < (end - start)) {
+          throw new PlanningException(
+              "Invalid input: (end - start) = (" + end + " - " + start + ") = "
+                  + (end - start) + " > period = " + period);
+        }
+
         // find the minimum resources available among all the instances that 
fit
         // in the LCM
         long numInstInLCM = periodicRle.getTimePeriod() / period;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6e614e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PeriodicRLESparseResourceAllocation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PeriodicRLESparseResourceAllocation.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PeriodicRLESparseResourceAllocation.java
index 7bc44f5..d326944 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PeriodicRLESparseResourceAllocation.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PeriodicRLESparseResourceAllocation.java
@@ -221,7 +221,8 @@ public class PeriodicRLESparseResourceAllocation
       NavigableMap<Long, Resource> cumulativeMap = this.getCumulative();
       Long previous = cumulativeMap.floorKey(relativeStart);
       previous = (previous != null) ? previous : 0;
-      for (long i = 0; i <= (end - start) / timePeriod; i++) {
+      //make sure to go one past end, to catch end times extending past period
+      for (long i = 0; i <= 1 + (end - start) / timePeriod; i++) {
         for (Map.Entry<Long, Resource> e : cumulativeMap.entrySet()) {
           long curKey = e.getKey() + (i * timePeriod);
           if (curKey >= previous && (start + curKey - relativeStart) <= end) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6e614e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
index 3062f3d..8280bc9 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java
@@ -423,7 +423,8 @@ public class RLESparseResourceAllocation {
       Resource outRes) {
 
     if (out.isEmpty() || (out.lastEntry() != null && outRes == null)
-        || !Resources.equals(out.lastEntry().getValue(), outRes)) {
+        || (out.lastEntry().getValue() != null
+            && !Resources.equals(out.lastEntry().getValue(), outRes))) {
       out.put(time, outRes);
     }
 
@@ -460,7 +461,8 @@ public class RLESparseResourceAllocation {
       if (!Resources.fitsIn(b, a)) {
         throw new PlanningException(
             "RLESparseResourceAllocation: merge failed as the "
-                + "resulting RLESparseResourceAllocation would be negative");
+                + "resulting RLESparseResourceAllocation would "
+                + "be negative, when testing: (" + eB + ") > (" + eA + ")");
       } else {
         return Resources.subtract(a, b);
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6e614e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
index 83f272e..9b18e90 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java
@@ -98,6 +98,12 @@ public class IterativePlanner extends PlanningAlgorithm {
     // Current stage
     ReservationRequest currentReservationStage;
 
+    // initialize periodicity
+    long period = 0;
+    if(reservation.getRecurrenceExpression() != null){
+      period = Long.parseLong(reservation.getRecurrenceExpression());
+    }
+
     // Iterate the stages in reverse order
     while (stageProvider.hasNext()) {
 
@@ -117,7 +123,7 @@ public class IterativePlanner extends PlanningAlgorithm {
       // Compute stage allocation
       Map<ReservationInterval, Resource> curAlloc =
           computeStageAllocation(plan, currentReservationStage, stageArrival,
-              stageDeadline, user, reservationId);
+              stageDeadline, period, user, reservationId);
 
       // If we did not find an allocation, return NULL
       // (unless it's an ANY job, then we simply continue).
@@ -216,11 +222,10 @@ public class IterativePlanner extends PlanningAlgorithm {
     planLoads = plan.getCumulativeLoadOverTime(jobArrival, jobDeadline);
     ReservationAllocation oldRes = plan.getReservationById(reservationId);
     if (oldRes != null) {
-      planLoads =
-          RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
-              plan.getTotalCapacity(), planLoads,
-              oldRes.getResourcesOverTime(), RLEOperator.subtract, jobArrival,
-              jobDeadline);
+      planLoads = RLESparseResourceAllocation.merge(
+          plan.getResourceCalculator(), plan.getTotalCapacity(), planLoads,
+          oldRes.getResourcesOverTime(jobArrival, jobDeadline),
+          RLEOperator.subtract, jobArrival, jobDeadline);
     }
   }
 
@@ -309,13 +314,13 @@ public class IterativePlanner extends PlanningAlgorithm {
   }
 
   // Call algStageAllocator
-  protected Map<ReservationInterval, Resource> computeStageAllocation(
-      Plan plan, ReservationRequest rr, long stageArrivalTime,
-      long stageDeadline, String user, ReservationId oldId)
-      throws PlanningException {
+  protected Map<ReservationInterval, Resource> computeStageAllocation(Plan 
plan,
+      ReservationRequest rr, long stageArrivalTime, long stageDeadline,
+      long period, String user, ReservationId oldId) throws PlanningException {
 
     return algStageAllocator.computeStageAllocation(plan, planLoads,
-        planModifications, rr, stageArrivalTime, stageDeadline, user, oldId);
+        planModifications, rr, stageArrivalTime, stageDeadline, period, user,
+        oldId);
 
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6e614e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
index bbbf0d6..d4b4b9e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
@@ -26,6 +26,7 @@ import 
org.apache.hadoop.yarn.api.records.ReservationDefinition;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation;
+import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.PeriodicRLESparseResourceAllocation;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
 import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
 import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
@@ -63,21 +64,33 @@ public abstract class PlanningAlgorithm implements 
ReservationAgent {
 
     // Compute the job allocation
     RLESparseResourceAllocation allocation =
-        computeJobAllocation(plan, reservationId, adjustedContract, user);
+            computeJobAllocation(plan, reservationId, adjustedContract, user);
+
+    long period = Long.parseLong(contract.getRecurrenceExpression());
+
+    // Make allocation periodic if request is periodic
+    if (contract.getRecurrenceExpression() != null) {
+      if (period > 0) {
+        allocation =
+            new PeriodicRLESparseResourceAllocation(allocation, period);
+      }
+    }
 
     // If no job allocation was found, fail
     if (allocation == null) {
       throw new PlanningException(
-          "The planning algorithm could not find a valid allocation"
-              + " for your request");
+              "The planning algorithm could not find a valid allocation"
+                      + " for your request");
     }
 
     // Translate the allocation to a map (with zero paddings)
     long step = plan.getStep();
+
     long jobArrival = stepRoundUp(adjustedContract.getArrival(), step);
     long jobDeadline = stepRoundUp(adjustedContract.getDeadline(), step);
+
     Map<ReservationInterval, Resource> mapAllocations =
-        allocationsToPaddedMap(allocation, jobArrival, jobDeadline);
+        allocationsToPaddedMap(allocation, jobArrival, jobDeadline, period);
 
     // Create the reservation
     ReservationAllocation capReservation =
@@ -85,8 +98,7 @@ public abstract class PlanningAlgorithm implements 
ReservationAgent {
             adjustedContract, // Contract
             user, // User name
             plan.getQueueName(), // Queue name
-            findEarliestTime(mapAllocations), // Earliest start time
-            findLatestTime(mapAllocations), // Latest end time
+            adjustedContract.getArrival(), adjustedContract.getDeadline(),
             mapAllocations, // Allocations
             plan.getResourceCalculator(), // Resource calculator
             plan.getMinimumAllocation()); // Minimum allocation
@@ -100,33 +112,46 @@ public abstract class PlanningAlgorithm implements 
ReservationAgent {
 
   }
 
-  private Map<ReservationInterval, Resource>
-      allocationsToPaddedMap(RLESparseResourceAllocation allocation,
-          long jobArrival, long jobDeadline) {
-
-    // Allocate
-    Map<ReservationInterval, Resource> mapAllocations =
-        allocation.toIntervalMap();
+  private Map<ReservationInterval, Resource> allocationsToPaddedMap(
+      RLESparseResourceAllocation allocation, long jobArrival, long 
jobDeadline,
+      long period) {
 
     // Zero allocation
     Resource zeroResource = Resource.newInstance(0, 0);
 
-    // Pad at the beginning
-    long earliestStart = findEarliestTime(mapAllocations);
-    if (jobArrival < earliestStart) {
-      mapAllocations.put(new ReservationInterval(jobArrival, earliestStart),
-          zeroResource);
-    }
+    if (period > 0) {
+      if ((jobDeadline - jobArrival) >= period) {
+        allocation.addInterval(new ReservationInterval(0L, period),
+            zeroResource);
+      }
+      jobArrival = jobArrival % period;
+      jobDeadline = jobDeadline % period;
+
+      if (jobArrival <= jobDeadline) {
+        allocation.addInterval(new ReservationInterval(0, jobArrival),
+            zeroResource);
+        allocation.addInterval(new ReservationInterval(jobDeadline, period),
+            zeroResource);
+      } else {
+        allocation.addInterval(new ReservationInterval(jobDeadline, 
jobArrival),
+            zeroResource);
+      }
+    } else {
+      // Pad at the beginning
+      long earliestStart = findEarliestTime(allocation.toIntervalMap());
+      if (jobArrival < earliestStart) {
+        allocation.addInterval(
+            new ReservationInterval(jobArrival, earliestStart), zeroResource);
+      }
 
-    // Pad at the beginning
-    long latestEnd = findLatestTime(mapAllocations);
-    if (latestEnd < jobDeadline) {
-      mapAllocations.put(new ReservationInterval(latestEnd, jobDeadline),
-          zeroResource);
+      // Pad at the beginning
+      long latestEnd = findLatestTime(allocation.toIntervalMap());
+      if (latestEnd < jobDeadline) {
+        allocation.addInterval(new ReservationInterval(latestEnd, jobDeadline),
+            zeroResource);
+      }
     }
-
-    return mapAllocations;
-
+    return allocation.toIntervalMap();
   }
 
   public abstract RLESparseResourceAllocation computeJobAllocation(Plan plan,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6e614e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
index 8934b0f..1b47a69 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java
@@ -45,6 +45,7 @@ public interface StageAllocator {
    *          the stage by the two phase planning algorithm
    * @param stageDeadline the deadline of the stage set by the two phase
    *          planning algorithm
+   * @param period the periodicity with which this stage appears
    * @param user name of the user
    * @param oldId identifier of the old reservation
    *
@@ -55,7 +56,7 @@ public interface StageAllocator {
   Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
       RLESparseResourceAllocation planLoads,
       RLESparseResourceAllocation planModifications, ReservationRequest rr,
-      long stageArrival, long stageDeadline, String user,
+      long stageArrival, long stageDeadline, long period, String user,
       ReservationId oldId) throws PlanningException;
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6e614e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
index d107487..267e673 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java
@@ -43,7 +43,7 @@ public class StageAllocatorGreedy implements StageAllocator {
   public Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
       RLESparseResourceAllocation planLoads,
       RLESparseResourceAllocation planModifications, ReservationRequest rr,
-      long stageEarliestStart, long stageDeadline, String user,
+      long stageEarliestStart, long stageDeadline, long period, String user,
       ReservationId oldId) throws PlanningException {
 
     Resource totalCapacity = plan.getTotalCapacity();
@@ -69,7 +69,7 @@ public class StageAllocatorGreedy implements StageAllocator {
 
     RLESparseResourceAllocation netAvailable =
         plan.getAvailableResourceOverTime(user, oldId, stageEarliestStart,
-            stageDeadline, 0);
+            stageDeadline, period);
 
     netAvailable =
         RLESparseResourceAllocation.merge(plan.getResourceCalculator(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6e614e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java
index ae7d91a..a11ea0d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java
@@ -54,7 +54,7 @@ public class StageAllocatorGreedyRLE implements 
StageAllocator {
   public Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
       RLESparseResourceAllocation planLoads,
       RLESparseResourceAllocation planModifications, ReservationRequest rr,
-      long stageEarliestStart, long stageDeadline, String user,
+      long stageEarliestStart, long stageDeadline, long period, String user,
       ReservationId oldId) throws PlanningException {
 
     // abort early if the interval is not satisfiable
@@ -83,8 +83,9 @@ public class StageAllocatorGreedyRLE implements 
StageAllocator {
     int gangsToPlace = rr.getNumContainers() / rr.getConcurrency();
 
     // get available resources from plan
-    RLESparseResourceAllocation netRLERes = plan.getAvailableResourceOverTime(
-        user, oldId, stageEarliestStart, stageDeadline, 0);
+    RLESparseResourceAllocation netRLERes =
+        plan.getAvailableResourceOverTime(user, oldId, stageEarliestStart,
+            stageDeadline, period);
 
     // remove plan modifications
     netRLERes =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6e614e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
index c014549..f67973f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java
@@ -70,15 +70,15 @@ public class StageAllocatorLowCostAligned implements 
StageAllocator {
   public Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
       RLESparseResourceAllocation planLoads,
       RLESparseResourceAllocation planModifications, ReservationRequest rr,
-      long stageArrival, long stageDeadline, String user, ReservationId oldId)
-      throws PlanningException {
+      long stageArrival, long stageDeadline, long period, String user,
+      ReservationId oldId) throws PlanningException {
 
     // Initialize
     ResourceCalculator resCalc = plan.getResourceCalculator();
     Resource capacity = plan.getTotalCapacity();
 
     RLESparseResourceAllocation netRLERes = plan.getAvailableResourceOverTime(
-        user, oldId, stageArrival, stageDeadline, 0);
+        user, oldId, stageArrival, stageDeadline, period);
 
     long step = plan.getStep();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6e614e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/BaseSharingPolicyTest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/BaseSharingPolicyTest.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/BaseSharingPolicyTest.java
index 294564a..b9ce54e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/BaseSharingPolicyTest.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/BaseSharingPolicyTest.java
@@ -180,8 +180,12 @@ public abstract class BaseSharingPolicyTest {
       }
     }
 
-
-    rle.addInterval(new ReservationInterval(rStart, rEnd), alloc);
+    if(rStart > rEnd){
+      rle.addInterval(new ReservationInterval(rStart, period), alloc);
+      rle.addInterval(new ReservationInterval(0, rEnd), alloc);
+    } else {
+      rle.addInterval(new ReservationInterval(rStart, rEnd), alloc);
+    }
     return rle;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6e614e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
index d054d3a..4ad4c3c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
 import java.util.Collection;
 
 import net.jcip.annotations.NotThreadSafe;
+import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException;
 import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
 import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException;
 import org.junit.Test;
@@ -39,63 +40,67 @@ public class TestCapacityOverTimePolicy extends 
BaseSharingPolicyTest {
   final static long ONEDAY = 86400 * 1000;
   final static long ONEHOUR = 3600 * 1000;
   final static long ONEMINUTE = 60 * 1000;
-  final static String TWODAYPERIOD = "7200000";
+  final static String TWOHOURPERIOD = "7200000";
   final static String ONEDAYPERIOD = "86400000";
 
   @Parameterized.Parameters(name = "Duration {0}, height {1}," +
-          " submission {2}, periodic {3})")
+          " numSubmission {2}, periodic {3})")
   public static Collection<Object[]> data() {
     return Arrays.asList(new Object[][] {
 
         // easy fit
         {ONEHOUR, 0.25, 1, null, null },
-        {ONEHOUR, 0.25, 1, TWODAYPERIOD, null },
+        {ONEHOUR, 0.25, 1, TWOHOURPERIOD, null },
         {ONEHOUR, 0.25, 1, ONEDAYPERIOD, null },
 
         // instantaneous high, but fit integral and inst limits
         {ONEMINUTE, 0.74, 1, null, null },
-        {ONEMINUTE, 0.74, 1, TWODAYPERIOD, null },
+        {ONEMINUTE, 0.74, 1, TWOHOURPERIOD, null },
         {ONEMINUTE, 0.74, 1, ONEDAYPERIOD, null },
 
         // barely fit
         {ONEHOUR, 0.76, 1, null, PlanningQuotaException.class },
-        {ONEHOUR, 0.76, 1, TWODAYPERIOD, PlanningQuotaException.class },
+        {ONEHOUR, 0.76, 1, TWOHOURPERIOD, PlanningQuotaException.class },
         {ONEHOUR, 0.76, 1, ONEDAYPERIOD, PlanningQuotaException.class },
 
         // overcommit with single reservation
         {ONEHOUR, 1.1, 1, null, PlanningQuotaException.class },
-        {ONEHOUR, 1.1, 1, TWODAYPERIOD, PlanningQuotaException.class },
+        {ONEHOUR, 1.1, 1, TWOHOURPERIOD, PlanningQuotaException.class },
         {ONEHOUR, 1.1, 1, ONEDAYPERIOD, PlanningQuotaException.class },
 
         // barely fit with multiple reservations (instantaneously, lowering to
         // 1min to fit integral)
         {ONEMINUTE, 0.25, 3, null, null },
-        {ONEMINUTE, 0.25, 3, TWODAYPERIOD, null },
+        {ONEMINUTE, 0.25, 3, TWOHOURPERIOD, null },
         {ONEMINUTE, 0.25, 3, ONEDAYPERIOD, null },
 
         // overcommit with multiple reservations (instantaneously)
         {ONEMINUTE, 0.25, 4, null, PlanningQuotaException.class },
-        {ONEMINUTE, 0.25, 4, TWODAYPERIOD, PlanningQuotaException.class },
+        {ONEMINUTE, 0.25, 4, TWOHOURPERIOD, PlanningQuotaException.class },
         {ONEMINUTE, 0.25, 4, ONEDAYPERIOD, PlanningQuotaException.class },
 
         // (non-periodic) reservation longer than window
         {25 * ONEHOUR, 0.25, 1, null, PlanningQuotaException.class },
-        {25 * ONEHOUR, 0.25, 1, TWODAYPERIOD, PlanningQuotaException.class },
+        // NOTE: we generally don't accept periodicity < duration but the test
+        // generator will "wrap" this correctly
+        {25 * ONEHOUR, 0.25, 1, TWOHOURPERIOD, PlanningQuotaException.class },
         {25 * ONEHOUR, 0.25, 1, ONEDAYPERIOD, PlanningQuotaException.class },
 
         // (non-periodic) reservation longer than window
         {25 * ONEHOUR, 0.05, 5, null, PlanningQuotaException.class },
-        {25 * ONEHOUR, 0.05, 5, TWODAYPERIOD, PlanningQuotaException.class },
+        // NOTE: we generally don't accept periodicity < duration but the test
+        // generator will "wrap" this correctly
+        {25 * ONEHOUR, 0.05, 5, TWOHOURPERIOD, PlanningQuotaException.class },
         {25 * ONEHOUR, 0.05, 5, ONEDAYPERIOD, PlanningQuotaException.class },
 
         // overcommit integral
         {ONEDAY, 0.26, 1, null, PlanningQuotaException.class },
-        {2 * ONEHOUR, 0.26, 1, TWODAYPERIOD, PlanningQuotaException.class },
+        {2 * ONEHOUR, 0.26, 1, TWOHOURPERIOD, PlanningQuotaException.class },
         {2 * ONEDAY, 0.26, 1, ONEDAYPERIOD, PlanningQuotaException.class },
 
         // overcommit integral
         {ONEDAY / 2, 0.51, 1, null, PlanningQuotaException.class },
-        {2 * ONEHOUR / 2, 0.51, 1, TWODAYPERIOD,
+        {2 * ONEHOUR / 2, 0.51, 1, TWOHOURPERIOD,
             PlanningQuotaException.class },
         {2 * ONEDAY / 2, 0.51, 1, ONEDAYPERIOD, PlanningQuotaException.class }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6e614e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
index 03569d4..7f2d199 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
@@ -17,6 +17,7 @@
  
*******************************************************************************/
 package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
+import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -119,6 +120,18 @@ public class TestInMemoryPlan {
     checkAllocation(plan, alloc, start, 0);
   }
 
+  @Test(expected = PlanningException.class)
+  public void testOutOfRange() throws PlanningException {
+    maxPeriodicity = 100;
+    Plan plan = new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 
1L,
+        resCalc, minAlloc, maxAlloc, planName, replanner, true, maxPeriodicity,
+        context, new UTCClock());
+
+    // we expect the plan to complaint as the range 330-150 > 50
+    RLESparseResourceAllocation availableBefore =
+        plan.getAvailableResourceOverTime(user, null, 150, 330, 50);
+  }
+
   @Test
   public void testAddPeriodicReservation() throws PlanningException {
 
@@ -146,8 +159,14 @@ public class TestInMemoryPlan {
     checkAllocation(plan, alloc, start, period);
 
     RLESparseResourceAllocation available =
-        plan.getAvailableResourceOverTime(user, reservationID, 150, 330, 50);
-    System.out.println(available);
+        plan.getAvailableResourceOverTime(user, null, 130, 170, 50);
+
+    // the reservation has period 20 starting at 10, and the interaction with
+    // the period 50 request means that every 10 we expect a "90GB" point
+    assertEquals(92160, available.getCapacityAtTime(130).getMemorySize());
+    assertEquals(92160, available.getCapacityAtTime(140).getMemorySize());
+    assertEquals(92160, available.getCapacityAtTime(150).getMemorySize());
+
   }
 
   private void checkAllocation(Plan plan, int[] alloc, int start,
@@ -162,18 +181,18 @@ public class TestInMemoryPlan {
     for (int i = 0; i < alloc.length; i++) {
       // only one instance for non-periodic reservation
       if (periodicity <= 0) {
-        Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), 
(alloc[i])),
+        assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
             plan.getTotalCommittedResources(start + i));
-        Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), 
(alloc[i])),
+        assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
             userCons.getCapacityAtTime(start + i));
       } else {
         // periodic reservations should repeat
         long y = 0;
         Resource res = Resource.newInstance(1024 * (alloc[i]), (alloc[i]));
         while (y <= end * 2) {
-          Assert.assertEquals("At time: " + start + i + y, res,
+          assertEquals("At time: " + start + i + y, res,
               plan.getTotalCommittedResources(start + i + y));
-          Assert.assertEquals(" At time: " + (start + i + y), res,
+          assertEquals(" At time: " + (start + i + y), res,
               userCons.getCapacityAtTime(start + i + y));
           y = y + periodicity;
         }
@@ -253,9 +272,9 @@ public class TestInMemoryPlan {
     RLESparseResourceAllocation userCons =
         plan.getConsumptionForUserOverTime(user, start, start + alloc.length);
     for (int i = 0; i < alloc.length; i++) {
-      Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
+      assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
           plan.getTotalCommittedResources(start + i));
-      Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
+      assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
           userCons.getCapacityAtTime(start + i));
     }
 
@@ -275,9 +294,9 @@ public class TestInMemoryPlan {
         start + updatedAlloc.length);
 
     for (int i = 0; i < updatedAlloc.length; i++) {
-      Assert.assertEquals(Resource.newInstance(1024 * (updatedAlloc[i] + i),
+      assertEquals(Resource.newInstance(1024 * (updatedAlloc[i] + i),
           updatedAlloc[i] + i), plan.getTotalCommittedResources(start + i));
-      Assert.assertEquals(Resource.newInstance(1024 * (updatedAlloc[i] + i),
+      assertEquals(Resource.newInstance(1024 * (updatedAlloc[i] + i),
           updatedAlloc[i] + i), userCons.getCapacityAtTime(start + i));
     }
   }
@@ -371,10 +390,10 @@ public class TestInMemoryPlan {
         plan.getConsumptionForUserOverTime(user, start, start + alloc.length);
 
     for (int i = 0; i < alloc.length; i++) {
-      Assert.assertEquals(
+      assertEquals(
           Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)),
           plan.getTotalCommittedResources(start + i));
-      Assert.assertEquals(
+      assertEquals(
           Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)),
           userCons.getCapacityAtTime(start + i));
     }
@@ -389,9 +408,9 @@ public class TestInMemoryPlan {
     userCons =
         plan.getConsumptionForUserOverTime(user, start, start + alloc.length);
     for (int i = 0; i < alloc.length; i++) {
-      Assert.assertEquals(Resource.newInstance(0, 0),
+      assertEquals(Resource.newInstance(0, 0),
           plan.getTotalCommittedResources(start + i));
-      Assert.assertEquals(Resource.newInstance(0, 0),
+      assertEquals(Resource.newInstance(0, 0),
           userCons.getCapacityAtTime(start + i));
     }
   }
@@ -492,11 +511,11 @@ public class TestInMemoryPlan {
         plan.getConsumptionForUserOverTime(user, start, start + alloc2.length);
 
     for (int i = 0; i < alloc2.length; i++) {
-      Assert.assertEquals(
+      assertEquals(
           Resource.newInstance(1024 * (alloc1[i] + alloc2[i] + i),
               alloc1[i] + alloc2[i] + i),
           plan.getTotalCommittedResources(start + i));
-      Assert.assertEquals(
+      assertEquals(
           Resource.newInstance(1024 * (alloc1[i] + alloc2[i] + i),
               alloc1[i] + alloc2[i] + i),
           userCons.getCapacityAtTime(start + i));
@@ -530,9 +549,9 @@ public class TestInMemoryPlan {
 
     Assert.assertNull(plan.getReservationById(reservationID1));
     for (int i = 0; i < alloc1.length; i++) {
-      Assert.assertEquals(Resource.newInstance(0, 0),
+      assertEquals(Resource.newInstance(0, 0),
           plan.getTotalCommittedResources(start + i));
-      Assert.assertEquals(Resource.newInstance(0, 0),
+      assertEquals(Resource.newInstance(0, 0),
           userCons.getCapacityAtTime(start + i));
     }
   }
@@ -721,16 +740,16 @@ public class TestInMemoryPlan {
   private void doAssertions(Plan plan, ReservationAllocation rAllocation) {
     ReservationId reservationID = rAllocation.getReservationId();
     Assert.assertNotNull(plan.getReservationById(reservationID));
-    Assert.assertEquals(rAllocation, plan.getReservationById(reservationID));
+    assertEquals(rAllocation, plan.getReservationById(reservationID));
     Assert.assertTrue(((InMemoryPlan) plan).getAllReservations().size() == 1);
     if (rAllocation.getPeriodicity() <= 0) {
-      Assert.assertEquals(rAllocation.getEndTime(), plan.getLastEndTime());
+      assertEquals(rAllocation.getEndTime(), plan.getLastEndTime());
     }
-    Assert.assertEquals(totalCapacity, plan.getTotalCapacity());
-    Assert.assertEquals(minAlloc, plan.getMinimumAllocation());
-    Assert.assertEquals(maxAlloc, plan.getMaximumAllocation());
-    Assert.assertEquals(resCalc, plan.getResourceCalculator());
-    Assert.assertEquals(planName, plan.getQueueName());
+    assertEquals(totalCapacity, plan.getTotalCapacity());
+    assertEquals(minAlloc, plan.getMinimumAllocation());
+    assertEquals(maxAlloc, plan.getMaximumAllocation());
+    assertEquals(resCalc, plan.getResourceCalculator());
+    assertEquals(planName, plan.getQueueName());
     Assert.assertTrue(plan.getMoveOnExpiry());
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6e614e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestAlignedPlanner.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestAlignedPlanner.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestAlignedPlanner.java
index 25ec9c9..9fa8559 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestAlignedPlanner.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestAlignedPlanner.java
@@ -26,11 +26,14 @@ import static org.mockito.Mockito.mock;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
+import net.jcip.annotations.NotThreadSafe;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.ReservationDefinition;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.ReservationRequest;
@@ -54,11 +57,27 @@ import 
org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
+/**
+ * This class tests the {@code AlignedPlannerWithGreedy} agent.
+ */
+@RunWith(value = Parameterized.class)
+@NotThreadSafe
+@SuppressWarnings("VisibilityModifier")
 public class TestAlignedPlanner {
 
+  @Parameterized.Parameter(value = 0)
+  public String recurrenceExpression;
+
+  final static String NONPERIODIC = "0";
+  final static String THREEHOURPERIOD = "10800000";
+  final static String ONEDAYPERIOD = "86400000";
+
   private static final Logger LOG = LoggerFactory
       .getLogger(TestAlignedPlanner.class);
 
@@ -72,6 +91,16 @@ public class TestAlignedPlanner {
   private Resource clusterCapacity;
   private long step;
 
+
+  @Parameterized.Parameters(name = "Testing: periodicity {0})")
+  public static Collection<Object[]> data() {
+    return Arrays.asList(new Object[][]{
+            {NONPERIODIC},
+            {THREEHOURPERIOD},
+            {ONEDAYPERIOD}
+    });
+  }
+
   @Test
   public void testSingleReservationAccept() throws PlanningException {
 
@@ -107,6 +136,9 @@ public class TestAlignedPlanner {
     assertTrue(alloc1.toString(),
         check(alloc1, 10 * step, 20 * step, 10, 2048, 2));
 
+    System.out.println("--------AFTER AGENT----------");
+    System.out.println(plan.toString());
+
   }
 
   @Test
@@ -1139,9 +1171,11 @@ public class TestAlignedPlanner {
       long deadline, ReservationRequest[] reservationRequests,
       ReservationRequestInterpreter rType, String username) {
 
-    return ReservationDefinition.newInstance(arrival, deadline,
-        ReservationRequests.newInstance(Arrays.asList(reservationRequests),
-            rType), username);
+
+    return ReservationDefinition.newInstance(arrival,
+        deadline, ReservationRequests
+            .newInstance(Arrays.asList(reservationRequests), rType),
+        username, recurrenceExpression, Priority.UNDEFINED);
 
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6e614e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java
index 51b971b..7c7b0a5 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java
@@ -59,13 +59,20 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @RunWith(Parameterized.class)
+@SuppressWarnings("VisibilityModifier")
 public class TestGreedyReservationAgent {
 
+
+  @Parameterized.Parameter(value = 0)
+  public boolean allocateLeft;
+
+  @Parameterized.Parameter(value = 1)
+  public String recurrenceExpression;
+
   private static final Logger LOG = LoggerFactory
       .getLogger(TestGreedyReservationAgent.class);
 
@@ -76,16 +83,18 @@ public class TestGreedyReservationAgent {
   Resource maxAlloc = Resource.newInstance(1024 * 8, 8);
   Random rand = new Random();
   long step;
-  boolean allocateLeft;
-
-  public TestGreedyReservationAgent(Boolean b){
-    this.allocateLeft = b;
-  }
 
-  @Parameters
+  @Parameterized.Parameters(name = "Testing: allocateLeft {0}," +
+          " recurrenceExpression {1})")
   public static Collection<Object[]> data() {
       return Arrays.asList(new Object[][] {
-               {true}, {false}});
+              {true, "0"},
+              {false, "0"},
+              {true, "7200000"},
+              {false, "7200000"},
+              {true, "86400000"},
+              {false, "86400000"}
+      });
   }
 
   @Before
@@ -134,6 +143,7 @@ public class TestGreedyReservationAgent {
     ReservationDefinition rr = new ReservationDefinitionPBImpl();
     rr.setArrival(5 * step);
     rr.setDeadline(20 * step);
+    rr.setRecurrenceExpression(recurrenceExpression);
     ReservationRequest r = ReservationRequest.newInstance(
         Resource.newInstance(2048, 2), 10, 5, 10 * step);
     ReservationRequests reqs = new ReservationRequestsPBImpl();
@@ -193,6 +203,7 @@ public class TestGreedyReservationAgent {
     ReservationDefinition rr = new ReservationDefinitionPBImpl();
     rr.setArrival(5 * step);
     rr.setDeadline(100 * step);
+    rr.setRecurrenceExpression(recurrenceExpression);
     ReservationRequest r =
         ReservationRequest.newInstance(Resource.newInstance(2048, 2), 20, 20,
             10 * step);
@@ -283,8 +294,9 @@ public class TestGreedyReservationAgent {
     int[] f = { 100, 100 };
 
     ReservationDefinition rDef =
-        ReservationSystemTestUtil.createSimpleReservationDefinition(
-            30 * step, 30 * step + f.length * step, f.length * step);
+        ReservationSystemTestUtil.createSimpleReservationDefinition(30 * step,
+            30 * step + f.length * step, f.length * step, 1,
+            recurrenceExpression);
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(
             ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
@@ -296,6 +308,7 @@ public class TestGreedyReservationAgent {
     ReservationDefinition rr = new ReservationDefinitionPBImpl();
     rr.setArrival(0 * step);
     rr.setDeadline(70 * step);
+    rr.setRecurrenceExpression(recurrenceExpression);
     ReservationRequests reqs = new ReservationRequestsPBImpl();
     reqs.setInterpreter(ReservationRequestInterpreter.R_ORDER);
     ReservationRequest r = ReservationRequest.newInstance(
@@ -346,10 +359,11 @@ public class TestGreedyReservationAgent {
     prepareBasicPlan();
     // create a completely utilized segment at time 30
     int[] f = { 100, 100 };
-    ReservationDefinition rDef =
-        ReservationSystemTestUtil.createSimpleReservationDefinition(
-            30, 30 * step + f.length * step, f.length * step);
-    assertTrue(plan.toString(),
+    ReservationDefinition rDef = ReservationSystemTestUtil
+        .createSimpleReservationDefinition(30, 30 * step + f.length * step,
+            f.length * step, 1, recurrenceExpression);
+    assertTrue(
+        plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(
             ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
             "dedicated", 30 * step, 30 * step + f.length * step,
@@ -406,6 +420,7 @@ public class TestGreedyReservationAgent {
     ReservationDefinition rr = new ReservationDefinitionPBImpl();
     rr.setArrival(0 * step);
     rr.setDeadline(60 * step);
+    rr.setRecurrenceExpression(recurrenceExpression);
     ReservationRequests reqs = new ReservationRequestsPBImpl();
     reqs.setInterpreter(ReservationRequestInterpreter.R_ORDER_NO_GAP);
     ReservationRequest r = ReservationRequest.newInstance(
@@ -453,6 +468,7 @@ public class TestGreedyReservationAgent {
     ReservationDefinition rr = new ReservationDefinitionPBImpl();
     rr.setArrival(100 * step);
     rr.setDeadline(120 * step);
+    rr.setRecurrenceExpression(recurrenceExpression);
     ReservationRequests reqs = new ReservationRequestsPBImpl();
     reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
     ReservationRequest r = ReservationRequest.newInstance(
@@ -494,6 +510,7 @@ public class TestGreedyReservationAgent {
     ReservationDefinition rr = new ReservationDefinitionPBImpl();
     rr.setArrival(100 * step);
     rr.setDeadline(120 * step);
+    rr.setRecurrenceExpression(recurrenceExpression);
     ReservationRequests reqs = new ReservationRequestsPBImpl();
     reqs.setInterpreter(ReservationRequestInterpreter.R_ANY);
     ReservationRequest r = ReservationRequest.newInstance(
@@ -542,6 +559,7 @@ public class TestGreedyReservationAgent {
     ReservationDefinition rr = new ReservationDefinitionPBImpl();
     rr.setArrival(100L);
     rr.setDeadline(120L);
+    rr.setRecurrenceExpression(recurrenceExpression);
     ReservationRequests reqs = new ReservationRequestsPBImpl();
     reqs.setInterpreter(ReservationRequestInterpreter.R_ANY);
 
@@ -587,6 +605,7 @@ public class TestGreedyReservationAgent {
     ReservationDefinition rr = new ReservationDefinitionPBImpl();
     rr.setArrival(100 * step);
     rr.setDeadline(120 * step);
+    rr.setRecurrenceExpression(recurrenceExpression);
     ReservationRequests reqs = new ReservationRequestsPBImpl();
     reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
     ReservationRequest r = ReservationRequest.newInstance(
@@ -635,6 +654,7 @@ public class TestGreedyReservationAgent {
     ReservationDefinition rr = new ReservationDefinitionPBImpl();
     rr.setArrival(100L);
     rr.setDeadline(120L);
+    rr.setRecurrenceExpression(recurrenceExpression);
     ReservationRequests reqs = new ReservationRequestsPBImpl();
     reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
     ReservationRequest r = ReservationRequest.newInstance(
@@ -759,25 +779,4 @@ public class TestGreedyReservationAgent {
         + " in " + (end - start) + "ms");
   }
 
-  public static void main(String[] arg) {
-
-    boolean left = false;
-    // run a stress test with by default 1000 random jobs
-    int numJobs = 1000;
-    if (arg.length > 0) {
-      numJobs = Integer.parseInt(arg[0]);
-    }
-    if (arg.length > 1) {
-      left = Boolean.parseBoolean(arg[1]);
-    }
-
-    try {
-      TestGreedyReservationAgent test = new TestGreedyReservationAgent(left);
-      test.setup();
-      test.testStress(numJobs);
-    } catch (Exception e) {
-      e.printStackTrace();
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6e614e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestReservationAgents.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestReservationAgents.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestReservationAgents.java
new file mode 100644
index 0000000..386fa68
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestReservationAgents.java
@@ -0,0 +1,213 @@
+/*****************************************************************************
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *****************************************************************************/
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
+
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.CapacityOverTimePolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryPlan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
+import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
+import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
+import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Random;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * General purpose ReservationAgent tester.
+ */
+@RunWith(Parameterized.class)
+@SuppressWarnings("VisibilityModifier")
+public class TestReservationAgents {
+
+  @Parameterized.Parameter(value = 0)
+  public Class agentClass;
+
+  @Parameterized.Parameter(value = 1)
+  public boolean allocateLeft;
+
+  @Parameterized.Parameter(value = 2)
+  public String recurrenceExpression;
+
+  @Parameterized.Parameter(value = 3)
+  public int numOfNodes;
+
+  private long step;
+  private Random rand = new Random(2);
+  private ReservationAgent agent;
+  private Plan plan;
+  private ResourceCalculator resCalc = new DefaultResourceCalculator();
+  private Resource minAlloc = Resource.newInstance(1024, 1);
+  private Resource maxAlloc = Resource.newInstance(32 * 1023, 32);
+
+  private long timeHorizon = 2 * 24 * 3600 * 1000; // 2 days
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestReservationAgents.class);
+
+  @Parameterized.Parameters(name = "Testing: agent {0}, allocateLeft: {1}," +
+          " recurrenceExpression: {2}, numNodes: {3})")
+  public static Collection<Object[]> data() {
+    return Arrays.asList(
+        new Object[][] {{GreedyReservationAgent.class, true, "0", 100 },
+            {GreedyReservationAgent.class, false, "0", 100 },
+            {GreedyReservationAgent.class, true, "7200000", 100 },
+            {GreedyReservationAgent.class, false, "7200000", 100 },
+            {GreedyReservationAgent.class, true, "86400000", 100 },
+            {GreedyReservationAgent.class, false, "86400000", 100 },
+            {AlignedPlannerWithGreedy.class, true, "0", 100 },
+            {AlignedPlannerWithGreedy.class, false, "0", 100 },
+            {AlignedPlannerWithGreedy.class, true, "7200000", 100 },
+            {AlignedPlannerWithGreedy.class, false, "7200000", 100 },
+            {AlignedPlannerWithGreedy.class, true, "86400000", 100 },
+            {AlignedPlannerWithGreedy.class, false, "86400000", 100 } });
+  }
+
+  @Before
+  public void setup() throws Exception {
+
+    long seed = rand.nextLong();
+    rand.setSeed(seed);
+    LOG.info("Running with seed: " + seed);
+
+    // setting completely loose quotas
+    long timeWindow = 1000000L;
+    Resource clusterCapacity =
+        Resource.newInstance(numOfNodes * 1024, numOfNodes);
+    step = 1000L;
+    String reservationQ =
+        ReservationSystemTestUtil.getFullReservationQueueName();
+
+    float instConstraint = 100;
+    float avgConstraint = 100;
+
+    ReservationSchedulerConfiguration conf = ReservationSystemTestUtil
+        .createConf(reservationQ, timeWindow, instConstraint, avgConstraint);
+    CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
+    policy.init(reservationQ, conf);
+
+    // setting conf to
+    conf.setBoolean(GreedyReservationAgent.FAVOR_EARLY_ALLOCATION,
+        allocateLeft);
+    agent = (ReservationAgent) agentClass.newInstance();
+    agent.init(conf);
+
+    QueueMetrics queueMetrics = mock(QueueMetrics.class);
+    RMContext context = ReservationSystemTestUtil.createMockRMContext();
+
+    plan = new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step,
+        resCalc, minAlloc, maxAlloc, "dedicated", null, true, context);
+  }
+
+  @Test
+  public void test() throws Exception {
+
+    long period = Long.parseLong(recurrenceExpression);
+    for (int i = 0; i < 1000; i++) {
+      ReservationDefinition rr = createRandomRequest(i);
+      if (rr != null) {
+        ReservationId reservationID =
+            ReservationSystemTestUtil.getNewReservationId();
+        try {
+          agent.createReservation(reservationID, "u1", plan, rr);
+        } catch (PlanningException p) {
+          // happens
+        }
+      }
+    }
+
+  }
+
+  private ReservationDefinition createRandomRequest(int i)
+      throws PlanningException {
+    long arrival = (long) Math.floor(rand.nextDouble() * timeHorizon);
+    long period = Long.parseLong(recurrenceExpression);
+
+    // min between period and rand around 30min
+    long duration =
+        (long) Math.round(Math.min(rand.nextDouble() * 3600 * 1000, period));
+
+    // min between period and rand around 5x duration
+    long deadline = (long) Math
+        .ceil(arrival + Math.min(duration * rand.nextDouble() * 10, period));
+
+    assert((deadline - arrival) <= period);
+
+    RLESparseResourceAllocation available = plan
+        .getAvailableResourceOverTime("u1", null, arrival, deadline, period);
+    NavigableMap<Long, Resource> availableMap = available.getCumulative();
+
+    // look at available space, and for each segment, use half of it with 50%
+    // probability
+    List<ReservationRequest> reservationRequests = new ArrayList<>();
+    for (Map.Entry<Long, Resource> e : availableMap.entrySet()) {
+      if (e.getValue() != null && rand.nextDouble() > 0.001) {
+        int numContainers = (int) Math.ceil(Resources.divide(resCalc,
+            plan.getTotalCapacity(), e.getValue(), minAlloc) / 2);
+        long tempDur =
+            Math.min(duration, availableMap.higherKey(e.getKey()) - 
e.getKey());
+        reservationRequests.add(ReservationRequest.newInstance(minAlloc,
+            numContainers, 1, tempDur));
+      }
+    }
+
+    if (reservationRequests.size() < 1) {
+      return null;
+    }
+
+    ReservationDefinition rr = new ReservationDefinitionPBImpl();
+    rr.setArrival(arrival);
+    rr.setDeadline(deadline);
+    rr.setRecurrenceExpression(recurrenceExpression);
+    ReservationRequests reqs = new ReservationRequestsPBImpl();
+    reqs.setInterpreter(ReservationRequestInterpreter.R_ORDER);
+    reqs.setReservationResources(reservationRequests);
+    rr.setReservationRequests(reqs);
+    rr.setReservationName("res_" + i);
+
+    return rr;
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to