This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e42805f77c [SPARK-44480][SS] Use thread pool to perform maintenance 
activity for hdfs/rocksdb state store providers
9e42805f77c is described below

commit 9e42805f77c1ab5beb3c193f47f016e735a68f06
Author: Eric Marnadi <eric.marn...@databricks.com>
AuthorDate: Wed Aug 2 04:52:43 2023 +0900

    [SPARK-44480][SS] Use thread pool to perform maintenance activity for 
hdfs/rocksdb state store providers
    
    ### What changes were proposed in this pull request?
    
    Maintenance tasks on StateStore was being done by a single background 
thread, which is prone to straggling. In this change, the single background 
thread would instead schedule maintenance tasks to a thread pool.
    
    By default, the maximum number of threads in the new thread pool is 
determined via the number of cores * 0.25, so that this thread pool doesn't 
take too many resources away from the query and affect performance. Users can 
set the number of threads explicitly via 
`spark.sql.streaming.stateStore.numStateStoreMaintenanceThreads`.
    
    ### Why are the changes needed?
    
    Using a thread pool instead of a single thread for snapshotting and cleanup 
reduces the effect of stragglers in the background task.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Manual testing, wrote a stateful query, tracked how many maintenance tasks 
were done and compared this to the baseline.  `StateStoreSuite` is enough to 
test functional correctness and cleanup here
    
    Closes #42066 from ericm-db/maintenance-thread-pool-optional.
    
    Lead-authored-by: Eric Marnadi <eric.marn...@databricks.com>
    Co-authored-by: ericm-db <132308037+ericm...@users.noreply.github.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    |  13 +++
 .../sql/execution/streaming/state/StateStore.scala | 108 ++++++++++++++++++---
 .../execution/streaming/state/StateStoreConf.scala |   5 +
 3 files changed, 114 insertions(+), 12 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 6eb2d9c38d9..dfa2a0f251f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1856,6 +1856,17 @@ object SQLConf {
       .createWithDefault(
         
"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider")
 
+  val NUM_STATE_STORE_MAINTENANCE_THREADS =
+    buildConf("spark.sql.streaming.stateStore.numStateStoreMaintenanceThreads")
+      .internal()
+      .doc("Number of threads in the thread pool that perform clean up and 
snapshotting tasks " +
+        "for stateful streaming queries. The default value is the number of 
cores * 0.25 " +
+        "so that this thread pool doesn't take too many resources " +
+        "away from the query and affect performance.")
+      .intConf
+      .checkValue(_ > 0, "Must be greater than 0")
+      .createWithDefault(Math.max(Runtime.getRuntime.availableProcessors() / 
4, 1))
+
   val STATE_SCHEMA_CHECK_ENABLED =
     buildConf("spark.sql.streaming.stateStore.stateSchemaCheck")
       .doc("When true, Spark will validate the state schema against schema on 
existing state and " +
@@ -4523,6 +4534,8 @@ class SQLConf extends Serializable with Logging with 
SqlApiConf {
 
   def isStateSchemaCheckEnabled: Boolean = getConf(STATE_SCHEMA_CHECK_ENABLED)
 
+  def numStateStoreMaintenanceThreads: Int = 
getConf(NUM_STATE_STORE_MAINTENANCE_THREADS)
+
   def stateStoreMinDeltasForSnapshot: Int = 
getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT)
 
   def stateStoreFormatValidationEnabled: Boolean = 
getConf(STATE_STORE_FORMAT_VALIDATION_ENABLED)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index cabad54be64..a1d4f7f40a7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.streaming.state
 
 import java.util.UUID
 import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicReference
 import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.mutable
@@ -440,6 +441,18 @@ object StateStore extends Logging {
   @GuardedBy("loadedProviders")
   private val schemaValidated = new mutable.HashMap[StateStoreProviderId, 
Option[Throwable]]()
 
+  private val maintenanceThreadPoolLock = new Object
+
+  // Shared exception between threads in thread pool that the scheduling thread
+  // checks to see if an exception has been thrown in the maintenance task
+  private val threadPoolException = new AtomicReference[Throwable](null)
+
+  // This set is to keep track of the partitions that are queued
+  // for maintenance or currently have maintenance running on them
+  // to prevent the same partition from being processed concurrently.
+  @GuardedBy("maintenanceThreadPoolLock")
+  private val maintenancePartitions = new mutable.HashSet[StateStoreProviderId]
+
   /**
    * Runs the `task` periodically and automatically cancels it if there is an 
exception. `onError`
    * will be called when an exception happens.
@@ -472,9 +485,29 @@ object StateStore extends Logging {
     def isRunning: Boolean = !future.isDone
   }
 
+  /**
+   * Thread Pool that runs maintenance on partitions that are scheduled by
+   * MaintenanceTask periodically
+   */
+  class MaintenanceThreadPool(numThreads: Int) {
+    private val threadPool = ThreadUtils.newDaemonFixedThreadPool(
+      numThreads, "state-store-maintenance-thread")
+
+    def execute(runnable: Runnable): Unit = {
+      threadPool.execute(runnable)
+    }
+
+    def stop(): Unit = {
+      threadPool.shutdown()
+    }
+  }
+
   @GuardedBy("loadedProviders")
   private var maintenanceTask: MaintenanceTask = null
 
+  @GuardedBy("loadedProviders")
+  private var maintenanceThreadPool: MaintenanceThreadPool = null
+
   @GuardedBy("loadedProviders")
   private var _coordRef: StateStoreCoordinatorRef = null
 
@@ -591,6 +624,14 @@ object StateStore extends Logging {
 
   /** Stop maintenance thread and reset the maintenance task */
   def stopMaintenanceTask(): Unit = loadedProviders.synchronized {
+    if (maintenanceThreadPool != null) {
+      threadPoolException.set(null)
+      maintenanceThreadPoolLock.synchronized {
+        maintenancePartitions.clear()
+      }
+      maintenanceThreadPool.stop()
+      maintenanceThreadPool = null
+    }
     if (maintenanceTask != null) {
       maintenanceTask.stop()
       maintenanceTask = null
@@ -607,7 +648,8 @@ object StateStore extends Logging {
   }
 
   /** Start the periodic maintenance task if not already started and if Spark 
active */
-  private def startMaintenanceIfNeeded(storeConf: StateStoreConf): Unit =
+  private def startMaintenanceIfNeeded(storeConf: StateStoreConf): Unit = {
+    val numMaintenanceThreads = storeConf.numStateStoreMaintenanceThreads
     loadedProviders.synchronized {
       if (SparkEnv.get != null && !isMaintenanceRunning) {
         maintenanceTask = new MaintenanceTask(
@@ -623,9 +665,22 @@ object StateStore extends Logging {
             }
           }
         )
+        maintenanceThreadPool = new 
MaintenanceThreadPool(numMaintenanceThreads)
         logInfo("State Store maintenance task started")
       }
     }
+  }
+
+  private def processThisPartition(id: StateStoreProviderId): Boolean = {
+    maintenanceThreadPoolLock.synchronized {
+      if (!maintenancePartitions.contains(id)) {
+        maintenancePartitions.add(id)
+        true
+      } else {
+        false
+      }
+    }
+  }
 
   /**
    * Execute background maintenance task in all the loaded store providers if 
they are still
@@ -636,17 +691,46 @@ object StateStore extends Logging {
     if (SparkEnv.get == null) {
       throw new IllegalStateException("SparkEnv not active, cannot do 
maintenance on StateStores")
     }
-    loadedProviders.synchronized { loadedProviders.toSeq }.foreach { case (id, 
provider) =>
-      try {
-        provider.doMaintenance()
-        if (!verifyIfStoreInstanceActive(id)) {
-          unload(id)
-          logInfo(s"Unloaded $provider")
-        }
-      } catch {
-        case NonFatal(e) =>
-          logWarning(s"Error managing $provider, stopping management thread")
-          throw e
+    loadedProviders.synchronized {
+      loadedProviders.toSeq
+    }.foreach { case (id, provider) =>
+      // check exception
+      if (threadPoolException.get() != null) {
+        val exception = threadPoolException.get()
+        logWarning("Error in maintenanceThreadPool", exception)
+        throw exception
+      }
+      if (processThisPartition(id)) {
+        maintenanceThreadPool.execute(() => {
+          val startTime = System.currentTimeMillis()
+          try {
+            provider.doMaintenance()
+            if (!verifyIfStoreInstanceActive(id)) {
+              unload(id)
+              logInfo(s"Unloaded $provider")
+            }
+          } catch {
+            case NonFatal(e) =>
+              logWarning(s"Error managing $provider, stopping management 
thread", e)
+              threadPoolException.set(e)
+              throw e
+          } finally {
+            val duration = System.currentTimeMillis() - startTime
+            val logMsg = s"Finished maintenance task for provider=$id" +
+              s" in elapsed_time=$duration\n"
+            if (duration > 5000) {
+              logInfo(logMsg)
+            } else {
+              logDebug(logMsg)
+            }
+            maintenanceThreadPoolLock.synchronized {
+              maintenancePartitions.remove(id)
+            }
+          }
+        })
+      } else {
+        logInfo(s"Not processing partition ${id} for maintenance because it is 
currently " +
+          s"being processed")
       }
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
index 21a18745348..c7004524097 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
@@ -27,6 +27,11 @@ class StateStoreConf(
 
   def this() = this(new SQLConf)
 
+  /**
+   * Size of MaintenanceThreadPool to perform maintenance tasks for StateStore
+   */
+  val numStateStoreMaintenanceThreads: Int = 
sqlConf.numStateStoreMaintenanceThreads
+
   /**
    * Minimum number of delta files in a chain after which HDFSBackedStateStore 
will
    * consider generating a snapshot.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to