YARN-3739. Add reservation system recovery to RM recovery process. Contributed 
by  Subru Krishnan.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2798723a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2798723a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2798723a

Branch: refs/heads/HADOOP-11890
Commit: 2798723a5443d04455b9d79c48d61f435ab52267
Parents: 381610d
Author: Anubhav Dhoot <adh...@apache.org>
Authored: Thu Oct 22 06:36:58 2015 -0700
Committer: Anubhav Dhoot <adh...@apache.org>
Committed: Thu Oct 22 06:51:00 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../server/resourcemanager/ClientRMService.java |   2 +-
 .../server/resourcemanager/ResourceManager.java |   4 +
 .../reservation/AbstractReservationSystem.java  |  91 ++++-
 .../AbstractSchedulerPlanFollower.java          |  76 ++--
 .../CapacitySchedulerPlanFollower.java          |  16 -
 .../reservation/FairSchedulerPlanFollower.java  |  15 -
 .../reservation/InMemoryPlan.java               |  35 +-
 .../resourcemanager/reservation/PlanEdit.java   |   6 +-
 .../reservation/PlanFollower.java               |   4 +-
 .../reservation/ReservationSystem.java          |   7 +-
 .../reservation/planning/PlanningAlgorithm.java |   2 +-
 .../TestReservationSystemWithRMHA.java          | 360 +++++++++++++++++--
 .../reservation/ReservationSystemTestUtil.java  |   1 -
 .../reservation/TestCapacityOverTimePolicy.java |  29 +-
 .../reservation/TestInMemoryPlan.java           |  16 +-
 .../reservation/TestNoOverCommitPolicy.java     |  19 +-
 .../TestSchedulerPlanFollowerBase.java          |  17 +-
 .../planning/TestAlignedPlanner.java            |   7 +-
 .../planning/TestGreedyReservationAgent.java    |  17 +-
 .../planning/TestSimpleCapacityReplanner.java   |  14 +-
 21 files changed, 556 insertions(+), 185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2798723a/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index ae26386..79df1ce 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -238,6 +238,9 @@ Release 2.8.0 - UNRELEASED
     YARN-4262. Allow whitelisted users to run privileged docker containers.
     (Sidharta Seethana via vvasudev)
 
+    YARN-3739. Add reservation system recovery to RM recovery process.
+    (Subru Krishnan via adhoot)
+
   IMPROVEMENTS
 
     YARN-644. Basic null check is not performed on passed in arguments before

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2798723a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
index 4a02580..812267d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
@@ -1363,7 +1363,7 @@ public class ClientRMService extends AbstractService 
implements
           .format(
               "Reservation {0} is within threshold so attempting to create 
synchronously.",
               reservationId));
