This is an automated email from the ASF dual-hosted git repository.

joshrosen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 54587638685b [SPARK-48541][CORE] Add a new exit code for executors 
killed by TaskReaper
54587638685b is described below

commit 54587638685bd633cb3840a23afd5a809d796d47
Author: Bo Zhang <bo.zh...@databricks.com>
AuthorDate: Wed Jun 19 11:03:24 2024 -0700

    [SPARK-48541][CORE] Add a new exit code for executors killed by TaskReaper
    
    ### What changes were proposed in this pull request?
    This change adds a new exit code, 57, for executors killed by TaskReaper.
    
    ### Why are the changes needed?
    This is to better monitor the cases when executors are killed by TaskReaper.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. The exit code for executors killed by TaskReaper will change.
    
    ### How was this patch tested?
    Updated unit tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #46883 from bozhang2820/spark-48541.
    
    Authored-by: Bo Zhang <bo.zh...@databricks.com>
    Signed-off-by: Josh Rosen <joshro...@databricks.com>
---
 .../main/scala/org/apache/spark/executor/Executor.scala |  7 ++++---
 .../org/apache/spark/executor/ExecutorExitCode.scala    |  6 ++++++
 .../spark/util/SparkUncaughtExceptionHandler.scala      |  3 +++
 .../scala/org/apache/spark/JobCancellationSuite.scala   | 17 +++++++++++++++--
 .../spark/util/SparkUncaughtExceptionHandlerSuite.scala |  5 +++++
 5 files changed, 33 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 4e5d151468d8..7317d3c47c08 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -1041,9 +1041,8 @@ private[spark] class Executor(
           } else {
             // In non-local-mode, the exception thrown here will bubble up to 
the uncaught exception
             // handler and cause the executor JVM to exit.
-            throw SparkException.internalError(
-              s"Killing executor JVM because killed task $taskId could not be 
stopped within " +
-                s"$killTimeoutMs ms.", category = "EXECUTOR")
+            throw new KilledByTaskReaperException(s"Killing executor JVM 
because killed task " +
+              s"$taskId could not be stopped within $killTimeoutMs ms.")
           }
         }
       } finally {
@@ -1328,3 +1327,5 @@ private[spark] object Executor {
     }
   }
 }
