wineternity commented on code in PR #38702: URL: https://github.com/apache/spark/pull/38702#discussion_r1034721151
########## core/src/main/scala/org/apache/spark/status/AppStatusListener.scala: ########## @@ -645,8 +645,11 @@ private[spark] class AppStatusListener( } override def onTaskEnd(event: SparkListenerTaskEnd): Unit = { - // TODO: can this really happen? - if (event.taskInfo == null) { + // TODO: can taskInfo null really happen? + // For resubmitted tasks caused by ExecutorLost, the SparkListenerTaskEnd is useless and + // will make activeTask in stage to be negative, this will cause stage not be removed in + // liveStages, and finally cause executor not removed in deadExecutors + if (event.taskInfo == null || event.reason == Resubmitted) { Review Comment: 1. First in TaskSetManager.scala, we handle the executorLost ``` scala override def executorLost(execId: String, host: String, reason: ExecutorLossReason): Unit = { // Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage, // and we are not using an external shuffle server which could serve the shuffle outputs. // The reason is the next stage wouldn't be able to fetch the data from this dead executor // so we would need to rerun these tasks on other executors. if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled && !isZombie) { for ((tid, info) <- taskInfos if info.executorId == execId) { val index = info.index // We may have a running task whose partition has been marked as successful, // this partition has another task completed in another stage attempt. // We treat it as a running task and will call handleFailedTask later. if (successful(index) && !info.running && !killedByOtherAttempt.contains(tid)) { successful(index) = false copiesRunning(index) -= 1 tasksSuccessful -= 1 addPendingTask(index) // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our // stage finishes when a total of tasks.size tasks finish. sched.dagScheduler.taskEnded( tasks(index), Resubmitted, null, Seq.empty, Array.empty, info) } } } ``` 2. the successful task is added to queue to re-execute in `addPendingTask` 3. `sched.dagScheduler.taskEnded` is called to tell DAGScheduler to handle this, this will post a `CompletionEvent` to eventProcessLoop 4. the function `handleTaskCompletion` in DATScheduler.scala will handle this message, as the reason `Resubmitted` is kind of a failure, it will call the function `handleResubmittedFailure`, which add the task partitionId to ShuffleMapStage's pendingPartitions. Thus this stage will wait this resubmitted task to finish again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org