This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 7e7bc940dcb [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen 7e7bc940dcb is described below commit 7e7bc940dcbbf918c7d571e1d27c7654ad387817 Author: yuanyimeng <yuanyim...@youzan.com> AuthorDate: Sun Dec 11 22:49:07 2022 -0600 [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen ### What changes were proposed in this pull request? Ignore the SparkListenerTaskEnd with Reason "Resubmitted" in AppStatusListener to avoid memory leak ### Why are the changes needed? For a long running spark thriftserver, LiveExecutor will be accumulated in the deadExecutors HashMap and cause message event queue processing slowly. For a every task, actually always sent out a `SparkListenerTaskStart` event and a `SparkListenerTaskEnd` event, they are always pairs. But in a executor lost situation, it send out event like following steps. a) There was a pair of task start and task end event which were fired for the task (let us call it Tr) b) When executor which ran Tr was lost, while stage is still running, a task end event with reason `Resubmitted` is fired for Tr. c) Subsequently, a new task start and task end will be fired for the retry of Tr. The processing of the `Resubmitted` task end event in AppStatusListener can lead to negative `LiveStage.activeTasks` since there's no corresponding `SparkListenerTaskStart` event for each of them. The negative activeTasks will make the stage always remains in the live stage list as it can never meet the condition activeTasks == 0. This in turn causes the dead executor to never be cleaned up if that live stage's submissionTime is less than the dead executor's removeTime( see isExecutor [...] Check [SPARK-41187](https://issues.apache.org/jira/browse/SPARK-41187) for evidences. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New UT Added Test in thriftserver env ### The way to reproduce I try to reproduce it in spark shell, but it is a little bit handy 1. start spark-shell , set spark.dynamicAllocation.maxExecutors=2 for convient ` bin/spark-shell --driver-java-options "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8006"` 2. run a job with shuffle `sc.parallelize(1 to 1000, 10).map { x => Thread.sleep(1000) ; (x % 3, x) }.reduceByKey((a, b) => a + b).collect()` 3. After some ShuffleMapTask finished, kill one or two executor to let tasks resubmitted 4. check by heap dump or debug or log Closes #38702 from wineternity/SPARK-41187. Authored-by: yuanyimeng <yuanyim...@youzan.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> --- .../apache/spark/status/AppStatusListener.scala | 28 ++++++---- .../spark/status/AppStatusListenerSuite.scala | 62 ++++++++++++++++++++++ 2 files changed, 80 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index ea028dfd11d..287bf2165c9 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -74,7 +74,7 @@ private[spark] class AppStatusListener( private val liveStages = new ConcurrentHashMap[(Int, Int), LiveStage]() private val liveJobs = new HashMap[Int, LiveJob]() private[spark] val liveExecutors = new HashMap[String, LiveExecutor]() - private val deadExecutors = new HashMap[String, LiveExecutor]() + private[spark] val deadExecutors = new HashMap[String, LiveExecutor]() private val liveTasks = new HashMap[Long, LiveTask]() private val liveRDDs = new HashMap[Int, LiveRDD]() private val pools = new HashMap[String, SchedulerPool]() @@ -674,22 +674,30 @@ private[spark] class AppStatusListener( delta }.orNull - val (completedDelta, failedDelta, killedDelta) = event.reason match { + // SPARK-41187: For `SparkListenerTaskEnd` with `Resubmitted` reason, which is raised by + // executor lost, it can lead to negative `LiveStage.activeTasks` since there's no + // corresponding `SparkListenerTaskStart` event for each of them. The negative activeTasks + // will make the stage always remains in the live stage list as it can never meet the + // condition activeTasks == 0. This in turn causes the dead executor to never be retained + // if that live stage's submissionTime is less than the dead executor's removeTime. + val (completedDelta, failedDelta, killedDelta, activeDelta) = event.reason match { case Success => - (1, 0, 0) + (1, 0, 0, 1) case _: TaskKilled => - (0, 0, 1) + (0, 0, 1, 1) case _: TaskCommitDenied => - (0, 0, 1) + (0, 0, 1, 1) + case _ @ Resubmitted => + (0, 1, 0, 0) case _ => - (0, 1, 0) + (0, 1, 0, 1) } Option(liveStages.get((event.stageId, event.stageAttemptId))).foreach { stage => if (metricsDelta != null) { stage.metrics = LiveEntityHelpers.addMetrics(stage.metrics, metricsDelta) } - stage.activeTasks -= 1 + stage.activeTasks -= activeDelta stage.completedTasks += completedDelta if (completedDelta > 0) { stage.completedIndices.add(event.taskInfo.index) @@ -699,7 +707,7 @@ private[spark] class AppStatusListener( if (killedDelta > 0) { stage.killedSummary = killedTasksSummary(event.reason, stage.killedSummary) } - stage.activeTasksPerExecutor(event.taskInfo.executorId) -= 1 + stage.activeTasksPerExecutor(event.taskInfo.executorId) -= activeDelta stage.peakExecutorMetrics.compareAndUpdatePeakValues(event.taskExecutorMetrics) stage.executorSummary(event.taskInfo.executorId).peakExecutorMetrics @@ -718,7 +726,7 @@ private[spark] class AppStatusListener( // Store both stage ID and task index in a single long variable for tracking at job level. val taskIndex = (event.stageId.toLong << Integer.SIZE) | event.taskInfo.index stage.jobs.foreach { job => - job.activeTasks -= 1 + job.activeTasks -= activeDelta job.completedTasks += completedDelta if (completedDelta > 0) { job.completedIndices.add(taskIndex) @@ -774,7 +782,7 @@ private[spark] class AppStatusListener( } liveExecutors.get(event.taskInfo.executorId).foreach { exec => - exec.activeTasks -= 1 + exec.activeTasks -= activeDelta exec.completedTasks += completedDelta exec.failedTasks += failedDelta exec.totalDuration += event.taskInfo.duration diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index 24a8a6844f1..5d0c25aa86a 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -1849,6 +1849,68 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter checkInfoPopulated(listener, logUrlMap, processId) } + test("SPARK-41187: Stage should be removed from liveStages to avoid deadExecutors accumulated") { + + val listener = new AppStatusListener(store, conf, true) + + listener.onExecutorAdded(createExecutorAddedEvent(1)) + listener.onExecutorAdded(createExecutorAddedEvent(2)) + val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details", + resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) + listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null)) + + time += 1 + stage.submissionTime = Some(time) + listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties())) + + val tasks = createTasks(2, Array("1", "2")) + tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task)) + } + + time += 1 + tasks(0).markFinished(TaskState.FINISHED, time) + listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType", + Success, tasks(0), new ExecutorMetrics, null)) + + // executor lost, success task will be resubmitted + time += 1 + listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType", + Resubmitted, tasks(0), new ExecutorMetrics, null)) + + // executor lost, running task will be failed and rerun + time += 1 + tasks(1).markFinished(TaskState.FAILED, time) + listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType", + ExecutorLostFailure("1", true, Some("Lost executor")), tasks(1), new ExecutorMetrics, + null)) + + tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task)) + } + + time += 1 + tasks(0).markFinished(TaskState.FINISHED, time) + listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType", + Success, tasks(0), new ExecutorMetrics, null)) + + time += 1 + tasks(1).markFinished(TaskState.FINISHED, time) + listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType", + Success, tasks(1), new ExecutorMetrics, null)) + + listener.onStageCompleted(SparkListenerStageCompleted(stage)) + time += 1 + listener.onJobEnd(SparkListenerJobEnd(1, time, JobSucceeded )) + + time += 1 + listener.onExecutorRemoved(SparkListenerExecutorRemoved(time, "1", "Test")) + time += 1 + listener.onExecutorRemoved(SparkListenerExecutorRemoved(time, "2", "Test")) + + assert(listener.deadExecutors.size === 0) + } + private def key(stage: StageInfo): Array[Int] = Array(stage.stageId, stage.attemptNumber) private def check[T: ClassTag](key: Any)(fn: T => Unit): Unit = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org