YARN-5009. NMLeveldbStateStoreService database can grow substantially leading 
to longer recovery times. Contributed by Jason Lowe
(cherry picked from commit 4a8508501bc753858693dacdafba61d604702f71)

Conflicts:

        
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
        
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e12c9e33
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e12c9e33
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e12c9e33

Branch: refs/heads/branch-2.7
Commit: e12c9e3348aeb8c929661c43cd70f01b59c39e11
Parents: dbb7458
Author: Jason Lowe <jl...@apache.org>
Authored: Fri Apr 29 15:07:07 2016 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Fri Apr 29 15:07:07 2016 +0000

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     |  7 +++
 .../src/main/resources/yarn-default.xml         |  8 ++++
 .../recovery/NMLeveldbStateStoreService.java    | 45 +++++++++++++++++++-
 .../TestNMLeveldbStateStoreService.java         | 25 +++++++++++
 4 files changed, 83 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e12c9e33/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 0cfca20..8922e06 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1134,6 +1134,13 @@ public class YarnConfiguration extends Configuration {
 
   public static final String NM_RECOVERY_DIR = NM_RECOVERY_PREFIX + "dir";
 
+  /** The time in seconds between full compactions of the NM state database.
+   *  Setting the interval to zero disables the full compaction cycles.
+   */
+  public static final String NM_RECOVERY_COMPACTION_INTERVAL_SECS =
+      NM_RECOVERY_PREFIX + "compaction-interval-secs";
+  public static final int DEFAULT_NM_RECOVERY_COMPACTION_INTERVAL_SECS = 3600;
+
   ////////////////////////////////
   // Web Proxy Configs
   ////////////////////////////////

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e12c9e33/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 5664242..12e42f8 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1241,6 +1241,14 @@
   </property>
 
   <property>
+    <description>The time in seconds between full compactions of the NM state
+    database. Setting the interval to zero disables the full compaction
+    cycles.</description>
+    <name>yarn.nodemanager.recovery.compaction-interval-secs</name>
+    <value>3600</value>
+  </property>
+
+  <property>
     <description>
     The delay time ms to unregister container metrics after completion.
     </description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e12c9e33/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
index 0c9901c..36b7f81 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
@@ -28,6 +28,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Timer;
+import java.util.TimerTask;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -35,6 +37,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import 
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainerRequestPBImpl;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -123,6 +126,7 @@ public class NMLeveldbStateStoreService extends 
NMStateStoreService {
 
   private DB db;
   private boolean isNewlyCreated;
+  private Timer compactionTimer;
 
   public NMLeveldbStateStoreService() {
     super(NMLeveldbStateStoreService.class.getName());
@@ -134,6 +138,10 @@ public class NMLeveldbStateStoreService extends 
NMStateStoreService {
 
   @Override
   protected void closeStorage() throws IOException {
+    if (compactionTimer != null) {
+      compactionTimer.cancel();
+      compactionTimer = null;
+    }
     if (db != null) {
       db.close();
     }
@@ -942,6 +950,12 @@ public class NMLeveldbStateStoreService extends 
NMStateStoreService {
   @Override
   protected void initStorage(Configuration conf)
       throws IOException {
+    db = openDatabase(conf);
+    checkVersion();
+    startCompactionTimer(conf);
+  }
+
+  protected DB openDatabase(Configuration conf) throws IOException {
     Path storeRoot = createStorageDir(conf);
     Options options = new Options();
     options.createIfMissing(false);
@@ -966,7 +980,7 @@ public class NMLeveldbStateStoreService extends 
NMStateStoreService {
         throw e;
       }
     }
-    checkVersion();
+    return db;
   }
 
   private Path createStorageDir(Configuration conf) throws IOException {
@@ -982,6 +996,33 @@ public class NMLeveldbStateStoreService extends 
NMStateStoreService {
     return root;
   }
 
+  private void startCompactionTimer(Configuration conf) {
+    long intervalMsec = conf.getLong(
+        YarnConfiguration.NM_RECOVERY_COMPACTION_INTERVAL_SECS,
+        YarnConfiguration.DEFAULT_NM_RECOVERY_COMPACTION_INTERVAL_SECS) * 1000;
+    if (intervalMsec > 0) {
+      compactionTimer = new Timer(
+          this.getClass().getSimpleName() + " compaction timer", true);
+      compactionTimer.schedule(new CompactionTimerTask(),
+          intervalMsec, intervalMsec);
+    }
+  }
+
+
+  private class CompactionTimerTask extends TimerTask {
+    @Override
+    public void run() {
+      long start = Time.monotonicNow();
+      LOG.info("Starting full compaction cycle");
+      try {
+        db.compactRange(null, null);
+      } catch (DBException e) {
+        LOG.error("Error compacting database", e);
+      }
+      long duration = Time.monotonicNow() - start;
+      LOG.info("Full compaction cycle completed in " + duration + " msec");
+    }
+  }
 
   private static class LeveldbLogger implements Logger {
     private static final Log LOG = LogFactory.getLog(LeveldbLogger.class);
@@ -1039,7 +1080,7 @@ public class NMLeveldbStateStoreService extends 
NMStateStoreService {
    *    throw exception and indicate user to use a separate upgrade tool to
    *    upgrade NM state or remove incompatible old state.
    */
-  private void checkVersion() throws IOException {
+  protected void checkVersion() throws IOException {
     Version loadedVersion = loadVersion();
     LOG.info("Loaded NM state version info " + loadedVersion);
     if (loadedVersion.equals(getCurrentVersion())) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e12c9e33/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
index 41ec2d5..f2f43a9 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
@@ -23,6 +23,10 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.isNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
 
 import java.io.File;
 import java.io.IOException;
@@ -75,6 +79,7 @@ import 
org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.iq80.leveldb.DB;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -874,6 +879,26 @@ public class TestNMLeveldbStateStoreService {
     assertTrue(state.getLogDeleterMap().isEmpty());
   }
 
+  @Test
+  public void testCompactionCycle() throws IOException {
+    final DB mockdb = mock(DB.class);
+    conf.setInt(YarnConfiguration.NM_RECOVERY_COMPACTION_INTERVAL_SECS, 1);
+    NMLeveldbStateStoreService store = new NMLeveldbStateStoreService() {
+      @Override
+      protected void checkVersion() {}
+
+      @Override
+      protected DB openDatabase(Configuration conf) {
+        return mockdb;
+      }
+    };
+    store.init(conf);
+    store.start();
+    verify(mockdb, timeout(10000)).compactRange(
+        (byte[]) isNull(), (byte[]) isNull());
+    store.close();
+  }
+
   private static class NMTokenSecretManagerForTest extends
       BaseNMTokenSecretManager {
     public MasterKey generateKey() {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to