baohe-zhang commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r436796185



##########
File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
##########
@@ -1197,6 +1213,71 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
     KVUtils.open(newStorePath, metadata)
   }
 
+  private def createHybridStore(
+      dm: HistoryServerDiskManager,
+      appId: String,
+      attempt: AttemptInfoWrapper,
+      metadata: AppStatusStoreMetadata): KVStore = {
+
+    val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath),
+      attempt.lastIndex)
+    val isCompressed = reader.compressionCodec.isDefined
+
+    val memoryUsage = approximateMemoryUsage(reader.totalSize, isCompressed)
+    if (currentInMemoryStoreUsage.get + memoryUsage > maxInMemoryStoreUsage) {
+      throw new IllegalStateException("Not enough in-memory storage to create 
hybrid store.")
+    }
+    currentInMemoryStoreUsage.addAndGet(memoryUsage)
+    logInfo(s"Attempt creating hybrid store to parse $appId / 
${attempt.info.attemptId}. " +
+      s"Requested ${Utils.bytesToString(memoryUsage)} in-memory storage 
quota.")
+
+    logInfo(s"Leasing disk manager space for app $appId / 
${attempt.info.attemptId}...")
+    val lease = dm.lease(reader.totalSize, isCompressed)
+    val isLeaseRolledBack = new 
java.util.concurrent.atomic.AtomicBoolean(false)
+    var store: HybridStore = null
+    try {
+      store = new HybridStore()
+      val levelDB = KVUtils.open(lease.tmpPath, metadata)
+      store.setLevelDB(levelDB)
+      store.setCachedQuantileKlass(classOf[CachedQuantile])

Review comment:
       ok, will do that. I am moving it to org.apache.spark.deploy.history 
module.

##########
File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
##########
@@ -1197,6 +1213,71 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
     KVUtils.open(newStorePath, metadata)
   }
 
+  private def createHybridStore(
+      dm: HistoryServerDiskManager,
+      appId: String,
+      attempt: AttemptInfoWrapper,
+      metadata: AppStatusStoreMetadata): KVStore = {
+
+    val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath),
+      attempt.lastIndex)
+    val isCompressed = reader.compressionCodec.isDefined
+
+    val memoryUsage = approximateMemoryUsage(reader.totalSize, isCompressed)
+    if (currentInMemoryStoreUsage.get + memoryUsage > maxInMemoryStoreUsage) {
+      throw new IllegalStateException("Not enough in-memory storage to create 
hybrid store.")
+    }
+    currentInMemoryStoreUsage.addAndGet(memoryUsage)
+    logInfo(s"Attempt creating hybrid store to parse $appId / 
${attempt.info.attemptId}. " +
+      s"Requested ${Utils.bytesToString(memoryUsage)} in-memory storage 
quota.")
+
+    logInfo(s"Leasing disk manager space for app $appId / 
${attempt.info.attemptId}...")
+    val lease = dm.lease(reader.totalSize, isCompressed)
+    val isLeaseRolledBack = new 
java.util.concurrent.atomic.AtomicBoolean(false)
+    var store: HybridStore = null
+    try {
+      store = new HybridStore()
+      val levelDB = KVUtils.open(lease.tmpPath, metadata)
+      store.setLevelDB(levelDB)
+      store.setCachedQuantileKlass(classOf[CachedQuantile])
+      rebuildAppStore(store, reader, attempt.info.lastUpdated.getTime())
+
+      // Start the background thread to dump data to levelDB when writing to
+      // InMemoryStore is completed.
+      store.switchingToLevelDB(new HybridStore.SwitchingToLevelDBListener {
+        override def onSwitchingToLevelDBSuccess: Unit = {
+          levelDB.close()
+          val newStorePath = lease.commit(appId, attempt.info.attemptId)
+          store.setLevelDB(KVUtils.open(newStorePath, metadata))
+          currentInMemoryStoreUsage.addAndGet(-memoryUsage)
+          logInfo(s"Completely switched to leveldb for app" +
+            s" $appId / ${attempt.info.attemptId}. " +
+            s"Released ${Utils.bytesToString(memoryUsage)} in-memory storage 
quota.")
+        }
+
+        override def onSwitchingToLevelDBFail(e: Exception): Unit = {
+          logWarning(s"Failed to switch to leveldb for app" +
+          s" $appId / ${attempt.info.attemptId}", e)
+          levelDB.close()
+          if (!isLeaseRolledBack.getAndSet(true)) {
+            lease.rollback()
+          }
+        }
+      })
+
+      store
+    } catch {
+      case e: Exception =>
+        store.close()
+        currentInMemoryStoreUsage.addAndGet(-memoryUsage)

Review comment:
       decrementAndGet() only decrements one, there is no method for 
decrementAndGet(long n)

##########
File path: core/src/main/scala/org/apache/spark/internal/config/History.scala
##########
@@ -195,4 +195,18 @@ private[spark] object History {
       .version("3.0.0")
       .booleanConf
       .createWithDefault(true)
+
+  val HYBRID_STORE_ENABLED = 
ConfigBuilder("spark.history.store.hybridStore.enabled")
+    .doc("Whether to use HybridStore as the store when parsing event logs. " +
+      "HybridStore will first write data to an in-memory store and having a 
background thread " +
+      "that dumps data to a disk store after the writing to in-memory store is 
completed. " +
+      "Use it with caution, as in-memory store requires higher memory usage.")

Review comment:
       addressed. removed caution.

##########
File path: 
common/kvstore/src/main/java/org/apache/spark/util/kvstore/HybridStore.java
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import org.apache.spark.annotation.Private;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Collection;
+
+/**
+ * Implementation of KVStore that writes data to InMemoryStore at first and 
uses
+ * a background thread to dump data to LevelDB once the writing to 
InMemoryStore
+ * is completed.
+ */
+@Private
+public class HybridStore implements KVStore {
+
+  private InMemoryStore inMemoryStore = new InMemoryStore();
+  private LevelDB levelDB = null;
+
+  // Flag to indicate if we should use inMemoryStore Or levelDB.
+  private AtomicBoolean shouldUseInMemoryStore = new AtomicBoolean(true);
+
+  // A background thread that dumps data in inMemoryStore to levelDB
+  private Thread backgroundThread = null;
+
+  // A hash map that stores all class types (except CachedQuantile) that had 
been writen
+  // to inMemoryStore.
+  private ConcurrentHashMap<Class<?>, Boolean> klassMap = new 
ConcurrentHashMap<>();
+
+  // CachedQuantile can be written to kvstore after rebuildAppStore(), so we 
need
+  // to handle it specially to avoid conflicts. We will use a queue store 
CachedQuantile
+  // objects when the underlying store is inMemoryStore, and dump these 
objects to levelDB
+  // before the switch completes.
+  private Class<?> cachedQuantileKlass = null;
+  private ConcurrentLinkedQueue<Object> cachedQuantileQueue = new 
ConcurrentLinkedQueue<>();
+
+
+  @Override
+  public <T> T getMetadata(Class<T> klass) throws Exception {
+    KVStore store = getStore();

Review comment:
       addressed.

##########
File path: 
common/kvstore/src/main/java/org/apache/spark/util/kvstore/HybridStore.java
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import org.apache.spark.annotation.Private;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Collection;
+
+/**
+ * Implementation of KVStore that writes data to InMemoryStore at first and 
uses
+ * a background thread to dump data to LevelDB once the writing to 
InMemoryStore
+ * is completed.
+ */
+@Private
+public class HybridStore implements KVStore {
+
+  private InMemoryStore inMemoryStore = new InMemoryStore();
+  private LevelDB levelDB = null;
+
+  // Flag to indicate if we should use inMemoryStore Or levelDB.
+  private AtomicBoolean shouldUseInMemoryStore = new AtomicBoolean(true);
+
+  // A background thread that dumps data in inMemoryStore to levelDB
+  private Thread backgroundThread = null;
+
+  // A hash map that stores all class types (except CachedQuantile) that had 
been writen
+  // to inMemoryStore.
+  private ConcurrentHashMap<Class<?>, Boolean> klassMap = new 
ConcurrentHashMap<>();
+
+  // CachedQuantile can be written to kvstore after rebuildAppStore(), so we 
need
+  // to handle it specially to avoid conflicts. We will use a queue store 
CachedQuantile
+  // objects when the underlying store is inMemoryStore, and dump these 
objects to levelDB
+  // before the switch completes.
+  private Class<?> cachedQuantileKlass = null;
+  private ConcurrentLinkedQueue<Object> cachedQuantileQueue = new 
ConcurrentLinkedQueue<>();
+
+
+  @Override
+  public <T> T getMetadata(Class<T> klass) throws Exception {
+    KVStore store = getStore();
+    T metaData = store.getMetadata(klass);
+    return metaData;
+  }
+
+  @Override
+  public void setMetadata(Object value) throws Exception {
+    KVStore store = getStore();

Review comment:
       addressed

##########
File path: 
common/kvstore/src/main/java/org/apache/spark/util/kvstore/HybridStore.java
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import org.apache.spark.annotation.Private;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Collection;
+
+/**
+ * Implementation of KVStore that writes data to InMemoryStore at first and 
uses
+ * a background thread to dump data to LevelDB once the writing to 
InMemoryStore
+ * is completed.
+ */
+@Private
+public class HybridStore implements KVStore {
+
+  private InMemoryStore inMemoryStore = new InMemoryStore();
+  private LevelDB levelDB = null;
+
+  // Flag to indicate if we should use inMemoryStore Or levelDB.
+  private AtomicBoolean shouldUseInMemoryStore = new AtomicBoolean(true);
+
+  // A background thread that dumps data in inMemoryStore to levelDB
+  private Thread backgroundThread = null;
+
+  // A hash map that stores all class types (except CachedQuantile) that had 
been writen
+  // to inMemoryStore.
+  private ConcurrentHashMap<Class<?>, Boolean> klassMap = new 
ConcurrentHashMap<>();
+
+  // CachedQuantile can be written to kvstore after rebuildAppStore(), so we 
need
+  // to handle it specially to avoid conflicts. We will use a queue store 
CachedQuantile
+  // objects when the underlying store is inMemoryStore, and dump these 
objects to levelDB
+  // before the switch completes.
+  private Class<?> cachedQuantileKlass = null;
+  private ConcurrentLinkedQueue<Object> cachedQuantileQueue = new 
ConcurrentLinkedQueue<>();
+
+
+  @Override
+  public <T> T getMetadata(Class<T> klass) throws Exception {
+    KVStore store = getStore();
+    T metaData = store.getMetadata(klass);
+    return metaData;
+  }
+
+  @Override
+  public void setMetadata(Object value) throws Exception {
+    KVStore store = getStore();
+    store.setMetadata(value);
+  }
+
+  @Override
+  public <T> T read(Class<T> klass, Object naturalKey) throws Exception {
+    KVStore store = getStore();

Review comment:
       addressed

##########
File path: 
common/kvstore/src/main/java/org/apache/spark/util/kvstore/HybridStore.java
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import org.apache.spark.annotation.Private;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Collection;
+
+/**
+ * Implementation of KVStore that writes data to InMemoryStore at first and 
uses
+ * a background thread to dump data to LevelDB once the writing to 
InMemoryStore
+ * is completed.
+ */
+@Private
+public class HybridStore implements KVStore {
+
+  private InMemoryStore inMemoryStore = new InMemoryStore();
+  private LevelDB levelDB = null;
+
+  // Flag to indicate if we should use inMemoryStore Or levelDB.
+  private AtomicBoolean shouldUseInMemoryStore = new AtomicBoolean(true);
+
+  // A background thread that dumps data in inMemoryStore to levelDB
+  private Thread backgroundThread = null;
+
+  // A hash map that stores all class types (except CachedQuantile) that had 
been writen
+  // to inMemoryStore.
+  private ConcurrentHashMap<Class<?>, Boolean> klassMap = new 
ConcurrentHashMap<>();
+
+  // CachedQuantile can be written to kvstore after rebuildAppStore(), so we 
need
+  // to handle it specially to avoid conflicts. We will use a queue store 
CachedQuantile
+  // objects when the underlying store is inMemoryStore, and dump these 
objects to levelDB
+  // before the switch completes.
+  private Class<?> cachedQuantileKlass = null;
+  private ConcurrentLinkedQueue<Object> cachedQuantileQueue = new 
ConcurrentLinkedQueue<>();
+
+
+  @Override
+  public <T> T getMetadata(Class<T> klass) throws Exception {
+    KVStore store = getStore();
+    T metaData = store.getMetadata(klass);
+    return metaData;
+  }
+
+  @Override
+  public void setMetadata(Object value) throws Exception {
+    KVStore store = getStore();
+    store.setMetadata(value);
+  }
+
+  @Override
+  public <T> T read(Class<T> klass, Object naturalKey) throws Exception {
+    KVStore store = getStore();
+    T value = store.read(klass, naturalKey);
+    return value;
+  }
+
+  @Override
+  public void write(Object value) throws Exception {
+    Class<?> klass = value.getClass();
+
+    if (backgroundThread != null && !klass.equals(cachedQuantileKlass)) {
+      throw new RuntimeException("write() for objects other than 
CachedQuantile " +
+        "shouldn't be called after the hybrid store begins switching to 
levelDB");
+    }
+
+    KVStore store = getStore();

Review comment:
       addressed

##########
File path: 
common/kvstore/src/main/java/org/apache/spark/util/kvstore/HybridStore.java
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import org.apache.spark.annotation.Private;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Collection;
+
+/**
+ * Implementation of KVStore that writes data to InMemoryStore at first and 
uses
+ * a background thread to dump data to LevelDB once the writing to 
InMemoryStore
+ * is completed.
+ */
+@Private
+public class HybridStore implements KVStore {
+
+  private InMemoryStore inMemoryStore = new InMemoryStore();
+  private LevelDB levelDB = null;
+
+  // Flag to indicate if we should use inMemoryStore Or levelDB.
+  private AtomicBoolean shouldUseInMemoryStore = new AtomicBoolean(true);
+
+  // A background thread that dumps data in inMemoryStore to levelDB
+  private Thread backgroundThread = null;
+
+  // A hash map that stores all class types (except CachedQuantile) that had 
been writen
+  // to inMemoryStore.
+  private ConcurrentHashMap<Class<?>, Boolean> klassMap = new 
ConcurrentHashMap<>();
+
+  // CachedQuantile can be written to kvstore after rebuildAppStore(), so we 
need
+  // to handle it specially to avoid conflicts. We will use a queue store 
CachedQuantile
+  // objects when the underlying store is inMemoryStore, and dump these 
objects to levelDB
+  // before the switch completes.
+  private Class<?> cachedQuantileKlass = null;
+  private ConcurrentLinkedQueue<Object> cachedQuantileQueue = new 
ConcurrentLinkedQueue<>();
+
+
+  @Override
+  public <T> T getMetadata(Class<T> klass) throws Exception {
+    KVStore store = getStore();
+    T metaData = store.getMetadata(klass);
+    return metaData;
+  }
+
+  @Override
+  public void setMetadata(Object value) throws Exception {
+    KVStore store = getStore();
+    store.setMetadata(value);
+  }
+
+  @Override
+  public <T> T read(Class<T> klass, Object naturalKey) throws Exception {
+    KVStore store = getStore();
+    T value = store.read(klass, naturalKey);
+    return value;
+  }
+
+  @Override
+  public void write(Object value) throws Exception {
+    Class<?> klass = value.getClass();
+
+    if (backgroundThread != null && !klass.equals(cachedQuantileKlass)) {
+      throw new RuntimeException("write() for objects other than 
CachedQuantile " +
+        "shouldn't be called after the hybrid store begins switching to 
levelDB");
+    }
+
+    KVStore store = getStore();
+    store.write(value);
+
+    if (store instanceof InMemoryStore) {
+      if (klass.equals(cachedQuantileKlass)) {
+        cachedQuantileQueue.add(value);
+      } else {
+        klassMap.putIfAbsent(klass, true);
+      }
+    }
+  }
+
+  @Override
+  public void delete(Class<?> type, Object naturalKey) throws Exception {
+    if (backgroundThread != null) {
+      throw new RuntimeException("delete() shouldn't be called after " +
+        "the hybrid store begins switching to levelDB");
+    }
+
+    KVStore store = getStore();

Review comment:
       addressed

##########
File path: 
common/kvstore/src/main/java/org/apache/spark/util/kvstore/HybridStore.java
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import org.apache.spark.annotation.Private;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Collection;
+
+/**
+ * Implementation of KVStore that writes data to InMemoryStore at first and 
uses
+ * a background thread to dump data to LevelDB once the writing to 
InMemoryStore
+ * is completed.
+ */
+@Private
+public class HybridStore implements KVStore {
+
+  private InMemoryStore inMemoryStore = new InMemoryStore();
+  private LevelDB levelDB = null;
+
+  // Flag to indicate if we should use inMemoryStore Or levelDB.
+  private AtomicBoolean shouldUseInMemoryStore = new AtomicBoolean(true);
+
+  // A background thread that dumps data in inMemoryStore to levelDB
+  private Thread backgroundThread = null;
+
+  // A hash map that stores all class types (except CachedQuantile) that had 
been writen
+  // to inMemoryStore.
+  private ConcurrentHashMap<Class<?>, Boolean> klassMap = new 
ConcurrentHashMap<>();
+
+  // CachedQuantile can be written to kvstore after rebuildAppStore(), so we 
need
+  // to handle it specially to avoid conflicts. We will use a queue store 
CachedQuantile
+  // objects when the underlying store is inMemoryStore, and dump these 
objects to levelDB
+  // before the switch completes.
+  private Class<?> cachedQuantileKlass = null;
+  private ConcurrentLinkedQueue<Object> cachedQuantileQueue = new 
ConcurrentLinkedQueue<>();
+
+
+  @Override
+  public <T> T getMetadata(Class<T> klass) throws Exception {
+    KVStore store = getStore();
+    T metaData = store.getMetadata(klass);
+    return metaData;
+  }
+
+  @Override
+  public void setMetadata(Object value) throws Exception {
+    KVStore store = getStore();
+    store.setMetadata(value);
+  }
+
+  @Override
+  public <T> T read(Class<T> klass, Object naturalKey) throws Exception {
+    KVStore store = getStore();
+    T value = store.read(klass, naturalKey);
+    return value;
+  }
+
+  @Override
+  public void write(Object value) throws Exception {
+    Class<?> klass = value.getClass();
+
+    if (backgroundThread != null && !klass.equals(cachedQuantileKlass)) {
+      throw new RuntimeException("write() for objects other than 
CachedQuantile " +
+        "shouldn't be called after the hybrid store begins switching to 
levelDB");
+    }
+
+    KVStore store = getStore();
+    store.write(value);
+
+    if (store instanceof InMemoryStore) {
+      if (klass.equals(cachedQuantileKlass)) {
+        cachedQuantileQueue.add(value);
+      } else {
+        klassMap.putIfAbsent(klass, true);
+      }
+    }
+  }
+
+  @Override
+  public void delete(Class<?> type, Object naturalKey) throws Exception {
+    if (backgroundThread != null) {
+      throw new RuntimeException("delete() shouldn't be called after " +
+        "the hybrid store begins switching to levelDB");
+    }
+
+    KVStore store = getStore();
+    store.delete(type, naturalKey);
+  }
+
+  @Override
+  public <T> KVStoreView<T> view(Class<T> type) throws Exception {
+    KVStore store = getStore();

Review comment:
       addressed

##########
File path: 
common/kvstore/src/main/java/org/apache/spark/util/kvstore/HybridStore.java
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import org.apache.spark.annotation.Private;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Collection;
+
+/**
+ * Implementation of KVStore that writes data to InMemoryStore at first and 
uses
+ * a background thread to dump data to LevelDB once the writing to 
InMemoryStore
+ * is completed.
+ */
+@Private
+public class HybridStore implements KVStore {
+
+  private InMemoryStore inMemoryStore = new InMemoryStore();
+  private LevelDB levelDB = null;
+
+  // Flag to indicate if we should use inMemoryStore Or levelDB.
+  private AtomicBoolean shouldUseInMemoryStore = new AtomicBoolean(true);
+
+  // A background thread that dumps data in inMemoryStore to levelDB
+  private Thread backgroundThread = null;
+
+  // A hash map that stores all class types (except CachedQuantile) that had 
been writen
+  // to inMemoryStore.
+  private ConcurrentHashMap<Class<?>, Boolean> klassMap = new 
ConcurrentHashMap<>();
+
+  // CachedQuantile can be written to kvstore after rebuildAppStore(), so we 
need
+  // to handle it specially to avoid conflicts. We will use a queue store 
CachedQuantile
+  // objects when the underlying store is inMemoryStore, and dump these 
objects to levelDB
+  // before the switch completes.
+  private Class<?> cachedQuantileKlass = null;
+  private ConcurrentLinkedQueue<Object> cachedQuantileQueue = new 
ConcurrentLinkedQueue<>();
+
+
+  @Override
+  public <T> T getMetadata(Class<T> klass) throws Exception {
+    KVStore store = getStore();
+    T metaData = store.getMetadata(klass);
+    return metaData;
+  }
+
+  @Override
+  public void setMetadata(Object value) throws Exception {
+    KVStore store = getStore();
+    store.setMetadata(value);
+  }
+
+  @Override
+  public <T> T read(Class<T> klass, Object naturalKey) throws Exception {
+    KVStore store = getStore();
+    T value = store.read(klass, naturalKey);
+    return value;
+  }
+
+  @Override
+  public void write(Object value) throws Exception {
+    Class<?> klass = value.getClass();
+
+    if (backgroundThread != null && !klass.equals(cachedQuantileKlass)) {
+      throw new RuntimeException("write() for objects other than 
CachedQuantile " +
+        "shouldn't be called after the hybrid store begins switching to 
levelDB");
+    }
+
+    KVStore store = getStore();
+    store.write(value);
+
+    if (store instanceof InMemoryStore) {
+      if (klass.equals(cachedQuantileKlass)) {
+        cachedQuantileQueue.add(value);
+      } else {
+        klassMap.putIfAbsent(klass, true);
+      }
+    }
+  }
+
+  @Override
+  public void delete(Class<?> type, Object naturalKey) throws Exception {
+    if (backgroundThread != null) {
+      throw new RuntimeException("delete() shouldn't be called after " +
+        "the hybrid store begins switching to levelDB");
+    }
+
+    KVStore store = getStore();
+    store.delete(type, naturalKey);
+  }
+
+  @Override
+  public <T> KVStoreView<T> view(Class<T> type) throws Exception {
+    KVStore store = getStore();
+    KVStoreView<T> view = store.view(type);
+    return view;
+  }
+
+  @Override
+  public long count(Class<?> type) throws Exception {
+    KVStore store = getStore();

Review comment:
       addressed

##########
File path: 
common/kvstore/src/main/java/org/apache/spark/util/kvstore/HybridStore.java
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import org.apache.spark.annotation.Private;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Collection;
+
+/**
+ * Implementation of KVStore that writes data to InMemoryStore at first and 
uses
+ * a background thread to dump data to LevelDB once the writing to 
InMemoryStore
+ * is completed.
+ */
+@Private
+public class HybridStore implements KVStore {
+
+  private InMemoryStore inMemoryStore = new InMemoryStore();
+  private LevelDB levelDB = null;
+
+  // Flag to indicate if we should use inMemoryStore Or levelDB.
+  private AtomicBoolean shouldUseInMemoryStore = new AtomicBoolean(true);
+
+  // A background thread that dumps data in inMemoryStore to levelDB
+  private Thread backgroundThread = null;
+
+  // A hash map that stores all class types (except CachedQuantile) that had 
been writen
+  // to inMemoryStore.
+  private ConcurrentHashMap<Class<?>, Boolean> klassMap = new 
ConcurrentHashMap<>();
+
+  // CachedQuantile can be written to kvstore after rebuildAppStore(), so we 
need
+  // to handle it specially to avoid conflicts. We will use a queue store 
CachedQuantile
+  // objects when the underlying store is inMemoryStore, and dump these 
objects to levelDB
+  // before the switch completes.
+  private Class<?> cachedQuantileKlass = null;
+  private ConcurrentLinkedQueue<Object> cachedQuantileQueue = new 
ConcurrentLinkedQueue<>();
+
+
+  @Override
+  public <T> T getMetadata(Class<T> klass) throws Exception {
+    KVStore store = getStore();
+    T metaData = store.getMetadata(klass);
+    return metaData;
+  }
+
+  @Override
+  public void setMetadata(Object value) throws Exception {
+    KVStore store = getStore();
+    store.setMetadata(value);
+  }
+
+  @Override
+  public <T> T read(Class<T> klass, Object naturalKey) throws Exception {
+    KVStore store = getStore();
+    T value = store.read(klass, naturalKey);
+    return value;
+  }
+
+  @Override
+  public void write(Object value) throws Exception {
+    Class<?> klass = value.getClass();
+
+    if (backgroundThread != null && !klass.equals(cachedQuantileKlass)) {
+      throw new RuntimeException("write() for objects other than 
CachedQuantile " +
+        "shouldn't be called after the hybrid store begins switching to 
levelDB");
+    }
+
+    KVStore store = getStore();
+    store.write(value);
+
+    if (store instanceof InMemoryStore) {
+      if (klass.equals(cachedQuantileKlass)) {
+        cachedQuantileQueue.add(value);
+      } else {
+        klassMap.putIfAbsent(klass, true);
+      }
+    }
+  }
+
+  @Override
+  public void delete(Class<?> type, Object naturalKey) throws Exception {
+    if (backgroundThread != null) {
+      throw new RuntimeException("delete() shouldn't be called after " +
+        "the hybrid store begins switching to levelDB");
+    }
+
+    KVStore store = getStore();
+    store.delete(type, naturalKey);
+  }
+
+  @Override
+  public <T> KVStoreView<T> view(Class<T> type) throws Exception {
+    KVStore store = getStore();
+    KVStoreView<T> view = store.view(type);
+    return view;
+  }
+
+  @Override
+  public long count(Class<?> type) throws Exception {
+    KVStore store = getStore();
+    long count = store.count(type);
+    return count;
+  }
+
+  @Override
+  public long count(Class<?> type, String index, Object indexedValue) throws 
Exception {
+    KVStore store = getStore();
+    long count = store.count(type, index, indexedValue);
+    return count;
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      if (backgroundThread != null && backgroundThread.isAlive()) {
+        // The background thread is still running, wait for it to finish
+        backgroundThread.join();
+      }
+      if (levelDB != null) {
+        levelDB.close();
+      }
+      if (inMemoryStore != null) {
+        inMemoryStore.close();
+      }
+    } catch (Exception e) {
+      if (e instanceof IOException) {
+        throw (IOException) e;
+      }
+    }
+  }
+
+  @Override
+  public <T> boolean removeAllByIndexValues(
+      Class<T> klass,
+      String index,
+      Collection<?> indexValues) throws Exception {
+    if (backgroundThread != null) {
+      throw new RuntimeException("removeAllByIndexValues() shouldn't be called 
after " +
+        "the hybrid store begins switching to levelDB");
+    }
+
+    KVStore store = getStore();

Review comment:
       addressed

##########
File path: 
common/kvstore/src/main/java/org/apache/spark/util/kvstore/HybridStore.java
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import org.apache.spark.annotation.Private;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Collection;
+
+/**
+ * Implementation of KVStore that writes data to InMemoryStore at first and 
uses
+ * a background thread to dump data to LevelDB once the writing to 
InMemoryStore
+ * is completed.
+ */
+@Private
+public class HybridStore implements KVStore {
+
+  private InMemoryStore inMemoryStore = new InMemoryStore();
+  private LevelDB levelDB = null;
+
+  // Flag to indicate if we should use inMemoryStore Or levelDB.
+  private AtomicBoolean shouldUseInMemoryStore = new AtomicBoolean(true);
+
+  // A background thread that dumps data in inMemoryStore to levelDB
+  private Thread backgroundThread = null;
+
+  // A hash map that stores all class types (except CachedQuantile) that had 
been writen
+  // to inMemoryStore.
+  private ConcurrentHashMap<Class<?>, Boolean> klassMap = new 
ConcurrentHashMap<>();
+
+  // CachedQuantile can be written to kvstore after rebuildAppStore(), so we 
need
+  // to handle it specially to avoid conflicts. We will use a queue store 
CachedQuantile
+  // objects when the underlying store is inMemoryStore, and dump these 
objects to levelDB
+  // before the switch completes.
+  private Class<?> cachedQuantileKlass = null;
+  private ConcurrentLinkedQueue<Object> cachedQuantileQueue = new 
ConcurrentLinkedQueue<>();
+
+
+  @Override
+  public <T> T getMetadata(Class<T> klass) throws Exception {
+    KVStore store = getStore();
+    T metaData = store.getMetadata(klass);
+    return metaData;
+  }
+
+  @Override
+  public void setMetadata(Object value) throws Exception {
+    KVStore store = getStore();
+    store.setMetadata(value);
+  }
+
+  @Override
+  public <T> T read(Class<T> klass, Object naturalKey) throws Exception {
+    KVStore store = getStore();
+    T value = store.read(klass, naturalKey);
+    return value;
+  }
+
+  @Override
+  public void write(Object value) throws Exception {
+    Class<?> klass = value.getClass();
+
+    if (backgroundThread != null && !klass.equals(cachedQuantileKlass)) {
+      throw new RuntimeException("write() for objects other than 
CachedQuantile " +
+        "shouldn't be called after the hybrid store begins switching to 
levelDB");
+    }
+
+    KVStore store = getStore();
+    store.write(value);
+
+    if (store instanceof InMemoryStore) {
+      if (klass.equals(cachedQuantileKlass)) {
+        cachedQuantileQueue.add(value);
+      } else {
+        klassMap.putIfAbsent(klass, true);
+      }
+    }
+  }
+
+  @Override
+  public void delete(Class<?> type, Object naturalKey) throws Exception {
+    if (backgroundThread != null) {
+      throw new RuntimeException("delete() shouldn't be called after " +
+        "the hybrid store begins switching to levelDB");
+    }
+
+    KVStore store = getStore();
+    store.delete(type, naturalKey);
+  }
+
+  @Override
+  public <T> KVStoreView<T> view(Class<T> type) throws Exception {
+    KVStore store = getStore();
+    KVStoreView<T> view = store.view(type);
+    return view;
+  }
+
+  @Override
+  public long count(Class<?> type) throws Exception {
+    KVStore store = getStore();
+    long count = store.count(type);
+    return count;
+  }
+
+  @Override
+  public long count(Class<?> type, String index, Object indexedValue) throws 
Exception {
+    KVStore store = getStore();
+    long count = store.count(type, index, indexedValue);
+    return count;
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      if (backgroundThread != null && backgroundThread.isAlive()) {
+        // The background thread is still running, wait for it to finish
+        backgroundThread.join();
+      }
+      if (levelDB != null) {
+        levelDB.close();
+      }
+      if (inMemoryStore != null) {
+        inMemoryStore.close();
+      }
+    } catch (Exception e) {
+      if (e instanceof IOException) {
+        throw (IOException) e;
+      }
+    }
+  }
+
+  @Override
+  public <T> boolean removeAllByIndexValues(
+      Class<T> klass,
+      String index,
+      Collection<?> indexValues) throws Exception {
+    if (backgroundThread != null) {
+      throw new RuntimeException("removeAllByIndexValues() shouldn't be called 
after " +
+        "the hybrid store begins switching to levelDB");
+    }
+
+    KVStore store = getStore();
+    boolean removed = store.removeAllByIndexValues(klass, index, indexValues);
+    return removed;
+  }
+
+  public void setLevelDB(LevelDB levelDB) {
+    this.levelDB = levelDB;
+  }
+
+  public void setCachedQuantileKlass(Class<?> klass) {
+    this.cachedQuantileKlass = klass;
+  }
+
+  /**
+   * This method is called when the writing is done for inMemoryStore. A
+   * background thread will be created and be started to dump data in 
inMemoryStore
+   * to levelDB. Once the dumping is completed, the underlying kvstore will be
+   * switched to levelDB.
+   */
+  public void switchingToLevelDB(SwitchingToLevelDBListener listener) throws 
Exception {
+    // A background thread that dumps data to levelDB
+    backgroundThread = new Thread(new Runnable() {
+      public void run() {
+        Exception exceptionCaught = null;
+
+        try {
+          for (Class<?> klass : klassMap.keySet()) {
+            KVStoreIterator<?> it = 
inMemoryStore.view(klass).closeableIterator();
+            while (it.hasNext()) {
+              levelDB.write(it.next());
+            }
+          }
+        } catch (Exception e) {
+          e.printStackTrace();

Review comment:
       removed

##########
File path: 
common/kvstore/src/main/java/org/apache/spark/util/kvstore/HybridStore.java
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import org.apache.spark.annotation.Private;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Collection;
+
+/**
+ * Implementation of KVStore that writes data to InMemoryStore at first and 
uses
+ * a background thread to dump data to LevelDB once the writing to 
InMemoryStore
+ * is completed.
+ */
+@Private
+public class HybridStore implements KVStore {
+
+  private InMemoryStore inMemoryStore = new InMemoryStore();
+  private LevelDB levelDB = null;
+
+  // Flag to indicate if we should use inMemoryStore Or levelDB.
+  private AtomicBoolean shouldUseInMemoryStore = new AtomicBoolean(true);
+
+  // A background thread that dumps data in inMemoryStore to levelDB
+  private Thread backgroundThread = null;
+
+  // A hash map that stores all class types (except CachedQuantile) that had 
been writen
+  // to inMemoryStore.
+  private ConcurrentHashMap<Class<?>, Boolean> klassMap = new 
ConcurrentHashMap<>();
+
+  // CachedQuantile can be written to kvstore after rebuildAppStore(), so we 
need
+  // to handle it specially to avoid conflicts. We will use a queue store 
CachedQuantile
+  // objects when the underlying store is inMemoryStore, and dump these 
objects to levelDB
+  // before the switch completes.
+  private Class<?> cachedQuantileKlass = null;
+  private ConcurrentLinkedQueue<Object> cachedQuantileQueue = new 
ConcurrentLinkedQueue<>();
+
+
+  @Override
+  public <T> T getMetadata(Class<T> klass) throws Exception {
+    KVStore store = getStore();
+    T metaData = store.getMetadata(klass);
+    return metaData;
+  }
+
+  @Override
+  public void setMetadata(Object value) throws Exception {
+    KVStore store = getStore();
+    store.setMetadata(value);
+  }
+
+  @Override
+  public <T> T read(Class<T> klass, Object naturalKey) throws Exception {
+    KVStore store = getStore();
+    T value = store.read(klass, naturalKey);
+    return value;
+  }
+
+  @Override
+  public void write(Object value) throws Exception {
+    Class<?> klass = value.getClass();
+
+    if (backgroundThread != null && !klass.equals(cachedQuantileKlass)) {
+      throw new RuntimeException("write() for objects other than 
CachedQuantile " +
+        "shouldn't be called after the hybrid store begins switching to 
levelDB");
+    }
+
+    KVStore store = getStore();
+    store.write(value);
+
+    if (store instanceof InMemoryStore) {
+      if (klass.equals(cachedQuantileKlass)) {
+        cachedQuantileQueue.add(value);
+      } else {
+        klassMap.putIfAbsent(klass, true);
+      }
+    }
+  }
+
+  @Override
+  public void delete(Class<?> type, Object naturalKey) throws Exception {
+    if (backgroundThread != null) {
+      throw new RuntimeException("delete() shouldn't be called after " +
+        "the hybrid store begins switching to levelDB");
+    }
+
+    KVStore store = getStore();
+    store.delete(type, naturalKey);
+  }
+
+  @Override
+  public <T> KVStoreView<T> view(Class<T> type) throws Exception {
+    KVStore store = getStore();
+    KVStoreView<T> view = store.view(type);
+    return view;
+  }
+
+  @Override
+  public long count(Class<?> type) throws Exception {
+    KVStore store = getStore();
+    long count = store.count(type);
+    return count;
+  }
+
+  @Override
+  public long count(Class<?> type, String index, Object indexedValue) throws 
Exception {
+    KVStore store = getStore();
+    long count = store.count(type, index, indexedValue);
+    return count;
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      if (backgroundThread != null && backgroundThread.isAlive()) {
+        // The background thread is still running, wait for it to finish
+        backgroundThread.join();
+      }
+      if (levelDB != null) {
+        levelDB.close();
+      }
+      if (inMemoryStore != null) {
+        inMemoryStore.close();
+      }
+    } catch (Exception e) {
+      if (e instanceof IOException) {
+        throw (IOException) e;
+      }
+    }
+  }
+
+  @Override
+  public <T> boolean removeAllByIndexValues(
+      Class<T> klass,
+      String index,
+      Collection<?> indexValues) throws Exception {
+    if (backgroundThread != null) {
+      throw new RuntimeException("removeAllByIndexValues() shouldn't be called 
after " +
+        "the hybrid store begins switching to levelDB");
+    }
+
+    KVStore store = getStore();
+    boolean removed = store.removeAllByIndexValues(klass, index, indexValues);
+    return removed;
+  }
+
+  public void setLevelDB(LevelDB levelDB) {
+    this.levelDB = levelDB;
+  }
+
+  public void setCachedQuantileKlass(Class<?> klass) {
+    this.cachedQuantileKlass = klass;
+  }
+
+  /**
+   * This method is called when the writing is done for inMemoryStore. A
+   * background thread will be created and be started to dump data in 
inMemoryStore
+   * to levelDB. Once the dumping is completed, the underlying kvstore will be
+   * switched to levelDB.
+   */
+  public void switchingToLevelDB(SwitchingToLevelDBListener listener) throws 
Exception {
+    // A background thread that dumps data to levelDB
+    backgroundThread = new Thread(new Runnable() {
+      public void run() {
+        Exception exceptionCaught = null;
+
+        try {
+          for (Class<?> klass : klassMap.keySet()) {
+            KVStoreIterator<?> it = 
inMemoryStore.view(klass).closeableIterator();
+            while (it.hasNext()) {
+              levelDB.write(it.next());
+            }
+          }
+        } catch (Exception e) {
+          e.printStackTrace();
+          exceptionCaught = e;
+        }
+
+        if (exceptionCaught == null) {
+          // Switch to levelDB and close inMemoryStore
+          shouldUseInMemoryStore.set(false);
+
+          // Dump CachedQuantile objects to levelDB
+          try {
+            while(cachedQuantileQueue.size() > 0) {
+              levelDB.write(cachedQuantileQueue.poll());
+            }
+          } catch (Exception e) {
+            e.printStackTrace();

Review comment:
       I changed the logic of writing CachedQuantile to leveldb. 

##########
File path: core/src/main/scala/org/apache/spark/internal/config/History.scala
##########
@@ -195,4 +195,18 @@ private[spark] object History {
       .version("3.0.0")
       .booleanConf
       .createWithDefault(true)
+
+  val HYBRID_STORE_ENABLED = 
ConfigBuilder("spark.history.store.hybridStore.enabled")
+    .doc("Whether to use HybridStore as the store when parsing event logs. " +
+      "HybridStore will first write data to an in-memory store and having a 
background thread " +
+      "that dumps data to a disk store after the writing to in-memory store is 
completed. " +
+      "Use it with caution, as in-memory store requires higher memory usage.")
+    .version("3.1.0")
+    .booleanConf
+    .createWithDefault(true)
+
+  val MAX_IN_MEMORY_STORE_USAGE = 
ConfigBuilder("spark.history.store.hybridStore.maxMemoryUsage")
+    .version("3.1.0")

Review comment:
       doc added.

##########
File path: core/src/main/scala/org/apache/spark/internal/config/History.scala
##########
@@ -195,4 +195,18 @@ private[spark] object History {
       .version("3.0.0")
       .booleanConf
       .createWithDefault(true)
+
+  val HYBRID_STORE_ENABLED = 
ConfigBuilder("spark.history.store.hybridStore.enabled")
+    .doc("Whether to use HybridStore as the store when parsing event logs. " +
+      "HybridStore will first write data to an in-memory store and having a 
background thread " +
+      "that dumps data to a disk store after the writing to in-memory store is 
completed. " +
+      "Use it with caution, as in-memory store requires higher memory usage.")
+    .version("3.1.0")
+    .booleanConf
+    .createWithDefault(true)

Review comment:
       done.

##########
File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
##########
@@ -128,6 +129,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
   private val storePath = conf.get(LOCAL_STORE_DIR).map(new File(_))
   private val fastInProgressParsing = conf.get(FAST_IN_PROGRESS_PARSING)
 
+  private val hybridStoreEnabled = conf.get(History.HYBRID_STORE_ENABLED)
+  private val maxInMemoryStoreUsage = 
conf.get(History.MAX_IN_MEMORY_STORE_USAGE)
+  private val currentInMemoryStoreUsage = new 
java.util.concurrent.atomic.AtomicLong(0L)

Review comment:
       a new class HistoryServerMemoryManager is created.

##########
File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.IOException
+import java.util.Collection
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.spark.util.kvstore._
+
+/**
+ * A implementation of KVStore that accelerates event logs loading.
+ *
+ * When rebuilding the application state from event logs, HybridStore will
+ * write data to InMemoryStore at first and use a background thread to dump
+ * data to LevelDB once the writing to InMemoryStore is completed.
+ */
+
+private[history] class HybridStore extends KVStore {
+
+  private val inMemoryStore = new InMemoryStore()
+
+  private var levelDB: LevelDB = null
+
+  // Flag to indicate whether we should use inMemoryStore or levelDB
+  private[history] val shouldUseInMemoryStore = new AtomicBoolean(true)
+
+  // A background thread that dumps data from inMemoryStore to levelDB
+  private var backgroundThread: Thread = null
+
+  // Objects of these classes will be dumped to levelDB in the background 
thread
+  private var klassList: Seq[Class[_]] = List(

Review comment:
       I think the main purpose is to handle CachedQuantile specifically. 
CachedQuantile is not on this list, so it won't be dumped to levelDB in the 
background thread. Right now my strategy is to write CachedQuantile to 
inMemoryStore and levelDB simultaneously during the transition process 
(background thread is alive), this strategy seems ok in my manual test. 
   
   CachedQuantile is quite special, it will only be added to kvstore after 
rebuildAppStore() finishes. that means if we don't handle it specially it will 
be added to inMemoryStore when the background thread is dumping data(including 
CachedQuantile objects) to leveldb and hence cause conflicts. 

##########
File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.IOException
+import java.util.Collection
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.spark.util.kvstore._
+
+/**
+ * A implementation of KVStore that accelerates event logs loading.
+ *
+ * When rebuilding the application state from event logs, HybridStore will
+ * write data to InMemoryStore at first and use a background thread to dump
+ * data to LevelDB once the writing to InMemoryStore is completed.
+ */
+
+private[history] class HybridStore extends KVStore {
+
+  private val inMemoryStore = new InMemoryStore()
+
+  private var levelDB: LevelDB = null
+
+  // Flag to indicate whether we should use inMemoryStore or levelDB
+  private[history] val shouldUseInMemoryStore = new AtomicBoolean(true)
+
+  // A background thread that dumps data from inMemoryStore to levelDB
+  private var backgroundThread: Thread = null
+
+  // Objects of these classes will be dumped to levelDB in the background 
thread
+  private var klassList: Seq[Class[_]] = List(

Review comment:
       ok, I will use a reject-list and add CachedQuantile to reject-list. And 
I think when using reject-list, we need a HashSet to record the classes that 
has been added to kvstore.
   
   yeah, I agree it's optional to retain these CachedQuantile objects.

##########
File path: 
core/src/main/scala/org/apache/spark/deploy/history/HistoryServerMemoryManager.scala
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.mutable.HashMap
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.History._
+import org.apache.spark.util.Utils
+
+/**
+ * A class used to keep track of in-memory store usage by the SHS.
+ */
+private class HistoryServerMemoryManager(
+    conf: SparkConf) extends Logging {
+
+  private val maxUsage = conf.get(MAX_IN_MEMORY_STORE_USAGE)
+  private val currentUsage = new AtomicLong(0L)
+  private val active = new HashMap[(String, Option[String]), Long]()
+
+  def initialize(): Unit = {
+    logInfo("Initialized memory manager: " +
+      s"current usage = ${Utils.bytesToString(currentUsage.get())}, " +
+      s"max usage = ${Utils.bytesToString(maxUsage)}")
+  }
+
+  def lease(
+      appId: String,
+      attemptId: Option[String],
+      eventLogSize: Long,
+      isCompressed: Boolean): Unit = {
+    val memoryUsage = approximateMemoryUsage(eventLogSize, isCompressed)
+    if (memoryUsage + currentUsage.get > maxUsage) {
+      throw new RuntimeException("Not enough memory to create hybrid store " +
+        s"for app $appId / $attemptId.")
+    }
+    active.synchronized {
+      active(appId -> attemptId) = memoryUsage
+    }
+    currentUsage.addAndGet(memoryUsage)
+    logInfo(s"Leasing ${Utils.bytesToString(memoryUsage)} memory usage for " +
+      s"app $appId / $attemptId")
+  }
+
+  def release(appId: String, attemptId: Option[String]): Unit = {
+    val memoryUsage = active.synchronized { active.remove(appId -> attemptId) }
+
+    memoryUsage match {
+      case Some(m) =>
+        currentUsage.addAndGet(-m)
+        logInfo(s"Released ${Utils.bytesToString(m)} memory usage for " +
+          s"app $appId / $attemptId")

Review comment:
       fixed. 

##########
File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.IOException
+import java.util.Collection
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.spark.util.kvstore._
+
+/**
+ * A implementation of KVStore that accelerates event logs loading.
+ *
+ * When rebuilding the application state from event logs, HybridStore will
+ * write data to InMemoryStore at first and use a background thread to dump
+ * data to LevelDB once the writing to InMemoryStore is completed.
+ */
+
+private[history] class HybridStore extends KVStore {
+
+  private val inMemoryStore = new InMemoryStore()
+
+  private var levelDB: LevelDB = null
+
+  // Flag to indicate whether we should use inMemoryStore or levelDB
+  private[history] val shouldUseInMemoryStore = new AtomicBoolean(true)
+
+  // A background thread that dumps data from inMemoryStore to levelDB
+  private var backgroundThread: Thread = null
+
+  // Objects of these classes will be dumped to levelDB in the background 
thread
+  private var klassList: Seq[Class[_]] = List(

Review comment:
       If we don't write CachedQuantile to levelDB during the transition 
process, we might be able to get rid of reject list or allow list, and hence no 
need to handle some classes specially. I will test that idea and push the code 
changes later. 

##########
File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.IOException
+import java.util.Collection
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.spark.util.kvstore._
+
+/**
+ * A implementation of KVStore that accelerates event logs loading.
+ *
+ * When rebuilding the application state from event logs, HybridStore will
+ * write data to InMemoryStore at first and use a background thread to dump
+ * data to LevelDB once the writing to InMemoryStore is completed.
+ */
+
+private[history] class HybridStore extends KVStore {
+
+  private val inMemoryStore = new InMemoryStore()
+
+  private var levelDB: LevelDB = null
+
+  // Flag to indicate whether we should use inMemoryStore or levelDB
+  private[history] val shouldUseInMemoryStore = new AtomicBoolean(true)
+
+  // A background thread that dumps data from inMemoryStore to levelDB
+  private var backgroundThread: Thread = null
+
+  // Objects of these classes will be dumped to levelDB in the background 
thread
+  private var klassList: Seq[Class[_]] = List(

Review comment:
       agree

##########
File path: 
common/kvstore/src/main/java/org/apache/spark/util/kvstore/HybridStore.java
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import org.apache.spark.annotation.Private;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Collection;
+
+/**
+ * Implementation of KVStore that writes data to InMemoryStore at first and 
uses
+ * a background thread to dump data to LevelDB once the writing to 
InMemoryStore
+ * is completed.
+ */
+@Private
+public class HybridStore implements KVStore {
+
+  private InMemoryStore inMemoryStore = new InMemoryStore();
+  private LevelDB levelDB = null;
+
+  // Flag to indicate if we should use inMemoryStore Or levelDB.
+  private AtomicBoolean shouldUseInMemoryStore = new AtomicBoolean(true);
+
+  // A background thread that dumps data in inMemoryStore to levelDB
+  private Thread backgroundThread = null;
+
+  // A hash map that stores all class types (except CachedQuantile) that had 
been writen
+  // to inMemoryStore.
+  private ConcurrentHashMap<Class<?>, Boolean> klassMap = new 
ConcurrentHashMap<>();
+
+  // CachedQuantile can be written to kvstore after rebuildAppStore(), so we 
need
+  // to handle it specially to avoid conflicts. We will use a queue store 
CachedQuantile
+  // objects when the underlying store is inMemoryStore, and dump these 
objects to levelDB
+  // before the switch completes.
+  private Class<?> cachedQuantileKlass = null;
+  private ConcurrentLinkedQueue<Object> cachedQuantileQueue = new 
ConcurrentLinkedQueue<>();
+
+
+  @Override
+  public <T> T getMetadata(Class<T> klass) throws Exception {
+    KVStore store = getStore();
+    T metaData = store.getMetadata(klass);
+    return metaData;
+  }
+
+  @Override
+  public void setMetadata(Object value) throws Exception {
+    KVStore store = getStore();
+    store.setMetadata(value);
+  }
+
+  @Override
+  public <T> T read(Class<T> klass, Object naturalKey) throws Exception {
+    KVStore store = getStore();
+    T value = store.read(klass, naturalKey);
+    return value;
+  }
+
+  @Override
+  public void write(Object value) throws Exception {
+    Class<?> klass = value.getClass();
+
+    if (backgroundThread != null && !klass.equals(cachedQuantileKlass)) {
+      throw new RuntimeException("write() for objects other than 
CachedQuantile " +
+        "shouldn't be called after the hybrid store begins switching to 
levelDB");
+    }
+
+    KVStore store = getStore();
+    store.write(value);
+
+    if (store instanceof InMemoryStore) {
+      if (klass.equals(cachedQuantileKlass)) {
+        cachedQuantileQueue.add(value);
+      } else {
+        klassMap.putIfAbsent(klass, true);
+      }
+    }
+  }
+
+  @Override
+  public void delete(Class<?> type, Object naturalKey) throws Exception {
+    if (backgroundThread != null) {
+      throw new RuntimeException("delete() shouldn't be called after " +
+        "the hybrid store begins switching to levelDB");
+    }
+
+    KVStore store = getStore();
+    store.delete(type, naturalKey);
+  }
+
+  @Override
+  public <T> KVStoreView<T> view(Class<T> type) throws Exception {
+    KVStore store = getStore();
+    KVStoreView<T> view = store.view(type);
+    return view;
+  }
+
+  @Override
+  public long count(Class<?> type) throws Exception {
+    KVStore store = getStore();
+    long count = store.count(type);
+    return count;
+  }
+
+  @Override
+  public long count(Class<?> type, String index, Object indexedValue) throws 
Exception {
+    KVStore store = getStore();

Review comment:
       addressed

##########
File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
##########
@@ -1167,6 +1172,17 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
     // At this point the disk data either does not exist or was deleted 
because it failed to
     // load, so the event log needs to be replayed.
 
+    // If hybrid store is enabled, try it first.

Review comment:
       retrial logic is added.

##########
File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.IOException
+import java.util.Collection
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.spark.util.kvstore._
+
+/**
+ * A implementation of KVStore that accelerates event logs loading.
+ *
+ * When rebuilding the application state from event logs, HybridStore will
+ * write data to InMemoryStore at first and use a background thread to dump
+ * data to LevelDB once the writing to InMemoryStore is completed.
+ */
+
+private[history] class HybridStore extends KVStore {
+
+  private val inMemoryStore = new InMemoryStore()
+
+  private var levelDB: LevelDB = null
+
+  // Flag to indicate whether we should use inMemoryStore or levelDB
+  private[history] val shouldUseInMemoryStore = new AtomicBoolean(true)
+
+  // A background thread that dumps data from inMemoryStore to levelDB
+  private var backgroundThread: Thread = null
+
+  // Objects of these classes will be dumped to levelDB in the background 
thread
+  private var klassList: Seq[Class[_]] = List(

Review comment:
       Explicit logic for handling particular classes are removed. 

##########
File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.IOException
+import java.util.Collection
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.util.kvstore._
+
+/**
+ * A implementation of KVStore that accelerates event logs loading.
+ *
+ * When rebuilding the application state from event logs, HybridStore will
+ * write data to InMemoryStore at first and use a background thread to dump
+ * data to LevelDB once the writing to InMemoryStore is completed.
+ */
+
+private[history] class HybridStore extends KVStore {
+
+  private val inMemoryStore = new InMemoryStore()
+
+  private var levelDB: LevelDB = null
+
+  // Flag to indicate whether we should use inMemoryStore or levelDB
+  private[history] val shouldUseInMemoryStore = new AtomicBoolean(true)
+
+  // A background thread that dumps data from inMemoryStore to levelDB
+  private var backgroundThread: Thread = null
+
+  // A hash map that stores all classes that had been writen to inMemoryStore
+  private val klassMap = new ConcurrentHashMap[Class[_], Boolean]
+
+  override def getMetadata[T](klass: Class[T]): T = {
+    getStore().getMetadata(klass)
+  }
+
+  override def setMetadata(value: Object): Unit = {
+    getStore().setMetadata(value)
+  }
+
+  override def read[T](klass: Class[T], naturalKey: Object): T = {
+    getStore().read(klass, naturalKey)
+  }
+
+  override def write(value: Object): Unit = {
+    val store = getStore()
+    store.write(value)

Review comment:
       we can update it to getStore().write(value)

##########
File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
##########
@@ -1197,6 +1213,78 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
     KVUtils.open(newStorePath, metadata)
   }
 
+  private def createHybridStore(
+      dm: HistoryServerDiskManager,
+      appId: String,
+      attempt: AttemptInfoWrapper,
+      metadata: AppStatusStoreMetadata): KVStore = {
+
+    var retried = false
+    var hybridStore: HybridStore = null
+    while (hybridStore == null) {
+      val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath),
+        attempt.lastIndex)
+      val isCompressed = reader.compressionCodec.isDefined
+
+      // Throws an exception if the memory space is not enough
+      memoryManager.lease(appId, attempt.info.attemptId, reader.totalSize, 
isCompressed)
+
+      logInfo(s"Leasing disk manager space for app $appId / 
${attempt.info.attemptId}...")
+      val lease = dm.lease(reader.totalSize, isCompressed)
+      val isLeaseRolledBack = new 
java.util.concurrent.atomic.AtomicBoolean(false)
+      var store: HybridStore = null
+      try {
+        store = new HybridStore()
+        val levelDB = KVUtils.open(lease.tmpPath, metadata)
+        store.setLevelDB(levelDB)
+        rebuildAppStore(store, reader, attempt.info.lastUpdated.getTime())
+
+        // Start the background thread to dump data to levelDB when writing to
+        // InMemoryStore is completed.
+        store.switchToLevelDB(new HybridStore.SwitchToLevelDBListener {
+          override def onSwitchToLevelDBSuccess: Unit = {
+            levelDB.close()
+            val newStorePath = lease.commit(appId, attempt.info.attemptId)
+            store.setLevelDB(KVUtils.open(newStorePath, metadata))
+            memoryManager.release(appId, attempt.info.attemptId)
+            logInfo(s"Completely switched to LevelDB for app $appId / 
${attempt.info.attemptId}.")
+          }
+
+          override def onSwitchToLevelDBFail(e: Exception): Unit = {
+            levelDB.close()
+            if (!isLeaseRolledBack.getAndSet(true)) {
+              lease.rollback()
+            }

Review comment:
       I feel like this atomicBoolean variable is somewhat unnecessary. If the 
background thread can successfully start, the createHybridStore() will finish 
and return, hence no exceptions will be caught in the foreground thread.  If 
exceptions occurred before backgroundThread.start(), the background thread 
won't be started and hence the lease can only be rollbacked on the foreground 
thread. What do you think? @HyukjinKwon @redsanket 

##########
File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
##########
@@ -1197,6 +1213,78 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
     KVUtils.open(newStorePath, metadata)
   }
 
+  private def createHybridStore(
+      dm: HistoryServerDiskManager,
+      appId: String,
+      attempt: AttemptInfoWrapper,
+      metadata: AppStatusStoreMetadata): KVStore = {
+
+    var retried = false
+    var hybridStore: HybridStore = null
+    while (hybridStore == null) {
+      val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath),
+        attempt.lastIndex)
+      val isCompressed = reader.compressionCodec.isDefined
+
+      // Throws an exception if the memory space is not enough
+      memoryManager.lease(appId, attempt.info.attemptId, reader.totalSize, 
isCompressed)
+
+      logInfo(s"Leasing disk manager space for app $appId / 
${attempt.info.attemptId}...")
+      val lease = dm.lease(reader.totalSize, isCompressed)
+      val isLeaseRolledBack = new 
java.util.concurrent.atomic.AtomicBoolean(false)
+      var store: HybridStore = null
+      try {
+        store = new HybridStore()
+        val levelDB = KVUtils.open(lease.tmpPath, metadata)
+        store.setLevelDB(levelDB)
+        rebuildAppStore(store, reader, attempt.info.lastUpdated.getTime())
+
+        // Start the background thread to dump data to levelDB when writing to
+        // InMemoryStore is completed.
+        store.switchToLevelDB(new HybridStore.SwitchToLevelDBListener {
+          override def onSwitchToLevelDBSuccess: Unit = {
+            levelDB.close()
+            val newStorePath = lease.commit(appId, attempt.info.attemptId)
+            store.setLevelDB(KVUtils.open(newStorePath, metadata))
+            memoryManager.release(appId, attempt.info.attemptId)
+            logInfo(s"Completely switched to LevelDB for app $appId / 
${attempt.info.attemptId}.")
+          }
+
+          override def onSwitchToLevelDBFail(e: Exception): Unit = {
+            levelDB.close()
+            if (!isLeaseRolledBack.getAndSet(true)) {
+              lease.rollback()
+            }

Review comment:
       I feel like this atomicBoolean variable is somewhat unnecessary. If the 
background thread can successfully start, the createHybridStore() will finish 
and return, hence no exceptions will be caught in the foreground thread.  If 
exceptions occurred before backgroundThread.start(), the background thread 
won't be started and hence the lease can only be rollbacked on the foreground 
thread. What do you think? @HeartSaVioR @redsanket 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to