This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 86bac8911a31 [SPARK-46711][SS] Fix RocksDB state provider race condition during rollback 86bac8911a31 is described below commit 86bac8911a31a98eb6ad1f5365554f4f095c0376 Author: Anish Shrigondekar <anish.shrigonde...@databricks.com> AuthorDate: Mon Jan 15 11:09:34 2024 +0900 [SPARK-46711][SS] Fix RocksDB state provider race condition during rollback ### What changes were proposed in this pull request? Fix RocksDB state provider race condition during rollback ### Why are the changes needed? The rollback() method in RocksDB is not properly synchronized, thus a race condition can be introduced during rollback when there are tasks trying to commit. The symptom of the race condition is the following exception being thrown: ``` `Caused by: java.io.FileNotFoundException: No such file or directory: ...state/0/54/10369.changelog at shaded.databricks.org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:4069) at shaded.databricks.org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3907) at shaded.databricks.org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:3801) at com.databricks.common.filesystem.LokiS3FS.getFileStatusNoCache(LokiS3FS.scala:91) at com.databricks.common.filesystem.LokiS3FS.getFileStatus(LokiS3FS.scala:86) at shaded.databricks.org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:1525)` ``` This race condition can happen for the following sequence of events 1. task A gets cancelled after releasing lock for rocksdb 2. task B starts and loads 10368 3. task A performs rocksdb rollback to -1 4. task B reads data from rocksdb and commits ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing unit tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #44722 from anishshri-db/task/SPARK-46711. Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 101a9e6b9199..0284d4c9d303 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -452,6 +452,7 @@ class RocksDB( * Drop uncommitted changes, and roll back to previous version. */ def rollback(): Unit = { + acquire() numKeysOnWritingVersion = numKeysOnLoadedVersion loadedVersion = -1L changelogWriter.foreach(_.abort()) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org