This is an automated email from the ASF dual-hosted git repository.

wuyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4eb0edf5382 [SPARK-40596][CORE] Populate ExecutorDecommission with 
messages in ExecutorDecommissionInfo
4eb0edf5382 is described below

commit 4eb0edf538266a8f7085fe57255a6870b2c13769
Author: Bo Zhang <bo.zh...@databricks.com>
AuthorDate: Tue Oct 11 09:50:06 2022 +0800

    [SPARK-40596][CORE] Populate ExecutorDecommission with messages in 
ExecutorDecommissionInfo
    
    ### What changes were proposed in this pull request?
    
    This change populates `ExecutorDecommission` with messages in 
`ExecutorDecommissionInfo`.
    
    ### Why are the changes needed?
    
    Currently the message in `ExecutorDecommission` is a fixed value ("Executor 
decommission."), so it is the same for all cases, e.g. spot instance 
interruptions and auto-scaling down. With this change we can better 
differentiate those cases.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Added a unit test.
    
    Closes #38030 from bozhang2820/spark-40596.
    
    Authored-by: Bo Zhang <bo.zh...@databricks.com>
    Signed-off-by: Yi Wu <yi...@databricks.com>
---
 .../scala/org/apache/spark/scheduler/DAGScheduler.scala    |  2 +-
 .../org/apache/spark/scheduler/ExecutorLossReason.scala    | 11 +++++++++--
 .../scala/org/apache/spark/scheduler/TaskSetManager.scala  |  2 +-
 .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala  | 13 +++++++------
 .../apache/spark/scheduler/dynalloc/ExecutorMonitor.scala  |  2 +-
 .../storage/BlockManagerDecommissionIntegrationSuite.scala | 14 +++++++++++---
 6 files changed, 30 insertions(+), 14 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index c5529851382..7ad53b8f9f8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -2949,7 +2949,7 @@ private[scheduler] class 
DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
     case ExecutorLost(execId, reason) =>
       val workerHost = reason match {
         case ExecutorProcessLost(_, workerHost, _) => workerHost
-        case ExecutorDecommission(workerHost) => workerHost
+        case ExecutorDecommission(workerHost, _) => workerHost
         case _ => None
       }
       dagScheduler.handleExecutorLost(execId, workerHost)
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala 
b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
index f333c01bb89..fb6a62551fa 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
@@ -77,6 +77,13 @@ case class ExecutorProcessLost(
  * If you update this code make sure to re-run the K8s integration tests.
  *
  * @param workerHost it is defined when the worker is decommissioned too
+ * @param reason detailed decommission message
  */
-private [spark] case class ExecutorDecommission(workerHost: Option[String] = 
None)
- extends ExecutorLossReason("Executor decommission.")
+private [spark] case class ExecutorDecommission(
+    workerHost: Option[String] = None,
+    reason: String = "")
+  extends ExecutorLossReason(ExecutorDecommission.msgPrefix + reason)
+
+private[spark] object ExecutorDecommission {
+  val msgPrefix = "Executor decommission: "
+}
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 1d157f51fe6..943d1e53df4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -1071,7 +1071,7 @@ private[spark] class TaskSetManager(
     for ((tid, info) <- taskInfos if info.running && info.executorId == 
execId) {
       val exitCausedByApp: Boolean = reason match {
         case ExecutorExited(_, false, _) => false
-        case ExecutorKilled | ExecutorDecommission(_) => false
+        case ExecutorKilled | ExecutorDecommission(_, _) => false
         case ExecutorProcessLost(_, _, false) => false
         // If the task is launching, this indicates that Driver has sent 
LaunchTask to Executor,
         // but Executor has not sent StatusUpdate(TaskState.RUNNING) to 
Driver. Hence, we assume
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index e37abd76296..225dd1d75bf 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -99,8 +99,8 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   // Executors that have been lost, but for which we don't yet know the real 
exit reason.
   protected val executorsPendingLossReason = new HashSet[String]
 
-  // Executors which are being decommissioned. Maps from executorId to 
workerHost.
-  protected val executorsPendingDecommission = new HashMap[String, 
Option[String]]
+  // Executors which are being decommissioned. Maps from executorId to 
ExecutorDecommissionInfo.
+  protected val executorsPendingDecommission = new HashMap[String, 
ExecutorDecommissionInfo]
 
   // A map of ResourceProfile id to map of hostname with its possible task 
number running on it
   @GuardedBy("CoarseGrainedSchedulerBackend.this")
@@ -447,11 +447,12 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
             executorDataMap -= executorId
             executorsPendingLossReason -= executorId
             val killedByDriver = 
executorsPendingToRemove.remove(executorId).getOrElse(false)
-            val workerHostOpt = executorsPendingDecommission.remove(executorId)
+            val decommissionInfoOpt = 
executorsPendingDecommission.remove(executorId)
             if (killedByDriver) {
               ExecutorKilled
-            } else if (workerHostOpt.isDefined) {
-              ExecutorDecommission(workerHostOpt.get)
+            } else if (decommissionInfoOpt.isDefined) {
+              val decommissionInfo = decommissionInfoOpt.get
+              ExecutorDecommission(decommissionInfo.workerHost, 
decommissionInfo.message)
             } else {
               reason
             }
@@ -535,7 +536,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
       // Only bother decommissioning executors which are alive.
       if (isExecutorActive(executorId)) {
         scheduler.executorDecommission(executorId, decomInfo)
-        executorsPendingDecommission(executorId) = decomInfo.workerHost
+        executorsPendingDecommission(executorId) = decomInfo
         Some(executorId)
       } else {
         None
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala 
b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
index 12942896369..fc9248de7ee 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
@@ -356,7 +356,7 @@ private[spark] class ExecutorMonitor(
     if (removed != null) {
       decrementExecResourceProfileCount(removed.resourceProfileId)
       if (event.reason == ExecutorLossMessage.decommissionFinished ||
-        event.reason == ExecutorDecommission().message) {
+        (event.reason != null && 
event.reason.startsWith(ExecutorDecommission.msgPrefix))) {
         metrics.gracefullyDecommissioned.inc()
       } else if (removed.decommissioning) {
           metrics.decommissionUnfinished.inc()
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
index e004c334dee..d9d2e6102f1 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
@@ -183,9 +183,14 @@ class BlockManagerDecommissionIntegrationSuite extends 
SparkFunSuite with LocalS
       
taskEndEvents.asScala.filter(_.taskInfo.successful).map(_.taskInfo.executorId).headOption
     }
 
-    sc.addSparkListener(new SparkListener {
+    val listener = new SparkListener {
+      var removeReasonValidated = false
+
       override def onExecutorRemoved(execRemoved: 
SparkListenerExecutorRemoved): Unit = {
         executorRemovedSem.release()
+        if (execRemoved.reason == ExecutorDecommission.msgPrefix + "test msg 
0") {
+          removeReasonValidated = true
+        }
       }
 
       override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
@@ -211,7 +216,8 @@ class BlockManagerDecommissionIntegrationSuite extends 
SparkFunSuite with LocalS
           }
         }
       }
-    })
+    }
+    sc.addSparkListener(listener)
 
     // Cache the RDD lazily
     if (persist) {
@@ -247,7 +253,7 @@ class BlockManagerDecommissionIntegrationSuite extends 
SparkFunSuite with LocalS
     // Decommission executor and ensure it is not relaunched by setting 
adjustTargetNumExecutors
     sched.decommissionExecutor(
       execToDecommission,
-      ExecutorDecommissionInfo("", None),
+      ExecutorDecommissionInfo("test msg 0", None),
       adjustTargetNumExecutors = true)
     val decomTime = new SystemClock().getTimeMillis()
 
@@ -343,5 +349,7 @@ class BlockManagerDecommissionIntegrationSuite extends 
SparkFunSuite with LocalS
     // should have same value like before
     assert(testRdd.count() === numParts)
     assert(accum.value === numParts)
+    import scala.language.reflectiveCalls
+    assert(listener.removeReasonValidated)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to