This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 86bac8911a31 [SPARK-46711][SS] Fix RocksDB state provider race 
condition during rollback
86bac8911a31 is described below

commit 86bac8911a31a98eb6ad1f5365554f4f095c0376
Author: Anish Shrigondekar <anish.shrigonde...@databricks.com>
AuthorDate: Mon Jan 15 11:09:34 2024 +0900

    [SPARK-46711][SS] Fix RocksDB state provider race condition during rollback
    
    ### What changes were proposed in this pull request?
    Fix RocksDB state provider race condition during rollback
    
    ### Why are the changes needed?
    The rollback() method in RocksDB is not properly synchronized, thus a race 
condition can be introduced during rollback when there are tasks trying to 
commit.
    
    The symptom of the race condition is the following exception being thrown:
    ```
    `Caused by: java.io.FileNotFoundException: No such file or directory: 
...state/0/54/10369.changelog
            at 
shaded.databricks.org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:4069)
            at 
shaded.databricks.org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3907)
            at 
shaded.databricks.org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:3801)
            at 
com.databricks.common.filesystem.LokiS3FS.getFileStatusNoCache(LokiS3FS.scala:91)
            at 
com.databricks.common.filesystem.LokiS3FS.getFileStatus(LokiS3FS.scala:86)
            at 
shaded.databricks.org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:1525)`
    ```
    
    This race condition can happen for the following sequence of events
    1. task A gets cancelled after releasing lock for rocksdb
    2. task B starts and loads 10368
    3. task A performs rocksdb rollback to -1
    4. task B reads data from rocksdb and commits
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existing unit tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #44722 from anishshri-db/task/SPARK-46711.
    
    Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala   | 1 +
 1 file changed, 1 insertion(+)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 101a9e6b9199..0284d4c9d303 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -452,6 +452,7 @@ class RocksDB(
    * Drop uncommitted changes, and roll back to previous version.
    */
   def rollback(): Unit = {
+    acquire()
     numKeysOnWritingVersion = numKeysOnLoadedVersion
     loadedVersion = -1L
     changelogWriter.foreach(_.abort())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to