This is an automated email from the ASF dual-hosted git repository. joshrosen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 54587638685b [SPARK-48541][CORE] Add a new exit code for executors killed by TaskReaper 54587638685b is described below commit 54587638685bd633cb3840a23afd5a809d796d47 Author: Bo Zhang <bo.zh...@databricks.com> AuthorDate: Wed Jun 19 11:03:24 2024 -0700 [SPARK-48541][CORE] Add a new exit code for executors killed by TaskReaper ### What changes were proposed in this pull request? This change adds a new exit code, 57, for executors killed by TaskReaper. ### Why are the changes needed? This is to better monitor the cases when executors are killed by TaskReaper. ### Does this PR introduce _any_ user-facing change? Yes. The exit code for executors killed by TaskReaper will change. ### How was this patch tested? Updated unit tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46883 from bozhang2820/spark-48541. Authored-by: Bo Zhang <bo.zh...@databricks.com> Signed-off-by: Josh Rosen <joshro...@databricks.com> --- .../main/scala/org/apache/spark/executor/Executor.scala | 7 ++++--- .../org/apache/spark/executor/ExecutorExitCode.scala | 6 ++++++ .../spark/util/SparkUncaughtExceptionHandler.scala | 3 +++ .../scala/org/apache/spark/JobCancellationSuite.scala | 17 +++++++++++++++-- .../spark/util/SparkUncaughtExceptionHandlerSuite.scala | 5 +++++ 5 files changed, 33 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 4e5d151468d8..7317d3c47c08 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -1041,9 +1041,8 @@ private[spark] class Executor( } else { // In non-local-mode, the exception thrown here will bubble up to the uncaught exception // handler and cause the executor JVM to exit. - throw SparkException.internalError( - s"Killing executor JVM because killed task $taskId could not be stopped within " + - s"$killTimeoutMs ms.", category = "EXECUTOR") + throw new KilledByTaskReaperException(s"Killing executor JVM because killed task " + + s"$taskId could not be stopped within $killTimeoutMs ms.") } } } finally { @@ -1328,3 +1327,5 @@ private[spark] object Executor { } } } + +class KilledByTaskReaperException(message: String) extends SparkException(message) diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala index 99858f785600..5300598ef53e 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala @@ -45,6 +45,10 @@ object ExecutorExitCode { */ val HEARTBEAT_FAILURE = 56 + /** The default uncaught exception handler was reached and the exception was thrown by + * TaskReaper. */ + val KILLED_BY_TASK_REAPER = 57 + def explainExitCode(exitCode: Int): String = { exitCode match { case UNCAUGHT_EXCEPTION => "Uncaught exception" @@ -59,6 +63,8 @@ object ExecutorExitCode { "ExternalBlockStore failed to create a local temporary directory." case HEARTBEAT_FAILURE => "Unable to send heartbeats to driver." + case KILLED_BY_TASK_REAPER => + "Executor killed by TaskReaper." case _ => "Unknown executor exit code (" + exitCode + ")" + ( if (exitCode > 128) { diff --git a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala index 730b762a3948..c1ea4f929101 100644 --- a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala +++ b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala @@ -17,6 +17,7 @@ package org.apache.spark.util +import org.apache.spark.executor.{ExecutorExitCode, KilledByTaskReaperException} import org.apache.spark.internal.{Logging, MDC} import org.apache.spark.internal.LogKeys.THREAD @@ -56,6 +57,8 @@ private[spark] class SparkUncaughtExceptionHandler(val exitOnUncaughtException: // SPARK-24294: This is defensive code, in case that SparkFatalException is // misused and uncaught. System.exit(SparkExitCode.OOM) + case _: KilledByTaskReaperException if exitOnUncaughtException => + System.exit(ExecutorExitCode.KILLED_BY_TASK_REAPER) case _ if exitOnUncaughtException => System.exit(SparkExitCode.UNCAUGHT_EXCEPTION) case _ => diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala index c15fdf098bb5..58cf14e969e5 100644 --- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala +++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark import java.util.concurrent.{Semaphore, TimeUnit} import java.util.concurrent.atomic.AtomicInteger +import scala.collection.mutable.ArrayBuffer import scala.concurrent.{ExecutionContext, Future} // scalastyle:off executioncontextglobal import scala.concurrent.ExecutionContext.Implicits.global @@ -29,9 +30,10 @@ import scala.concurrent.duration._ import org.scalatest.BeforeAndAfter import org.scalatest.matchers.must.Matchers +import org.apache.spark.executor.ExecutorExitCode import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Deploy._ -import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd, SparkListenerJobStart, SparkListenerStageCompleted, SparkListenerTaskEnd, SparkListenerTaskStart} +import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorRemoved, SparkListenerJobEnd, SparkListenerJobStart, SparkListenerStageCompleted, SparkListenerTaskEnd, SparkListenerTaskStart} import org.apache.spark.util.ThreadUtils /** @@ -429,12 +431,20 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft .set(TASK_REAPER_KILL_TIMEOUT.key, "5s") sc = new SparkContext("local-cluster[2,1,1024]", "test", conf) - // Add a listener to release the semaphore once any tasks are launched. + // Add a listener to release a semaphore once any tasks are launched, and another semaphore + // once an executor is removed. val sem = new Semaphore(0) + val semExec = new Semaphore(0) + val execLossReason = new ArrayBuffer[String]() sc.addSparkListener(new SparkListener { override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { sem.release() } + + override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { + execLossReason += executorRemoved.reason + semExec.release() + } }) // jobA is the one to be cancelled. @@ -455,6 +465,9 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft sc.cancelJobGroup("jobA") val e = intercept[SparkException] { ThreadUtils.awaitResult(jobA, 15.seconds) }.getCause assert(e.getMessage contains "cancel") + semExec.acquire(2) + val expectedReason = s"Command exited with code ${ExecutorExitCode.KILLED_BY_TASK_REAPER}" + assert(execLossReason == Seq(expectedReason, expectedReason)) // Once A is cancelled, job B should finish fairly quickly. assert(ThreadUtils.awaitResult(jobB, 1.minute) === 100) diff --git a/core/src/test/scala/org/apache/spark/util/SparkUncaughtExceptionHandlerSuite.scala b/core/src/test/scala/org/apache/spark/util/SparkUncaughtExceptionHandlerSuite.scala index 9e23b25493df..484340966155 100644 --- a/core/src/test/scala/org/apache/spark/util/SparkUncaughtExceptionHandlerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/SparkUncaughtExceptionHandlerSuite.scala @@ -22,6 +22,7 @@ import java.io.File import scala.util.Try import org.apache.spark.SparkFunSuite +import org.apache.spark.executor.{ExecutorExitCode, KilledByTaskReaperException} class SparkUncaughtExceptionHandlerSuite extends SparkFunSuite { @@ -33,6 +34,8 @@ class SparkUncaughtExceptionHandlerSuite extends SparkFunSuite { (ThrowableTypes.RuntimeException, false, 0), (ThrowableTypes.OutOfMemoryError, true, SparkExitCode.OOM), (ThrowableTypes.OutOfMemoryError, false, SparkExitCode.OOM), + (ThrowableTypes.KilledByTaskReaperException, true, ExecutorExitCode.KILLED_BY_TASK_REAPER), + (ThrowableTypes.KilledByTaskReaperException, false, 0), (ThrowableTypes.SparkFatalRuntimeException, true, SparkExitCode.UNCAUGHT_EXCEPTION), (ThrowableTypes.SparkFatalRuntimeException, false, 0), (ThrowableTypes.SparkFatalOutOfMemoryError, true, SparkExitCode.OOM), @@ -64,6 +67,8 @@ object ThrowableTypes extends Enumeration { val RuntimeException = ThrowableTypesVal("RuntimeException", new RuntimeException) val OutOfMemoryError = ThrowableTypesVal("OutOfMemoryError", new OutOfMemoryError) + val KilledByTaskReaperException = ThrowableTypesVal("KilledByTaskReaperException", + new KilledByTaskReaperException("dummy message")) val SparkFatalRuntimeException = ThrowableTypesVal("SparkFatalException(RuntimeException)", new SparkFatalException(new RuntimeException)) val SparkFatalOutOfMemoryError = ThrowableTypesVal("SparkFatalException(OutOfMemoryError)", --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org