YARN-5009. NMLeveldbStateStoreService database can grow substantially leading 
to longer recovery times. Contributed by Jason Lowe

(cherry picked from commit 4a8508501bc753858693dacdafba61d604702f71)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5ba79d77
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5ba79d77
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5ba79d77

Branch: refs/heads/branch-2.8
Commit: 5ba79d77fbb21ed0e870bc0eff9123c9335137d2
Parents: e62162d
Author: Jian He <jia...@apache.org>
Authored: Thu Apr 28 21:54:11 2016 -0700
Committer: Jian He <jia...@apache.org>
Committed: Thu Apr 28 21:54:53 2016 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     |  7 +++
 .../src/main/resources/yarn-default.xml         |  8 ++++
 .../recovery/NMLeveldbStateStoreService.java    | 45 +++++++++++++++++++-
 .../TestNMLeveldbStateStoreService.java         | 25 +++++++++++
 4 files changed, 83 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ba79d77/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 6305b90..c509b66 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1338,6 +1338,13 @@ public class YarnConfiguration extends Configuration {
 
   public static final String NM_RECOVERY_DIR = NM_RECOVERY_PREFIX + "dir";
 
+  /** The time in seconds between full compactions of the NM state database.
+   *  Setting the interval to zero disables the full compaction cycles.
+   */
+  public static final String NM_RECOVERY_COMPACTION_INTERVAL_SECS =
+      NM_RECOVERY_PREFIX + "compaction-interval-secs";
+  public static final int DEFAULT_NM_RECOVERY_COMPACTION_INTERVAL_SECS = 3600;
+
   public static final String NM_RECOVERY_SUPERVISED =
       NM_RECOVERY_PREFIX + "supervised";
   public static final boolean DEFAULT_NM_RECOVERY_SUPERVISED = false;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ba79d77/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 12afee0..e251a12 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1633,6 +1633,14 @@
   </property>
 
   <property>
+    <description>The time in seconds between full compactions of the NM state
+    database. Setting the interval to zero disables the full compaction
+    cycles.</description>
+    <name>yarn.nodemanager.recovery.compaction-interval-secs</name>
+    <value>3600</value>
+  </property>
+
+  <property>
     <description>Whether the nodemanager is running under supervision. A
       nodemanager that supports recovery and is running under supervision
       will not try to cleanup containers as it exits with the assumption

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ba79d77/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
index 6f64cc7..1a8c6ff 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
@@ -28,6 +28,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Timer;
+import java.util.TimerTask;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -35,6 +37,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import 
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainerRequestPBImpl;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -128,6 +131,7 @@ public class NMLeveldbStateStoreService extends 
NMStateStoreService {
 
   private DB db;
   private boolean isNewlyCreated;
+  private Timer compactionTimer;
 
   public NMLeveldbStateStoreService() {
     super(NMLeveldbStateStoreService.class.getName());
@@ -139,6 +143,10 @@ public class NMLeveldbStateStoreService extends 
NMStateStoreService {
 
   @Override
   protected void closeStorage() throws IOException {
+    if (compactionTimer != null) {
+      compactionTimer.cancel();
+      compactionTimer = null;
+    }
     if (db != null) {
       db.close();
     }
@@ -964,6 +972,12 @@ public class NMLeveldbStateStoreService extends 
NMStateStoreService {
   @Override
   protected void initStorage(Configuration conf)
       throws IOException {
+    db = openDatabase(conf);
+    checkVersion();
+    startCompactionTimer(conf);
+  }
+
+  protected DB openDatabase(Configuration conf) throws IOException {
     Path storeRoot = createStorageDir(conf);
     Options options = new Options();
     options.createIfMissing(false);
@@ -988,7 +1002,7 @@ public class NMLeveldbStateStoreService extends 
NMStateStoreService {
         throw e;
       }
     }
-    checkVersion();
+    return db;
   }
 
   private Path createStorageDir(Configuration conf) throws IOException {
@@ -1004,6 +1018,33 @@ public class NMLeveldbStateStoreService extends 
NMStateStoreService {
     return root;
   }
 
+  private void startCompactionTimer(Configuration conf) {
+    long intervalMsec = conf.getLong(
+        YarnConfiguration.NM_RECOVERY_COMPACTION_INTERVAL_SECS,
+        YarnConfiguration.DEFAULT_NM_RECOVERY_COMPACTION_INTERVAL_SECS) * 1000;
+    if (intervalMsec > 0) {
+      compactionTimer = new Timer(
+          this.getClass().getSimpleName() + " compaction timer", true);
+      compactionTimer.schedule(new CompactionTimerTask(),
+          intervalMsec, intervalMsec);
+    }
+  }
+
+
+  private class CompactionTimerTask extends TimerTask {
+    @Override
+    public void run() {
+      long start = Time.monotonicNow();
+      LOG.info("Starting full compaction cycle");
+      try {
+        db.compactRange(null, null);
+      } catch (DBException e) {
+        LOG.error("Error compacting database", e);
+      }
+      long duration = Time.monotonicNow() - start;
+      LOG.info("Full compaction cycle completed in " + duration + " msec");
+    }
+  }
 
   private static class LeveldbLogger implements Logger {
     private static final Log LOG = LogFactory.getLog(LeveldbLogger.class);
@@ -1061,7 +1102,7 @@ public class NMLeveldbStateStoreService extends 
NMStateStoreService {
    *    throw exception and indicate user to use a separate upgrade tool to
    *    upgrade NM state or remove incompatible old state.
    */
-  private void checkVersion() throws IOException {
+  protected void checkVersion() throws IOException {
     Version loadedVersion = loadVersion();
     LOG.info("Loaded NM state version info " + loadedVersion);
     if (loadedVersion.equals(getCurrentVersion())) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ba79d77/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
index 47468d6..e44e5e5 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
@@ -23,6 +23,10 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.isNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
 
 import java.io.File;
 import java.io.IOException;
@@ -75,6 +79,7 @@ import 
org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.iq80.leveldb.DB;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -885,6 +890,26 @@ public class TestNMLeveldbStateStoreService {
     assertTrue(state.getLogDeleterMap().isEmpty());
   }
 
+  @Test
+  public void testCompactionCycle() throws IOException {
+    final DB mockdb = mock(DB.class);
+    conf.setInt(YarnConfiguration.NM_RECOVERY_COMPACTION_INTERVAL_SECS, 1);
+    NMLeveldbStateStoreService store = new NMLeveldbStateStoreService() {
+      @Override
+      protected void checkVersion() {}
+
+      @Override
+      protected DB openDatabase(Configuration conf) {
+        return mockdb;
+      }
+    };
+    store.init(conf);
+    store.start();
+    verify(mockdb, timeout(10000)).compactRange(
+        (byte[]) isNull(), (byte[]) isNull());
+    store.close();
+  }
+
   private static class NMTokenSecretManagerForTest extends
       BaseNMTokenSecretManager {
     public MasterKey generateKey() {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to