Ngone51 commented on code in PR #43954: URL: https://github.com/apache/spark/pull/43954#discussion_r1408994168
########## core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala: ########## @@ -296,18 +296,31 @@ private[spark] class TaskSchedulerImpl( new TaskSetManager(this, taskSet, maxTaskFailures, healthTrackerOpt, clock) } + // Kill all the tasks in all the stage attempts of the same stage Id. Note stage attempts won't + // be aborted but will be marked as zombie. The stage attempt will be finished and cleaned up + // once all the tasks has been finished. The stage attempt could be aborted after the call of + // `cancelTasks` if required. override def cancelTasks( stageId: Int, interruptThread: Boolean, reason: String): Unit = synchronized { logInfo("Cancelling stage " + stageId) // Kill all running tasks for the stage. - killAllTaskAttempts(stageId, interruptThread, reason = "Stage cancelled: " + reason) - // Cancel all attempts for the stage. + logInfo(s"Killing all running tasks in stage $stageId: $reason") taskSetsByStageIdAndAttempt.get(stageId).foreach { attempts => attempts.foreach { case (_, tsm) => - tsm.abort("Stage %s cancelled".format(stageId)) - logInfo("Stage %d was cancelled".format(stageId)) + // There are two possible cases here: + // 1. The task set manager has been created and some tasks have been scheduled. + // In this case, send a kill signal to the executors to kill the task. + // 2. The task set manager has been created but no tasks have been scheduled. In this case, + // simply continue. + tsm.runningTasksSet.foreach { tid => + taskIdToExecutorId.get(tid).foreach { execId => + backend.killTask(tid, execId, interruptThread, s"Stage cancelled: $reason") + } + } + tsm.suspend() Review Comment: No, I don't. Killing tasks may take some time so I don't expect an immediate tsm finishes. `suspend()` intends to call `maybeFinishTaskSet()` for safe in case the tsm can't finish normally in the end. Although, I don't see it's an issue in prod (tms should finishes normally when all tasks finish) but I did see it fails a test (`cancelTasks shall kill all the running tasks`) without `maybeFinishTaskSet()`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org