[GitHub] [spark] pan3793 commented on a diff in pull request #38651: [SPARK-41136][K8S] Shorten graceful shutdown time of ExecutorPodsSnapshotsStoreImpl to prevent blocking shutdown process

2022-11-16 Thread GitBox


pan3793 commented on code in PR #38651:
URL: https://github.com/apache/spark/pull/38651#discussion_r1024775551


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala:
##
@@ -57,6 +60,7 @@ import org.apache.spark.util.ThreadUtils
  * The subscriber notification callback is guaranteed to be called from a 
single thread at a time.
  */
 private[spark] class ExecutorPodsSnapshotsStoreImpl(
+conf: SparkConf,

Review Comment:
   I moved `conf: SparkConf = new SparkConf` to the last as suggested



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] pan3793 commented on a diff in pull request #38651: [SPARK-41136][K8S] Shorten graceful shutdown time of ExecutorPodsSnapshotsStoreImpl to prevent blocking shutdown process

2022-11-16 Thread GitBox


pan3793 commented on code in PR #38651:
URL: https://github.com/apache/spark/pull/38651#discussion_r1023762475


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala:
##
@@ -57,10 +60,22 @@ import org.apache.spark.util.ThreadUtils
  * The subscriber notification callback is guaranteed to be called from a 
single thread at a time.
  */
 private[spark] class ExecutorPodsSnapshotsStoreImpl(
+conf: SparkConf,
 subscribersExecutor: ScheduledExecutorService,
 clock: Clock = new SystemClock)
   extends ExecutorPodsSnapshotsStore with Logging {
 
+  private[spark] def this(
+  subscribersExecutor: ScheduledExecutorService) = {
+this(new SparkConf, subscribersExecutor, new SystemClock)
+  }
+
+  private[spark] def this(
+  subscribersExecutor: ScheduledExecutorService,
+  clock: Clock) = {
+this(new SparkConf, subscribersExecutor, clock)
+  }

Review Comment:
   Then `clock` is required if user wants to pass `conf`, it's not friendly IMO.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] pan3793 commented on a diff in pull request #38651: [SPARK-41136][K8S] Shorten graceful shutdown time of ExecutorPodsSnapshotsStoreImpl to prevent blocking shutdown process

2022-11-16 Thread GitBox


pan3793 commented on code in PR #38651:
URL: https://github.com/apache/spark/pull/38651#discussion_r1023756963


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala:
##
@@ -103,7 +103,7 @@ private[spark] class KubernetesClusterManager extends 
ExternalClusterManager wit
 val subscribersExecutor = ThreadUtils
   .newDaemonThreadPoolScheduledExecutor(
 "kubernetes-executor-snapshots-subscribers", 2)
-val snapshotsStore = new 
ExecutorPodsSnapshotsStoreImpl(subscribersExecutor)
+val snapshotsStore = new ExecutorPodsSnapshotsStoreImpl(sc.conf, 
subscribersExecutor)

Review Comment:
   yes, but I prefer the SparkConf here, otherwise another 
backward-compatibility constructor issue will happen if we pass other 
parameters in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] pan3793 commented on a diff in pull request #38651: [SPARK-41136][K8S] Shorten graceful shutdown time of ExecutorPodsSnapshotsStoreImpl to prevent blocking shutdown process

2022-11-16 Thread GitBox


pan3793 commented on code in PR #38651:
URL: https://github.com/apache/spark/pull/38651#discussion_r1023756963


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala:
##
@@ -103,7 +103,7 @@ private[spark] class KubernetesClusterManager extends 
ExternalClusterManager wit
 val subscribersExecutor = ThreadUtils
   .newDaemonThreadPoolScheduledExecutor(
 "kubernetes-executor-snapshots-subscribers", 2)
-val snapshotsStore = new 
ExecutorPodsSnapshotsStoreImpl(subscribersExecutor)
+val snapshotsStore = new ExecutorPodsSnapshotsStoreImpl(sc.conf, 
subscribersExecutor)

Review Comment:
   yes, but I prefer the SparkConf here, otherwise another 
in-backward-compatibility constructor issue will happen if we pass other 
parameters in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] pan3793 commented on a diff in pull request #38651: [SPARK-41136][K8S] Shorten graceful shutdown time of ExecutorPodsSnapshotsStoreImpl to prevent blocking shutdown process

2022-11-15 Thread GitBox


pan3793 commented on code in PR #38651:
URL: https://github.com/apache/spark/pull/38651#discussion_r1023478697


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala:
##
@@ -57,6 +60,7 @@ import org.apache.spark.util.ThreadUtils
  * The subscriber notification callback is guaranteed to be called from a 