-      reservationSystem.synchronizePlan(planName);
+      reservationSystem.synchronizePlan(planName, true);
       LOG.info(MessageFormat.format("Created reservation {0} synchronously.",
           reservationId));
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2798723a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index b38f188..88fb1cf 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -1186,6 +1186,10 @@ public class ResourceManager extends CompositeService 
implements Recoverable {
     // recover AMRMTokenSecretManager
     rmContext.getAMRMTokenSecretManager().recover(state);
 
+    // recover reservations
+    if (reservationSystem != null) {
+      reservationSystem.recover(state);
+    }
     // recover applications
     rmAppManager.recover(state);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2798723a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
index cf57dbe..56423e2 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
@@ -18,16 +18,6 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
@@ -38,8 +28,11 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import 
org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
 import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
 import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@@ -52,6 +45,17 @@ import 
org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 /**
  * This is the implementation of {@link ReservationSystem} based on the
  * {@link ResourceScheduler}
@@ -94,6 +98,8 @@ public abstract class AbstractReservationSystem extends 
AbstractService
 
   private PlanFollower planFollower;
 
+  private boolean isRecoveryEnabled = false;
+
   /**
    * Construct the service.
    * 
@@ -149,6 +155,49 @@ public abstract class AbstractReservationSystem extends 
AbstractService
       Plan plan = initializePlan(planQueueName);
       plans.put(planQueueName, plan);
     }
+    isRecoveryEnabled = conf.getBoolean(
+        YarnConfiguration.RECOVERY_ENABLED,
+        YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
+  }
+
+  private void loadPlan(String planName,
+      Map<ReservationId, ReservationAllocationStateProto> reservations)
+          throws PlanningException {
+    Plan plan = plans.get(planName);
+    Resource minAllocation = getMinAllocation();
+    ResourceCalculator rescCalculator = getResourceCalculator();
+    for (Entry<ReservationId, ReservationAllocationStateProto> 
currentReservation : reservations
+        .entrySet()) {
+      plan.addReservation(ReservationSystemUtil.toInMemoryAllocation(planName,
+          currentReservation.getKey(), currentReservation.getValue(),
+          minAllocation, rescCalculator), true);
+      resQMap.put(currentReservation.getKey(), planName);
+    }
+    LOG.info("Recovered reservations for Plan: {}", planName);
+  }
+
+  @Override
+  public void recover(RMState state) throws Exception {
+    LOG.info("Recovering Reservation system");
+    writeLock.lock();
+    try {
+      Map<String, Map<ReservationId, ReservationAllocationStateProto>> 
reservationSystemState =
+          state.getReservationState();
+      if (planFollower != null) {
+        for (String plan : plans.keySet()) {
+          // recover reservations if any from state store
+          if (reservationSystemState.containsKey(plan)) {
+            loadPlan(plan, reservationSystemState.get(plan));
+          }
+          synchronizePlan(plan, false);
+        }
+        startPlanFollower(conf.getLong(
+            YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
+            
YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS));
+      }
+    } finally {
+      writeLock.unlock();
+    }
   }
 
   private void initializeNewPlans(Configuration conf) {
@@ -162,7 +211,7 @@ public abstract class AbstractReservationSystem extends 
AbstractService
           Plan plan = initializePlan(planQueueName);
           plans.put(planQueueName, plan);
         } else {
-          LOG.warn("Plan based on reservation queue {0} already exists.",
+          LOG.warn("Plan based on reservation queue {} already exists.",
               planQueueName);
         }
       }
@@ -236,18 +285,26 @@ public abstract class AbstractReservationSystem extends 
AbstractService
   }
 
   @Override
-  public void synchronizePlan(String planName) {
+  public void synchronizePlan(String planName, boolean shouldReplan) {
     writeLock.lock();
     try {
       Plan plan = plans.get(planName);
       if (plan != null) {
-        planFollower.synchronizePlan(plan);
+        planFollower.synchronizePlan(plan, shouldReplan);
       }
     } finally {
       writeLock.unlock();
     }
   }
 
+  private void startPlanFollower(long initialDelay) {
+    if (planFollower != null) {
+      scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
+      scheduledExecutorService.scheduleWithFixedDelay(planFollower,
+          initialDelay, planStepSize, TimeUnit.MILLISECONDS);
+    }
+  }
+
   @Override
   public void serviceInit(Configuration conf) throws Exception {
     Configuration configuration = new Configuration(conf);
@@ -262,10 +319,8 @@ public abstract class AbstractReservationSystem extends 
AbstractService
 
   @Override
   public void serviceStart() throws Exception {
-    if (planFollower != null) {
-      scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
-      scheduledExecutorService.scheduleWithFixedDelay(planFollower, 0L,
-          planStepSize, TimeUnit.MILLISECONDS);
+    if (!isRecoveryEnabled) {
+      startPlanFollower(planStepSize);
     }
     super.serviceStart();
   }
@@ -350,7 +405,7 @@ public abstract class AbstractReservationSystem extends 
AbstractService
             minAllocation, maxAllocation, planQueueName,
             getReplanner(planQueuePath), getReservationSchedulerConfiguration()
             .getMoveOnExpiry(planQueuePath), rmContext);
-    LOG.info("Intialized plan {0} based on reservable queue {1}",
+    LOG.info("Intialized plan {} based on reservable queue {}",
         plan.toString(), planQueueName);
     return plan;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2798723a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java
index ea7f27d..eaf2902 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java
@@ -27,8 +27,8 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
-
 import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,7 +43,7 @@ import java.util.Set;
 
 public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
   private static final Logger LOG = LoggerFactory
-      .getLogger(CapacitySchedulerPlanFollower.class);
+      .getLogger(AbstractSchedulerPlanFollower.class);
 
   protected Collection<Plan> plans = new ArrayList<Plan>();
   protected YarnScheduler scheduler;
@@ -59,7 +59,7 @@ public abstract class AbstractSchedulerPlanFollower 
implements PlanFollower {
   @Override
   public synchronized void run() {
     for (Plan plan : plans) {
-      synchronizePlan(plan);
+      synchronizePlan(plan, true);
     }
   }
 
@@ -70,7 +70,7 @@ public abstract class AbstractSchedulerPlanFollower 
implements PlanFollower {
   }
 
   @Override
-  public synchronized void synchronizePlan(Plan plan) {
+  public synchronized void synchronizePlan(Plan plan, boolean shouldReplan) {
      String planQueueName = plan.getQueueName();
     if (LOG.isDebugEnabled()) {
       LOG.debug("Running plan follower edit policy for plan: " + 
planQueueName);
@@ -88,14 +88,12 @@ public abstract class AbstractSchedulerPlanFollower 
implements PlanFollower {
     Resource clusterResources = scheduler.getClusterResource();
     Resource planResources = getPlanResources(plan, planQueue,
         clusterResources);
-
     Set<ReservationAllocation> currentReservations =
         plan.getReservationsAtTime(now);
     Set<String> curReservationNames = new HashSet<String>();
     Resource reservedResources = Resource.newInstance(0, 0);
     int numRes = getReservedResources(now, currentReservations,
         curReservationNames, reservedResources);
-
     // create the default reservation queue if it doesnt exist
     String defReservationId = getReservationIdFromQueueName(planQueueName) +
         ReservationConstants.DEFAULT_QUEUE_SUFFIX;
@@ -104,14 +102,18 @@ public abstract class AbstractSchedulerPlanFollower 
implements PlanFollower {
     createDefaultReservationQueue(planQueueName, planQueue,
         defReservationId);
     curReservationNames.add(defReservationId);
-
     // if the resources dedicated to this plan has shrunk invoke replanner
-    if (arePlanResourcesLessThanReservations(clusterResources, planResources,
-        reservedResources)) {
-      try {
-        plan.getReplanner().plan(plan, null);
-      } catch (PlanningException e) {
-        LOG.warn("Exception while trying to replan: {}", planQueueName, e);
+    boolean shouldResize = false;
+    if (arePlanResourcesLessThanReservations(plan.getResourceCalculator(),
+        clusterResources, planResources, reservedResources)) {
+      if (shouldReplan) {
+        try {
+          plan.getReplanner().plan(plan, null);
+        } catch (PlanningException e) {
+          LOG.warn("Exception while trying to replan: {}", planQueueName, e);
+        }
+      } else {
+        shouldResize = true;
       }
     }
     // identify the reservations that have expired and new reservations that
@@ -133,7 +135,6 @@ public abstract class AbstractSchedulerPlanFollower 
implements PlanFollower {
     // garbage collect expired reservations
     cleanupExpiredQueues(planQueueName, plan.getMoveOnExpiry(), expired,
         defReservationQueue);
-
     // Add new reservations and update existing ones
     float totalAssignedCapacity = 0f;
     if (currentReservations != null) {
@@ -146,9 +147,8 @@ public abstract class AbstractSchedulerPlanFollower 
implements PlanFollower {
             planQueueName, e);
       }
       // sort allocations from the one giving up the most resources, to the
-      // one asking for the most
-      // avoid order-of-operation errors that temporarily violate 100%
-      // capacity bound
+      // one asking for the most avoid order-of-operation errors that
+      // temporarily violate 100% capacity bound
       List<ReservationAllocation> sortedAllocations =
           sortByDelta(
               new ArrayList<ReservationAllocation>(currentReservations), now,
@@ -162,10 +162,15 @@ public abstract class AbstractSchedulerPlanFollower 
implements PlanFollower {
         float targetCapacity = 0f;
         if (planResources.getMemory() > 0
             && planResources.getVirtualCores() > 0) {
+          if (shouldResize) {
+            capToAssign =
+                calculateReservationToPlanProportion(
+                    plan.getResourceCalculator(), planResources,
+                    reservedResources, capToAssign);
+          }
           targetCapacity =
-              calculateReservationToPlanRatio(clusterResources,
-                  planResources,
-                  capToAssign);
+              calculateReservationToPlanRatio(plan.getResourceCalculator(),
+                  clusterResources, planResources, capToAssign);
         }
         if (LOG.isDebugEnabled()) {
           LOG.debug(
@@ -211,7 +216,6 @@ public abstract class AbstractSchedulerPlanFollower 
implements PlanFollower {
     }
     LOG.info("Finished iteration of plan follower edit policy for plan: "
         + planQueueName);
-
     // Extension: update plan with app states,
     // useful to support smart replanning
   }
@@ -324,18 +328,34 @@ public abstract class AbstractSchedulerPlanFollower 
implements PlanFollower {
   protected abstract Queue getPlanQueue(String planQueueName);
 
   /**
+   * Resizes reservations based on currently available resources
+   */
+  private Resource calculateReservationToPlanProportion(
+      ResourceCalculator rescCalculator, Resource availablePlanResources,
+      Resource totalReservationResources, Resource reservationResources) {
+    return Resources.multiply(availablePlanResources, Resources.ratio(
+        rescCalculator, reservationResources, totalReservationResources));
+  }
+
+  /**
    * Calculates ratio of reservationResources to planResources
    */
-  protected abstract float calculateReservationToPlanRatio(
-      Resource clusterResources, Resource planResources,
-      Resource reservationResources);
+  private float calculateReservationToPlanRatio(
+      ResourceCalculator rescCalculator, Resource clusterResources,
+      Resource planResources, Resource reservationResources) {
+    return Resources.divide(rescCalculator, clusterResources,
+        reservationResources, planResources);
+  }
 
   /**
    * Check if plan resources are less than expected reservation resources
    */
-  protected abstract boolean arePlanResourcesLessThanReservations(
-      Resource clusterResources, Resource planResources,
-      Resource reservedResources);
+  private boolean arePlanResourcesLessThanReservations(
+      ResourceCalculator rescCalculator, Resource clusterResources,
+      Resource planResources, Resource reservedResources) {
+    return Resources.greaterThan(rescCalculator, clusterResources,
+        reservedResources, planResources);
+  }
 
   /**
    * Get a list of reservation queues for this planQueue
@@ -363,7 +383,7 @@ public abstract class AbstractSchedulerPlanFollower 
implements PlanFollower {
       Plan plan, Queue queue, Resource clusterResources);
 
   /**
-   * Get reservation queue resources if it exists otherwise return null
+   * Get reservation queue resources if it exists otherwise return null.
    */
   protected abstract Resource getReservationQueueResourceIfExists(Plan plan,
       ReservationId reservationId);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2798723a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java
index 61772c9..551f075 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java
@@ -81,22 +81,6 @@ public class CapacitySchedulerPlanFollower extends 
AbstractSchedulerPlanFollower
   }
 
   @Override
