YARN-5009. NMLeveldbStateStoreService database can grow substantially leading to longer recovery times. Contributed by Jason Lowe
(cherry picked from commit 4a8508501bc753858693dacdafba61d604702f71) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5ba79d77 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5ba79d77 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5ba79d77 Branch: refs/heads/branch-2.8 Commit: 5ba79d77fbb21ed0e870bc0eff9123c9335137d2 Parents: e62162d Author: Jian He <jia...@apache.org> Authored: Thu Apr 28 21:54:11 2016 -0700 Committer: Jian He <jia...@apache.org> Committed: Thu Apr 28 21:54:53 2016 -0700 ---------------------------------------------------------------------- .../hadoop/yarn/conf/YarnConfiguration.java | 7 +++ .../src/main/resources/yarn-default.xml | 8 ++++ .../recovery/NMLeveldbStateStoreService.java | 45 +++++++++++++++++++- .../TestNMLeveldbStateStoreService.java | 25 +++++++++++ 4 files changed, 83 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ba79d77/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 6305b90..c509b66 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1338,6 +1338,13 @@ public class YarnConfiguration extends Configuration { public static final String NM_RECOVERY_DIR = NM_RECOVERY_PREFIX + "dir"; + /** The time in seconds between full compactions of the NM state database. + * Setting the interval to zero disables the full compaction cycles. + */ + public static final String NM_RECOVERY_COMPACTION_INTERVAL_SECS = + NM_RECOVERY_PREFIX + "compaction-interval-secs"; + public static final int DEFAULT_NM_RECOVERY_COMPACTION_INTERVAL_SECS = 3600; + public static final String NM_RECOVERY_SUPERVISED = NM_RECOVERY_PREFIX + "supervised"; public static final boolean DEFAULT_NM_RECOVERY_SUPERVISED = false; http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ba79d77/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 12afee0..e251a12 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1633,6 +1633,14 @@ </property> <property> + <description>The time in seconds between full compactions of the NM state + database. Setting the interval to zero disables the full compaction + cycles.</description> + <name>yarn.nodemanager.recovery.compaction-interval-secs</name> + <value>3600</value> + </property> + + <property> <description>Whether the nodemanager is running under supervision. A nodemanager that supports recovery and is running under supervision will not try to cleanup containers as it exits with the assumption http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ba79d77/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 6f64cc7..1a8c6ff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -28,6 +28,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Timer; +import java.util.TimerTask; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -35,6 +37,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainerRequestPBImpl; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -128,6 +131,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { private DB db; private boolean isNewlyCreated; + private Timer compactionTimer; public NMLeveldbStateStoreService() { super(NMLeveldbStateStoreService.class.getName()); @@ -139,6 +143,10 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { @Override protected void closeStorage() throws IOException { + if (compactionTimer != null) { + compactionTimer.cancel(); + compactionTimer = null; + } if (db != null) { db.close(); } @@ -964,6 +972,12 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { @Override protected void initStorage(Configuration conf) throws IOException { + db = openDatabase(conf); + checkVersion(); + startCompactionTimer(conf); + } + + protected DB openDatabase(Configuration conf) throws IOException { Path storeRoot = createStorageDir(conf); Options options = new Options(); options.createIfMissing(false); @@ -988,7 +1002,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { throw e; } } - checkVersion(); + return db; } private Path createStorageDir(Configuration conf) throws IOException { @@ -1004,6 +1018,33 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { return root; } + private void startCompactionTimer(Configuration conf) { + long intervalMsec = conf.getLong( + YarnConfiguration.NM_RECOVERY_COMPACTION_INTERVAL_SECS, + YarnConfiguration.DEFAULT_NM_RECOVERY_COMPACTION_INTERVAL_SECS) * 1000; + if (intervalMsec > 0) { + compactionTimer = new Timer( + this.getClass().getSimpleName() + " compaction timer", true); + compactionTimer.schedule(new CompactionTimerTask(), + intervalMsec, intervalMsec); + } + } + + + private class CompactionTimerTask extends TimerTask { + @Override + public void run() { + long start = Time.monotonicNow(); + LOG.info("Starting full compaction cycle"); + try { + db.compactRange(null, null); + } catch (DBException e) { + LOG.error("Error compacting database", e); + } + long duration = Time.monotonicNow() - start; + LOG.info("Full compaction cycle completed in " + duration + " msec"); + } + } private static class LeveldbLogger implements Logger { private static final Log LOG = LogFactory.getLog(LeveldbLogger.class); @@ -1061,7 +1102,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { * throw exception and indicate user to use a separate upgrade tool to * upgrade NM state or remove incompatible old state. */ - private void checkVersion() throws IOException { + protected void checkVersion() throws IOException { Version loadedVersion = loadVersion(); LOG.info("Loaded NM state version info " + loadedVersion); if (loadedVersion.equals(getCurrentVersion())) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ba79d77/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index 47468d6..e44e5e5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -23,6 +23,10 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; +import static org.mockito.Mockito.isNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; import java.io.File; import java.io.IOException; @@ -75,6 +79,7 @@ import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager; import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.iq80.leveldb.DB; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -885,6 +890,26 @@ public class TestNMLeveldbStateStoreService { assertTrue(state.getLogDeleterMap().isEmpty()); } + @Test + public void testCompactionCycle() throws IOException { + final DB mockdb = mock(DB.class); + conf.setInt(YarnConfiguration.NM_RECOVERY_COMPACTION_INTERVAL_SECS, 1); + NMLeveldbStateStoreService store = new NMLeveldbStateStoreService() { + @Override + protected void checkVersion() {} + + @Override + protected DB openDatabase(Configuration conf) { + return mockdb; + } + }; + store.init(conf); + store.start(); + verify(mockdb, timeout(10000)).compactRange( + (byte[]) isNull(), (byte[]) isNull()); + store.close(); + } + private static class NMTokenSecretManagerForTest extends BaseNMTokenSecretManager { public MasterKey generateKey() { --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org