baohe-zhang commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r420461909



##########
File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
##########
@@ -1167,6 +1168,58 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
     // At this point the disk data either does not exist or was deleted 
because it failed to
     // load, so the event log needs to be replayed.
 
+    // TODO: Maybe need to do other check to see if there's enough memory to
+    // use inMemoryStore.
+    if (hybridKVStoreEnabled) {
+      logInfo("Using HybridKVStore as KVStore")
+      var retried = false
+      var store: HybridKVStore = null
+      while(store == null) {
+        val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath),
+          attempt.lastIndex)
+        val isCompressed = reader.compressionCodec.isDefined
+        logInfo(s"Leasing disk manager space for app $appId / 
${attempt.info.attemptId}...")
+        val lease = dm.lease(reader.totalSize, isCompressed)
+        try {
+          val s = new HybridKVStore()
+          val levelDB = KVUtils.open(lease.tmpPath, metadata)
+          s.setLevelDB(levelDB)
+
+          s.startBackgroundThreadToWriteToDB(new 
HybridKVStore.SwitchingToLevelDBListener {
+            override def onSwitchingToLevelDBSuccess: Unit = {
+              levelDB.close()
+              val newStorePath = lease.commit(appId, attempt.info.attemptId)
+              s.setLevelDB(KVUtils.open(newStorePath, metadata))
+              logInfo(s"Completely switched to use leveldb for app" +
+              s" $appId / ${attempt.info.attemptId}")
+            }
+
+            override def onSwitchingToLevelDBFail(e: Exception): Unit = {
+              logWarning(s"Failed to switch to use LevelDb for app" +
+              s" $appId / ${attempt.info.attemptId}")
+              levelDB.close()
+              throw e
+            }
+          })
+
+          rebuildAppStore(s, reader, attempt.info.lastUpdated.getTime())
+          s.stopBackgroundThreadAndSwitchToLevelDB()
+          store = s
+        } catch {
+          case _: IOException if !retried =>
+            // compaction may touch the file(s) which app rebuild wants to read
+            // compaction wouldn't run in short interval, so try again...
+            logWarning(s"Exception occurred while rebuilding app $appId - 
trying again...")
+            lease.rollback()

Review comment:
       I didn't quite understand this comment, could you elaborate more? I 
think in the current implementation, if any exceptions are thrown when 
migrating data to leveldb, the hybrid kvstore will not switch to leveldb and 
the getStore() method in hybrid kvstore will always return an in-memory kvstore.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to