This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b844e52b35b [SPARK-47568][SS] Fix race condition between maintenance 
thread and load/commit for snapshot files
0b844e52b35b is described below

commit 0b844e52b35b0491717ba9f0ae8fe2e0cf45e88d
Author: Bhuwan Sahni <bhuwan.sa...@databricks.com>
AuthorDate: Fri Mar 29 13:23:15 2024 +0900

    [SPARK-47568][SS] Fix race condition between maintenance thread and 
load/commit for snapshot files
    
    ### What changes were proposed in this pull request?
    
    This PR fixes a race condition between the maintenance thread and task 
thread when change-log checkpointing is enabled, and ensure all snapshots are 
valid.
    
    1. The maintenance thread currently relies on class variable lastSnapshot 
to find the latest checkpoint and uploads it to DFS. This checkpoint can be 
modified at commit time by Task thread if a new snapshot is created.
    2. The task thread was not resetting the lastSnapshot at load time, which 
can result in newer snapshots (if a old version is loaded) being considered 
valid and uploaded to DFS. This results in VersionIdMismatch errors.
    
    ### Why are the changes needed?
    
    These are logical bugs which can cause `VersionIdMismatch` errors causing 
user to discard the snapshot and restart the query.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Added unit test cases.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #45724 from sahnib/rocks-db-fix.
    
    Authored-by: Bhuwan Sahni <bhuwan.sa...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../sql/execution/streaming/state/RocksDB.scala    | 66 ++++++++++++++--------
 .../streaming/state/RocksDBFileManager.scala       |  3 +-
 .../execution/streaming/state/RocksDBSuite.scala   | 37 ++++++++++++
 3 files changed, 82 insertions(+), 24 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 1817104a5c22..fcefc1666f3a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.streaming.state
 
 import java.io.File
 import java.util.Locale
+import java.util.concurrent.TimeUnit
 import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.{mutable, Map}
@@ -49,6 +50,7 @@ case object RollbackStore extends 
RocksDBOpType("rollback_store")
 case object CloseStore extends RocksDBOpType("close_store")
 case object ReportStoreMetrics extends RocksDBOpType("report_store_metrics")
 case object StoreTaskCompletionListener extends 
