This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f86dc2b7e5c3 [SPARK-47848][SS] Fix thread safety issue around access 
to loadedMaps in close function for hdfs store provider
f86dc2b7e5c3 is described below

commit f86dc2b7e5c3aa4a94888feb499bc495877f36a9
Author: Anish Shrigondekar <anish.shrigonde...@databricks.com>
AuthorDate: Mon Apr 15 14:59:03 2024 +0900

    [SPARK-47848][SS] Fix thread safety issue around access to loadedMaps in 
close function for hdfs store provider
    
    ### What changes were proposed in this pull request?
    Fix thread safety issue around access to loadedMaps in close function for 
hdfs store provider
    
    ### Why are the changes needed?
    To ensure thread safe access to `loadedMaps` which is the critical section 
for the hdfs backed state store provider in structured streaming
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existing unit tests
    
    ```
    [info] Run completed in 2 minutes, 51 seconds.
    [info] Total number of tests run: 152
    [info] Suites: completed 1, aborted 0
    [info] Tests: succeeded 152, failed 0, canceled 0, ignored 0, pending 0
    [info] All tests passed.
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #46048 from anishshri-db/task/SPARK-47848.
    
    Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 850329e1ec69..2ecfa0931042 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -346,7 +346,7 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
   }
 
   override def close(): Unit = {
-    loadedMaps.values.asScala.foreach(_.clear())
+    synchronized { loadedMaps.values.asScala.foreach(_.clear()) }
   }
 
   override def supportedCustomMetrics: Seq[StateStoreCustomMetric] = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to