This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 262ed5bcab0b [SPARK-46929][CORE][CONNECT][SS] Use ThreadUtils.shutdown 
to close thread pools
262ed5bcab0b is described below

commit 262ed5bcab0ba750b089b0693dbb1a59ef6fd11f
Author: beliefer <belie...@163.com>
AuthorDate: Wed Jan 31 09:52:19 2024 -0600

    [SPARK-46929][CORE][CONNECT][SS] Use ThreadUtils.shutdown to close thread 
pools
    
    ### What changes were proposed in this pull request?
    This PR propose use `ThreadUtils.shutdown` to close thread pools.
    
    ### Why are the changes needed?
    `ThreadUtils` provided the `shutdown` to close thread pools. `ThreadUtils` 
wraps common logic to shutdown thread pools.
    We should use `ThreadUtils.shutdown` to close the thread pool.
    
    ### Does this PR introduce _any_ user-facing change?
    'No'.
    
    ### How was this patch tested?
    GA
    
    ### Was this patch authored or co-authored using generative AI tooling?
    'No'.
    
    Closes #44962 from beliefer/SPARK-46929.
    
    Authored-by: beliefer <belie...@163.com>
    Signed-off-by: Sean Owen <sro...@gmail.com>
---
 .../sql/connect/service/SparkConnectExecutionManager.scala  |  5 +++--
 .../sql/connect/service/SparkConnectSessionManager.scala    |  5 +++--
 .../connect/service/SparkConnectStreamingQueryCache.scala   |  9 +++------
 .../scala/org/apache/spark/ExecutorAllocationManager.scala  |  4 ++--
 .../org/apache/spark/status/ElementTrackingStore.scala      |  6 ++----
 .../main/scala/org/apache/spark/streaming/Checkpoint.scala  | 12 +++++-------
 .../org/apache/spark/streaming/scheduler/JobScheduler.scala | 13 ++++++-------
 7 files changed, 24 insertions(+), 30 deletions(-)

diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
index c90f53ac07df..85fb150b3171 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
@@ -21,6 +21,7 @@ import java.util.concurrent.{Executors, 
ScheduledExecutorService, TimeUnit}
 import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.mutable
+import scala.concurrent.duration.FiniteDuration
 import scala.jdk.CollectionConverters._
 import scala.util.control.NonFatal
 
@@ -30,6 +31,7 @@ import org.apache.spark.{SparkEnv, SparkSQLException}
 import org.apache.spark.connect.proto
 import org.apache.spark.internal.Logging
 import 
org.apache.spark.sql.connect.config.Connect.{CONNECT_EXECUTE_MANAGER_ABANDONED_TOMBSTONES_SIZE,
 CONNECT_EXECUTE_MANAGER_DETACHED_TIMEOUT, 
CONNECT_EXECUTE_MANAGER_MAINTENANCE_INTERVAL}
+import org.apache.spark.util.ThreadUtils
 
 // Unique key identifying execution by combination of user, session and 
operation id
 case class ExecuteKey(userId: String, sessionId: String, operationId: String)