-  protected float calculateReservationToPlanRatio(
-      Resource clusterResources, Resource planResources,
-      Resource reservationResources) {
-    return Resources.divide(cs.getResourceCalculator(),
-        clusterResources, reservationResources, planResources);
-  }
-
-  @Override
-  protected boolean arePlanResourcesLessThanReservations(
-      Resource clusterResources, Resource planResources,
-      Resource reservedResources) {
-    return Resources.greaterThan(cs.getResourceCalculator(),
-        clusterResources, reservedResources, planResources);
-  }
-
-  @Override
   protected List<? extends Queue> getChildReservationQueues(Queue queue) {
     PlanQueue planQueue = (PlanQueue)queue;
     List<CSQueue> childQueues = planQueue.getChildQueues();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2798723a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/FairSchedulerPlanFollower.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/FairSchedulerPlanFollower.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/FairSchedulerPlanFollower.java
index 7ca03c5..2a57b22 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/FairSchedulerPlanFollower.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/FairSchedulerPlanFollower.java
@@ -30,7 +30,6 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueu
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
 import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.resource.Resources;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,20 +59,6 @@ public class FairSchedulerPlanFollower extends 
AbstractSchedulerPlanFollower {
   }
 
   @Override
-  protected float calculateReservationToPlanRatio(Resource clusterResources,
-      Resource planResources, Resource capToAssign) {
-    return Resources.divide(fs.getResourceCalculator(),
-        clusterResources, capToAssign, planResources);
-  }
-
-  @Override
-  protected boolean arePlanResourcesLessThanReservations(Resource
-      clusterResources, Resource planResources, Resource reservedResources) {
-    return Resources.greaterThan(fs.getResourceCalculator(),
-        clusterResources, reservedResources, planResources);
-  }
-
-  @Override
   protected List<? extends Queue> getChildReservationQueues(Queue queue) {
     FSQueue planQueue = (FSQueue)queue;
     List<FSQueue> childQueues = planQueue.getChildQueues();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2798723a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
index a887e24..7e2567b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java
@@ -33,6 +33,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
 import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
 import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
@@ -54,7 +55,7 @@ public class InMemoryPlan implements Plan {
   private static final Logger LOG = 
LoggerFactory.getLogger(InMemoryPlan.class);
 
   private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0);
-  private final RMContext rmContext;
+  private final RMStateStore rmStateStore;
 
   private TreeMap<ReservationInterval, Set<InMemoryReservationAllocation>> 
currentReservations =
       new TreeMap<ReservationInterval, Set<InMemoryReservationAllocation>>();
@@ -112,7 +113,7 @@ public class InMemoryPlan implements Plan {
     this.replanner = replanner;
     this.getMoveOnExpiry = getMoveOnExpiry;
     this.clock = clock;
-    this.rmContext = rmContext;
+    this.rmStateStore = rmContext.getStateStore();
   }
 
   @Override
@@ -174,8 +175,8 @@ public class InMemoryPlan implements Plan {
   }
 
   @Override
-  public boolean addReservation(ReservationAllocation reservation)
-      throws PlanningException {
+  public boolean addReservation(ReservationAllocation reservation,
+      boolean isRecovering) throws PlanningException {
     // Verify the allocation is memory based otherwise it is not supported
     InMemoryReservationAllocation inMemReservation =
         (InMemoryReservationAllocation) reservation;
@@ -198,9 +199,16 @@ public class InMemoryPlan implements Plan {
       }
       // Validate if we can accept this reservation, throws exception if
       // validation fails
-      policy.validate(this, inMemReservation);
-      // we record here the time in which the allocation has been accepted
-      reservation.setAcceptanceTimestamp(clock.getTime());
+      if (!isRecovering) {
+        policy.validate(this, inMemReservation);
+        // we record here the time in which the allocation has been accepted
+        reservation.setAcceptanceTimestamp(clock.getTime());
+        if (rmStateStore != null) {
+          rmStateStore.storeNewReservation(
+              ReservationSystemUtil.buildStateProto(inMemReservation),
+              getQueueName(), inMemReservation.getReservationId().toString());
+        }
+      }
       ReservationInterval searchInterval =
           new ReservationInterval(inMemReservation.getStartTime(),
               inMemReservation.getEndTime());
@@ -217,9 +225,6 @@ public class InMemoryPlan implements Plan {
       currentReservations.put(searchInterval, reservations);
       reservationTable.put(inMemReservation.getReservationId(),
           inMemReservation);
-      rmContext.getStateStore().storeNewReservation(
-          ReservationSystemUtil.buildStateProto(inMemReservation),
-          getQueueName(), inMemReservation.getReservationId().toString());
       incrementAllocation(inMemReservation);
       LOG.info("Sucessfully added reservation: {} to plan.",
           inMemReservation.getReservationId());
@@ -253,7 +258,7 @@ public class InMemoryPlan implements Plan {
         return result;
       }
       try {
-        result = addReservation(reservation);
+        result = addReservation(reservation, false);
       } catch (PlanningException e) {
         LOG.error("Unable to update reservation: {} from plan due to {}.",
             reservation.getReservationId(), e.getMessage());
@@ -264,7 +269,7 @@ public class InMemoryPlan implements Plan {
         return result;
       } else {
         // rollback delete
-        addReservation(currReservation);
+        addReservation(currReservation, false);
         LOG.info("Rollbacked update reservation: {} from plan.",
             reservation.getReservationId());
         return result;
@@ -282,6 +287,10 @@ public class InMemoryPlan implements Plan {
     Set<InMemoryReservationAllocation> reservations =
         currentReservations.get(searchInterval);
     if (reservations != null) {
+      if (rmStateStore != null) {
+        rmStateStore.removeReservation(getQueueName(),
+            reservation.getReservationId().toString());
+      }
       if (!reservations.remove(reservation)) {
         LOG.error("Unable to remove reservation: {} from plan.",
             reservation.getReservationId());
@@ -298,8 +307,6 @@ public class InMemoryPlan implements Plan {
       throw new IllegalArgumentException(errMsg);
     }
     reservationTable.remove(reservation.getReservationId());
-    rmContext.getStateStore().removeReservation(
-        getQueueName(), reservation.getReservationId().toString());
     decrementAllocation(reservation);
     LOG.info("Sucessfully deleted reservation: {} in plan.",
         reservation.getReservationId());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2798723a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java
index 60e201b..504a250 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java
@@ -32,10 +32,12 @@ public interface PlanEdit extends PlanContext, PlanView {
    * 
    * @param reservation the {@link ReservationAllocation} to be added to the
    *          plan
+   * @param isRecovering flag to indicate if reservation is being added as part
+   *          of failover or not
    * @return true if addition is successful, false otherwise
    */
-  public boolean addReservation(ReservationAllocation reservation)
-      throws PlanningException;
+  public boolean addReservation(ReservationAllocation reservation,
+      boolean isRecovering) throws PlanningException;
 
   /**
    * Updates an existing {@link ReservationAllocation} in the plan. This is

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2798723a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanFollower.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanFollower.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanFollower.java
index 6635314..a5c8ee1 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanFollower.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanFollower.java
@@ -71,8 +71,10 @@ public interface PlanFollower extends Runnable {
    * start time is imminent.
    * 
    * @param plan the Plan to synchronize
+   * @param shouldReplan replan on reduction of plan capacity if true or
+   *          proportionally scale down reservations if false
    */
-  public void synchronizePlan(Plan plan);
+  public void synchronizePlan(Plan plan, boolean shouldReplan);
 
   /**
    * Setter for the list of plans.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2798723a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java
index 7785885..56a08ef 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 
@@ -40,7 +41,7 @@ import java.util.Map;
  */
 @LimitedPrivate("yarn")
 @Unstable
-public interface ReservationSystem {
+public interface ReservationSystem extends Recoverable {
 
   /**
    * Set RMContext for {@link ReservationSystem}. This method should be called
@@ -82,8 +83,10 @@ public interface ReservationSystem {
    * the {@link ResourceScheduler}
    * 
    * @param planName the name of the {@link Plan} to be synchronized
+   * @param shouldReplan replan on reduction of plan capacity if true or
+   *          proportionally scale down reservations if false
    */
-  void synchronizePlan(String planName);
+  void synchronizePlan(String planName, boolean shouldReplan);
 
   /**
    * Return the time step (ms) at which the {@link PlanFollower} is invoked

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2798723a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
index 9a0a0f0..8b72b9f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java
@@ -94,7 +94,7 @@ public abstract class PlanningAlgorithm implements 
ReservationAgent {
     if (oldReservation != null) {
       return plan.updateReservation(capReservation);
     } else {
-      return plan.addReservation(capReservation);
+      return plan.addReservation(capReservation, false);
     }
 
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2798723a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestReservationSystemWithRMHA.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestReservationSystemWithRMHA.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestReservationSystemWithRMHA.java
index 6f74130..48a4d97 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestReservationSystemWithRMHA.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestReservationSystemWithRMHA.java
@@ -18,15 +18,20 @@
 package org.apache.hadoop.yarn.server.resourcemanager;
 
 import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
 import 
org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.ReservationDefinition;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import 
org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
 import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
 import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
@@ -38,7 +43,7 @@ import org.junit.Test;
 
 import java.util.Map;
 
-public class TestReservationSystemWithRMHA extends RMHATestBase{
+public class TestReservationSystemWithRMHA extends RMHATestBase {
 
   @Override
   public void setup() throws Exception {
@@ -56,7 +61,7 @@ public class TestReservationSystemWithRMHA extends 
RMHATestBase{
   public void testSubmitReservationAndCheckAfterFailover() throws Exception {
     startRMs();
 
-    addNodeCapacityToPlan();
+    addNodeCapacityToPlan(rm1, 102400, 100);
 
     ClientRMService clientService = rm1.getClientRMService();
 
@@ -72,8 +77,6 @@ public class TestReservationSystemWithRMHA extends 
RMHATestBase{
     ReservationId reservationID = response.getReservationId();
     Assert.assertNotNull(reservationID);
     LOG.info("Submit reservation response: " + reservationID);
-    ReservationDefinition reservationDefinition = request
-        .getReservationDefinition();
 
     // Do the failover
     explicitFailover();
@@ -87,12 +90,11 @@ public class TestReservationSystemWithRMHA extends 
RMHATestBase{
     Assert.assertNotNull(reservationStateMap.get(reservationID));
   }
 
-
   @Test
   public void testUpdateReservationAndCheckAfterFailover() throws Exception {
     startRMs();
 
-    addNodeCapacityToPlan();
+    addNodeCapacityToPlan(rm1, 102400, 100);
 
     ClientRMService clientService = rm1.getClientRMService();
 
@@ -108,17 +110,15 @@ public class TestReservationSystemWithRMHA extends 
RMHATestBase{
     ReservationId reservationID = response.getReservationId();
     Assert.assertNotNull(reservationID);
     LOG.info("Submit reservation response: " + reservationID);
-    ReservationDefinition reservationDefinition = request
-        .getReservationDefinition();
-
+    ReservationDefinition reservationDefinition =
+        request.getReservationDefinition();
 
     // Change any field
 
     long newDeadline = reservationDefinition.getDeadline() + 100;
     reservationDefinition.setDeadline(newDeadline);
-    ReservationUpdateRequest updateRequest =
-        ReservationUpdateRequest.newInstance(
-          reservationDefinition, reservationID);
+    ReservationUpdateRequest updateRequest = ReservationUpdateRequest
+        .newInstance(reservationDefinition, reservationID);
     rm1.updateReservationState(updateRequest);
 
     // Do the failover
@@ -140,7 +140,7 @@ public class TestReservationSystemWithRMHA extends 
RMHATestBase{
   public void testDeleteReservationAndCheckAfterFailover() throws Exception {
     startRMs();
 
-    addNodeCapacityToPlan();
+    addNodeCapacityToPlan(rm1, 102400, 100);
 
     ClientRMService clientService = rm1.getClientRMService();
 
@@ -156,7 +156,6 @@ public class TestReservationSystemWithRMHA extends 
RMHATestBase{
     ReservationId reservationID = response.getReservationId();
     Assert.assertNotNull(reservationID);
 
-
     // Delete the reservation
     ReservationDeleteRequest deleteRequest =
         ReservationDeleteRequest.newInstance(reservationID);
@@ -168,32 +167,31 @@ public class TestReservationSystemWithRMHA extends 
RMHATestBase{
     rm2.registerNode("127.0.0.1:1", 102400, 100);
 
     RMState state = rm2.getRMContext().getStateStore().loadState();
-    Assert.assertNull(state.getReservationState().get(
-        ReservationSystemTestUtil.reservationQ));
+    Assert.assertNull(state.getReservationState()
+        .get(ReservationSystemTestUtil.reservationQ));
   }
 
-  private void addNodeCapacityToPlan() {
+  private void addNodeCapacityToPlan(MockRM rm, int memory, int vCores) {
     try {
-      rm1.registerNode("127.0.0.1:1", 102400, 100);
+      rm.registerNode("127.0.0.1:1", memory, vCores);
       int attempts = 10;
       do {
         DrainDispatcher dispatcher =
             (DrainDispatcher) rm1.getRMContext().getDispatcher();
         dispatcher.await();
-        rm1.getRMContext().getReservationSystem().synchronizePlan(
-            ReservationSystemTestUtil.reservationQ);
-        if (rm1.getRMContext().getReservationSystem().getPlan
-            (ReservationSystemTestUtil.reservationQ).getTotalCapacity()
+        rm.getRMContext().getReservationSystem()
+            .synchronizePlan(ReservationSystemTestUtil.reservationQ, false);
+        if (rm.getRMContext().getReservationSystem()
+            .getPlan(ReservationSystemTestUtil.reservationQ).getTotalCapacity()
             .getMemory() > 0) {
           break;
         }
         LOG.info("Waiting for node capacity to be added to plan");
         Thread.sleep(100);
-      }
-      while (attempts-- > 0);
+      } while (attempts-- > 0);
       if (attempts <= 0) {
-        Assert.fail("Exhausted attempts in checking if node capacity was " +
-            "added to the plan");
+        Assert.fail("Exhausted attempts in checking if node capacity was "
+            + "added to the plan");
       }
 
     } catch (Exception e) {
@@ -205,8 +203,316 @@ public class TestReservationSystemWithRMHA extends 
RMHATestBase{
     Clock clock = new UTCClock();
     long arrival = clock.getTime();
     long duration = 60000;
-    long deadline = (long) (arrival + 1.05 * duration);
+    long deadline = (long) (arrival + duration + 1500);
     return ReservationSystemTestUtil.createSimpleReservationRequest(4, arrival,
         deadline, duration);
   }
+
+  private void validateReservation(Plan plan, ReservationId resId,
+      ReservationDefinition rDef) {
+    ReservationAllocation reservation = plan.getReservationById(resId);
+    Assert.assertNotNull(reservation);
+    Assert.assertEquals(rDef.getDeadline(),
+        reservation.getReservationDefinition().getDeadline());
+  }
+
+  @Test
+  public void testSubmitReservationFailoverAndDelete() throws Exception {
+    startRMs();
+
+    addNodeCapacityToPlan(rm1, 102400, 100);
+
+    ClientRMService clientService = rm1.getClientRMService();
+
+    // create a reservation
+    ReservationSubmissionRequest request = 
createReservationSubmissionRequest();
+    ReservationSubmissionResponse response = null;
+    try {
+      response = clientService.submitReservation(request);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(response);
+    ReservationId reservationID = response.getReservationId();
+    Assert.assertNotNull(reservationID);
+    LOG.info("Submit reservation response: " + reservationID);
+    ReservationDefinition reservationDefinition =
+        request.getReservationDefinition();
+
+    // Do the failover
+    explicitFailover();
+
+    addNodeCapacityToPlan(rm2, 102400, 100);
+
+    // check if reservation exists after failover
+    Plan plan = rm2.getRMContext().getReservationSystem()
+        .getPlan(ReservationSystemTestUtil.reservationQ);
+    validateReservation(plan, reservationID, reservationDefinition);
+
+    // delete the reservation
+    ReservationDeleteRequest deleteRequest =
+        ReservationDeleteRequest.newInstance(reservationID);
+    ReservationDeleteResponse deleteResponse = null;
+    clientService = rm2.getClientRMService();
+    try {
+      deleteResponse = clientService.deleteReservation(deleteRequest);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(deleteResponse);
+    Assert.assertNull(plan.getReservationById(reservationID));
+  }
+
+  @Test
+  public void testFailoverAndSubmitReservation() throws Exception {
+    startRMs();
+
+    addNodeCapacityToPlan(rm1, 102400, 100);
+
+    // Do the failover
+    explicitFailover();
+
+    addNodeCapacityToPlan(rm2, 102400, 100);
+
+    // create a reservation
+    ClientRMService clientService = rm2.getClientRMService();
+    ReservationSubmissionRequest request = 
createReservationSubmissionRequest();
+    ReservationSubmissionResponse response = null;
+    try {
+      response = clientService.submitReservation(request);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(response);
+    ReservationId reservationID = response.getReservationId();
+    Assert.assertNotNull(reservationID);
+    LOG.info("Submit reservation response: " + reservationID);
+    ReservationDefinition reservationDefinition =
+        request.getReservationDefinition();
+
+    // check if reservation is submitted successfully
+    Plan plan = rm2.getRMContext().getReservationSystem()
+        .getPlan(ReservationSystemTestUtil.reservationQ);
+    validateReservation(plan, reservationID, reservationDefinition);
+  }
+
+  @Test
+  public void testSubmitReservationFailoverAndUpdate() throws Exception {
+    startRMs();
+
+    addNodeCapacityToPlan(rm1, 102400, 100);
+
+    ClientRMService clientService = rm1.getClientRMService();
+
+    // create a reservation
+    ReservationSubmissionRequest request = 
createReservationSubmissionRequest();
+    ReservationSubmissionResponse response = null;
+    try {
+      response = clientService.submitReservation(request);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(response);
+    ReservationId reservationID = response.getReservationId();
+    Assert.assertNotNull(reservationID);
+    LOG.info("Submit reservation response: " + reservationID);
+    ReservationDefinition reservationDefinition =
+        request.getReservationDefinition();
+
+    // Do the failover
+    explicitFailover();
+
+    addNodeCapacityToPlan(rm2, 102400, 100);
+
+    // check if reservation exists after failover
+    Plan plan = rm2.getRMContext().getReservationSystem()
+        .getPlan(ReservationSystemTestUtil.reservationQ);
+    validateReservation(plan, reservationID, reservationDefinition);
+
+    // update the reservation
+    long newDeadline = reservationDefinition.getDeadline() + 100;
+    reservationDefinition.setDeadline(newDeadline);
+    ReservationUpdateRequest updateRequest = ReservationUpdateRequest
+        .newInstance(reservationDefinition, reservationID);
+    ReservationUpdateResponse updateResponse = null;
+    clientService = rm2.getClientRMService();
+    try {
+      updateResponse = clientService.updateReservation(updateRequest);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(updateResponse);
+    validateReservation(plan, reservationID, reservationDefinition);
+  }
+
+  @Test
+  public void testSubmitUpdateReservationFailoverAndDelete() throws Exception {
+    startRMs();
+
+    addNodeCapacityToPlan(rm1, 102400, 100);
+
+    ClientRMService clientService = rm1.getClientRMService();
+
+    // create a reservation
+    ReservationSubmissionRequest request = 
createReservationSubmissionRequest();
+    ReservationSubmissionResponse response = null;
+    try {
+      response = clientService.submitReservation(request);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(response);
+    ReservationId reservationID = response.getReservationId();
+    Assert.assertNotNull(reservationID);
+    LOG.info("Submit reservation response: " + reservationID);
+    ReservationDefinition reservationDefinition =
+        request.getReservationDefinition();
+
+    // check if reservation is submitted successfully
+    Plan plan = rm1.getRMContext().getReservationSystem()
+        .getPlan(ReservationSystemTestUtil.reservationQ);
+    validateReservation(plan, reservationID, reservationDefinition);
+
+    // update the reservation
+    long newDeadline = reservationDefinition.getDeadline() + 100;
+    reservationDefinition.setDeadline(newDeadline);
+    ReservationUpdateRequest updateRequest = ReservationUpdateRequest
+        .newInstance(reservationDefinition, reservationID);
+    ReservationUpdateResponse updateResponse = null;
+    try {
+      updateResponse = clientService.updateReservation(updateRequest);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(updateResponse);
+    validateReservation(plan, reservationID, reservationDefinition);
+
+    // Do the failover
+    explicitFailover();
+
+    addNodeCapacityToPlan(rm2, 102400, 100);
+
+    // check if reservation exists after failover
+    plan = rm2.getRMContext().getReservationSystem()
+        .getPlan(ReservationSystemTestUtil.reservationQ);
+    validateReservation(plan, reservationID, reservationDefinition);
+
+    // delete the reservation
+    ReservationDeleteRequest deleteRequest =
+        ReservationDeleteRequest.newInstance(reservationID);
+    ReservationDeleteResponse deleteResponse = null;
+    clientService = rm2.getClientRMService();
+    try {
+      deleteResponse = clientService.deleteReservation(deleteRequest);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(deleteResponse);
+    Assert.assertNull(plan.getReservationById(reservationID));
+  }
+
+  @Test
+  public void testReservationResizeAfterFailover() throws Exception {
+    startRMs();
+
+    addNodeCapacityToPlan(rm1, 102400, 100);
+
+    ClientRMService clientService = rm1.getClientRMService();
+
+    // create 3 reservations
+    ReservationSubmissionRequest request = 
createReservationSubmissionRequest();
+    ReservationDefinition reservationDefinition =
+        request.getReservationDefinition();
+    ReservationSubmissionResponse response = null;
+    try {
+      response = clientService.submitReservation(request);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(response);
+    ReservationId resID1 = response.getReservationId();
+    Assert.assertNotNull(resID1);
+    LOG.info("Submit reservation response: " + resID1);
+    try {
+      response = clientService.submitReservation(request);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(response);
+    ReservationId resID2 = response.getReservationId();
+    Assert.assertNotNull(resID2);
+    LOG.info("Submit reservation response: " + resID2);
+    try {
+      response = clientService.submitReservation(request);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(response);
+    ReservationId resID3 = response.getReservationId();
+    Assert.assertNotNull(resID3);
+    LOG.info("Submit reservation response: " + resID3);
+
+    // allow the reservations to become active
+    waitForReservationActivation(rm1, resID1,
+        ReservationSystemTestUtil.reservationQ);
+
+    // validate reservations before failover
+    Plan plan = rm1.getRMContext().getReservationSystem()
+        .getPlan(ReservationSystemTestUtil.reservationQ);
+    validateReservation(plan, resID1, reservationDefinition);
+    validateReservation(plan, resID2, reservationDefinition);
+    validateReservation(plan, resID3, reservationDefinition);
+    ResourceScheduler scheduler = rm1.getResourceScheduler();
+    QueueInfo resQ1 = scheduler.getQueueInfo(resID1.toString(), false, false);
+    Assert.assertEquals(0.05, resQ1.getCapacity(), 0.001f);
+    QueueInfo resQ2 = scheduler.getQueueInfo(resID2.toString(), false, false);
+    Assert.assertEquals(0.05, resQ2.getCapacity(), 0.001f);
+    QueueInfo resQ3 = scheduler.getQueueInfo(resID3.toString(), false, false);
+    Assert.assertEquals(0.05, resQ3.getCapacity(), 0.001f);
+
+    // Do the failover
+    explicitFailover();
+
+    addNodeCapacityToPlan(rm2, 5120, 5);
+
+    // check if reservations exists after failover
+    plan = rm2.getRMContext().getReservationSystem()
+        .getPlan(ReservationSystemTestUtil.reservationQ);
+    validateReservation(plan, resID1, reservationDefinition);
+    validateReservation(plan, resID3, reservationDefinition);
+
+    // verify if the reservations have been resized
+    scheduler = rm2.getResourceScheduler();
+    resQ1 = scheduler.getQueueInfo(resID1.toString(), false, false);
+    Assert.assertEquals(1f / 3f, resQ1.getCapacity(), 0.001f);
+    resQ2 = scheduler.getQueueInfo(resID2.toString(), false, false);
+    Assert.assertEquals(1f / 3f, resQ2.getCapacity(), 0.001f);
+    resQ3 = scheduler.getQueueInfo(resID3.toString(), false, false);
+    Assert.assertEquals(1f / 3f, resQ3.getCapacity(), 0.001f);
+  }
+
+  private void waitForReservationActivation(MockRM rm,
+      ReservationId reservationId, String planName) {
+    try {
+      int attempts = 20;
+      do {
+        rm.getRMContext().getReservationSystem().synchronizePlan(planName,
+            false);
+        if (rm.getResourceScheduler()
+            .getQueueInfo(reservationId.toString(), false, false)
+            .getCapacity() > 0f) {
+          break;
+        }
+        LOG.info("Waiting for reservation to be active");
+        Thread.sleep(100);
+      } while (attempts-- > 0);
+      if (attempts <= 0) {
+        Assert
+            .fail("Exceeded attempts in waiting for reservation to be active");
+      }
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2798723a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
index 7729172..05933f5 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
@@ -215,7 +215,6 @@ public class ReservationSystemTestUtil {
     return context;
   }
 
-  @SuppressWarnings("unchecked")
   public CapacityScheduler mockCapacityScheduler(int numContainers)
       throws IOException {
     // stolen from TestCapacityScheduler

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2798723a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
index 22ce6aa..4aed064 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java
@@ -75,10 +75,11 @@ public class TestCapacityOverTimePolicy {
     maxAlloc = Resource.newInstance(1024 * 8, 8);
 
     mAgent = mock(ReservationAgent.class);
-    ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
     QueueMetrics rootQueueMetrics = mock(QueueMetrics.class);
-    String reservationQ = testUtil.getFullReservationQueueName();
-    Resource clusterResource = testUtil.calculateClusterResource(totCont);
+    String reservationQ =
+        ReservationSystemTestUtil.getFullReservationQueueName();
+    Resource clusterResource =
+        ReservationSystemTestUtil.calculateClusterResource(totCont);
     ReservationSchedulerConfiguration conf =
         ReservationSystemTestUtil.createConf(reservationQ, timeWindow,
             instConstraint, avgConstraint);
@@ -113,7 +114,7 @@ public class TestCapacityOverTimePolicy {
             ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
             "dedicated", initTime, initTime + f.length,
             ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-            res, minAlloc)));
+            res, minAlloc), false));
   }
 
   @Test
@@ -130,7 +131,7 @@ public class TestCapacityOverTimePolicy {
             ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
             "dedicated", initTime, initTime + f.length,
             ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-            res, minAlloc)));
+            res, minAlloc), false));
   }
 
   @Test
@@ -146,7 +147,7 @@ public class TestCapacityOverTimePolicy {
               ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i,
               "dedicated", initTime, initTime + f.length,
               ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-              res, minAlloc)));
+              res, minAlloc), false));
     }
   }
 
@@ -163,7 +164,7 @@ public class TestCapacityOverTimePolicy {
               ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i,
               "dedicated", initTime, initTime + f.length,
               ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-              res, minAlloc)));
+              res, minAlloc), false));
     }
   }
 
@@ -179,7 +180,7 @@ public class TestCapacityOverTimePolicy {
             ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
             "dedicated", initTime, initTime + f.length,
             ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-            res, minAlloc)));
+            res, minAlloc), false));
     Assert.fail("should not have accepted this");
   }
 
@@ -195,20 +196,20 @@ public class TestCapacityOverTimePolicy {
             ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
             "dedicated", initTime, initTime + f.length,
             ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-            res, minAlloc)));
+            res, minAlloc), false));
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(
             ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
             "dedicated", initTime, initTime + f.length,
             ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-            res, minAlloc)));
+            res, minAlloc), false));
     try {
       assertTrue(plan.toString(),
           plan.addReservation(new InMemoryReservationAllocation(
               ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
               "dedicated", initTime, initTime + f.length,
               ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-              res, minAlloc)));
+              res, minAlloc), false));
       Assert.fail();
     } catch (PlanningQuotaException p) {
       // expected
@@ -232,7 +233,7 @@ public class TestCapacityOverTimePolicy {
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(
             ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
-            "dedicated", initTime, initTime + win, req, res, minAlloc)));
+            "dedicated", initTime, initTime + win, req, res, minAlloc), 
false));
   }
 
   @Test
@@ -251,13 +252,13 @@ public class TestCapacityOverTimePolicy {
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(
             ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
-            "dedicated", initTime, initTime + win, req, res, minAlloc)));
+            "dedicated", initTime, initTime + win, req, res, minAlloc), 
false));
 
     try {
       assertTrue(plan.toString(),
           plan.addReservation(new InMemoryReservationAllocation(
               ReservationSystemTestUtil.getNewReservationId(), null, "u1",
-              "dedicated", initTime, initTime + win, req, res, minAlloc)));
+              "dedicated", initTime, initTime + win, req, res, minAlloc), 
false));
 
       Assert.fail("should not have accepted this");
     } catch (PlanningQuotaException e) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2798723a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
index c661271..2e262a0 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java
@@ -113,7 +113,7 @@ public class TestInMemoryPlan {
             start, start + alloc.length, allocs, resCalc, minAlloc);
     Assert.assertNull(plan.getReservationById(reservationID));
     try {
-      plan.addReservation(rAllocation);
+      plan.addReservation(rAllocation, false);
     } catch (PlanningException e) {
       Assert.fail(e.getMessage());
     }
@@ -147,7 +147,7 @@ public class TestInMemoryPlan {
             start, start + alloc.length, allocs, resCalc, minAlloc);
     Assert.assertNull(plan.getReservationById(reservationID));
     try {
-      plan.addReservation(rAllocation);
+      plan.addReservation(rAllocation, false);
     } catch (PlanningException e) {
       Assert.fail(e.getMessage());
     }
@@ -175,7 +175,7 @@ public class TestInMemoryPlan {
             start, start + alloc.length, allocs, resCalc, minAlloc);
     Assert.assertNull(plan.getReservationById(reservationID));
     try {
-      plan.addReservation(rAllocation);
+      plan.addReservation(rAllocation, false);
     } catch (PlanningException e) {
       Assert.fail(e.getMessage());
     }
@@ -189,7 +189,7 @@ public class TestInMemoryPlan {
 
     // Try to add it again
     try {
-      plan.addReservation(rAllocation);
+      plan.addReservation(rAllocation, false);
       Assert.fail("Add should fail as it already exists");
     } catch (IllegalArgumentException e) {
       Assert.assertTrue(e.getMessage().endsWith("already exists"));
@@ -221,7 +221,7 @@ public class TestInMemoryPlan {
             start, start + alloc.length, allocs, resCalc, minAlloc);
     Assert.assertNull(plan.getReservationById(reservationID));
     try {
-      plan.addReservation(rAllocation);
+      plan.addReservation(rAllocation, false);
     } catch (PlanningException e) {
       Assert.fail(e.getMessage());
     }
@@ -316,7 +316,7 @@ public class TestInMemoryPlan {
             start, start + alloc.length, allocs, resCalc, minAlloc);
     Assert.assertNull(plan.getReservationById(reservationID));
     try {
-      plan.addReservation(rAllocation);
+      plan.addReservation(rAllocation, false);
     } catch (PlanningException e) {
       Assert.fail(e.getMessage());
     }
@@ -388,7 +388,7 @@ public class TestInMemoryPlan {
             minAlloc);
     Assert.assertNull(plan.getReservationById(reservationID1));
     try {
-      plan.addReservation(rAllocation);
+      plan.addReservation(rAllocation, false);
     } catch (PlanningException e) {
       Assert.fail(e.getMessage());
     }
@@ -419,7 +419,7 @@ public class TestInMemoryPlan {
             minAlloc);
     Assert.assertNull(plan.getReservationById(reservationID2));
     try {
-      plan.addReservation(rAllocation);
+      plan.addReservation(rAllocation, false);
     } catch (PlanningException e) {
       Assert.fail(e.getMessage());
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2798723a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java
index a5f3fb4df..28dd62e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java
@@ -61,10 +61,11 @@ public class TestNoOverCommitPolicy {
     maxAlloc = Resource.newInstance(1024 * 8, 8);
 
     mAgent = mock(ReservationAgent.class);
-    ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
-    String reservationQ = testUtil.getFullReservationQueueName();
+    String reservationQ =
+        ReservationSystemTestUtil.getFullReservationQueueName();
     QueueMetrics rootQueueMetrics = mock(QueueMetrics.class);
-    Resource clusterResource = testUtil.calculateClusterResource(totCont);
+    Resource clusterResource =
+        ReservationSystemTestUtil.calculateClusterResource(totCont);
     ReservationSchedulerConfiguration conf = mock
         (ReservationSchedulerConfiguration.class);
     NoOverCommitPolicy policy = new NoOverCommitPolicy();
@@ -97,7 +98,7 @@ public class TestNoOverCommitPolicy {
             ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
             "dedicated", initTime, initTime + f.length,
             ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-            res, minAlloc)));
+            res, minAlloc), false));
   }
 
   @Test
@@ -113,7 +114,7 @@ public class TestNoOverCommitPolicy {
             ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
             "dedicated", initTime, initTime + f.length,
             ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-            res, minAlloc)));
+            res, minAlloc), false));
   }
 
   @Test(expected = ResourceOverCommitException.class)
@@ -123,7 +124,7 @@ public class TestNoOverCommitPolicy {
     plan.addReservation(new InMemoryReservationAllocation(
         ReservationSystemTestUtil.getNewReservationId(), null, "u1",
         "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil
-            .generateAllocation(initTime, step, f), res, minAlloc));
+            .generateAllocation(initTime, step, f), res, minAlloc), false);
   }
 
   @Test(expected = MismatchedUserException.class)
@@ -137,7 +138,7 @@ public class TestNoOverCommitPolicy {
 
     plan.addReservation(new InMemoryReservationAllocation(rid, rDef, "u1",
         "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil
-            .generateAllocation(initTime, step, f), res, minAlloc));
+            .generateAllocation(initTime, step, f), res, minAlloc), false);
 
     // trying to update a reservation with a mismatching user
     plan.updateReservation(new InMemoryReservationAllocation(rid, rDef, "u2",
@@ -158,7 +159,7 @@ public class TestNoOverCommitPolicy {
               ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i,
               "dedicated", initTime, initTime + f.length,
               ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-              res, minAlloc)));
+              res, minAlloc), false));
     }
   }
 
@@ -175,7 +176,7 @@ public class TestNoOverCommitPolicy {
               ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i,
               "dedicated", initTime, initTime + f.length,
               ReservationSystemTestUtil.generateAllocation(initTime, step, f),
-              res, minAlloc)));
+              res, minAlloc), false));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2798723a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java
index 9689fce..b604799 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java
@@ -18,6 +18,11 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
+
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -38,12 +43,6 @@ import 
org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.junit.Assert;
 
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 public abstract class TestSchedulerPlanFollowerBase {
   final static int GB = 1024;
   protected Clock mClock = null;
@@ -75,20 +74,20 @@ public abstract class TestSchedulerPlanFollowerBase {
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(r1, rDef, "u3",
             "dedicated", 0, 0 + f1.length, ReservationSystemTestUtil
-                .generateAllocation(0L, 1L, f1), res, minAlloc)));
+                .generateAllocation(0L, 1L, f1), res, minAlloc), false));
 
     ReservationId r2 = ReservationId.newInstance(ts, 2);
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(r2, rDef, "u3",
             "dedicated", 3, 3 + f1.length, ReservationSystemTestUtil
-                .generateAllocation(3L, 1L, f1), res, minAlloc)));
+                .generateAllocation(3L, 1L, f1), res, minAlloc), false));
 
     ReservationId r3 = ReservationId.newInstance(ts, 3);
     int[] f2 = { 0, 10, 20, 10, 0 };
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(r3, rDef, "u4",
             "dedicated", 10, 10 + f2.length, ReservationSystemTestUtil
-                .generateAllocation(10L, 1L, f2), res, minAlloc)));
+                .generateAllocation(10L, 1L, f2), res, minAlloc), false));
 
     AbstractSchedulerPlanFollower planFollower = createPlanFollower();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2798723a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestAlignedPlanner.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestAlignedPlanner.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestAlignedPlanner.java
index 01b7976..ec305a2 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestAlignedPlanner.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestAlignedPlanner.java
@@ -711,9 +711,8 @@ public class TestAlignedPlanner {
 
     Resource clusterCapacity = Resource.newInstance(capacityMem, 
capacityCores);
 
-    // Set configuration
-    ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
-    String reservationQ = testUtil.getFullReservationQueueName();
+    String reservationQ =
+        ReservationSystemTestUtil.getFullReservationQueueName();
     float instConstraint = 100;
     float avgConstraint = 100;
 
@@ -792,7 +791,7 @@ public class TestAlignedPlanner {
             ReservationSystemTestUtil.getNewReservationId(), rDef,
             "user_fixed", "dedicated", start, start + f.length * step,
             ReservationSystemTestUtil.generateAllocation(start, step, f), res,
-            minAlloc)));
+            minAlloc), false));
 
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2798723a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java
index f51cc75..cb4eaeb 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java
@@ -21,7 +21,6 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -76,8 +75,8 @@ public class TestGreedyReservationAgent {
     long timeWindow = 1000000L;
     Resource clusterCapacity = Resource.newInstance(100 * 1024, 100);
     step = 1000L;
-    ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
-    String reservationQ = testUtil.getFullReservationQueueName();
+    String reservationQ =
+        ReservationSystemTestUtil.getFullReservationQueueName();
 
     float instConstraint = 100;
     float avgConstraint = 100;
@@ -151,7 +150,7 @@ public class TestGreedyReservationAgent {
             ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
             "dedicated", 30 * step, 30 * step + f.length * step,
             ReservationSystemTestUtil.generateAllocation(30 * step, step, f),
-            res, minAlloc)));
+            res, minAlloc), false));
 
     // create a chain of 4 RR, mixing gang and non-gang
     ReservationDefinition rr = new ReservationDefinitionPBImpl();
@@ -208,7 +207,7 @@ public class TestGreedyReservationAgent {
             ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
             "dedicated", 30 * step, 30 * step + f.length * step,
             ReservationSystemTestUtil.generateAllocation(30 * step, step, f),
-            res, minAlloc)));
+            res, minAlloc), false));
 
     // create a chain of 4 RR, mixing gang and non-gang
     ReservationDefinition rr = new ReservationDefinitionPBImpl();
@@ -529,7 +528,7 @@ public class TestGreedyReservationAgent {
         plan.addReservation(new InMemoryReservationAllocation(
             ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
             "dedicated", 0L, 0L + f.length * step, ReservationSystemTestUtil
-                .generateAllocation(0, step, f), res, minAlloc)));
+                .generateAllocation(0, step, f), res, minAlloc), false));
 
     int[] f2 = { 5, 5, 5, 5, 5, 5, 5 };
     Map<ReservationInterval, Resource> alloc =
@@ -537,7 +536,8 @@ public class TestGreedyReservationAgent {
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(
             ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
-            "dedicated", 5000, 5000 + f2.length * step, alloc, res, 
minAlloc)));
+            "dedicated", 5000, 5000 + f2.length * step, alloc, res, minAlloc),
+        false));
 
     System.out.println("--------BEFORE AGENT----------");
     System.out.println(plan.toString());
@@ -563,7 +563,8 @@ public class TestGreedyReservationAgent {
     step = 1000L;
     ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
     CapacityScheduler scheduler = testUtil.mockCapacityScheduler(500 * 100);
-    String reservationQ = testUtil.getFullReservationQueueName();
+    String reservationQ =
+        ReservationSystemTestUtil.getFullReservationQueueName();
     float instConstraint = 100;
     float avgConstraint = 100;
     ReservationSchedulerConfiguration conf =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2798723a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java
index b6e6667..eb0b0e2 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java
@@ -93,44 +93,44 @@ public class TestSimpleCapacityReplanner {
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(r1, rDef, "u3",
             "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
-            minAlloc)));
+            minAlloc), false));
     when(clock.getTime()).thenReturn(1L);
     ReservationId r2 = ReservationId.newInstance(ts, 2);
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(r2, rDef, "u4",
             "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
-            minAlloc)));
+            minAlloc), false));
     when(clock.getTime()).thenReturn(2L);
     ReservationId r3 = ReservationId.newInstance(ts, 3);
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(r3, rDef, "u5",
             "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
-            minAlloc)));
+            minAlloc), false));
     when(clock.getTime()).thenReturn(3L);
     ReservationId r4 = ReservationId.newInstance(ts, 4);
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(r4, rDef, "u6",
             "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
-            minAlloc)));
+            minAlloc), false));
     when(clock.getTime()).thenReturn(4L);
     ReservationId r5 = ReservationId.newInstance(ts, 5);
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(r5, rDef, "u7",
             "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
-            minAlloc)));
+            minAlloc), false));
 
     int[] f6 = { 50, 50, 50, 50, 50 };
     ReservationId r6 = ReservationId.newInstance(ts, 6);
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(r6, rDef, "u3",
             "dedicated", 10, 10 + f6.length, generateAllocation(10, f6), res,
-            minAlloc)));
+            minAlloc), false));
     when(clock.getTime()).thenReturn(6L);
     ReservationId r7 = ReservationId.newInstance(ts, 7);
     assertTrue(plan.toString(),
         plan.addReservation(new InMemoryReservationAllocation(r7, rDef, "u4",
             "dedicated", 10, 10 + f6.length, generateAllocation(10, f6), res,
-            minAlloc)));
+            minAlloc), false));
 
     // remove some of the resources (requires replanning)
     plan.setTotalCapacity(Resource.newInstance(70 * 1024, 70));

Reply via email to