RocksDBOpType("store_task_completion_listener")
+case object StoreMaintenance extends RocksDBOpType("store_maintenance")
 
 /**
  * Class representing a RocksDB instance that checkpoints version of data to 
DFS.
@@ -184,19 +186,23 @@ class RocksDB(
         loadedVersion = latestSnapshotVersion
 
         // reset last snapshot version
-        lastSnapshotVersion = 0L
+        if (lastSnapshotVersion > latestSnapshotVersion) {
+          // discard any newer snapshots
+          lastSnapshotVersion = 0L
+          latestSnapshot = None
+        }
         openDB()
 
         numKeysOnWritingVersion = if (!conf.trackTotalNumberOfRows) {
-          // we don't track the total number of rows - discard the number 
being track
-          -1L
-        } else if (metadata.numKeys < 0) {
-          // we track the total number of rows, but the snapshot doesn't have 
tracking number
-          // need to count keys now
-          countKeys()
-        } else {
-          metadata.numKeys
-        }
+            // we don't track the total number of rows - discard the number 
being track
+            -1L
+          } else if (metadata.numKeys < 0) {
+            // we track the total number of rows, but the snapshot doesn't 
have tracking number
+            // need to count keys now
+            countKeys()
+          } else {
+            metadata.numKeys
+          }
         if (loadedVersion != version) replayChangelog(version)
         // After changelog replay the numKeysOnWritingVersion will be updated 
to
         // the correct number of keys in the loaded version.
@@ -571,16 +577,14 @@ class RocksDB(
           // background operations.
           val cp = Checkpoint.create(db)
           cp.createCheckpoint(checkpointDir.toString)
-          synchronized {
-            // if changelog checkpointing is disabled, the snapshot is 
uploaded synchronously
-            // inside the uploadSnapshot() called below.
-            // If changelog checkpointing is enabled, snapshot will be 
uploaded asynchronously
-            // during state store maintenance.
-            latestSnapshot.foreach(_.close())
-            latestSnapshot = Some(
-              RocksDBSnapshot(checkpointDir, newVersion, 
numKeysOnWritingVersion))
-            lastSnapshotVersion = newVersion
-          }
+          // if changelog checkpointing is disabled, the snapshot is uploaded 
synchronously
+          // inside the uploadSnapshot() called below.
+          // If changelog checkpointing is enabled, snapshot will be uploaded 
asynchronously
+          // during state store maintenance.
+          latestSnapshot.foreach(_.close())
+          latestSnapshot = Some(
+            RocksDBSnapshot(checkpointDir, newVersion, 
numKeysOnWritingVersion))
+          lastSnapshotVersion = newVersion
         }
       }
 
@@ -668,7 +672,20 @@ class RocksDB(
 
   def doMaintenance(): Unit = {
     if (enableChangelogCheckpointing) {
-      uploadSnapshot()
+      // There is race to update latestSnapshot between load(), commit()
+      // and uploadSnapshot().
+      // The load method will reset latestSnapshot to discard any snapshots 
taken
+      // from newer versions (when a old version is reloaded).
+      // commit() method deletes the existing snapshot while creating a new 
snapshot.
+      // In order to ensure that the snapshot being uploaded would not be 
modified
+      // concurrently, we need to synchronize the snapshot access between task 
thread
+      // and maintenance thread.
+      acquire(StoreMaintenance)
+      try {
+        uploadSnapshot()
+      } finally {
+        release(StoreMaintenance)
+      }
     }
     val cleanupTime = timeTakenMs {
       fileManager.deleteOldVersions(conf.minVersionsToRetain)
@@ -788,8 +805,11 @@ class RocksDB(
    */
   private def acquire(opType: RocksDBOpType): Unit = acquireLock.synchronized {
     val newAcquiredThreadInfo = AcquiredThreadInfo()
-    val waitStartTime = System.currentTimeMillis
-    def timeWaitedMs = System.currentTimeMillis - waitStartTime
+    val waitStartTime = System.nanoTime()
+    def timeWaitedMs = {
+      val elapsedNanos = System.nanoTime() - waitStartTime
+      TimeUnit.MILLISECONDS.convert(elapsedNanos, TimeUnit.NANOSECONDS)
+    }
     def isAcquiredByDifferentThread = acquiredThreadInfo != null &&
       acquiredThreadInfo.threadRef.get.isDefined &&
       newAcquiredThreadInfo.threadRef.get.get.getId != 
acquiredThreadInfo.threadRef.get.get.getId
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index 18c769bed350..bd1daa48f809 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -554,6 +554,7 @@ class RocksDBFileManager(
     // Delete unnecessary local immutable files
     localImmutableFiles
       .foreach { existingFile =>
+        val existingFileSize = existingFile.length()
         val requiredFile = 
requiredFileNameToFileDetails.get(existingFile.getName)
         val prevDfsFile = 
localFilesToDfsFiles.asScala.get(existingFile.getName)
         val isSameFile = if (requiredFile.isDefined && prevDfsFile.isDefined) {
@@ -566,7 +567,7 @@ class RocksDBFileManager(
         if (!isSameFile) {
           existingFile.delete()
           localFilesToDfsFiles.remove(existingFile.getName)
-          logInfo(s"Deleted local file $existingFile with size 
${existingFile.length()} mapped" +
+          logInfo(s"Deleted local file $existingFile with size 
$existingFileSize mapped" +
             s" to previous dfsFile ${prevDfsFile.getOrElse("null")}")
         } else {
           logInfo(s"reusing $prevDfsFile present at $existingFile for 
$requiredFile")
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index f3393ac10b29..d98b02c2a897 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -383,6 +383,14 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
         db.load(version, readOnly = true)
         assert(db.iterator().map(toStr).toSet === Set((version.toString, 
version.toString)))
       }
+
+      // recommit 60 to ensure that acquireLock is released for maintenance
+      for (version <- 60 to 60) {
+        db.load(version - 1)
+        db.put(version.toString, version.toString)
+        db.remove((version - 1).toString)
+        db.commit()
+      }
       // Check that snapshots and changelogs get purged correctly.
       db.doMaintenance()
       assert(snapshotVersionsPresent(remoteDir) === Seq(30, 60))
@@ -1919,6 +1927,35 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
     }
   }
 
+  testWithChangelogCheckpointingEnabled("time travel 4 -" +
+    " validate successful RocksDB load") {
+    val remoteDir = Utils.createTempDir().toString
+    val conf = dbConf.copy(minDeltasForSnapshot = 2, compactOnCommit = false)
+    new File(remoteDir).delete() // to make sure that the directory gets 
created
+    withDB(remoteDir, conf = conf) { db =>
+      for (version <- 0 to 1) {
+        db.load(version)
+        db.put(version.toString, version.toString)
+        db.commit()
+      }
+
+      // load previous version, and recreate the snapshot
+      db.load(1)
+      db.put("3", "3")
+
+      // do maintenance - upload any latest snapshots so far
+      // would fail to acquire lock and no snapshots would be uploaded
+      db.doMaintenance()
+      db.commit()
+      // upload newly created snapshot 2.zip
+      db.doMaintenance()
+    }
+
+    // reload version 2 - should succeed
+    withDB(remoteDir, version = 2, conf = conf) { db =>
+    }
+  }
+
   test("validate Rocks DB SST files do not have a VersionIdMismatch" +
     " when metadata file is not overwritten - scenario 1") {
     val fmClass = "org.apache.spark.sql.execution.streaming.state." +


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to