This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 465a85375b1e [SPARK-47568][SS][3.5] Fix race condition between 
maintenance thread and load/commit for snapshot files
465a85375b1e is described below

commit 465a85375b1e3b81b96eb365d4a68943478265c9
Author: Bhuwan Sahni <bhuwan.sa...@databricks.com>
AuthorDate: Fri Apr 5 10:51:24 2024 +0900

    [SPARK-47568][SS][3.5] Fix race condition between maintenance thread and 
load/commit for snapshot files
    
    Backports https://github.com/apache/spark/pull/45724 to 3.5
    
    ### What changes were proposed in this pull request?
    
    This PR fixes a race condition between the maintenance thread and task 
thread when change-log checkpointing is enabled, and ensure all snapshots are 
valid.
    
    1. The maintenance thread currently relies on class variable lastSnapshot 
to find the latest checkpoint and uploads it to DFS. This checkpoint can be 
modified at commit time by Task thread if a new snapshot is created.
    2. The task thread was not resetting the lastSnapshot at load time, which 
can result in newer snapshots (if a old version is loaded) being considered 
valid and uploaded to DFS. This results in VersionIdMismatch errors.
    
    ### Why are the changes needed?
    
    These are logical bugs which can cause `VersionIdMismatch` errors causing 
user to discard the snapshot and restart the query.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Added unit test cases.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #45881 from sahnib/rocks-db-fix-3.5.
    
    Authored-by: Bhuwan Sahni <bhuwan.sa...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../sql/execution/streaming/state/RocksDB.scala    | 65 ++++++++++++++--------
 .../streaming/state/RocksDBFileManager.scala       |  3 +-
 .../execution/streaming/state/RocksDBSuite.scala   | 37 ++++++++++++
 3 files changed, 81 insertions(+), 24 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 0c9738a6b081..301d978c9038 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.streaming.state
 
 import java.io.File
 import java.util.Locale
+import java.util.concurrent.TimeUnit
 import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.{mutable, Map}
@@ -152,19 +153,23 @@ class RocksDB(
         loadedVersion = latestSnapshotVersion
 
         // reset last snapshot version
-        lastSnapshotVersion = 0L
+        if (lastSnapshotVersion > latestSnapshotVersion) {
+          // discard any newer snapshots
+          lastSnapshotVersion = 0L
+          latestSnapshot = None
+        }
         openDB()
 
         numKeysOnWritingVersion = if (!conf.trackTotalNumberOfRows) {
-          // we don't track the total number of rows - discard the number 
being track
-          -1L
-        } else if (metadata.numKeys < 0) {
-          // we track the total number of rows, but the snapshot doesn't have 
tracking number
-          // need to count keys now
-          countKeys()
-        } else {
-          metadata.numKeys
-        }
+            // we don't track the total number of rows - discard the number 
being track
+            -1L
+          } else if (metadata.numKeys < 0) {
+            // we track the total number of rows, but the snapshot doesn't 
have tracking number
+            // need to count keys now
+            countKeys()
+          } else {
+            metadata.numKeys
+          }
         if (loadedVersion != version) replayChangelog(version)
         // After changelog replay the numKeysOnWritingVersion will be updated 
to
         // the correct number of keys in the loaded version.
@@ -359,16 +364,14 @@ class RocksDB(
           // background operations.
           val cp = Checkpoint.create(db)
           cp.createCheckpoint(checkpointDir.toString)
-          synchronized {
-            // if changelog checkpointing is disabled, the snapshot is 
uploaded synchronously
-            // inside the uploadSnapshot() called below.
-            // If changelog checkpointing is enabled, snapshot will be 
uploaded asynchronously
-            // during state store maintenance.
-            latestSnapshot.foreach(_.close())
-            latestSnapshot = Some(
-              RocksDBSnapshot(checkpointDir, newVersion, 
numKeysOnWritingVersion))
-            lastSnapshotVersion = newVersion
-          }
+          // if changelog checkpointing is disabled, the snapshot is uploaded 
synchronously
+          // inside the uploadSnapshot() called below.
+          // If changelog checkpointing is enabled, snapshot will be uploaded 
asynchronously
+          // during state store maintenance.
+          latestSnapshot.foreach(_.close())
+          latestSnapshot = Some(
+            RocksDBSnapshot(checkpointDir, newVersion, 
numKeysOnWritingVersion))
+          lastSnapshotVersion = newVersion
         }
       }
 
