Repository: hadoop Updated Branches: refs/heads/trunk a3d9934f9 -> 485c96e3c
YARN-2001. Added a time threshold for RM to wait before starting container allocations after restart/failover. Contributed by Jian He. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/485c96e3 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/485c96e3 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/485c96e3 Branch: refs/heads/trunk Commit: 485c96e3cb9b0b05d6e490b4773506da83ebc61d Parents: a3d9934 Author: Vinod Kumar Vavilapalli <[email protected]> Authored: Thu Sep 18 11:03:12 2014 -0700 Committer: Vinod Kumar Vavilapalli <[email protected]> Committed: Thu Sep 18 11:03:12 2014 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 ++ .../hadoop/yarn/conf/YarnConfiguration.java | 5 ++ .../src/main/resources/yarn-default.xml | 10 ++++ .../yarn/server/resourcemanager/RMContext.java | 4 +- .../server/resourcemanager/RMContextImpl.java | 43 ++++++++++++++- .../server/resourcemanager/ResourceManager.java | 12 +++++ .../scheduler/capacity/CapacityScheduler.java | 4 ++ .../scheduler/fair/FairScheduler.java | 5 ++ .../scheduler/fifo/FifoScheduler.java | 6 +++ .../TestWorkPreservingRMRestart.java | 56 ++++++++++++++++++++ 10 files changed, 145 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/485c96e3/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 643fb14..c179c7f 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -235,6 +235,9 @@ Release 2.6.0 - UNRELEASED YARN-2557. Add a parameter "attempt_Failures_Validity_Interval" into DistributedShell (xgong) + YARN-2001. Added a time threshold for RM to wait before starting container + allocations after restart/failover. (Jian He via vinodkv) + OPTIMIZATIONS BUG FIXES http://git-wip-us.apache.org/repos/asf/hadoop/blob/485c96e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 44a6fc3..acc4a05 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -353,6 +353,11 @@ public class YarnConfiguration extends Configuration { public static final boolean DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED = false; + public static final String RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS = + RM_PREFIX + "work-preserving-recovery.scheduling-wait-ms"; + public static final long DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS = + 10000; + /** Zookeeper interaction configs */ public static final String RM_ZK_PREFIX = RM_PREFIX + "zk-"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/485c96e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 3a7e94a..a2c3fd0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -298,6 +298,16 @@ </property> <property> + <description>Set the amount of time RM waits before allocating new + containers on work-preserving-recovery. Such wait period gives RM a chance + to settle down resyncing with NMs in the cluster on recovery, before assigning + new containers to applications. + </description> + <name>yarn.resourcemanager.work-preserving-recovery.scheduling-wait-ms</name> + <value>10000</value> + </property> + + <property> <description>The class to use as the persistent store. If org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore http://git-wip-us.apache.org/repos/asf/hadoop/blob/485c96e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index 46ef432..60f88f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -108,4 +108,6 @@ public interface RMContext { boolean isWorkPreservingRecoveryEnabled(); long getEpoch(); -} \ No newline at end of file + + boolean isSchedulerReadyForAllocatingContainers(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/485c96e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 8a9b51e..36eec04 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -21,6 +21,9 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.yarn.LocalConfigurationProvider; @@ -44,6 +47,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRen import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; import com.google.common.annotations.VisibleForTesting; @@ -85,6 +90,13 @@ public class RMContextImpl implements RMContext { private SystemMetricsPublisher systemMetricsPublisher; private ConfigurationProvider configurationProvider; private long epoch; + private Clock systemClock = new SystemClock(); + private long schedulerRecoveryStartTime = 0; + private long schedulerRecoveryWaitTime = 0; + private boolean printLog = true; + private boolean isSchedulerReady = false; + + private static final Log LOG = LogFactory.getLog(RMContextImpl.class); /** * Default constructor. To be used in conjunction with setter methods for @@ -379,7 +391,34 @@ public class RMContextImpl implements RMContext { return this.epoch; } - void setEpoch(long epoch) { + void setEpoch(long epoch) { this.epoch = epoch; } -} \ No newline at end of file + + public void setSchedulerRecoveryStartAndWaitTime(long waitTime) { + this.schedulerRecoveryStartTime = systemClock.getTime(); + this.schedulerRecoveryWaitTime = waitTime; + } + + public boolean isSchedulerReadyForAllocatingContainers() { + if (isSchedulerReady) { + return isSchedulerReady; + } + isSchedulerReady = (systemClock.getTime() - schedulerRecoveryStartTime) + > schedulerRecoveryWaitTime; + if (!isSchedulerReady && printLog) { + LOG.info("Skip allocating containers. Scheduler is waiting for recovery."); + printLog = false; + } + if (isSchedulerReady) { + LOG.info("Scheduler recovery is done. Start allocating new containers."); + } + return isSchedulerReady; + } + + @Private + @VisibleForTesting + public void setSystemClock(Clock clock) { + this.systemClock = clock; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/485c96e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 0def615..79af7a6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -1131,6 +1131,8 @@ public class ResourceManager extends CompositeService implements Recoverable { // recover applications rmAppManager.recover(state); + + setSchedulerRecoveryStartAndWaitTime(state, conf); } public static void main(String argv[]) { @@ -1178,6 +1180,16 @@ public class ResourceManager extends CompositeService implements Recoverable { rmContext.setDispatcher(rmDispatcher); } + private void setSchedulerRecoveryStartAndWaitTime(RMState state, + Configuration conf) { + if (!state.getApplicationState().isEmpty()) { + long waitTime = + conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, + YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS); + rmContext.setSchedulerRecoveryStartAndWaitTime(waitTime); + } + } + /** * Retrieve RM bind address from configuration * http://git-wip-us.apache.org/repos/asf/hadoop/blob/485c96e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 6b810d7..bdfc819 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -902,6 +902,10 @@ public class CapacityScheduler extends } private synchronized void allocateContainersToNode(FiCaSchedulerNode node) { + if (rmContext.isWorkPreservingRecoveryEnabled() + && !rmContext.isSchedulerReadyForAllocatingContainers()) { + return; + } // Assign new containers... // 1. Check for reserved applications http://git-wip-us.apache.org/repos/asf/hadoop/blob/485c96e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 9c40d48..296d884 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -1015,6 +1015,11 @@ public class FairScheduler extends } private synchronized void attemptScheduling(FSSchedulerNode node) { + if (rmContext.isWorkPreservingRecoveryEnabled() + && !rmContext.isSchedulerReadyForAllocatingContainers()) { + return; + } + // Assign new containers... // 1. Check for reserved applications // 2. Schedule if there are no reservations http://git-wip-us.apache.org/repos/asf/hadoop/blob/485c96e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index d72e796..ea21c2b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -702,6 +702,12 @@ public class FifoScheduler extends completedContainer, RMContainerEventType.FINISHED); } + + if (rmContext.isWorkPreservingRecoveryEnabled() + && !rmContext.isSchedulerReadyForAllocatingContainers()) { + return; + } + if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource, node.getAvailableResource(),minimumAllocation)) { LOG.debug("Node heartbeat " + rmNode.getNodeID() + http://git-wip-us.apache.org/repos/asf/hadoop/blob/485c96e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index 02983c2..f1b5f14 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -37,10 +37,12 @@ import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; @@ -62,6 +64,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueu import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; +import org.apache.hadoop.yarn.util.ControlledClock; +import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -479,6 +483,7 @@ public class TestWorkPreservingRMRestart { @Test(timeout = 20000) public void testAMfailedBetweenRMRestart() throws Exception { MemoryRMStateStore memStore = new MemoryRMStateStore(); + conf.setLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, 0); memStore.init(conf); rm1 = new MockRM(conf, memStore); rm1.start(); @@ -762,4 +767,55 @@ public class TestWorkPreservingRMRestart { Thread.sleep(200); } } + + @Test (timeout = 20000) + public void testNewContainersNotAllocatedDuringSchedulerRecovery() + throws Exception { + conf.setLong( + YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, 4000); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + nm1.registerNode(); + RMApp app1 = rm1.submitApp(200); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // Restart RM + rm2 = new MockRM(conf, memStore); + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + nm1.registerNode(); + ControlledClock clock = new ControlledClock(new SystemClock()); + long startTime = System.currentTimeMillis(); + ((RMContextImpl)rm2.getRMContext()).setSystemClock(clock); + am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext()); + am1.registerAppAttempt(true); + rm2.waitForState(app1.getApplicationId(), RMAppState.RUNNING); + + // AM request for new containers + am1.allocate("127.0.0.1", 1000, 1, new ArrayList<ContainerId>()); + + List<Container> containers = new ArrayList<Container>(); + clock.setTime(startTime + 2000); + nm1.nodeHeartbeat(true); + + // sleep some time as allocation happens asynchronously. + Thread.sleep(3000); + containers.addAll(am1.allocate(new ArrayList<ResourceRequest>(), + new ArrayList<ContainerId>()).getAllocatedContainers()); + // container is not allocated during scheduling recovery. + Assert.assertTrue(containers.isEmpty()); + + clock.setTime(startTime + 8000); + nm1.nodeHeartbeat(true); + // Container is created after recovery is done. + while (containers.isEmpty()) { + containers.addAll(am1.allocate(new ArrayList<ResourceRequest>(), + new ArrayList<ContainerId>()).getAllocatedContainers()); + Thread.sleep(500); + } + } }
