This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new 465a85375b1e [SPARK-47568][SS][3.5] Fix race condition between maintenance thread and load/commit for snapshot files 465a85375b1e is described below commit 465a85375b1e3b81b96eb365d4a68943478265c9 Author: Bhuwan Sahni <bhuwan.sa...@databricks.com> AuthorDate: Fri Apr 5 10:51:24 2024 +0900 [SPARK-47568][SS][3.5] Fix race condition between maintenance thread and load/commit for snapshot files Backports https://github.com/apache/spark/pull/45724 to 3.5 ### What changes were proposed in this pull request? This PR fixes a race condition between the maintenance thread and task thread when change-log checkpointing is enabled, and ensure all snapshots are valid. 1. The maintenance thread currently relies on class variable lastSnapshot to find the latest checkpoint and uploads it to DFS. This checkpoint can be modified at commit time by Task thread if a new snapshot is created. 2. The task thread was not resetting the lastSnapshot at load time, which can result in newer snapshots (if a old version is loaded) being considered valid and uploaded to DFS. This results in VersionIdMismatch errors. ### Why are the changes needed? These are logical bugs which can cause `VersionIdMismatch` errors causing user to discard the snapshot and restart the query. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit test cases. ### Was this patch authored or co-authored using generative AI tooling? No Closes #45881 from sahnib/rocks-db-fix-3.5. Authored-by: Bhuwan Sahni <bhuwan.sa...@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../sql/execution/streaming/state/RocksDB.scala | 65 ++++++++++++++-------- .../streaming/state/RocksDBFileManager.scala | 3 +- .../execution/streaming/state/RocksDBSuite.scala | 37 ++++++++++++ 3 files changed, 81 insertions(+), 24 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 0c9738a6b081..301d978c9038 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.streaming.state import java.io.File import java.util.Locale +import java.util.concurrent.TimeUnit import javax.annotation.concurrent.GuardedBy import scala.collection.{mutable, Map} @@ -152,19 +153,23 @@ class RocksDB( loadedVersion = latestSnapshotVersion // reset last snapshot version - lastSnapshotVersion = 0L + if (lastSnapshotVersion > latestSnapshotVersion) { + // discard any newer snapshots + lastSnapshotVersion = 0L + latestSnapshot = None + } openDB() numKeysOnWritingVersion = if (!conf.trackTotalNumberOfRows) { - // we don't track the total number of rows - discard the number being track - -1L - } else if (metadata.numKeys < 0) { - // we track the total number of rows, but the snapshot doesn't have tracking number - // need to count keys now - countKeys() - } else { - metadata.numKeys - } + // we don't track the total number of rows - discard the number being track + -1L + } else if (metadata.numKeys < 0) { + // we track the total number of rows, but the snapshot doesn't have tracking number + // need to count keys now + countKeys() + } else { + metadata.numKeys + } if (loadedVersion != version) replayChangelog(version) // After changelog replay the numKeysOnWritingVersion will be updated to // the correct number of keys in the loaded version. @@ -359,16 +364,14 @@ class RocksDB( // background operations. val cp = Checkpoint.create(db) cp.createCheckpoint(checkpointDir.toString) - synchronized { - // if changelog checkpointing is disabled, the snapshot is uploaded synchronously - // inside the uploadSnapshot() called below. - // If changelog checkpointing is enabled, snapshot will be uploaded asynchronously - // during state store maintenance. - latestSnapshot.foreach(_.close()) - latestSnapshot = Some( - RocksDBSnapshot(checkpointDir, newVersion, numKeysOnWritingVersion)) - lastSnapshotVersion = newVersion - } + // if changelog checkpointing is disabled, the snapshot is uploaded synchronously + // inside the uploadSnapshot() called below. + // If changelog checkpointing is enabled, snapshot will be uploaded asynchronously + // during state store maintenance. + latestSnapshot.foreach(_.close()) + latestSnapshot = Some( + RocksDBSnapshot(checkpointDir, newVersion, numKeysOnWritingVersion)) + lastSnapshotVersion = newVersion } } @@ -454,7 +457,20 @@ class RocksDB( def doMaintenance(): Unit = { if (enableChangelogCheckpointing) { - uploadSnapshot() + // There is race to update latestSnapshot between load(), commit() + // and uploadSnapshot(). + // The load method will reset latestSnapshot to discard any snapshots taken + // from newer versions (when a old version is reloaded). + // commit() method deletes the existing snapshot while creating a new snapshot. + // In order to ensure that the snapshot being uploaded would not be modified + // concurrently, we need to synchronize the snapshot access between task thread + // and maintenance thread. + acquire() + try { + uploadSnapshot() + } finally { + release() + } } val cleanupTime = timeTakenMs { fileManager.deleteOldVersions(conf.minVersionsToRetain) @@ -549,8 +565,11 @@ class RocksDB( private def acquire(): Unit = acquireLock.synchronized { val newAcquiredThreadInfo = AcquiredThreadInfo() - val waitStartTime = System.currentTimeMillis - def timeWaitedMs = System.currentTimeMillis - waitStartTime + val waitStartTime = System.nanoTime() + def timeWaitedMs = { + val elapsedNanos = System.nanoTime() - waitStartTime + TimeUnit.MILLISECONDS.convert(elapsedNanos, TimeUnit.NANOSECONDS) + } def isAcquiredByDifferentThread = acquiredThreadInfo != null && acquiredThreadInfo.threadRef.get.isDefined && newAcquiredThreadInfo.threadRef.get.get.getId != acquiredThreadInfo.threadRef.get.get.getId diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index 3089de7127e7..c527a6a03ae9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -532,6 +532,7 @@ class RocksDBFileManager( // Delete unnecessary local immutable files localImmutableFiles .foreach { existingFile => + val existingFileSize = existingFile.length() val requiredFile = requiredFileNameToFileDetails.get(existingFile.getName) val prevDfsFile = localFilesToDfsFiles.asScala.get(existingFile.getName) val isSameFile = if (requiredFile.isDefined && prevDfsFile.isDefined) { @@ -544,7 +545,7 @@ class RocksDBFileManager( if (!isSameFile) { existingFile.delete() localFilesToDfsFiles.remove(existingFile.getName) - logInfo(s"Deleted local file $existingFile with size ${existingFile.length()} mapped" + + logInfo(s"Deleted local file $existingFile with size $existingFileSize mapped" + s" to previous dfsFile ${prevDfsFile.getOrElse("null")}") } else { logInfo(s"reusing $prevDfsFile present at $existingFile for $requiredFile") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 16bfe2359f43..89b4925db707 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -304,6 +304,14 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared db.load(version, readOnly = true) assert(db.iterator().map(toStr).toSet === Set((version.toString, version.toString))) } + + // recommit 60 to ensure that acquireLock is released for maintenance + for (version <- 60 to 60) { + db.load(version - 1) + db.put(version.toString, version.toString) + db.remove((version - 1).toString) + db.commit() + } // Check that snapshots and changelogs get purged correctly. db.doMaintenance() assert(snapshotVersionsPresent(remoteDir) === Seq(30, 60)) @@ -1281,6 +1289,35 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared } } + testWithChangelogCheckpointingEnabled("time travel 4 -" + + " validate successful RocksDB load") { + val remoteDir = Utils.createTempDir().toString + val conf = dbConf.copy(minDeltasForSnapshot = 2, compactOnCommit = false) + new File(remoteDir).delete() // to make sure that the directory gets created + withDB(remoteDir, conf = conf) { db => + for (version <- 0 to 1) { + db.load(version) + db.put(version.toString, version.toString) + db.commit() + } + + // load previous version, and recreate the snapshot + db.load(1) + db.put("3", "3") + + // do maintenance - upload any latest snapshots so far + // would fail to acquire lock and no snapshots would be uploaded + db.doMaintenance() + db.commit() + // upload newly created snapshot 2.zip + db.doMaintenance() + } + + // reload version 2 - should succeed + withDB(remoteDir, version = 2, conf = conf) { db => + } + } + test("validate Rocks DB SST files do not have a VersionIdMismatch" + " when metadata file is not overwritten - scenario 1") { val fmClass = "org.apache.spark.sql.execution.streaming.state." + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org