This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository

The following commit(s) were added to refs/heads/master by this push:
     new 7e7bc940dcb [SPARK-41187][CORE] LiveExecutor MemoryLeak in 
AppStatusListener when ExecutorLost happen
7e7bc940dcb is described below

commit 7e7bc940dcbbf918c7d571e1d27c7654ad387817
Author: yuanyimeng <>
AuthorDate: Sun Dec 11 22:49:07 2022 -0600

    [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when 
ExecutorLost happen
    ### What changes were proposed in this pull request?
    Ignore the SparkListenerTaskEnd with Reason "Resubmitted" in 
AppStatusListener to avoid memory leak
    ### Why are the changes needed?
    For a long running spark thriftserver,  LiveExecutor will be accumulated in 
the deadExecutors HashMap and cause message event queue processing slowly.
    For a every task, actually always sent out a `SparkListenerTaskStart` event 
and a `SparkListenerTaskEnd` event, they are always pairs. But in a executor 
lost situation, it send out event like following steps.
    a) There was a pair of task start and task end event which were fired for 
the task (let us call it Tr)
    b) When executor which ran Tr was lost, while stage is still running, a 
task end event with reason `Resubmitted`  is fired for Tr.
    c) Subsequently, a new task start and task end will be fired for the retry 
of Tr.
    The processing of the `Resubmitted` task end event in AppStatusListener can 
