YARN-3738. Add support for recovery of reserved apps running under dynamic queues (subru via asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ab8eb877 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ab8eb877 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ab8eb877 Branch: refs/heads/HDFS-8966 Commit: ab8eb8770c8b8bff41dacb1a399d75906abb1ac4 Parents: 446212a Author: Arun Suresh <asur...@apache.org> Authored: Sat Oct 24 22:53:10 2015 -0700 Committer: Arun Suresh <asur...@apache.org> Committed: Sat Oct 24 22:53:10 2015 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/capacity/CapacityScheduler.java | 27 ++-- .../scheduler/fair/FairScheduler.java | 11 +- .../TestWorkPreservingRMRestart.java | 156 ++++++++++++++++++- .../reservation/ReservationSystemTestUtil.java | 12 ++ 5 files changed, 196 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab8eb877/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 0641091..22e4294 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -537,6 +537,9 @@ Release 2.8.0 - UNRELEASED YARN-4296. DistributedShell Log.info is not friendly. (Xiaowei Wang via stevel) + YARN-3738. Add support for recovery of reserved apps running under dynamic + queues (subru via asuresh) + OPTIMIZATIONS YARN-3339. TestDockerContainerExecutor should pull a single image and not http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab8eb877/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 6e356b5..1075ee0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1320,10 +1320,9 @@ public class CapacityScheduler extends case APP_ADDED: { AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; - String queueName = - resolveReservationQueueName(appAddedEvent.getQueue(), - appAddedEvent.getApplicationId(), - appAddedEvent.getReservationID()); + String queueName = resolveReservationQueueName(appAddedEvent.getQueue(), + appAddedEvent.getApplicationId(), appAddedEvent.getReservationID(), + appAddedEvent.getIsAppRecovering()); if (queueName != null) { if (!appAddedEvent.getIsAppRecovering()) { addApplication(appAddedEvent.getApplicationId(), queueName, @@ -1664,8 +1663,13 @@ public class CapacityScheduler extends } } + private String getDefaultReservationQueueName(String planQueueName) { + return planQueueName + ReservationConstants.DEFAULT_QUEUE_SUFFIX; + } + private synchronized String resolveReservationQueueName(String queueName, - ApplicationId applicationId, ReservationId reservationID) { + ApplicationId applicationId, ReservationId reservationID, + boolean isRecovering) { CSQueue queue = getQueue(queueName); // Check if the queue is a plan queue if ((queue == null) || !(queue instanceof PlanQueue)) { @@ -1675,10 +1679,15 @@ public class CapacityScheduler extends String resQName = reservationID.toString(); queue = getQueue(resQName); if (queue == null) { + // reservation has terminated during failover + if (isRecovering + && conf.getMoveOnExpiry(getQueue(queueName).getQueuePath())) { + // move to the default child queue of the plan + return getDefaultReservationQueueName(queueName); + } String message = - "Application " - + applicationId - + " submitted to a reservation which is not yet currently active: " + "Application " + applicationId + + " submitted to a reservation which is not currently active: " + resQName; this.rmContext.getDispatcher().getEventHandler() .handle(new RMAppEvent(applicationId, @@ -1699,7 +1708,7 @@ public class CapacityScheduler extends queueName = resQName; } else { // use the default child queue of the plan for unreserved apps - queueName = queueName + ReservationConstants.DEFAULT_QUEUE_SUFFIX; + queueName = getDefaultReservationQueueName(queueName); } return queueName; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab8eb877/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 6355a1e..f26e506 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -1244,7 +1244,8 @@ public class FairScheduler extends String queueName = resolveReservationQueueName(appAddedEvent.getQueue(), appAddedEvent.getApplicationId(), - appAddedEvent.getReservationID()); + appAddedEvent.getReservationID(), + appAddedEvent.getIsAppRecovering()); if (queueName != null) { addApplication(appAddedEvent.getApplicationId(), queueName, appAddedEvent.getUser(), @@ -1317,7 +1318,8 @@ public class FairScheduler extends } private synchronized String resolveReservationQueueName(String queueName, - ApplicationId applicationId, ReservationId reservationID) { + ApplicationId applicationId, ReservationId reservationID, + boolean isRecovering) { FSQueue queue = queueMgr.getQueue(queueName); if ((queue == null) || !allocConf.isReservable(queue.getQueueName())) { return queueName; @@ -1328,6 +1330,11 @@ public class FairScheduler extends String resQName = queueName + "." + reservationID.toString(); queue = queueMgr.getQueue(resQName); if (queue == null) { + // reservation has terminated during failover + if (isRecovering && allocConf.getMoveOnExpiry(queueName)) { + // move to the default child queue of the plan + return getDefaultQueueForPlanQueue(queueName); + } String message = "Application " + applicationId http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab8eb877/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index 479ee93..2ef427e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.io.File; import java.io.IOException; import java.net.UnknownHostException; import java.util.ArrayList; @@ -32,7 +33,9 @@ import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.Credentials; @@ -57,6 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityM import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -66,6 +70,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueInvalidException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; @@ -77,6 +82,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQu import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerTestBase; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; import org.apache.hadoop.yarn.util.ControlledClock; @@ -94,8 +101,6 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import org.mortbay.log.Log; - import com.google.common.base.Supplier; @@ -132,6 +137,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase if (rm2 != null) { rm2.stop(); } + conf = null; } // Test common scheduler state including SchedulerAttempt, SchedulerNode, @@ -257,6 +263,152 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase assertEquals((1L << 40) + 1L, schedulerAttempt.getNewContainerId()); } + private Configuration getSchedulerDynamicConfiguration() throws IOException { + conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true); + conf.setTimeDuration( + YarnConfiguration.RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP, 1L, + TimeUnit.SECONDS); + if (getSchedulerType() == SchedulerType.CAPACITY) { + CapacitySchedulerConfiguration schedulerConf = + new CapacitySchedulerConfiguration(conf); + ReservationSystemTestUtil.setupDynamicQueueConfiguration(schedulerConf); + return schedulerConf; + } else { + String allocFile = new File(FairSchedulerTestBase.TEST_DIR, + TestWorkPreservingRMRestart.class.getSimpleName() + ".xml") + .getAbsolutePath(); + ReservationSystemTestUtil.setupFSAllocationFile(allocFile); + conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class, + ResourceScheduler.class); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, allocFile); + return conf; + } + } + + // Test work preserving recovery of apps running under reservation. + // This involves: + // 1. Setting up a dynamic reservable queue, + // 2. Submitting an app to it, + // 3. Failing over RM, + // 4. Validating that the app is recovered post failover, + // 5. Check if all running containers are recovered, + // 6. Verify the scheduler state like attempt info, + // 7. Verify the queue/user metrics for the dynamic reservable queue. + @Test(timeout = 30000) + public void testDynamicQueueRecovery() throws Exception { + conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true); + conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, + DominantResourceCalculator.class.getName()); + + // 1. Set up dynamic reservable queue. + Configuration schedulerConf = getSchedulerDynamicConfiguration(); + int containerMemory = 1024; + Resource containerResource = Resource.newInstance(containerMemory, 1); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(schedulerConf); + rm1 = new MockRM(schedulerConf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + nm1.registerNode(); + // 2. Run plan follower to update the added node & then submit app to + // dynamic queue. + rm1.getRMContext().getReservationSystem() + .synchronizePlan(ReservationSystemTestUtil.reservationQ, true); + RMApp app1 = rm1.submitApp(200, "dynamicQApp", + UserGroupInformation.getCurrentUser().getShortUserName(), null, + ReservationSystemTestUtil.getReservationQueueName()); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // clear queue metrics + rm1.clearQueueMetrics(app1); + + // 3. Fail over (restart) RM. + rm2 = new MockRM(schedulerConf, memStore); + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + // 4. Validate app is recovered post failover. + RMApp recoveredApp1 = + rm2.getRMContext().getRMApps().get(app1.getApplicationId()); + RMAppAttempt loadedAttempt1 = recoveredApp1.getCurrentAppAttempt(); + NMContainerStatus amContainer = TestRMRestart.createNMContainerStatus( + am1.getApplicationAttemptId(), 1, ContainerState.RUNNING); + NMContainerStatus runningContainer = TestRMRestart.createNMContainerStatus( + am1.getApplicationAttemptId(), 2, ContainerState.RUNNING); + NMContainerStatus completedContainer = + TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 3, + ContainerState.COMPLETE); + + nm1.registerNode( + Arrays.asList(amContainer, runningContainer, completedContainer), null); + + // Wait for RM to settle down on recovering containers. + waitForNumContainersToRecover(2, rm2, am1.getApplicationAttemptId()); + Set<ContainerId> launchedContainers = + ((RMNodeImpl) rm2.getRMContext().getRMNodes().get(nm1.getNodeId())) + .getLaunchedContainers(); + assertTrue(launchedContainers.contains(amContainer.getContainerId())); + assertTrue(launchedContainers.contains(runningContainer.getContainerId())); + + // 5. Check RMContainers are re-recreated and the container state is + // correct. + rm2.waitForState(nm1, amContainer.getContainerId(), + RMContainerState.RUNNING); + rm2.waitForState(nm1, runningContainer.getContainerId(), + RMContainerState.RUNNING); + rm2.waitForContainerToComplete(loadedAttempt1, completedContainer); + + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm2.getResourceScheduler(); + SchedulerNode schedulerNode1 = scheduler.getSchedulerNode(nm1.getNodeId()); + + // ********* check scheduler node state.******* + // 2 running containers. + Resource usedResources = Resources.multiply(containerResource, 2); + Resource nmResource = + Resource.newInstance(nm1.getMemory(), nm1.getvCores()); + + assertTrue(schedulerNode1.isValidContainer(amContainer.getContainerId())); + assertTrue( + schedulerNode1.isValidContainer(runningContainer.getContainerId())); + assertFalse( + schedulerNode1.isValidContainer(completedContainer.getContainerId())); + // 2 launched containers, 1 completed container + assertEquals(2, schedulerNode1.getNumContainers()); + + assertEquals(Resources.subtract(nmResource, usedResources), + schedulerNode1.getAvailableResource()); + assertEquals(usedResources, schedulerNode1.getUsedResource()); + Resource availableResources = Resources.subtract(nmResource, usedResources); + + // 6. Verify the scheduler state like attempt info. + Map<ApplicationId, SchedulerApplication<SchedulerApplicationAttempt>> sa = + ((AbstractYarnScheduler) rm2.getResourceScheduler()) + .getSchedulerApplications(); + SchedulerApplication<SchedulerApplicationAttempt> schedulerApp = + sa.get(recoveredApp1.getApplicationId()); + + // 7. Verify the queue/user metrics for the dynamic reservable queue. + if (getSchedulerType() == SchedulerType.CAPACITY) { + checkCSQueue(rm2, schedulerApp, nmResource, nmResource, usedResources, 2); + } else { + checkFSQueue(rm2, schedulerApp, usedResources, availableResources); + } + + // *********** check scheduler attempt state.******** + SchedulerApplicationAttempt schedulerAttempt = + schedulerApp.getCurrentAppAttempt(); + assertTrue(schedulerAttempt.getLiveContainers() + .contains(scheduler.getRMContainer(amContainer.getContainerId()))); + assertTrue(schedulerAttempt.getLiveContainers() + .contains(scheduler.getRMContainer(runningContainer.getContainerId()))); + assertEquals(schedulerAttempt.getCurrentConsumption(), usedResources); + + // *********** check appSchedulingInfo state *********** + assertEquals((1L << 40) + 1L, schedulerAttempt.getNewContainerId()); + } + private void checkCSQueue(MockRM rm, SchedulerApplication<SchedulerApplicationAttempt> app, Resource clusterResource, Resource queueResource, Resource usedResource, http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab8eb877/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java index 05933f5..0aedc6a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java @@ -304,6 +304,18 @@ public class ReservationSystemTestUtil { conf.setCapacity(A2, 70); } + public static void setupDynamicQueueConfiguration( + CapacitySchedulerConfiguration conf) { + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] { reservationQ }); + final String dedicated = CapacitySchedulerConfiguration.ROOT + + CapacitySchedulerConfiguration.DOT + reservationQ; + conf.setCapacity(dedicated, 100); + // Set as reservation queue + conf.setReservable(dedicated, true); + } + public static String getFullReservationQueueName() { return CapacitySchedulerConfiguration.ROOT + CapacitySchedulerConfiguration.DOT + reservationQ;