This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 2a93e46eb062 [SPARK-47207][CORE] Support `spark.driver.timeout` and `DriverTimeoutPlugin` 2a93e46eb062 is described below commit 2a93e46eb0627df9cd288156bffa0a0815906c3c Author: Dongjoon Hyun <dh...@apple.com> AuthorDate: Wed Feb 28 09:27:53 2024 -0800 [SPARK-47207][CORE] Support `spark.driver.timeout` and `DriverTimeoutPlugin` ### What changes were proposed in this pull request? This PR aims to support `spark.driver.timeout` and `DriverTimeoutPlugin`. ### Why are the changes needed? Sometime, Spark applications fall into abnormal situation and hang. We had better provide a way to guarantee the termination after pre-defined timeout via a standard way. - spark.plugins=org.apache.spark.deploy.DriverTimeoutPlugin - spark.driver.timeout=1min ``` $ bin/spark-shell -c spark.plugins=org.apache.spark.deploy.DriverTimeoutPlugin -c spark.driver.timeout=1min Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 4.0.0-SNAPSHOT /_/ Using Scala version 2.13.12 (OpenJDK 64-Bit Server VM, Java 17.0.10) Type in expressions to have them evaluated. Type :help for more information. 24/02/28 06:53:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Spark context Web UI available at http://localhost:4040 Spark context available as 'sc' (master = local[*], app id = local-1709132014477). Spark session available as 'spark'. scala> 24/02/28 06:54:34 WARN DriverTimeoutDriverPlugin: Terminate Driver JVM because it runs after 1 minute $ echo $? 124 ``` ### Does this PR introduce _any_ user-facing change? No, this is a new feature and a built-in plugin. ### How was this patch tested? Manually because this invokes `System.exit`. 1. Timeout with 1 minute ``` $ bin/spark-shell -c spark.plugins=org.apache.spark.deploy.DriverTimeoutPlugin -c spark.driver.timeout=1min ... scala> 24/02/28 06:54:34 WARN DriverTimeoutDriverPlugin: Terminate Driver JVM because it runs after 1 minute $ echo $? 124 ``` 2. `DriverTimeoutPlugin` will be ignored if the default value of `spark.driver.timeout` is used. ``` $ bin/spark-shell -c spark.plugins=org.apache.spark.deploy.DriverTimeoutPlugin ... 24/02/28 01:02:57 WARN DriverTimeoutDriverPlugin: Disabled with the timeout value 0. ... scala> ``` 3. `spark.driver.timeout` will be ignored if `DriverTimeoutPlugin` is not provided. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45313 from dongjoon-hyun/SPARK-47207. Authored-by: Dongjoon Hyun <dh...@apple.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../apache/spark/deploy/DriverTimeoutPlugin.scala | 62 ++++++++++++++++++++++ .../org/apache/spark/internal/config/package.scala | 9 ++++ .../org/apache/spark/util/SparkExitCode.scala | 3 ++ docs/configuration.md | 11 ++++ 4 files changed, 85 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/DriverTimeoutPlugin.scala b/core/src/main/scala/org/apache/spark/deploy/DriverTimeoutPlugin.scala new file mode 100644 index 000000000000..9b141d607572 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/DriverTimeoutPlugin.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy + +import java.util.{Map => JMap} +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} + +import scala.jdk.CollectionConverters._ + +import org.apache.spark.SparkContext +import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.DRIVER_TIMEOUT +import org.apache.spark.util.{SparkExitCode, ThreadUtils} + +/** + * A built-in plugin to provide Driver timeout feature. + */ +class DriverTimeoutPlugin extends SparkPlugin { + override def driverPlugin(): DriverPlugin = new DriverTimeoutDriverPlugin() + + // No-op + override def executorPlugin(): ExecutorPlugin = null +} + +class DriverTimeoutDriverPlugin extends DriverPlugin with Logging { + + private val timeoutService: ScheduledExecutorService = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-timeout") + + override def init(sc: SparkContext, ctx: PluginContext): JMap[String, String] = { + val timeout = sc.conf.get(DRIVER_TIMEOUT) + if (timeout == 0) { + logWarning("Disabled with the timeout value 0.") + } else { + val task: Runnable = () => { + logWarning(s"Terminate Driver JVM because it runs after $timeout minute" + + (if (timeout == 1) "" else "s")) + // We cannot use 'SparkContext.stop' because SparkContext might be in abnormal situation. + System.exit(SparkExitCode.DRIVER_TIMEOUT) + } + timeoutService.schedule(task, timeout, TimeUnit.MINUTES) + } + Map.empty[String, String].asJava + } + + override def shutdown(): Unit = timeoutService.shutdown() +} diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 7caac5884c74..1fcf75b02503 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1099,6 +1099,15 @@ package object config { .booleanConf .createWithDefault(false) + private[spark] val DRIVER_TIMEOUT = ConfigBuilder("spark.driver.timeout") + .doc("A timeout for Spark driver in minutes. 0 means infinite. For the positive time value, " + + "terminate the driver with the exit code 124 if it runs after timeout duration. To use, " + + "it's required to set `spark.plugins=org.apache.spark.deploy.DriverTimeoutPlugin`.") + .version("4.0.0") + .timeConf(TimeUnit.MINUTES) + .checkValue(v => v >= 0, "The value should be a non-negative time value.") + .createWithDefaultString("0min") + private[spark] val DRIVER_BIND_ADDRESS = ConfigBuilder("spark.driver.bindAddress") .doc("Address where to bind network listen sockets on the driver.") .version("2.1.0") diff --git a/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala b/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala index 75b3d134b94d..e8f8788243cd 100644 --- a/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala +++ b/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala @@ -45,6 +45,9 @@ private[spark] object SparkExitCode { OutOfMemoryError. */ val OOM = 52 + /** Exit because the driver is running over the given threshold. */ + val DRIVER_TIMEOUT = 124 + /** Exception indicate command not found. */ val ERROR_COMMAND_NOT_FOUND = 127 } diff --git a/docs/configuration.md b/docs/configuration.md index f6e1e449e2dc..f0d68c55e7b3 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -434,6 +434,17 @@ of the most common options to set are: </td> <td>1.3.0</td> </tr> +<tr> + <td><code>spark.driver.timeout</code></td> + <td>0min</td> + <td> + A timeout for Spark driver in minutes. 0 means infinite. For the positive time value, + terminate the driver with the exit code 124 if it runs after timeout duration. To use, + it's required to set <code>spark.plugins</code> with + <code>org.apache.spark.deploy.DriverTimeoutPlugin</code>. + </td> + <td>4.0.0</td> +</tr> <tr> <td><code>spark.driver.log.localDir</code></td> <td>(none)</td> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org