YARN-5008. LeveldbRMStateStore database can grow substantially leading to long 
recovery times. Contributed by Jason Lowe

(cherry picked from commit dd80042c42aadaa347db93028724f69c9aca69c6)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e62162db
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e62162db
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e62162db

Branch: refs/heads/branch-2.8
Commit: e62162db46df34bb47ed368e51dbbdd3f44c0ad3
Parents: 0d4fbf0
Author: Jian He <jia...@apache.org>
Authored: Thu Apr 28 21:27:25 2016 -0700
Committer: Jian He <jia...@apache.org>
Committed: Thu Apr 28 21:36:23 2016 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     |  7 ++++
 .../src/main/resources/yarn-default.xml         |  8 ++++
 .../recovery/LeveldbRMStateStore.java           | 42 ++++++++++++++++++++
 .../recovery/TestLeveldbRMStateStore.java       | 24 +++++++++++
 4 files changed, 81 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e62162db/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 8018d1c..6305b90 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -576,6 +576,13 @@ public class YarnConfiguration extends Configuration {
   public static final String RM_LEVELDB_STORE_PATH = RM_PREFIX
       + "leveldb-state-store.path";
 
+  /** The time in seconds between full compactions of the leveldb database.
+   *  Setting the interval to zero disables the full compaction cycles.
+   */
+  public static final String RM_LEVELDB_COMPACTION_INTERVAL_SECS = RM_PREFIX
+      + "leveldb-state-store.compaction-interval-secs";
+  public static final long DEFAULT_RM_LEVELDB_COMPACTION_INTERVAL_SECS = 3600;
+
   /** The maximum number of completed applications RM keeps. */ 
   public static final String RM_MAX_COMPLETED_APPLICATIONS =
     RM_PREFIX + "max-completed-applications";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e62162db/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index a588893..12afee0 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -555,6 +555,14 @@
   </property>
 
   <property>
+    <description>The time in seconds between full compactions of the leveldb
+    database. Setting the interval to zero disables the full compaction
+    cycles.</description>
+    
<name>yarn.resourcemanager.leveldb-state-store.compaction-interval-secs</name>
+    <value>3600</value>
+  </property>
+
+  <property>
     <description>Enable RM high-availability. When enabled,
       (1) The RM starts in the Standby mode by default, and transitions to
       the Active mode when prompted to.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e62162db/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
index 3f67282..913412a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
@@ -29,6 +29,8 @@ import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map.Entry;
+import java.util.Timer;
+import java.util.TimerTask;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -38,6 +40,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ReservationId;
@@ -97,6 +100,8 @@ public class LeveldbRMStateStore extends RMStateStore {
       .newInstance(1, 1);
 
   private DB db;
+  private Timer compactionTimer;
+  private long compactionIntervalMsec;
 
   private String getApplicationNodeKey(ApplicationId appId) {
     return RM_APP_ROOT + SEPARATOR + appId;
@@ -128,6 +133,9 @@ public class LeveldbRMStateStore extends RMStateStore {
 
   @Override
   protected void initInternal(Configuration conf) throws Exception {
+    compactionIntervalMsec = conf.getLong(
+        YarnConfiguration.RM_LEVELDB_COMPACTION_INTERVAL_SECS,
+        YarnConfiguration.DEFAULT_RM_LEVELDB_COMPACTION_INTERVAL_SECS) * 1000;
   }
 
   private Path getStorageDir() throws IOException {
@@ -149,6 +157,11 @@ public class LeveldbRMStateStore extends RMStateStore {
 
   @Override
   protected void startInternal() throws Exception {
+    db = openDatabase();
+    startCompactionTimer();
+  }
+
+  protected DB openDatabase() throws Exception {
     Path storeRoot = createStorageDir();
     Options options = new Options();
     options.createIfMissing(false);
@@ -172,10 +185,24 @@ public class LeveldbRMStateStore extends RMStateStore {
         throw e;
       }
     }
+    return db;
+  }
+
+  private void startCompactionTimer() {
+    if (compactionIntervalMsec > 0) {
+      compactionTimer = new Timer(
+          this.getClass().getSimpleName() + " compaction timer", true);
+      compactionTimer.schedule(new CompactionTimerTask(),
+          compactionIntervalMsec, compactionIntervalMsec);
+    }
   }
 
   @Override
   protected void closeInternal() throws Exception {
+    if (compactionTimer != null) {
+      compactionTimer.cancel();
+      compactionTimer = null;
+    }
     if (db != null) {
       db.close();
       db = null;
@@ -793,6 +820,21 @@ public class LeveldbRMStateStore extends RMStateStore {
     return numEntries;
   }
 
+  private class CompactionTimerTask extends TimerTask {
+    @Override
+    public void run() {
+      long start = Time.monotonicNow();
+      LOG.info("Starting full compaction cycle");
+      try {
+        db.compactRange(null, null);
+      } catch (DBException e) {
+        LOG.error("Error compacting database", e);
+      }
+      long duration = Time.monotonicNow() - start;
+      LOG.info("Full compaction cycle completed in " + duration + " msec");
+    }
+  }
+
   private static class LeveldbLogger implements Logger {
     private static final Log LOG = LogFactory.getLog(LeveldbLogger.class);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e62162db/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java
index 4666142..b86508a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java
@@ -18,6 +18,11 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.recovery;
 
+import static org.mockito.Mockito.isNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
 import java.io.File;
 import java.io.IOException;
 
@@ -25,6 +30,8 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.iq80.leveldb.DB;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -108,6 +115,23 @@ public class TestLeveldbRMStateStore extends 
RMStateStoreTestBase {
     testReservationStateStore(tester);
   }
 
+  @Test(timeout = 60000)
+  public void testCompactionCycle() throws Exception {
+    final DB mockdb = mock(DB.class);
+    conf.setLong(YarnConfiguration.RM_LEVELDB_COMPACTION_INTERVAL_SECS, 1);
+    LeveldbRMStateStore store = new LeveldbRMStateStore() {
+      @Override
+      protected DB openDatabase() throws Exception {
+        return mockdb;
+      }
+    };
+    store.init(conf);
+    store.start();
+    verify(mockdb, timeout(10000)).compactRange(
+        (byte[]) isNull(), (byte[]) isNull());
+    store.close();
+  }
+
   class LeveldbStateStoreTester implements RMStateStoreHelper {
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to