YARN-8151. Yarn RM Epoch should wrap around. Contributed by Young Chen.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e6a80e47 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e6a80e47 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e6a80e47 Branch: refs/heads/HDDS-4 Commit: e6a80e476d4348a4373e6dd5792d70edff16516f Parents: 87c23ef Author: Inigo Goiri <inigo...@apache.org> Authored: Wed May 2 17:23:17 2018 -0700 Committer: Inigo Goiri <inigo...@apache.org> Committed: Wed May 2 17:23:17 2018 -0700 ---------------------------------------------------------------------- .../hadoop/yarn/conf/YarnConfiguration.java | 4 ++++ .../src/main/resources/yarn-default.xml | 7 +++++++ .../recovery/FileSystemRMStateStore.java | 4 ++-- .../recovery/LeveldbRMStateStore.java | 2 +- .../recovery/MemoryRMStateStore.java | 2 +- .../resourcemanager/recovery/RMStateStore.java | 18 +++++++++++++++++- .../resourcemanager/recovery/ZKRMStateStore.java | 4 ++-- .../recovery/RMStateStoreTestBase.java | 14 ++++++++++++++ .../recovery/TestFSRMStateStore.java | 1 + .../recovery/TestLeveldbRMStateStore.java | 1 + .../recovery/TestZKRMStateStore.java | 1 + 11 files changed, 51 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6a80e47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 8aa136d..5ba2e05 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -188,6 +188,10 @@ public class YarnConfiguration extends Configuration { public static final String RM_EPOCH = RM_PREFIX + "epoch"; public static final long DEFAULT_RM_EPOCH = 0L; + /** The epoch range before wrap around. 0 disables wrap around*/ + public static final String RM_EPOCH_RANGE = RM_EPOCH + ".range"; + public static final long DEFAULT_RM_EPOCH_RANGE = 0; + /** The address of the applications manager interface in the RM.*/ public static final String RM_ADDRESS = RM_PREFIX + "address"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6a80e47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 85915c2..4eb509f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -677,6 +677,13 @@ </property> <property> + <description>The range of values above base epoch that the RM will use before + wrapping around</description> + <name>yarn.resourcemanager.epoch.range</name> + <value>0</value> + </property> + + <property> <description>The list of RM nodes in the cluster when HA is enabled. See description of yarn.resourcemanager.ha .enabled for full details on how this is used.</description> http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6a80e47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index 19297bc..b797283 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -205,12 +205,12 @@ public class FileSystemRMStateStore extends RMStateStore { Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data)); currentEpoch = epoch.getEpoch(); // increment epoch and store it - byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto() + byte[] storeData = Epoch.newInstance(nextEpoch(currentEpoch)).getProto() .toByteArray(); updateFile(epochNodePath, storeData, false); } else { // initialize epoch file with 1 for the next time. - byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto() + byte[] storeData = Epoch.newInstance(nextEpoch(currentEpoch)).getProto() .toByteArray(); writeFileWithRetries(epochNodePath, storeData, false); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6a80e47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java index 36a8dfa..e7fb02f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java @@ -259,7 +259,7 @@ public class LeveldbRMStateStore extends RMStateStore { if (data != null) { currentEpoch = EpochProto.parseFrom(data).getEpoch(); } - EpochProto proto = Epoch.newInstance(currentEpoch + 1).getProto(); + EpochProto proto = Epoch.newInstance(nextEpoch(currentEpoch)).getProto(); db.put(dbKeyBytes, proto.toByteArray()); } catch (DBException e) { throw new IOException(e); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6a80e47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index 5041000..219e10a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -59,7 +59,7 @@ public class MemoryRMStateStore extends RMStateStore { @Override public synchronized long getAndIncrementEpoch() throws Exception { long currentEpoch = epoch; - epoch = epoch + 1; + epoch = nextEpoch(epoch); return currentEpoch; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6a80e47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index b4dd378..242b5d0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -104,6 +104,7 @@ public abstract class RMStateStore extends AbstractService { protected static final String VERSION_NODE = "RMVersionNode"; protected static final String EPOCH_NODE = "EpochNode"; protected long baseEpoch; + private long epochRange; protected ResourceManager resourceManager; private final ReadLock readLock; private final WriteLock writeLock; @@ -732,6 +733,8 @@ public abstract class RMStateStore extends AbstractService { // read the base epoch value from conf baseEpoch = conf.getLong(YarnConfiguration.RM_EPOCH, YarnConfiguration.DEFAULT_RM_EPOCH); + epochRange = conf.getLong(YarnConfiguration.RM_EPOCH_RANGE, + YarnConfiguration.DEFAULT_RM_EPOCH_RANGE); initInternal(conf); } @@ -818,7 +821,20 @@ public abstract class RMStateStore extends AbstractService { * Get the current epoch of RM and increment the value. */ public abstract long getAndIncrementEpoch() throws Exception; - + + /** + * Compute the next epoch value by incrementing by one. + * Wraps around if the epoch range is exceeded so that + * when federation is enabled epoch collisions can be avoided. + */ + protected long nextEpoch(long epoch){ + long epochVal = epoch - baseEpoch + 1; + if (epochRange > 0) { + epochVal %= epochRange; + } + return epochVal + baseEpoch; + } + /** * Blocking API * The derived class must recover state from the store and return a new http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6a80e47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 9073910..de1f1ad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -491,13 +491,13 @@ public class ZKRMStateStore extends RMStateStore { Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data)); currentEpoch = epoch.getEpoch(); // increment epoch and store it - byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto() + byte[] storeData = Epoch.newInstance(nextEpoch(currentEpoch)).getProto() .toByteArray(); zkManager.safeSetData(epochNodePath, storeData, -1, zkAcl, fencingNodePath); } else { // initialize epoch node with 1 for the next time. - byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto() + byte[] storeData = Epoch.newInstance(nextEpoch(currentEpoch)).getProto() .toByteArray(); zkManager.safeCreate(epochNodePath, storeData, zkAcl, CreateMode.PERSISTENT, zkAcl, fencingNodePath); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6a80e47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index 957d4ce..3454d72 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -94,6 +94,8 @@ public class RMStateStoreTestBase { protected final long epoch = 10L; + private final long epochRange = 10L; + static class TestDispatcher implements Dispatcher, EventHandler<Event> { ApplicationAttemptId attemptId; @@ -141,6 +143,10 @@ public class RMStateStoreTestBase { boolean attemptExists(RMAppAttempt attempt) throws Exception; } + public long getEpochRange() { + return epochRange; + } + void waitNotify(TestDispatcher dispatcher) { long startTime = System.currentTimeMillis(); while(!dispatcher.notified) { @@ -576,6 +582,14 @@ public class RMStateStoreTestBase { long thirdTimeEpoch = store.getAndIncrementEpoch(); Assert.assertEquals(epoch + 2, thirdTimeEpoch); + + for (int i = 0; i < epochRange; ++i) { + store.getAndIncrementEpoch(); + } + long wrappedEpoch = store.getAndIncrementEpoch(); + // Epoch should have wrapped around and then incremented once for a total + // of + 3 + Assert.assertEquals(epoch + 3, wrappedEpoch); } public void testAppDeletion(RMStateStoreHelper stateStoreHelper) http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6a80e47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java index fe4a701..14f5404 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java @@ -118,6 +118,7 @@ public class TestFSRMStateStore extends RMStateStoreTestBase { conf.setLong(YarnConfiguration.FS_RM_STATE_STORE_RETRY_INTERVAL_MS, 900L); conf.setLong(YarnConfiguration.RM_EPOCH, epoch); + conf.setLong(YarnConfiguration.RM_EPOCH_RANGE, getEpochRange()); if (adminCheckEnable) { conf.setBoolean( YarnConfiguration.YARN_INTERMEDIATE_DATA_ENCRYPTION, true); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6a80e47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java index afd0c77..576ee7f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java @@ -83,6 +83,7 @@ public class TestLeveldbRMStateStore extends RMStateStoreTestBase { @Test(timeout = 60000) public void testEpoch() throws Exception { conf.setLong(YarnConfiguration.RM_EPOCH, epoch); + conf.setLong(YarnConfiguration.RM_EPOCH_RANGE, getEpochRange()); LeveldbStateStoreTester tester = new LeveldbStateStoreTester(); testEpoch(tester); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6a80e47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java index d8718e0..4cba266 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java @@ -210,6 +210,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase { curatorTestingServer.getConnectString()); conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode); conf.setLong(YarnConfiguration.RM_EPOCH, epoch); + conf.setLong(YarnConfiguration.RM_EPOCH_RANGE, getEpochRange()); this.store = new TestZKRMStateStoreInternal(conf, workingZnode); return this.store; } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org