[GitHub] [spark] pan3793 commented on a diff in pull request #38651: [SPARK-41136][K8S] Shorten graceful shutdown time of ExecutorPodsSnapshotsStoreImpl to prevent blocking shutdown process
pan3793 commented on code in PR #38651: URL: https://github.com/apache/spark/pull/38651#discussion_r1024775551 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala: ## @@ -57,6 +60,7 @@ import org.apache.spark.util.ThreadUtils * The subscriber notification callback is guaranteed to be called from a single thread at a time. */ private[spark] class ExecutorPodsSnapshotsStoreImpl( +conf: SparkConf, Review Comment: I moved `conf: SparkConf = new SparkConf` to the last as suggested -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] pan3793 commented on a diff in pull request #38651: [SPARK-41136][K8S] Shorten graceful shutdown time of ExecutorPodsSnapshotsStoreImpl to prevent blocking shutdown process
pan3793 commented on code in PR #38651: URL: https://github.com/apache/spark/pull/38651#discussion_r1023762475 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala: ## @@ -57,10 +60,22 @@ import org.apache.spark.util.ThreadUtils * The subscriber notification callback is guaranteed to be called from a single thread at a time. */ private[spark] class ExecutorPodsSnapshotsStoreImpl( +conf: SparkConf, subscribersExecutor: ScheduledExecutorService, clock: Clock = new SystemClock) extends ExecutorPodsSnapshotsStore with Logging { + private[spark] def this( + subscribersExecutor: ScheduledExecutorService) = { +this(new SparkConf, subscribersExecutor, new SystemClock) + } + + private[spark] def this( + subscribersExecutor: ScheduledExecutorService, + clock: Clock) = { +this(new SparkConf, subscribersExecutor, clock) + } Review Comment: Then `clock` is required if user wants to pass `conf`, it's not friendly IMO. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] pan3793 commented on a diff in pull request #38651: [SPARK-41136][K8S] Shorten graceful shutdown time of ExecutorPodsSnapshotsStoreImpl to prevent blocking shutdown process
pan3793 commented on code in PR #38651: URL: https://github.com/apache/spark/pull/38651#discussion_r1023756963 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala: ## @@ -103,7 +103,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit val subscribersExecutor = ThreadUtils .newDaemonThreadPoolScheduledExecutor( "kubernetes-executor-snapshots-subscribers", 2) -val snapshotsStore = new ExecutorPodsSnapshotsStoreImpl(subscribersExecutor) +val snapshotsStore = new ExecutorPodsSnapshotsStoreImpl(sc.conf, subscribersExecutor) Review Comment: yes, but I prefer the SparkConf here, otherwise another backward-compatibility constructor issue will happen if we pass other parameters in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] pan3793 commented on a diff in pull request #38651: [SPARK-41136][K8S] Shorten graceful shutdown time of ExecutorPodsSnapshotsStoreImpl to prevent blocking shutdown process
pan3793 commented on code in PR #38651: URL: https://github.com/apache/spark/pull/38651#discussion_r1023756963 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala: ## @@ -103,7 +103,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit val subscribersExecutor = ThreadUtils .newDaemonThreadPoolScheduledExecutor( "kubernetes-executor-snapshots-subscribers", 2) -val snapshotsStore = new ExecutorPodsSnapshotsStoreImpl(subscribersExecutor) +val snapshotsStore = new ExecutorPodsSnapshotsStoreImpl(sc.conf, subscribersExecutor) Review Comment: yes, but I prefer the SparkConf here, otherwise another in-backward-compatibility constructor issue will happen if we pass other parameters in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] pan3793 commented on a diff in pull request #38651: [SPARK-41136][K8S] Shorten graceful shutdown time of ExecutorPodsSnapshotsStoreImpl to prevent blocking shutdown process
pan3793 commented on code in PR #38651: URL: https://github.com/apache/spark/pull/38651#discussion_r1023478697 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala: ## @@ -57,6 +60,7 @@ import org.apache.spark.util.ThreadUtils * The subscriber notification callback is guaranteed to be called from a single thread at a time. */ private[spark] class ExecutorPodsSnapshotsStoreImpl( +conf: SparkConf, Review Comment: the backward-compatible constructors were added ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala: ## @@ -723,6 +723,18 @@ private[spark] object Config extends Logging { .checkValue(value => value > 0, "Maximum number of pending pods should be a positive integer") .createWithDefault(Int.MaxValue) + val KUBERNETES_EXECUTOR_SNAPSHOTS_SUBSCRIBERS_GRACE_PERIOD = + ConfigBuilder("spark.kubernetes.executorSnapshotsSubscribersShutdownGracePeriod") + .doc("Time to wait for graceful shutdown kubernetes-executor-snapshots-subscribers " + +"thread pool. Since it may be called by ShutdownHookManager, where timeout is " + +"controlled by hadoop configuration `hadoop.service.shutdown.timeout` " + +"(default is 30s). As the whole Spark shutdown procedure shares the above timeout, " + +"this value should be short than that to prevent blocking the following shutdown " + +"procedures.") + .version("3.4.0") + .timeConf(TimeUnit.SECONDS) Review Comment: added -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] pan3793 commented on a diff in pull request #38651: [SPARK-41136][K8S] Shorten graceful shutdown time of ExecutorPodsSnapshotsStoreImpl to prevent blocking shutdown process
pan3793 commented on code in PR #38651: URL: https://github.com/apache/spark/pull/38651#discussion_r1023477562 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala: ## @@ -57,10 +60,22 @@ import org.apache.spark.util.ThreadUtils * The subscriber notification callback is guaranteed to be called from a single thread at a time. */ private[spark] class ExecutorPodsSnapshotsStoreImpl( +conf: SparkConf, subscribersExecutor: ScheduledExecutorService, clock: Clock = new SystemClock) extends ExecutorPodsSnapshotsStore with Logging { + private[spark] def this( + subscribersExecutor: ScheduledExecutorService) = { +this(new SparkConf, subscribersExecutor, new SystemClock) + } + + private[spark] def this( + subscribersExecutor: ScheduledExecutorService, + clock: Clock) = { +this(new SparkConf, subscribersExecutor, clock) + } Review Comment: I can not merge these two constructers into one, ``` private[spark] def this( subscribersExecutor: ScheduledExecutorService, clock: Clock = new SystemClock) = { this(new SparkConf, subscribersExecutor, clock) } ``` it fails compilation ``` [error] /Users/chengpan/Projects/apache-spark/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala:106:64: type mismatch; [error] found : org.apache.spark.SparkConf [error] required: java.util.concurrent.ScheduledExecutorService [error] val snapshotsStore = new ExecutorPodsSnapshotsStoreImpl(sc.conf, subscribersExecutor) [error] `` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] pan3793 commented on a diff in pull request #38651: [SPARK-41136][K8S] Shorten graceful shutdown time of ExecutorPodsSnapshotsStoreImpl to prevent blocking shutdown process
pan3793 commented on code in PR #38651: URL: https://github.com/apache/spark/pull/38651#discussion_r1022312510 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala: ## @@ -94,7 +95,9 @@ private[spark] class ExecutorPodsSnapshotsStoreImpl( override def stop(): Unit = { pollingTasks.asScala.foreach(_.cancel(false)) -ThreadUtils.shutdown(subscribersExecutor) +// SPARK-41136: it may be called by MGR, and since that MGR has a default 30s timeout, here Review Comment: Thanks, will make the hardcoded 20s configurable, and update and move the comment into configuration description. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] pan3793 commented on a diff in pull request #38651: [SPARK-41136][K8S] Shorten graceful shutdown time of ExecutorPodsSnapshotsStoreImpl to prevent blocking shutdown process
pan3793 commented on code in PR #38651: URL: https://github.com/apache/spark/pull/38651#discussion_r1021830231 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala: ## @@ -94,7 +95,9 @@ private[spark] class ExecutorPodsSnapshotsStoreImpl( override def stop(): Unit = { pollingTasks.asScala.foreach(_.cancel(false)) -ThreadUtils.shutdown(subscribersExecutor) +// SPARK-41136: it may be called by MGR, and since that MGR has a default 30s timeout, here +// a shorter duration 20s is chosen to prevent blocking the following shutdown process. +ThreadUtils.shutdown(subscribersExecutor, FiniteDuration(20, TimeUnit.SECONDS)) Review Comment: yes, will update it tomorrow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org