@@ -454,7 +457,20 @@ class RocksDB(
 
   def doMaintenance(): Unit = {
     if (enableChangelogCheckpointing) {
-      uploadSnapshot()
+      // There is race to update latestSnapshot between load(), commit()
+      // and uploadSnapshot().
+      // The load method will reset latestSnapshot to discard any snapshots 
taken
+      // from newer versions (when a old version is reloaded).
+      // commit() method deletes the existing snapshot while creating a new 
snapshot.
+      // In order to ensure that the snapshot being uploaded would not be 
modified
+      // concurrently, we need to synchronize the snapshot access between task 
thread
+      // and maintenance thread.
+      acquire()
+      try {
+        uploadSnapshot()
+      } finally {
+        release()
+      }
     }
     val cleanupTime = timeTakenMs {
       fileManager.deleteOldVersions(conf.minVersionsToRetain)
@@ -549,8 +565,11 @@ class RocksDB(
 
   private def acquire(): Unit = acquireLock.synchronized {
     val newAcquiredThreadInfo = AcquiredThreadInfo()
-    val waitStartTime = System.currentTimeMillis
-    def timeWaitedMs = System.currentTimeMillis - waitStartTime
+    val waitStartTime = System.nanoTime()
+    def timeWaitedMs = {
+      val elapsedNanos = System.nanoTime() - waitStartTime
+      TimeUnit.MILLISECONDS.convert(elapsedNanos, TimeUnit.NANOSECONDS)
+    }
     def isAcquiredByDifferentThread = acquiredThreadInfo != null &&
       acquiredThreadInfo.threadRef.get.isDefined &&
       newAcquiredThreadInfo.threadRef.get.get.getId != 
acquiredThreadInfo.threadRef.get.get.getId
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index 3089de7127e7..c527a6a03ae9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -532,6 +532,7 @@ class RocksDBFileManager(
     // Delete unnecessary local immutable files
     localImmutableFiles
       .foreach { existingFile =>
+        val existingFileSize = existingFile.length()
         val requiredFile = 
requiredFileNameToFileDetails.get(existingFile.getName)
         val prevDfsFile = 
localFilesToDfsFiles.asScala.get(existingFile.getName)
         val isSameFile = if (requiredFile.isDefined && prevDfsFile.isDefined) {
@@ -544,7 +545,7 @@ class RocksDBFileManager(
         if (!isSameFile) {
           existingFile.delete()
           localFilesToDfsFiles.remove(existingFile.getName)
-          logInfo(s"Deleted local file $existingFile with size 
${existingFile.length()} mapped" +
+          logInfo(s"Deleted local file $existingFile with size 
$existingFileSize mapped" +
             s" to previous dfsFile ${prevDfsFile.getOrElse("null")}")
         } else {
           logInfo(s"reusing $prevDfsFile present at $existingFile for 
$requiredFile")
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index 16bfe2359f43..89b4925db707 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -304,6 +304,14 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
         db.load(version, readOnly = true)
         assert(db.iterator().map(toStr).toSet === Set((version.toString, 
version.toString)))
       }
+
+      // recommit 60 to ensure that acquireLock is released for maintenance
+      for (version <- 60 to 60) {
+        db.load(version - 1)
+        db.put(version.toString, version.toString)
+        db.remove((version - 1).toString)
+        db.commit()
+      }
       // Check that snapshots and changelogs get purged correctly.
       db.doMaintenance()
       assert(snapshotVersionsPresent(remoteDir) === Seq(30, 60))
@@ -1281,6 +1289,35 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
     }
   }
 
+  testWithChangelogCheckpointingEnabled("time travel 4 -" +
+    " validate successful RocksDB load") {
+    val remoteDir = Utils.createTempDir().toString
+    val conf = dbConf.copy(minDeltasForSnapshot = 2, compactOnCommit = false)
+    new File(remoteDir).delete() // to make sure that the directory gets 
created
+    withDB(remoteDir, conf = conf) { db =>
+      for (version <- 0 to 1) {
+        db.load(version)
+        db.put(version.toString, version.toString)
+        db.commit()
+      }
+
+      // load previous version, and recreate the snapshot
+      db.load(1)
+      db.put("3", "3")
+
+      // do maintenance - upload any latest snapshots so far
+      // would fail to acquire lock and no snapshots would be uploaded
+      db.doMaintenance()
+      db.commit()
+      // upload newly created snapshot 2.zip
+      db.doMaintenance()
+    }
+
+    // reload version 2 - should succeed
+    withDB(remoteDir, version = 2, conf = conf) { db =>
+    }
+  }
+
   test("validate Rocks DB SST files do not have a VersionIdMismatch" +
     " when metadata file is not overwritten - scenario 1") {
     val fmClass = "org.apache.spark.sql.execution.streaming.state." +


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to