This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f73b405b25 [SPARK-44438][SS] Shutdown scheduled executor used for 
maintenance task if an error is reported
5f73b405b25 is described below

commit 5f73b405b251f8cb39be80377e8cb2d2f960d490
Author: Anish Shrigondekar <anish.shrigonde...@databricks.com>
AuthorDate: Sat Jul 15 22:33:28 2023 +0900

    [SPARK-44438][SS] Shutdown scheduled executor used for maintenance task if 
an error is reported
    
    ### What changes were proposed in this pull request?
    Shutdown scheduled executor used for maintenance task if an error is 
reported
    
    ### Why are the changes needed?
    Without this change, we basically would never clean up the maintenance task 
threads and the pointer is lost too, so we have no way to shutdown this 
scheduled executor which could eventually lead to thread exhaustion. The change 
here fixes this issue.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added unit tests
    
    ```
    [info] - SPARK-44438: maintenance task should be shutdown on error (759 
milliseconds)
    13:59:57.755 WARN 
org.apache.spark.sql.execution.streaming.state.StateStoreSuite:
    
    ===== POSSIBLE THREAD LEAK IN SUITE 
o.a.s.sql.execution.streaming.state.StateStoreSuite, threads: rpc-boss-3-1 
(daemon=true), shuffle-boss-6-1 (daemon=true) =====
    [info] Run completed in 2 seconds, 255 milliseconds.
    [info] Total number of tests run: 1
    [info] Suites: completed 1, aborted 0
    [info] Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0
    [info] All tests passed.
    ```
    
    Closes #42010 from anishshri-db/task/SPARK-44438.
    
    Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../sql/execution/streaming/state/StateStore.scala | 21 ++++++--
 .../streaming/state/StateStoreSuite.scala          | 56 ++++++++++++++++++++++
 2 files changed, 72 insertions(+), 5 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index 30e660eb2ff..96c7b61f205 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -579,19 +579,25 @@ object StateStore extends Logging {
     loadedProviders.contains(storeProviderId)
   }
 
+  /** Check if maintenance thread is running and scheduled future is not done 
*/
   def isMaintenanceRunning: Boolean = loadedProviders.synchronized {
     maintenanceTask != null && maintenanceTask.isRunning
   }
 
+  /** Stop maintenance thread and reset the maintenance task */
+  def stopMaintenanceTask(): Unit = loadedProviders.synchronized {
+    if (maintenanceTask != null) {
+      maintenanceTask.stop()
+      maintenanceTask = null
+    }
+  }
+
   /** Unload and stop all state store providers */
   def stop(): Unit = loadedProviders.synchronized {
     loadedProviders.keySet.foreach { key => unload(key) }
     loadedProviders.clear()
     _coordRef = null
-    if (maintenanceTask != null) {
-      maintenanceTask.stop()
-      maintenanceTask = null
-    }
+    stopMaintenanceTask()
     logInfo("StateStore stopped")
   }
 
@@ -602,7 +608,12 @@ object StateStore extends Logging {
         maintenanceTask = new MaintenanceTask(
           storeConf.maintenanceInterval,
           task = { doMaintenance() },
-          onError = { loadedProviders.synchronized { loadedProviders.clear() } 
}
+          onError = { loadedProviders.synchronized {
+              logInfo("Stopping maintenance task since an error was 
encountered.")
+              stopMaintenanceTask()
+              loadedProviders.clear()
+            }
+          }
         )
         logInfo("State Store maintenance task started")
       }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index 9f8a588cc32..02aa12b325f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -47,6 +47,30 @@ import org.apache.spark.tags.ExtendedSQLTest
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.Utils
 
+class FakeStateStoreProviderWithMaintenanceError extends StateStoreProvider {
+  private var id: StateStoreId = null
+
+  override def init(
+      stateStoreId: StateStoreId,
+      keySchema: StructType,
+      valueSchema: StructType,
+      numColsPrefixKey: Int,
+      storeConfs: StateStoreConf,
+      hadoopConf: Configuration): Unit = {
+    id = stateStoreId
+  }
+
+  override def stateStoreId: StateStoreId = id
+
+  override def close(): Unit = {}
+
+  override def getStore(version: Long): StateStore = null
+
+  override def doMaintenance(): Unit = {
+    throw new RuntimeException("Intentional maintenance failure")
+  }
+}
+
 @ExtendedSQLTest
 class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
   with BeforeAndAfter {
@@ -1280,6 +1304,38 @@ abstract class StateStoreSuiteBase[ProviderClass <: 
StateStoreProvider]
     }
   }
 
+  test("SPARK-44438: maintenance task should be shutdown on error") {
+    val conf = new SparkConf()
+      .setMaster("local")
+      .setAppName("test")
+    val sqlConf = getDefaultSQLConf(
+      SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.defaultValue.get,
+      SQLConf.MAX_BATCHES_TO_RETAIN_IN_MEMORY.defaultValue.get
+    )
+    // Make maintenance interval small so that maintenance task is called 
right after scheduling.
+    sqlConf.setConf(SQLConf.STREAMING_MAINTENANCE_INTERVAL, 100L)
+    // Use the `FakeStateStoreProviderWithMaintenanceError` to run the test
+    sqlConf.setConf(SQLConf.STATE_STORE_PROVIDER_CLASS,
+      classOf[FakeStateStoreProviderWithMaintenanceError].getName)
+
+    quietly {
+      withSpark(new SparkContext(conf)) { sc =>
+        withCoordinatorRef(sc) { _ =>
+          val storeId = StateStoreProviderId(StateStoreId("firstDir", 0, 1), 
UUID.randomUUID)
+          val storeConf = StateStoreConf(sqlConf)
+
+          // get the state store and kick off the maintenance task
+          StateStore.get(storeId, null, null, 0, 0, storeConf, 
sc.hadoopConfiguration)
+
+          eventually(timeout(30.seconds)) {
+            assert(!StateStore.isMaintenanceRunning)
+          }
+          StateStore.stop()
+        }
+      }
+    }
+  }
+
   test("SPARK-42572: StateStoreProvider.validateStateRowFormat shouldn't 
check" +
     " value row format when SQLConf.STATE_STORE_FORMAT_VALIDATION_ENABLED is 
false") {
     // By default, when there is an invalid pair of value row and value 
schema, it should throw


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to