This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a93e46eb062 [SPARK-47207][CORE] Support `spark.driver.timeout` and 
`DriverTimeoutPlugin`
2a93e46eb062 is described below

commit 2a93e46eb0627df9cd288156bffa0a0815906c3c
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Wed Feb 28 09:27:53 2024 -0800

    [SPARK-47207][CORE] Support `spark.driver.timeout` and `DriverTimeoutPlugin`
    
    ### What changes were proposed in this pull request?
    
    This PR aims to support `spark.driver.timeout` and `DriverTimeoutPlugin`.
    
    ### Why are the changes needed?
    
    Sometime, Spark applications fall into abnormal situation and hang.
    
    We had better provide a way to guarantee the termination after pre-defined 
timeout via a standard way.
    - spark.plugins=org.apache.spark.deploy.DriverTimeoutPlugin
    - spark.driver.timeout=1min
    
    ```
    $ bin/spark-shell -c 
spark.plugins=org.apache.spark.deploy.DriverTimeoutPlugin -c 
spark.driver.timeout=1min
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
setLogLevel(newLevel).
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_\   version 4.0.0-SNAPSHOT
          /_/
    
    Using Scala version 2.13.12 (OpenJDK 64-Bit Server VM, Java 17.0.10)
    Type in expressions to have them evaluated.
    Type :help for more information.
    24/02/28 06:53:34 WARN NativeCodeLoader: Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
    Spark context Web UI available at http://localhost:4040
    Spark context available as 'sc' (master = local[*], app id = 
local-1709132014477).
    Spark session available as 'spark'.
    
    scala> 24/02/28 06:54:34 WARN DriverTimeoutDriverPlugin: Terminate Driver 
JVM because it runs after 1 minute
    
    $ echo $?
    124
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, this is a new feature and a built-in plugin.
    
    ### How was this patch tested?
    
    Manually because this invokes `System.exit`.
    
    1. Timeout with 1 minute
    ```
    $ bin/spark-shell -c 
spark.plugins=org.apache.spark.deploy.DriverTimeoutPlugin -c 
spark.driver.timeout=1min
    ...
    scala> 24/02/28 06:54:34 WARN DriverTimeoutDriverPlugin: Terminate Driver 
JVM because it runs after 1 minute
    
    $ echo $?
    124
    ```
    
    2. `DriverTimeoutPlugin` will be ignored if the default value of 
`spark.driver.timeout` is used.
    ```
    $ bin/spark-shell -c 
spark.plugins=org.apache.spark.deploy.DriverTimeoutPlugin
    ...
    24/02/28 01:02:57 WARN DriverTimeoutDriverPlugin: Disabled with the timeout 
value 0.
    ...
    scala>
    ```
    
    3. `spark.driver.timeout` will be ignored if `DriverTimeoutPlugin` is not 
provided.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #45313 from dongjoon-hyun/SPARK-47207.
    
    Authored-by: Dongjoon Hyun <dh...@apple.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../apache/spark/deploy/DriverTimeoutPlugin.scala  | 62 ++++++++++++++++++++++
 .../org/apache/spark/internal/config/package.scala |  9 ++++
 .../org/apache/spark/util/SparkExitCode.scala      |  3 ++
 docs/configuration.md                              | 11 ++++
 4 files changed, 85 insertions(+)

diff --git 
a/core/src/main/scala/org/apache/spark/deploy/DriverTimeoutPlugin.scala 
b/core/src/main/scala/org/apache/spark/deploy/DriverTimeoutPlugin.scala
new file mode 100644
index 000000000000..9b141d607572
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/DriverTimeoutPlugin.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy
+
+import java.util.{Map => JMap}
+import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.SparkContext
+import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, 
PluginContext, SparkPlugin}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.DRIVER_TIMEOUT
+import org.apache.spark.util.{SparkExitCode, ThreadUtils}
+
+/**
+ * A built-in plugin to provide Driver timeout feature.
+ */
+class DriverTimeoutPlugin extends SparkPlugin {
+  override def driverPlugin(): DriverPlugin = new DriverTimeoutDriverPlugin()
+
+  // No-op
+  override def executorPlugin(): ExecutorPlugin = null
+}
+
+class DriverTimeoutDriverPlugin extends DriverPlugin with Logging {
+
+  private val timeoutService: ScheduledExecutorService =
+    ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-timeout")
+
+  override def init(sc: SparkContext, ctx: PluginContext): JMap[String, 
String] = {
+    val timeout = sc.conf.get(DRIVER_TIMEOUT)
+    if (timeout == 0) {
+      logWarning("Disabled with the timeout value 0.")
+    } else {
+      val task: Runnable = () => {
+        logWarning(s"Terminate Driver JVM because it runs after $timeout 
minute" +
+          (if (timeout == 1) "" else "s"))
+        // We cannot use 'SparkContext.stop' because SparkContext might be in 
abnormal situation.
+        System.exit(SparkExitCode.DRIVER_TIMEOUT)
+      }
+      timeoutService.schedule(task, timeout, TimeUnit.MINUTES)
+    }
+    Map.empty[String, String].asJava
+  }
+
+  override def shutdown(): Unit = timeoutService.shutdown()
+}
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 7caac5884c74..1fcf75b02503 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -1099,6 +1099,15 @@ package object config {
     .booleanConf
     .createWithDefault(false)
 
+  private[spark] val DRIVER_TIMEOUT = ConfigBuilder("spark.driver.timeout")
+    .doc("A timeout for Spark driver in minutes. 0 means infinite. For the 
positive time value, " +
+      "terminate the driver with the exit code 124 if it runs after timeout 
duration. To use, " +
+      "it's required to set 
`spark.plugins=org.apache.spark.deploy.DriverTimeoutPlugin`.")
+    .version("4.0.0")
+    .timeConf(TimeUnit.MINUTES)
+    .checkValue(v => v >= 0, "The value should be a non-negative time value.")
+    .createWithDefaultString("0min")
+
   private[spark] val DRIVER_BIND_ADDRESS = 
ConfigBuilder("spark.driver.bindAddress")
     .doc("Address where to bind network listen sockets on the driver.")
     .version("2.1.0")
diff --git a/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala 
b/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala
index 75b3d134b94d..e8f8788243cd 100644
--- a/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala
+++ b/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala
@@ -45,6 +45,9 @@ private[spark] object SparkExitCode {
       OutOfMemoryError. */
   val OOM = 52
 
+  /** Exit because the driver is running over the given threshold. */
+  val DRIVER_TIMEOUT = 124
+
   /** Exception indicate command not found. */
   val ERROR_COMMAND_NOT_FOUND = 127
 }
diff --git a/docs/configuration.md b/docs/configuration.md
index f6e1e449e2dc..f0d68c55e7b3 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -434,6 +434,17 @@ of the most common options to set are:
   </td>
   <td>1.3.0</td>
 </tr>
+<tr>
+  <td><code>spark.driver.timeout</code></td>
+  <td>0min</td>
+  <td>
+    A timeout for Spark driver in minutes. 0 means infinite. For the positive 
time value,
+    terminate the driver with the exit code 124 if it runs after timeout 
duration. To use,
+    it's required to set <code>spark.plugins</code> with
+    <code>org.apache.spark.deploy.DriverTimeoutPlugin</code>.
+  </td>
+  <td>4.0.0</td>
+</tr>
 <tr>
   <td><code>spark.driver.log.localDir</code></td>
   <td>(none)</td>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to