YARN-3738. Add support for recovery of reserved apps running under dynamic 
queues (subru via asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ab8eb877
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ab8eb877
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ab8eb877

Branch: refs/heads/HDFS-8966
Commit: ab8eb8770c8b8bff41dacb1a399d75906abb1ac4
Parents: 446212a
Author: Arun Suresh <asur...@apache.org>
Authored: Sat Oct 24 22:53:10 2015 -0700
Committer: Arun Suresh <asur...@apache.org>
Committed: Sat Oct 24 22:53:10 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../scheduler/capacity/CapacityScheduler.java   |  27 ++--
 .../scheduler/fair/FairScheduler.java           |  11 +-
 .../TestWorkPreservingRMRestart.java            | 156 ++++++++++++++++++-
 .../reservation/ReservationSystemTestUtil.java  |  12 ++
 5 files changed, 196 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab8eb877/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 0641091..22e4294 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -537,6 +537,9 @@ Release 2.8.0 - UNRELEASED
     YARN-4296. DistributedShell Log.info is not friendly.
     (Xiaowei Wang via stevel)
 
+    YARN-3738. Add support for recovery of reserved apps running under dynamic
+    queues (subru via asuresh)
+
   OPTIMIZATIONS
 
     YARN-3339. TestDockerContainerExecutor should pull a single image and not

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab8eb877/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 6e356b5..1075ee0 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -1320,10 +1320,9 @@ public class CapacityScheduler extends
     case APP_ADDED:
     {
       AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
-      String queueName =
-          resolveReservationQueueName(appAddedEvent.getQueue(),
-              appAddedEvent.getApplicationId(),
-              appAddedEvent.getReservationID());
+      String queueName = resolveReservationQueueName(appAddedEvent.getQueue(),
+          appAddedEvent.getApplicationId(), appAddedEvent.getReservationID(),
+          appAddedEvent.getIsAppRecovering());
       if (queueName != null) {
         if (!appAddedEvent.getIsAppRecovering()) {
           addApplication(appAddedEvent.getApplicationId(), queueName,
@@ -1664,8 +1663,13 @@ public class CapacityScheduler extends
     }
   }
 
+  private String getDefaultReservationQueueName(String planQueueName) {
+    return planQueueName + ReservationConstants.DEFAULT_QUEUE_SUFFIX;
+  }
+
   private synchronized String resolveReservationQueueName(String queueName,
-      ApplicationId applicationId, ReservationId reservationID) {
+      ApplicationId applicationId, ReservationId reservationID,
+      boolean isRecovering) {
     CSQueue queue = getQueue(queueName);
     // Check if the queue is a plan queue
     if ((queue == null) || !(queue instanceof PlanQueue)) {
@@ -1675,10 +1679,15 @@ public class CapacityScheduler extends
       String resQName = reservationID.toString();
       queue = getQueue(resQName);
       if (queue == null) {
+        // reservation has terminated during failover
+        if (isRecovering
+            && conf.getMoveOnExpiry(getQueue(queueName).getQueuePath())) {
+          // move to the default child queue of the plan
+          return getDefaultReservationQueueName(queueName);
+        }
         String message =
-            "Application "
-                + applicationId
-                + " submitted to a reservation which is not yet currently 
active: "
+            "Application " + applicationId
+                + " submitted to a reservation which is not currently active: "
                 + resQName;
         this.rmContext.getDispatcher().getEventHandler()
             .handle(new RMAppEvent(applicationId,
@@ -1699,7 +1708,7 @@ public class CapacityScheduler extends
       queueName = resQName;
     } else {
       // use the default child queue of the plan for unreserved apps
-      queueName = queueName + ReservationConstants.DEFAULT_QUEUE_SUFFIX;
+      queueName = getDefaultReservationQueueName(queueName);
     }
     return queueName;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab8eb877/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 6355a1e..f26e506 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -1244,7 +1244,8 @@ public class FairScheduler extends
       String queueName =
           resolveReservationQueueName(appAddedEvent.getQueue(),
               appAddedEvent.getApplicationId(),
-              appAddedEvent.getReservationID());
+              appAddedEvent.getReservationID(),
+              appAddedEvent.getIsAppRecovering());
       if (queueName != null) {
         addApplication(appAddedEvent.getApplicationId(),
             queueName, appAddedEvent.getUser(),
@@ -1317,7 +1318,8 @@ public class FairScheduler extends
   }
 
   private synchronized String resolveReservationQueueName(String queueName,
-      ApplicationId applicationId, ReservationId reservationID) {
+      ApplicationId applicationId, ReservationId reservationID,
+      boolean isRecovering) {
     FSQueue queue = queueMgr.getQueue(queueName);
     if ((queue == null) || !allocConf.isReservable(queue.getQueueName())) {
       return queueName;
@@ -1328,6 +1330,11 @@ public class FairScheduler extends
       String resQName = queueName + "." + reservationID.toString();
       queue = queueMgr.getQueue(resQName);
       if (queue == null) {
+        // reservation has terminated during failover
+        if (isRecovering && allocConf.getMoveOnExpiry(queueName)) {
+          // move to the default child queue of the plan
+          return getDefaultQueueForPlanQueue(queueName);
+        }
         String message =
             "Application "
                 + applicationId

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab8eb877/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
index 479ee93..2ef427e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.io.File;
 import java.io.IOException;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
@@ -32,7 +33,9 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.Credentials;
@@ -57,6 +60,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityM
 import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
 import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
+import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -66,6 +70,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueInvalidException;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
@@ -77,6 +82,8 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQu
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSAppAttempt;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueue;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerTestBase;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
 import org.apache.hadoop.yarn.util.ControlledClock;
@@ -94,8 +101,6 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.mortbay.log.Log;
-
 import com.google.common.base.Supplier;
 
 
@@ -132,6 +137,7 @@ public class TestWorkPreservingRMRestart extends 
ParameterizedSchedulerTestBase
     if (rm2 != null) {
       rm2.stop();
     }
+    conf = null;
   }
 
   // Test common scheduler state including SchedulerAttempt, SchedulerNode,
@@ -257,6 +263,152 @@ public class TestWorkPreservingRMRestart extends 
ParameterizedSchedulerTestBase
     assertEquals((1L << 40) + 1L, schedulerAttempt.getNewContainerId());
   }
 
+  private Configuration getSchedulerDynamicConfiguration() throws IOException {
+    conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
+    conf.setTimeDuration(
+        YarnConfiguration.RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP, 1L,
+        TimeUnit.SECONDS);
+    if (getSchedulerType() == SchedulerType.CAPACITY) {
+      CapacitySchedulerConfiguration schedulerConf =
+          new CapacitySchedulerConfiguration(conf);
+      ReservationSystemTestUtil.setupDynamicQueueConfiguration(schedulerConf);
+      return schedulerConf;
+    } else {
+      String allocFile = new File(FairSchedulerTestBase.TEST_DIR,
+          TestWorkPreservingRMRestart.class.getSimpleName() + ".xml")
+              .getAbsolutePath();
+      ReservationSystemTestUtil.setupFSAllocationFile(allocFile);
+      conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
+          ResourceScheduler.class);
+      conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, allocFile);
+      return conf;
+    }
+  }
+
+  // Test work preserving recovery of apps running under reservation.
+  // This involves:
+  // 1. Setting up a dynamic reservable queue,
+  // 2. Submitting an app to it,
+  // 3. Failing over RM,
+  // 4. Validating that the app is recovered post failover,
+  // 5. Check if all running containers are recovered,
+  // 6. Verify the scheduler state like attempt info,
+  // 7. Verify the queue/user metrics for the dynamic reservable queue.
+  @Test(timeout = 30000)
+  public void testDynamicQueueRecovery() throws Exception {
+    conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
+    conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
+        DominantResourceCalculator.class.getName());
+
+    // 1. Set up dynamic reservable queue.
+    Configuration schedulerConf = getSchedulerDynamicConfiguration();
+    int containerMemory = 1024;
+    Resource containerResource = Resource.newInstance(containerMemory, 1);
+
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(schedulerConf);
+    rm1 = new MockRM(schedulerConf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    // 2. Run plan follower to update the added node & then submit app to
+    // dynamic queue.
+    rm1.getRMContext().getReservationSystem()
+        .synchronizePlan(ReservationSystemTestUtil.reservationQ, true);
+    RMApp app1 = rm1.submitApp(200, "dynamicQApp",
+        UserGroupInformation.getCurrentUser().getShortUserName(), null,
+        ReservationSystemTestUtil.getReservationQueueName());
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    // clear queue metrics
+    rm1.clearQueueMetrics(app1);
+
+    // 3. Fail over (restart) RM.
+    rm2 = new MockRM(schedulerConf, memStore);
+    rm2.start();
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+    // 4. Validate app is recovered post failover.
+    RMApp recoveredApp1 =
+        rm2.getRMContext().getRMApps().get(app1.getApplicationId());
+    RMAppAttempt loadedAttempt1 = recoveredApp1.getCurrentAppAttempt();
+    NMContainerStatus amContainer = TestRMRestart.createNMContainerStatus(
+        am1.getApplicationAttemptId(), 1, ContainerState.RUNNING);
+    NMContainerStatus runningContainer = TestRMRestart.createNMContainerStatus(
+        am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
+    NMContainerStatus completedContainer =
+        TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 3,
+            ContainerState.COMPLETE);
+
+    nm1.registerNode(
+        Arrays.asList(amContainer, runningContainer, completedContainer), 
null);
+
+    // Wait for RM to settle down on recovering containers.
+    waitForNumContainersToRecover(2, rm2, am1.getApplicationAttemptId());
+    Set<ContainerId> launchedContainers =
+        ((RMNodeImpl) rm2.getRMContext().getRMNodes().get(nm1.getNodeId()))
+            .getLaunchedContainers();
+    assertTrue(launchedContainers.contains(amContainer.getContainerId()));
+    assertTrue(launchedContainers.contains(runningContainer.getContainerId()));
+
+    // 5. Check RMContainers are re-recreated and the container state is
+    // correct.
+    rm2.waitForState(nm1, amContainer.getContainerId(),
+        RMContainerState.RUNNING);
+    rm2.waitForState(nm1, runningContainer.getContainerId(),
+        RMContainerState.RUNNING);
+    rm2.waitForContainerToComplete(loadedAttempt1, completedContainer);
+
+    AbstractYarnScheduler scheduler =
+        (AbstractYarnScheduler) rm2.getResourceScheduler();
+    SchedulerNode schedulerNode1 = scheduler.getSchedulerNode(nm1.getNodeId());
+
+    // ********* check scheduler node state.*******
+    // 2 running containers.
+    Resource usedResources = Resources.multiply(containerResource, 2);
+    Resource nmResource =
+        Resource.newInstance(nm1.getMemory(), nm1.getvCores());
+
+    assertTrue(schedulerNode1.isValidContainer(amContainer.getContainerId()));
+    assertTrue(
+        schedulerNode1.isValidContainer(runningContainer.getContainerId()));
+    assertFalse(
+        schedulerNode1.isValidContainer(completedContainer.getContainerId()));
+    // 2 launched containers, 1 completed container
+    assertEquals(2, schedulerNode1.getNumContainers());
+
+    assertEquals(Resources.subtract(nmResource, usedResources),
+        schedulerNode1.getAvailableResource());
+    assertEquals(usedResources, schedulerNode1.getUsedResource());
+    Resource availableResources = Resources.subtract(nmResource, 
usedResources);
+
+    // 6. Verify the scheduler state like attempt info.
+    Map<ApplicationId, SchedulerApplication<SchedulerApplicationAttempt>> sa =
+        ((AbstractYarnScheduler) rm2.getResourceScheduler())
+            .getSchedulerApplications();
+    SchedulerApplication<SchedulerApplicationAttempt> schedulerApp =
+        sa.get(recoveredApp1.getApplicationId());
+
+    // 7. Verify the queue/user metrics for the dynamic reservable queue.
+    if (getSchedulerType() == SchedulerType.CAPACITY) {
+      checkCSQueue(rm2, schedulerApp, nmResource, nmResource, usedResources, 
2);
+    } else {
+      checkFSQueue(rm2, schedulerApp, usedResources, availableResources);
+    }
+
+    // *********** check scheduler attempt state.********
+    SchedulerApplicationAttempt schedulerAttempt =
+        schedulerApp.getCurrentAppAttempt();
+    assertTrue(schedulerAttempt.getLiveContainers()
+        .contains(scheduler.getRMContainer(amContainer.getContainerId())));
+    assertTrue(schedulerAttempt.getLiveContainers()
+        
.contains(scheduler.getRMContainer(runningContainer.getContainerId())));
+    assertEquals(schedulerAttempt.getCurrentConsumption(), usedResources);
+
+    // *********** check appSchedulingInfo state ***********
+    assertEquals((1L << 40) + 1L, schedulerAttempt.getNewContainerId());
+  }
+
   private void checkCSQueue(MockRM rm,
       SchedulerApplication<SchedulerApplicationAttempt> app,
       Resource clusterResource, Resource queueResource, Resource usedResource,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab8eb877/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
index 05933f5..0aedc6a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
@@ -304,6 +304,18 @@ public class ReservationSystemTestUtil {
     conf.setCapacity(A2, 70);
   }
 
+  public static void setupDynamicQueueConfiguration(
+      CapacitySchedulerConfiguration conf) {
+    // Define top-level queues
+    conf.setQueues(CapacitySchedulerConfiguration.ROOT,
+        new String[] { reservationQ });
+    final String dedicated = CapacitySchedulerConfiguration.ROOT
+        + CapacitySchedulerConfiguration.DOT + reservationQ;
+    conf.setCapacity(dedicated, 100);
+    // Set as reservation queue
+    conf.setReservable(dedicated, true);
+  }
+
   public static String getFullReservationQueueName() {
     return CapacitySchedulerConfiguration.ROOT
         + CapacitySchedulerConfiguration.DOT + reservationQ;

Reply via email to