This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 262ed5bcab0b [SPARK-46929][CORE][CONNECT][SS] Use ThreadUtils.shutdown to close thread pools 262ed5bcab0b is described below commit 262ed5bcab0ba750b089b0693dbb1a59ef6fd11f Author: beliefer <belie...@163.com> AuthorDate: Wed Jan 31 09:52:19 2024 -0600 [SPARK-46929][CORE][CONNECT][SS] Use ThreadUtils.shutdown to close thread pools ### What changes were proposed in this pull request? This PR propose use `ThreadUtils.shutdown` to close thread pools. ### Why are the changes needed? `ThreadUtils` provided the `shutdown` to close thread pools. `ThreadUtils` wraps common logic to shutdown thread pools. We should use `ThreadUtils.shutdown` to close the thread pool. ### Does this PR introduce _any_ user-facing change? 'No'. ### How was this patch tested? GA ### Was this patch authored or co-authored using generative AI tooling? 'No'. Closes #44962 from beliefer/SPARK-46929. Authored-by: beliefer <belie...@163.com> Signed-off-by: Sean Owen <sro...@gmail.com> --- .../sql/connect/service/SparkConnectExecutionManager.scala | 5 +++-- .../sql/connect/service/SparkConnectSessionManager.scala | 5 +++-- .../connect/service/SparkConnectStreamingQueryCache.scala | 9 +++------ .../scala/org/apache/spark/ExecutorAllocationManager.scala | 4 ++-- .../org/apache/spark/status/ElementTrackingStore.scala | 6 ++---- .../main/scala/org/apache/spark/streaming/Checkpoint.scala | 12 +++++------- .../org/apache/spark/streaming/scheduler/JobScheduler.scala | 13 ++++++------- 7 files changed, 24 insertions(+), 30 deletions(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala index c90f53ac07df..85fb150b3171 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala @@ -21,6 +21,7 @@ import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit} import javax.annotation.concurrent.GuardedBy import scala.collection.mutable +import scala.concurrent.duration.FiniteDuration import scala.jdk.CollectionConverters._ import scala.util.control.NonFatal @@ -30,6 +31,7 @@ import org.apache.spark.{SparkEnv, SparkSQLException} import org.apache.spark.connect.proto import org.apache.spark.internal.Logging import org.apache.spark.sql.connect.config.Connect.{CONNECT_EXECUTE_MANAGER_ABANDONED_TOMBSTONES_SIZE, CONNECT_EXECUTE_MANAGER_DETACHED_TIMEOUT, CONNECT_EXECUTE_MANAGER_MAINTENANCE_INTERVAL} +import org.apache.spark.util.ThreadUtils // Unique key identifying execution by combination of user, session and operation id case class ExecuteKey(userId: String, sessionId: String, operationId: String) @@ -167,8 +169,7 @@ private[connect] class SparkConnectExecutionManager() extends Logging { private[connect] def shutdown(): Unit = executionsLock.synchronized { scheduledExecutor.foreach { executor => - executor.shutdown() - executor.awaitTermination(1, TimeUnit.MINUTES) + ThreadUtils.shutdown(executor, FiniteDuration(1, TimeUnit.MINUTES)) } scheduledExecutor = None // note: this does not cleanly shut down the executions, but the server is shutting down. diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala index ef14cd305d40..4da728b95a33 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala @@ -22,6 +22,7 @@ import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit} import javax.annotation.concurrent.GuardedBy import scala.collection.mutable +import scala.concurrent.duration.FiniteDuration import scala.jdk.CollectionConverters._ import scala.util.control.NonFatal @@ -31,6 +32,7 @@ import org.apache.spark.{SparkEnv, SparkSQLException} import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connect.config.Connect.{CONNECT_SESSION_MANAGER_CLOSED_SESSIONS_TOMBSTONES_SIZE, CONNECT_SESSION_MANAGER_DEFAULT_SESSION_TIMEOUT, CONNECT_SESSION_MANAGER_MAINTENANCE_INTERVAL} +import org.apache.spark.util.ThreadUtils /** * Global tracker of all SessionHolders holding Spark Connect sessions. @@ -154,8 +156,7 @@ class SparkConnectSessionManager extends Logging { private[connect] def shutdown(): Unit = sessionsLock.synchronized { scheduledExecutor.foreach { executor => - executor.shutdown() - executor.awaitTermination(1, TimeUnit.MINUTES) + ThreadUtils.shutdown(executor, FiniteDuration(1, TimeUnit.MINUTES)) } scheduledExecutor = None // note: this does not cleanly shut down the sessions, but the server is shutting down. diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala index b3be9c2c4de9..a5d3fa497bb3 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala @@ -23,15 +23,13 @@ import java.util.concurrent.TimeUnit import javax.annotation.concurrent.GuardedBy import scala.collection.mutable -import scala.concurrent.duration.Duration -import scala.concurrent.duration.DurationInt +import scala.concurrent.duration.{Duration, DurationInt, FiniteDuration} import scala.util.control.NonFatal import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.streaming.StreamingQuery -import org.apache.spark.util.Clock -import org.apache.spark.util.SystemClock +import org.apache.spark.util.{Clock, SystemClock, ThreadUtils} /** * Caches Spark-Connect streaming query references and the sessions. When a query is stopped (i.e. @@ -131,8 +129,7 @@ private[connect] class SparkConnectStreamingQueryCache( // Visible for testing. private[service] def shutdown(): Unit = queryCacheLock.synchronized { scheduledExecutor.foreach { executor => - executor.shutdown() - executor.awaitTermination(1, TimeUnit.MINUTES) + ThreadUtils.shutdown(executor, FiniteDuration(1, TimeUnit.MINUTES)) } scheduledExecutor = None } diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 9b808161bec9..a4ffd7d2c7cd 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import scala.concurrent.duration.FiniteDuration import scala.util.control.NonFatal import com.codahale.metrics.{Counter, Gauge, MetricRegistry} @@ -259,8 +260,7 @@ private[spark] class ExecutorAllocationManager( * Stop the allocation manager. */ def stop(): Unit = { - executor.shutdown() - executor.awaitTermination(10, TimeUnit.SECONDS) + ThreadUtils.shutdown(executor, FiniteDuration(10, TimeUnit.SECONDS)) } /** diff --git a/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala b/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala index 4aef7f9aa0e1..b399595ff332 100644 --- a/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala +++ b/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala @@ -22,6 +22,7 @@ import java.util.concurrent.{ExecutorService, TimeUnit} import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable.{HashMap, ListBuffer} +import scala.concurrent.duration.FiniteDuration import scala.jdk.CollectionConverters._ import org.apache.spark.SparkConf @@ -177,10 +178,7 @@ private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) exten } stopped = true - executor.shutdown() - if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { - executor.shutdownNow() - } + ThreadUtils.shutdown(executor, FiniteDuration(5, TimeUnit.SECONDS)) flushTriggers.foreach { trigger => Utils.tryLog(trigger()) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 8034b1b21715..53d897af5bef 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -21,6 +21,8 @@ import java.io._ import java.util.concurrent.{ArrayBlockingQueue, RejectedExecutionException, ThreadPoolExecutor, TimeUnit} +import scala.concurrent.duration.FiniteDuration + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} @@ -30,8 +32,8 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config.UI._ import org.apache.spark.io.CompressionCodec import org.apache.spark.streaming.scheduler.JobGenerator +import org.apache.spark.util.{ThreadUtils, Utils} import org.apache.spark.util.ArrayImplicits._ -import org.apache.spark.util.Utils private[streaming] class Checkpoint(ssc: StreamingContext, val checkpointTime: Time) @@ -307,13 +309,9 @@ class CheckpointWriter( def stop(): Unit = synchronized { if (stopped) return - executor.shutdown() val startTimeNs = System.nanoTime() - val terminated = executor.awaitTermination(10, java.util.concurrent.TimeUnit.SECONDS) - if (!terminated) { - executor.shutdownNow() - } - logInfo(s"CheckpointWriter executor terminated? $terminated," + + ThreadUtils.shutdown(executor, FiniteDuration(10, TimeUnit.SECONDS)) + logInfo(s"CheckpointWriter executor terminated? ${executor.isTerminated}," + s" waited for ${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)} ms.") stopped = true } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 29bccc79a6eb..293024005bd5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -19,6 +19,7 @@ package org.apache.spark.streaming.scheduler import java.util.concurrent.{ConcurrentHashMap, TimeUnit} +import scala.concurrent.duration.FiniteDuration import scala.jdk.CollectionConverters._ import scala.util.Failure @@ -123,17 +124,15 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { // Stop the executor for receiving new jobs logDebug("Stopping job executor") - jobExecutor.shutdown() // Wait for the queued jobs to complete if indicated - val terminated = if (processAllReceivedData) { - jobExecutor.awaitTermination(1, TimeUnit.HOURS) // just a very large period of time + if (processAllReceivedData) { + // just a very large period of time + ThreadUtils.shutdown(jobExecutor, FiniteDuration(1, TimeUnit.HOURS)) } else { - jobExecutor.awaitTermination(2, TimeUnit.SECONDS) - } - if (!terminated) { - jobExecutor.shutdownNow() + ThreadUtils.shutdown(jobExecutor, FiniteDuration(2, TimeUnit.SECONDS)) } + logDebug("Stopped job executor") // Stop everything else --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org