[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #38651: [SPARK-41136][K8S] Shorten graceful shutdown time of ExecutorPodsSnapshotsStoreImpl to prevent blocking shutdown process

2022-11-15 Thread GitBox


dongjoon-hyun commented on code in PR #38651:
URL: https://github.com/apache/spark/pull/38651#discussion_r1023621416


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala:
##
@@ -103,7 +103,7 @@ private[spark] class KubernetesClusterManager extends 
ExternalClusterManager wit
 val subscribersExecutor = ThreadUtils
   .newDaemonThreadPoolScheduledExecutor(
 "kubernetes-executor-snapshots-subscribers", 2)
-val snapshotsStore = new 
ExecutorPodsSnapshotsStoreImpl(subscribersExecutor)
+val snapshotsStore = new ExecutorPodsSnapshotsStoreImpl(sc.conf, 
subscribersExecutor)

Review Comment:
   BTW, it seems that we don't need to hand over the whole `SparkConf` here. 
What we need is only 
`sc.conf.get(KUBERNETES_EXECUTOR_SNAPSHOTS_SUBSCRIBERS_GRACE_PERIOD)`, isnt' it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #38651: [SPARK-41136][K8S] Shorten graceful shutdown time of ExecutorPodsSnapshotsStoreImpl to prevent blocking shutdown process

2022-11-15 Thread GitBox


dongjoon-hyun commented on code in PR #38651:
URL: https://github.com/apache/spark/pull/38651#discussion_r1023621416


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala:
##
@@ -103,7 +103,7 @@ private[spark] class KubernetesClusterManager extends 
ExternalClusterManager wit
 val subscribersExecutor = ThreadUtils
   .newDaemonThreadPoolScheduledExecutor(
 "kubernetes-executor-snapshots-subscribers", 2)
-val snapshotsStore = new 
ExecutorPodsSnapshotsStoreImpl(subscribersExecutor)
+val snapshotsStore = new ExecutorPodsSnapshotsStoreImpl(sc.conf, 
subscribersExecutor)

Review Comment:
   BTW, it seems that we don't need to hand over the whole `SparkConf` here. 
What we need is only 
`conf.get(KUBERNETES_EXECUTOR_SNAPSHOTS_SUBSCRIBERS_GRACE_PERIOD)`, isnt' it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #38651: [SPARK-41136][K8S] Shorten graceful shutdown time of ExecutorPodsSnapshotsStoreImpl to prevent blocking shutdown process

2022-11-15 Thread GitBox


dongjoon-hyun commented on code in PR #38651:
URL: https://github.com/apache/spark/pull/38651#discussion_r1023614721


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala:
##
@@ -57,10 +60,22 @@ import org.apache.spark.util.ThreadUtils
  * The subscriber notification callback is guaranteed to be called from a 
single thread at a time.
  */
 private[spark] class ExecutorPodsSnapshotsStoreImpl(
+conf: SparkConf,
 subscribersExecutor: ScheduledExecutorService,
 clock: Clock = new SystemClock)
   extends ExecutorPodsSnapshotsStore with Logging {
 
+  private[spark] def this(
+  subscribersExecutor: ScheduledExecutorService) = {
+this(new SparkConf, subscribersExecutor, new SystemClock)
+  }
+
+  private[spark] def this(
+  subscribersExecutor: ScheduledExecutorService,
+  clock: Clock) = {
+this(new SparkConf, subscribersExecutor, clock)
+  }

Review Comment:
   Oh, interesting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #38651: [SPARK-41136][K8S] Shorten graceful shutdown time of ExecutorPodsSnapshotsStoreImpl to prevent blocking shutdown process

2022-11-15 Thread GitBox


dongjoon-hyun commented on code in PR #38651:
URL: https://github.com/apache/spark/pull/38651#discussion_r1023278795


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala:
##
@@ -723,6 +723,18 @@ private[spark] object Config extends Logging {
   .checkValue(value => value > 0, "Maximum number of pending pods should 
be a positive integer")
   .createWithDefault(Int.MaxValue)
 
+  val KUBERNETES_EXECUTOR_SNAPSHOTS_SUBSCRIBERS_GRACE_PERIOD =
+
ConfigBuilder("spark.kubernetes.executorSnapshotsSubscribersShutdownGracePeriod")
+  .doc("Time to wait for graceful shutdown 
kubernetes-executor-snapshots-subscribers " +
+"thread pool. Since it may be called by ShutdownHookManager, where 
timeout is " +
+"controlled by hadoop configuration `hadoop.service.shutdown.timeout` 
" +
+"(default is 30s). As the whole Spark shutdown procedure shares the 
above timeout, " +
+"this value should be short than that to prevent blocking the 
following shutdown " +
+"procedures.")
+  .version("3.4.0")
+  .timeConf(TimeUnit.SECONDS)

Review Comment:
   Please add `checkValue`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #38651: [SPARK-41136][K8S] Shorten graceful shutdown time of ExecutorPodsSnapshotsStoreImpl to prevent blocking shutdown process

2022-11-15 Thread GitBox


dongjoon-hyun commented on code in PR #38651:
URL: https://github.com/apache/spark/pull/38651#discussion_r1023277805


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala:
##
@@ -57,6 +60,7 @@ import org.apache.spark.util.ThreadUtils
  * The subscriber notification callback is guaranteed to be called from a 
single thread at a time.
  */
 private[spark] class ExecutorPodsSnapshotsStoreImpl(
+conf: SparkConf,

Review Comment:
   Although this is a `private`, please provide a backward-compatibility in 
this case by keeping the existing constructor and adding a new one with new 
parameter `conf`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #38651: [SPARK-41136][K8S] Shorten graceful shutdown time of ExecutorPodsSnapshotsStoreImpl to prevent blocking shutdown process

2022-11-14 Thread GitBox


dongjoon-hyun commented on code in PR #38651:
URL: https://github.com/apache/spark/pull/38651#discussion_r1022306648


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala:
##
@@ -94,7 +95,9 @@ private[spark] class ExecutorPodsSnapshotsStoreImpl(
 
   override def stop(): Unit = {
 pollingTasks.asScala.foreach(_.cancel(false))
-ThreadUtils.shutdown(subscribersExecutor)
+// SPARK-41136: it may be called by MGR, and since that MGR has a default 
30s timeout, here

Review Comment:
   - In this case, we don't need JIRA id in the code. 
   - Please write full name instead of MGR.
   - When MGR has non-default timeout like 20s, what happens?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org