beliefer commented on code in PR #43954: URL: https://github.com/apache/spark/pull/43954#discussion_r1407378513
########## core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala: ########## @@ -54,7 +54,7 @@ private[spark] trait TaskScheduler { // Submit a sequence of tasks to run. def submitTasks(taskSet: TaskSet): Unit - // Kill all the tasks in a stage and fail the stage and all the jobs that depend on the stage. + // Kill all the tasks in all the stage attempts of the same stage Id Review Comment: Please add comment about `mark all the stage attempts as zombie`. ########## core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala: ########## @@ -66,10 +66,6 @@ private[spark] trait TaskScheduler { */ def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Boolean - // Kill all the running task attempts in a stage. - // Throw UnsupportedOperationException if the backend doesn't support kill tasks. - def killAllTaskAttempts(stageId: Int, interruptThread: Boolean, reason: String): Unit Review Comment: Does this is a break change? ########## core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala: ########## @@ -1671,37 +1671,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext assert(taskScheduler.taskSetManagerForAttempt(0, 0).isEmpty) } - test("killAllTaskAttempts shall kill all the running tasks and not fail the stage") { - val taskScheduler = setupScheduler() - - taskScheduler.initialize(new FakeSchedulerBackend { - override def killTask( - taskId: Long, - executorId: String, - interruptThread: Boolean, - reason: String): Unit = { - // Since we only submit one stage attempt, the following call is sufficient to mark the - // task as killed. - taskScheduler.taskSetManagerForAttempt(0, 0).get.runningTasksSet.remove(taskId) - } - }) - - val attempt1 = FakeTask.createTaskSet(10) - taskScheduler.submitTasks(attempt1) - - val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 1), - new WorkerOffer("executor1", "host1", 1)) - val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten - assert(2 === taskDescriptions.length) - val tsm = taskScheduler.taskSetManagerForAttempt(0, 0).get - assert(2 === tsm.runningTasks) - - taskScheduler.killAllTaskAttempts(0, false, "test") Review Comment: Shall we update the test case with `taskScheduler.cancelTasks`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org