lead to negative `LiveStage.activeTasks` since there's no corresponding 
`SparkListenerTaskStart` event for each of them. The negative activeTasks will 
make the stage always remains in the live stage list as it can never meet the 
condition activeTasks == 0. This in turn causes the dead executor to never be 
cleaned up if that live stage's submissionTime is less than the dead executor's 
removeTime( see isExecutor [...]
    Check  [SPARK-41187]( for 
    ### Does this PR introduce _any_ user-facing change?
    ### How was this patch tested?
    New UT Added
    Test in thriftserver env
    ### The way to reproduce
    I try to reproduce it in spark shell, but it is a little bit handy
    1.  start spark-shell , set spark.dynamicAllocation.maxExecutors=2 for 
    ` bin/spark-shell  --driver-java-options 
    2. run a job with shuffle
     `sc.parallelize(1 to 1000, 10).map { x => Thread.sleep(1000) ; (x % 3, x) 
}.reduceByKey((a, b) => a + b).collect()`
    3. After some ShuffleMapTask finished, kill one or two executor to let 
tasks resubmitted
    4. check by heap dump or debug or log
    Closes #38702 from wineternity/SPARK-41187.
    Authored-by: yuanyimeng <>
    Signed-off-by: Mridul Muralidharan <mridul<at>>
 .../apache/spark/status/AppStatusListener.scala    | 28 ++++++----
 .../spark/status/AppStatusListenerSuite.scala      | 62 ++++++++++++++++++++++
 2 files changed, 80 insertions(+), 10 deletions(-)

diff --git 
index ea028dfd11d..287bf2165c9 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -74,7 +74,7 @@ private[spark] class AppStatusListener(
   private val liveStages = new ConcurrentHashMap[(Int, Int), LiveStage]()
   private val liveJobs = new HashMap[Int, LiveJob]()
   private[spark] val liveExecutors = new HashMap[String, LiveExecutor]()
-  private val deadExecutors = new HashMap[String, LiveExecutor]()
+  private[spark] val deadExecutors = new HashMap[String, LiveExecutor]()
   private val liveTasks = new HashMap[Long, LiveTask]()
   private val liveRDDs = new HashMap[Int, LiveRDD]()
   private val pools = new HashMap[String, SchedulerPool]()
@@ -674,22 +674,30 @@ private[spark] class AppStatusListener(
-    val (completedDelta, failedDelta, killedDelta) = event.reason match {
+    // SPARK-41187: For `SparkListenerTaskEnd` with `Resubmitted` reason, 
which is raised by
+    // executor lost, it can lead to negative `LiveStage.activeTasks` since 
there's no
+    // corresponding `SparkListenerTaskStart` event for each of them. The 
negative activeTasks
+    // will make the stage always remains in the live stage list as it can 
never meet the
+    // condition activeTasks == 0. This in turn causes the dead executor to 
never be retained
+    // if that live stage's submissionTime is less than the dead executor's 
+    val (completedDelta, failedDelta, killedDelta, activeDelta) = event.reason 
match {
       case Success =>
-        (1, 0, 0)
+        (1, 0, 0, 1)
       case _: TaskKilled =>
-        (0, 0, 1)
+        (0, 0, 1, 1)
       case _: TaskCommitDenied =>
-        (0, 0, 1)
+        (0, 0, 1, 1)
+      case _ @ Resubmitted =>
+        (0, 1, 0, 0)
       case _ =>
-        (0, 1, 0)
+        (0, 1, 0, 1)
     Option(liveStages.get((event.stageId, event.stageAttemptId))).foreach { 
stage =>
       if (metricsDelta != null) {
         stage.metrics = LiveEntityHelpers.addMetrics(stage.metrics, 
-      stage.activeTasks -= 1
+      stage.activeTasks -= activeDelta
       stage.completedTasks += completedDelta
       if (completedDelta > 0) {
@@ -699,7 +707,7 @@ private[spark] class AppStatusListener(
       if (killedDelta > 0) {
         stage.killedSummary = killedTasksSummary(event.reason, 
-      stage.activeTasksPerExecutor(event.taskInfo.executorId) -= 1
+      stage.activeTasksPerExecutor(event.taskInfo.executorId) -= activeDelta
@@ -718,7 +726,7 @@ private[spark] class AppStatusListener(
       // Store both stage ID and task index in a single long variable for 
tracking at job level.
       val taskIndex = (event.stageId.toLong << Integer.SIZE) | 
event.taskInfo.index { job =>
-        job.activeTasks -= 1
+        job.activeTasks -= activeDelta
         job.completedTasks += completedDelta
         if (completedDelta > 0) {
@@ -774,7 +782,7 @@ private[spark] class AppStatusListener(
     liveExecutors.get(event.taskInfo.executorId).foreach { exec =>
-      exec.activeTasks -= 1
+      exec.activeTasks -= activeDelta
       exec.completedTasks += completedDelta
       exec.failedTasks += failedDelta
       exec.totalDuration += event.taskInfo.duration
diff --git 
index 24a8a6844f1..5d0c25aa86a 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -1849,6 +1849,68 @@ abstract class AppStatusListenerSuite extends 
SparkFunSuite with BeforeAndAfter
     checkInfoPopulated(listener, logUrlMap, processId)
+  test("SPARK-41187: Stage should be removed from liveStages to avoid 
deadExecutors accumulated") {
+    val listener = new AppStatusListener(store, conf, true)
+    listener.onExecutorAdded(createExecutorAddedEvent(1))
+    listener.onExecutorAdded(createExecutorAddedEvent(2))
+    val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details",
+      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+    listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
+    time += 1
+    stage.submissionTime = Some(time)
+    listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new 
+    val tasks = createTasks(2, Array("1", "2"))
+    tasks.foreach { task =>
+      listener.onTaskStart(SparkListenerTaskStart(stage.stageId, 
stage.attemptNumber, task))
+    }
+    time += 1
+    tasks(0).markFinished(TaskState.FINISHED, time)
+    listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, 
stage.attemptNumber, "taskType",
+      Success, tasks(0), new ExecutorMetrics, null))
+    // executor lost, success task will be resubmitted
+    time += 1
+    listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, 
stage.attemptNumber, "taskType",
+      Resubmitted, tasks(0), new ExecutorMetrics, null))
+    // executor lost, running task will be failed and rerun
+    time += 1
+    tasks(1).markFinished(TaskState.FAILED, time)
+    listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, 
stage.attemptNumber, "taskType",
+      ExecutorLostFailure("1", true, Some("Lost executor")), tasks(1), new 
+      null))
+    tasks.foreach { task =>
+      listener.onTaskStart(SparkListenerTaskStart(stage.stageId, 
stage.attemptNumber, task))
+    }
+    time += 1
+    tasks(0).markFinished(TaskState.FINISHED, time)
+    listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, 
stage.attemptNumber, "taskType",
+      Success, tasks(0), new ExecutorMetrics, null))
+    time += 1
+    tasks(1).markFinished(TaskState.FINISHED, time)
+    listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, 
stage.attemptNumber, "taskType",
+      Success, tasks(1), new ExecutorMetrics, null))
+    listener.onStageCompleted(SparkListenerStageCompleted(stage))
+    time += 1
+    listener.onJobEnd(SparkListenerJobEnd(1, time, JobSucceeded ))
+    time += 1
+    listener.onExecutorRemoved(SparkListenerExecutorRemoved(time, "1", "Test"))
+    time += 1
+    listener.onExecutorRemoved(SparkListenerExecutorRemoved(time, "2", "Test"))
+    assert(listener.deadExecutors.size === 0)
+  }
   private def key(stage: StageInfo): Array[Int] = Array(stage.stageId, 
   private def check[T: ClassTag](key: Any)(fn: T => Unit): Unit = {

To unsubscribe, e-mail:
For additional commands, e-mail:

Reply via email to