This is an automated email from the ASF dual-hosted git repository. wuyi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 4eb0edf5382 [SPARK-40596][CORE] Populate ExecutorDecommission with messages in ExecutorDecommissionInfo 4eb0edf5382 is described below commit 4eb0edf538266a8f7085fe57255a6870b2c13769 Author: Bo Zhang <bo.zh...@databricks.com> AuthorDate: Tue Oct 11 09:50:06 2022 +0800 [SPARK-40596][CORE] Populate ExecutorDecommission with messages in ExecutorDecommissionInfo ### What changes were proposed in this pull request? This change populates `ExecutorDecommission` with messages in `ExecutorDecommissionInfo`. ### Why are the changes needed? Currently the message in `ExecutorDecommission` is a fixed value ("Executor decommission."), so it is the same for all cases, e.g. spot instance interruptions and auto-scaling down. With this change we can better differentiate those cases. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added a unit test. Closes #38030 from bozhang2820/spark-40596. Authored-by: Bo Zhang <bo.zh...@databricks.com> Signed-off-by: Yi Wu <yi...@databricks.com> --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 2 +- .../org/apache/spark/scheduler/ExecutorLossReason.scala | 11 +++++++++-- .../scala/org/apache/spark/scheduler/TaskSetManager.scala | 2 +- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 13 +++++++------ .../apache/spark/scheduler/dynalloc/ExecutorMonitor.scala | 2 +- .../storage/BlockManagerDecommissionIntegrationSuite.scala | 14 +++++++++++--- 6 files changed, 30 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index c5529851382..7ad53b8f9f8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -2949,7 +2949,7 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler case ExecutorLost(execId, reason) => val workerHost = reason match { case ExecutorProcessLost(_, workerHost, _) => workerHost - case ExecutorDecommission(workerHost) => workerHost + case ExecutorDecommission(workerHost, _) => workerHost case _ => None } dagScheduler.handleExecutorLost(execId, workerHost) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala index f333c01bb89..fb6a62551fa 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala @@ -77,6 +77,13 @@ case class ExecutorProcessLost( * If you update this code make sure to re-run the K8s integration tests. * * @param workerHost it is defined when the worker is decommissioned too + * @param reason detailed decommission message */ -private [spark] case class ExecutorDecommission(workerHost: Option[String] = None) - extends ExecutorLossReason("Executor decommission.") +private [spark] case class ExecutorDecommission( + workerHost: Option[String] = None, + reason: String = "") + extends ExecutorLossReason(ExecutorDecommission.msgPrefix + reason) + +private[spark] object ExecutorDecommission { + val msgPrefix = "Executor decommission: " +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 1d157f51fe6..943d1e53df4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -1071,7 +1071,7 @@ private[spark] class TaskSetManager( for ((tid, info) <- taskInfos if info.running && info.executorId == execId) { val exitCausedByApp: Boolean = reason match { case ExecutorExited(_, false, _) => false - case ExecutorKilled | ExecutorDecommission(_) => false + case ExecutorKilled | ExecutorDecommission(_, _) => false case ExecutorProcessLost(_, _, false) => false // If the task is launching, this indicates that Driver has sent LaunchTask to Executor, // but Executor has not sent StatusUpdate(TaskState.RUNNING) to Driver. Hence, we assume diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index e37abd76296..225dd1d75bf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -99,8 +99,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Executors that have been lost, but for which we don't yet know the real exit reason. protected val executorsPendingLossReason = new HashSet[String] - // Executors which are being decommissioned. Maps from executorId to workerHost. - protected val executorsPendingDecommission = new HashMap[String, Option[String]] + // Executors which are being decommissioned. Maps from executorId to ExecutorDecommissionInfo. + protected val executorsPendingDecommission = new HashMap[String, ExecutorDecommissionInfo] // A map of ResourceProfile id to map of hostname with its possible task number running on it @GuardedBy("CoarseGrainedSchedulerBackend.this") @@ -447,11 +447,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp executorDataMap -= executorId executorsPendingLossReason -= executorId val killedByDriver = executorsPendingToRemove.remove(executorId).getOrElse(false) - val workerHostOpt = executorsPendingDecommission.remove(executorId) + val decommissionInfoOpt = executorsPendingDecommission.remove(executorId) if (killedByDriver) { ExecutorKilled - } else if (workerHostOpt.isDefined) { - ExecutorDecommission(workerHostOpt.get) + } else if (decommissionInfoOpt.isDefined) { + val decommissionInfo = decommissionInfoOpt.get + ExecutorDecommission(decommissionInfo.workerHost, decommissionInfo.message) } else { reason } @@ -535,7 +536,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Only bother decommissioning executors which are alive. if (isExecutorActive(executorId)) { scheduler.executorDecommission(executorId, decomInfo) - executorsPendingDecommission(executorId) = decomInfo.workerHost + executorsPendingDecommission(executorId) = decomInfo Some(executorId) } else { None diff --git a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala index 12942896369..fc9248de7ee 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala @@ -356,7 +356,7 @@ private[spark] class ExecutorMonitor( if (removed != null) { decrementExecResourceProfileCount(removed.resourceProfileId) if (event.reason == ExecutorLossMessage.decommissionFinished || - event.reason == ExecutorDecommission().message) { + (event.reason != null && event.reason.startsWith(ExecutorDecommission.msgPrefix))) { metrics.gracefullyDecommissioned.inc() } else if (removed.decommissioning) { metrics.decommissionUnfinished.inc() diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala index e004c334dee..d9d2e6102f1 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala @@ -183,9 +183,14 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS taskEndEvents.asScala.filter(_.taskInfo.successful).map(_.taskInfo.executorId).headOption } - sc.addSparkListener(new SparkListener { + val listener = new SparkListener { + var removeReasonValidated = false + override def onExecutorRemoved(execRemoved: SparkListenerExecutorRemoved): Unit = { executorRemovedSem.release() + if (execRemoved.reason == ExecutorDecommission.msgPrefix + "test msg 0") { + removeReasonValidated = true + } } override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { @@ -211,7 +216,8 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS } } } - }) + } + sc.addSparkListener(listener) // Cache the RDD lazily if (persist) { @@ -247,7 +253,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS // Decommission executor and ensure it is not relaunched by setting adjustTargetNumExecutors sched.decommissionExecutor( execToDecommission, - ExecutorDecommissionInfo("", None), + ExecutorDecommissionInfo("test msg 0", None), adjustTargetNumExecutors = true) val decomTime = new SystemClock().getTimeMillis() @@ -343,5 +349,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS // should have same value like before assert(testRdd.count() === numParts) assert(accum.value === numParts) + import scala.language.reflectiveCalls + assert(listener.removeReasonValidated) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org