@@ -167,8 +169,7 @@ private[connect] class SparkConnectExecutionManager() 
extends Logging {
 
   private[connect] def shutdown(): Unit = executionsLock.synchronized {
     scheduledExecutor.foreach { executor =>
-      executor.shutdown()
-      executor.awaitTermination(1, TimeUnit.MINUTES)
+      ThreadUtils.shutdown(executor, FiniteDuration(1, TimeUnit.MINUTES))
     }
     scheduledExecutor = None
     // note: this does not cleanly shut down the executions, but the server is 
shutting down.
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala
index ef14cd305d40..4da728b95a33 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.{Executors, 
ScheduledExecutorService, TimeUnit}
 import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.mutable
+import scala.concurrent.duration.FiniteDuration
 import scala.jdk.CollectionConverters._
 import scala.util.control.NonFatal
 
@@ -31,6 +32,7 @@ import org.apache.spark.{SparkEnv, SparkSQLException}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 import 
org.apache.spark.sql.connect.config.Connect.{CONNECT_SESSION_MANAGER_CLOSED_SESSIONS_TOMBSTONES_SIZE,
 CONNECT_SESSION_MANAGER_DEFAULT_SESSION_TIMEOUT, 
CONNECT_SESSION_MANAGER_MAINTENANCE_INTERVAL}
+import org.apache.spark.util.ThreadUtils
 
 /**
  * Global tracker of all SessionHolders holding Spark Connect sessions.
@@ -154,8 +156,7 @@ class SparkConnectSessionManager extends Logging {
 
   private[connect] def shutdown(): Unit = sessionsLock.synchronized {
     scheduledExecutor.foreach { executor =>
-      executor.shutdown()
-      executor.awaitTermination(1, TimeUnit.MINUTES)
+      ThreadUtils.shutdown(executor, FiniteDuration(1, TimeUnit.MINUTES))
     }
     scheduledExecutor = None
     // note: this does not cleanly shut down the sessions, but the server is 
shutting down.
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala
index b3be9c2c4de9..a5d3fa497bb3 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala
@@ -23,15 +23,13 @@ import java.util.concurrent.TimeUnit
 import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.mutable
-import scala.concurrent.duration.Duration
-import scala.concurrent.duration.DurationInt
+import scala.concurrent.duration.{Duration, DurationInt, FiniteDuration}
 import scala.util.control.NonFatal
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.streaming.StreamingQuery
-import org.apache.spark.util.Clock
-import org.apache.spark.util.SystemClock
+import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
 
 /**
  * Caches Spark-Connect streaming query references and the sessions. When a 
query is stopped (i.e.
@@ -131,8 +129,7 @@ private[connect] class SparkConnectStreamingQueryCache(
   // Visible for testing.
   private[service] def shutdown(): Unit = queryCacheLock.synchronized {
     scheduledExecutor.foreach { executor =>
-      executor.shutdown()
-      executor.awaitTermination(1, TimeUnit.MINUTES)
+      ThreadUtils.shutdown(executor, FiniteDuration(1, TimeUnit.MINUTES))
     }
     scheduledExecutor = None
   }
diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 9b808161bec9..a4ffd7d2c7cd 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit
 
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration.FiniteDuration
 import scala.util.control.NonFatal
 
 import com.codahale.metrics.{Counter, Gauge, MetricRegistry}
@@ -259,8 +260,7 @@ private[spark] class ExecutorAllocationManager(
    * Stop the allocation manager.
    */
   def stop(): Unit = {
-    executor.shutdown()
-    executor.awaitTermination(10, TimeUnit.SECONDS)
+    ThreadUtils.shutdown(executor, FiniteDuration(10, TimeUnit.SECONDS))
   }
 
   /**
diff --git 
a/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala 
b/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala
index 4aef7f9aa0e1..b399595ff332 100644
--- a/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala
+++ b/core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.{ExecutorService, TimeUnit}
 import java.util.concurrent.atomic.AtomicBoolean
 
 import scala.collection.mutable.{HashMap, ListBuffer}
+import scala.concurrent.duration.FiniteDuration
 import scala.jdk.CollectionConverters._
 
 import org.apache.spark.SparkConf
@@ -177,10 +178,7 @@ private[spark] class ElementTrackingStore(store: KVStore, 
conf: SparkConf) exten
     }
 
     stopped = true
-    executor.shutdown()
-    if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
-      executor.shutdownNow()
-    }
+    ThreadUtils.shutdown(executor, FiniteDuration(5, TimeUnit.SECONDS))
 
     flushTriggers.foreach { trigger =>
       Utils.tryLog(trigger())
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 8034b1b21715..53d897af5bef 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -21,6 +21,8 @@ import java.io._
 import java.util.concurrent.{ArrayBlockingQueue, RejectedExecutionException,
   ThreadPoolExecutor, TimeUnit}
 
+import scala.concurrent.duration.FiniteDuration
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
@@ -30,8 +32,8 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.UI._
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.streaming.scheduler.JobGenerator
+import org.apache.spark.util.{ThreadUtils, Utils}
 import org.apache.spark.util.ArrayImplicits._
-import org.apache.spark.util.Utils
 
 private[streaming]
 class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
@@ -307,13 +309,9 @@ class CheckpointWriter(
   def stop(): Unit = synchronized {
     if (stopped) return
 
-    executor.shutdown()
     val startTimeNs = System.nanoTime()
-    val terminated = executor.awaitTermination(10, 
java.util.concurrent.TimeUnit.SECONDS)
-    if (!terminated) {
-      executor.shutdownNow()
-    }
-    logInfo(s"CheckpointWriter executor terminated? $terminated," +
+    ThreadUtils.shutdown(executor, FiniteDuration(10, TimeUnit.SECONDS))
+    logInfo(s"CheckpointWriter executor terminated? ${executor.isTerminated}," 
+
       s" waited for ${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startTimeNs)} ms.")
     stopped = true
   }
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 29bccc79a6eb..293024005bd5 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -19,6 +19,7 @@ package org.apache.spark.streaming.scheduler
 
 import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
 
+import scala.concurrent.duration.FiniteDuration
 import scala.jdk.CollectionConverters._
 import scala.util.Failure
 
@@ -123,17 +124,15 @@ class JobScheduler(val ssc: StreamingContext) extends 
Logging {
 
     // Stop the executor for receiving new jobs
     logDebug("Stopping job executor")
-    jobExecutor.shutdown()
 
     // Wait for the queued jobs to complete if indicated
-    val terminated = if (processAllReceivedData) {
-      jobExecutor.awaitTermination(1, TimeUnit.HOURS)  // just a very large 
period of time
+    if (processAllReceivedData) {
+      // just a very large period of time
+      ThreadUtils.shutdown(jobExecutor, FiniteDuration(1, TimeUnit.HOURS))
     } else {
-      jobExecutor.awaitTermination(2, TimeUnit.SECONDS)
-    }
-    if (!terminated) {
-      jobExecutor.shutdownNow()
+      ThreadUtils.shutdown(jobExecutor, FiniteDuration(2, TimeUnit.SECONDS))
     }
+
     logDebug("Stopped job executor")
 
     // Stop everything else


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to