This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c26c014f11 [SPARK-44504][SS] Unload provider thereby forcing DB 
instance close and releasing resources on maintenance task error
8c26c014f11 is described below

commit 8c26c014f11b1b9d7d6c3b315fbb633c2bb2ca73
Author: Anish Shrigondekar <anish.shrigonde...@databricks.com>
AuthorDate: Fri Jul 21 13:46:01 2023 +0900

    [SPARK-44504][SS] Unload provider thereby forcing DB instance close and 
releasing resources on maintenance task error
    
    ### What changes were proposed in this pull request?
    Unload provider thereby forcing DB instance close and releasing resources 
on maintenance task error
    
    ### Why are the changes needed?
    If we don't do the close, the DB instance and corresponding resources 
(memory, file descriptors etc) are always left open and the pointer to these 
objects is lost since loadedProviders is cleared.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existing unit tests
    
    ```
    ), ForkJoinPool.commonPool-worker-5 (daemon=true), 
ForkJoinPool.commonPool-worker-17 (daemon=true), shuffle-boss-6-1 
(daemon=true), ForkJoinPool.commonPool-worker-3 (daemon=true), 
ForkJoinPool.commonPool-worker-31 (daemon=true), 
ForkJoinPool.commonPool-worker-23 (daemon=true), state-store-maintenance-task 
(daemon=true), ForkJoinPool.commonPool-worker-9 (daemon=true) =====
    [info] Run completed in 2 minutes, 49 seconds.
    [info] Total number of tests run: 32
    [info] Suites: completed 1, aborted 0
    [info] Tests: succeeded 32, failed 0, canceled 0, ignored 0, pending 0
    [info] All tests passed.
    ```
    
    Closes #42098 from anishshri-db/task/SPARK-44504.
    
    Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../org/apache/spark/sql/execution/streaming/state/RocksDB.scala      | 4 ++++
 .../org/apache/spark/sql/execution/streaming/state/StateStore.scala   | 3 +++
 2 files changed, 7 insertions(+)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 7961c5e716b..d4366fe732b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -463,6 +463,8 @@ class RocksDB(
   /** Release all resources */
   def close(): Unit = {
     try {
+      // Acquire DB instance lock and release at the end to allow for 
synchronized access
+      acquire()
       closeDB()
 
       readOptions.close()
@@ -477,6 +479,8 @@ class RocksDB(
     } catch {
       case e: Exception =>
         logWarning("Error closing RocksDB", e)
+    } finally {
+      release()
     }
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index 359cff81aea..cabad54be64 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -616,6 +616,9 @@ object StateStore extends Logging {
           onError = { loadedProviders.synchronized {
               logInfo("Stopping maintenance task since an error was 
encountered.")
               stopMaintenanceTask()
+              // SPARK-44504 - Unload explicitly to force closing underlying 
DB instance
+              // and releasing allocated resources, especially for 
RocksDBStateStoreProvider.
+              loadedProviders.keySet.foreach { key => unload(key) }
               loadedProviders.clear()
             }
           }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to