single thread at a time.
  */
 private[spark] class ExecutorPodsSnapshotsStoreImpl(
+conf: SparkConf,

Review Comment:
   the backward-compatible constructors were added



##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala:
##
@@ -723,6 +723,18 @@ private[spark] object Config extends Logging {
   .checkValue(value => value > 0, "Maximum number of pending pods should 
be a positive integer")
   .createWithDefault(Int.MaxValue)
 
+  val KUBERNETES_EXECUTOR_SNAPSHOTS_SUBSCRIBERS_GRACE_PERIOD =
+
ConfigBuilder("spark.kubernetes.executorSnapshotsSubscribersShutdownGracePeriod")
+  .doc("Time to wait for graceful shutdown 
kubernetes-executor-snapshots-subscribers " +
+"thread pool. Since it may be called by ShutdownHookManager, where 
timeout is " +
+"controlled by hadoop configuration `hadoop.service.shutdown.timeout` 
" +
+"(default is 30s). As the whole Spark shutdown procedure shares the 
above timeout, " +
+"this value should be short than that to prevent blocking the 
following shutdown " +
+"procedures.")
+  .version("3.4.0")
+  .timeConf(TimeUnit.SECONDS)

Review Comment:
   added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] pan3793 commented on a diff in pull request #38651: [SPARK-41136][K8S] Shorten graceful shutdown time of ExecutorPodsSnapshotsStoreImpl to prevent blocking shutdown process

2022-11-15 Thread GitBox


pan3793 commented on code in PR #38651:
URL: https://github.com/apache/spark/pull/38651#discussion_r1023477562


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala:
##
@@ -57,10 +60,22 @@ import org.apache.spark.util.ThreadUtils
  * The subscriber notification callback is guaranteed to be called from a 
single thread at a time.
  */
 private[spark] class ExecutorPodsSnapshotsStoreImpl(
+conf: SparkConf,
 subscribersExecutor: ScheduledExecutorService,
 clock: Clock = new SystemClock)
   extends ExecutorPodsSnapshotsStore with Logging {
 
+  private[spark] def this(
+  subscribersExecutor: ScheduledExecutorService) = {
+this(new SparkConf, subscribersExecutor, new SystemClock)
+  }
+
+  private[spark] def this(
+  subscribersExecutor: ScheduledExecutorService,
+  clock: Clock) = {
+this(new SparkConf, subscribersExecutor, clock)
+  }

Review Comment:
   I can not merge these two constructers into one,
   ```
 private[spark] def this(
 subscribersExecutor: ScheduledExecutorService,
 clock: Clock = new SystemClock) = {
   this(new SparkConf, subscribersExecutor, clock)
 }
   ```
   it fails compilation
   ```
   [error] 
/Users/chengpan/Projects/apache-spark/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala:106:64:
 type mismatch;
   [error]  found   : org.apache.spark.SparkConf
   [error]  required: java.util.concurrent.ScheduledExecutorService
   [error] val snapshotsStore = new ExecutorPodsSnapshotsStoreImpl(sc.conf, 
subscribersExecutor)
   [error]
   ``



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] pan3793 commented on a diff in pull request #38651: [SPARK-41136][K8S] Shorten graceful shutdown time of ExecutorPodsSnapshotsStoreImpl to prevent blocking shutdown process

2022-11-14 Thread GitBox


pan3793 commented on code in PR #38651:
URL: https://github.com/apache/spark/pull/38651#discussion_r1022312510


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala:
##
@@ -94,7 +95,9 @@ private[spark] class ExecutorPodsSnapshotsStoreImpl(
 
   override def stop(): Unit = {
 pollingTasks.asScala.foreach(_.cancel(false))
-ThreadUtils.shutdown(subscribersExecutor)
+// SPARK-41136: it may be called by MGR, and since that MGR has a default 
30s timeout, here

Review Comment:
   Thanks, will make the hardcoded 20s configurable, and update and move the 
comment into 
   configuration description.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] pan3793 commented on a diff in pull request #38651: [SPARK-41136][K8S] Shorten graceful shutdown time of ExecutorPodsSnapshotsStoreImpl to prevent blocking shutdown process

2022-11-14 Thread GitBox


pan3793 commented on code in PR #38651:
URL: https://github.com/apache/spark/pull/38651#discussion_r1021830231


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala:
##
@@ -94,7 +95,9 @@ private[spark] class ExecutorPodsSnapshotsStoreImpl(
 
   override def stop(): Unit = {
 pollingTasks.asScala.foreach(_.cancel(false))
-ThreadUtils.shutdown(subscribersExecutor)
+// SPARK-41136: it may be called by MGR, and since that MGR has a default 
30s timeout, here
+// a shorter duration 20s is chosen to prevent blocking the following 
shutdown process.
+ThreadUtils.shutdown(subscribersExecutor, FiniteDuration(20, 
TimeUnit.SECONDS))

Review Comment:
   yes, will update it tomorrow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org