+
+class KilledByTaskReaperException(message: String) extends 
SparkException(message)
diff --git 
a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala 
b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
index 99858f785600..5300598ef53e 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
@@ -45,6 +45,10 @@ object ExecutorExitCode {
    */
   val HEARTBEAT_FAILURE = 56
 
+  /** The default uncaught exception handler was reached and the exception was 
thrown by
+   * TaskReaper. */
+  val KILLED_BY_TASK_REAPER = 57
+
   def explainExitCode(exitCode: Int): String = {
     exitCode match {
       case UNCAUGHT_EXCEPTION => "Uncaught exception"
@@ -59,6 +63,8 @@ object ExecutorExitCode {
         "ExternalBlockStore failed to create a local temporary directory."
       case HEARTBEAT_FAILURE =>
         "Unable to send heartbeats to driver."
+      case KILLED_BY_TASK_REAPER =>
+        "Executor killed by TaskReaper."
       case _ =>
         "Unknown executor exit code (" + exitCode + ")" + (
           if (exitCode > 128) {
diff --git 
a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala 
b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala
index 730b762a3948..c1ea4f929101 100644
--- 
a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.util
 
+import org.apache.spark.executor.{ExecutorExitCode, 
KilledByTaskReaperException}
 import org.apache.spark.internal.{Logging, MDC}
 import org.apache.spark.internal.LogKeys.THREAD
 
@@ -56,6 +57,8 @@ private[spark] class SparkUncaughtExceptionHandler(val 
exitOnUncaughtException:
             // SPARK-24294: This is defensive code, in case that 
SparkFatalException is
             // misused and uncaught.
             System.exit(SparkExitCode.OOM)
+          case _: KilledByTaskReaperException if exitOnUncaughtException =>
+            System.exit(ExecutorExitCode.KILLED_BY_TASK_REAPER)
           case _ if exitOnUncaughtException =>
             System.exit(SparkExitCode.UNCAUGHT_EXCEPTION)
           case _ =>
diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala 
b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
index c15fdf098bb5..58cf14e969e5 100644
--- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark
 import java.util.concurrent.{Semaphore, TimeUnit}
 import java.util.concurrent.atomic.AtomicInteger
 
+import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.{ExecutionContext, Future}
 // scalastyle:off executioncontextglobal
 import scala.concurrent.ExecutionContext.Implicits.global
@@ -29,9 +30,10 @@ import scala.concurrent.duration._
 import org.scalatest.BeforeAndAfter
 import org.scalatest.matchers.must.Matchers
 
+import org.apache.spark.executor.ExecutorExitCode
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Deploy._
-import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd, 
SparkListenerJobStart, SparkListenerStageCompleted, SparkListenerTaskEnd, 
SparkListenerTaskStart}
+import org.apache.spark.scheduler.{SparkListener, 
SparkListenerExecutorRemoved, SparkListenerJobEnd, SparkListenerJobStart, 
SparkListenerStageCompleted, SparkListenerTaskEnd, SparkListenerTaskStart}
 import org.apache.spark.util.ThreadUtils
 
 /**
@@ -429,12 +431,20 @@ class JobCancellationSuite extends SparkFunSuite with 
Matchers with BeforeAndAft
       .set(TASK_REAPER_KILL_TIMEOUT.key, "5s")
     sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
 
-    // Add a listener to release the semaphore once any tasks are launched.
+    // Add a listener to release a semaphore once any tasks are launched, and 
another semaphore
+    // once an executor is removed.
     val sem = new Semaphore(0)
+    val semExec = new Semaphore(0)
+    val execLossReason = new ArrayBuffer[String]()
     sc.addSparkListener(new SparkListener {
       override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
         sem.release()
       }
+
+      override def onExecutorRemoved(executorRemoved: 
SparkListenerExecutorRemoved): Unit = {
+        execLossReason += executorRemoved.reason
+        semExec.release()
+      }
     })
 
     // jobA is the one to be cancelled.
@@ -455,6 +465,9 @@ class JobCancellationSuite extends SparkFunSuite with 
Matchers with BeforeAndAft
     sc.cancelJobGroup("jobA")
     val e = intercept[SparkException] { ThreadUtils.awaitResult(jobA, 
15.seconds) }.getCause
     assert(e.getMessage contains "cancel")
+    semExec.acquire(2)
+    val expectedReason = s"Command exited with code 
${ExecutorExitCode.KILLED_BY_TASK_REAPER}"
+    assert(execLossReason == Seq(expectedReason, expectedReason))
 
     // Once A is cancelled, job B should finish fairly quickly.
     assert(ThreadUtils.awaitResult(jobB, 1.minute) === 100)
diff --git 
a/core/src/test/scala/org/apache/spark/util/SparkUncaughtExceptionHandlerSuite.scala
 
b/core/src/test/scala/org/apache/spark/util/SparkUncaughtExceptionHandlerSuite.scala
index 9e23b25493df..484340966155 100644
--- 
a/core/src/test/scala/org/apache/spark/util/SparkUncaughtExceptionHandlerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/util/SparkUncaughtExceptionHandlerSuite.scala
@@ -22,6 +22,7 @@ import java.io.File
 import scala.util.Try
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.executor.{ExecutorExitCode, 
KilledByTaskReaperException}
 
 class SparkUncaughtExceptionHandlerSuite extends SparkFunSuite {
 
@@ -33,6 +34,8 @@ class SparkUncaughtExceptionHandlerSuite extends 
SparkFunSuite {
     (ThrowableTypes.RuntimeException, false, 0),
     (ThrowableTypes.OutOfMemoryError, true, SparkExitCode.OOM),
     (ThrowableTypes.OutOfMemoryError, false, SparkExitCode.OOM),
+    (ThrowableTypes.KilledByTaskReaperException, true, 
ExecutorExitCode.KILLED_BY_TASK_REAPER),
+    (ThrowableTypes.KilledByTaskReaperException, false, 0),
     (ThrowableTypes.SparkFatalRuntimeException, true, 
SparkExitCode.UNCAUGHT_EXCEPTION),
     (ThrowableTypes.SparkFatalRuntimeException, false, 0),
     (ThrowableTypes.SparkFatalOutOfMemoryError, true, SparkExitCode.OOM),
@@ -64,6 +67,8 @@ object ThrowableTypes extends Enumeration {
 
   val RuntimeException = ThrowableTypesVal("RuntimeException", new 
RuntimeException)
   val OutOfMemoryError = ThrowableTypesVal("OutOfMemoryError", new 
OutOfMemoryError)
+  val KilledByTaskReaperException = 
ThrowableTypesVal("KilledByTaskReaperException",
+    new KilledByTaskReaperException("dummy message"))
   val SparkFatalRuntimeException = 
ThrowableTypesVal("SparkFatalException(RuntimeException)",
     new SparkFatalException(new RuntimeException))
   val SparkFatalOutOfMemoryError = 
ThrowableTypesVal("SparkFatalException(OutOfMemoryError)",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to