beliefer commented on code in PR #43954:
URL: https://github.com/apache/spark/pull/43954#discussion_r1407378513


##########
core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala:
##########
@@ -54,7 +54,7 @@ private[spark] trait TaskScheduler {
   // Submit a sequence of tasks to run.
   def submitTasks(taskSet: TaskSet): Unit
 
-  // Kill all the tasks in a stage and fail the stage and all the jobs that 
depend on the stage.
+  // Kill all the tasks in all the stage attempts of the same stage Id

Review Comment:
   Please add comment about `mark all the stage attempts as zombie`.



##########
core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala:
##########
@@ -66,10 +66,6 @@ private[spark] trait TaskScheduler {
    */
   def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): 
Boolean
 
-  // Kill all the running task attempts in a stage.
-  // Throw UnsupportedOperationException if the backend doesn't support kill 
tasks.
-  def killAllTaskAttempts(stageId: Int, interruptThread: Boolean, reason: 
String): Unit

Review Comment:
   Does this is a break change?



##########
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala:
##########
@@ -1671,37 +1671,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext
     assert(taskScheduler.taskSetManagerForAttempt(0, 0).isEmpty)
   }
 
-  test("killAllTaskAttempts shall kill all the running tasks and not fail the 
stage") {
-    val taskScheduler = setupScheduler()
-
-    taskScheduler.initialize(new FakeSchedulerBackend {
-      override def killTask(
-          taskId: Long,
-          executorId: String,
-          interruptThread: Boolean,
-          reason: String): Unit = {
-        // Since we only submit one stage attempt, the following call is 
sufficient to mark the
-        // task as killed.
-        taskScheduler.taskSetManagerForAttempt(0, 
0).get.runningTasksSet.remove(taskId)
-      }
-    })
-
-    val attempt1 = FakeTask.createTaskSet(10)
-    taskScheduler.submitTasks(attempt1)
-
-    val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 1),
-      new WorkerOffer("executor1", "host1", 1))
-    val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
-    assert(2 === taskDescriptions.length)
-    val tsm = taskScheduler.taskSetManagerForAttempt(0, 0).get
-    assert(2 === tsm.runningTasks)
-
-    taskScheduler.killAllTaskAttempts(0, false, "test")

Review Comment:
   Shall we update the test case with `taskScheduler.cancelTasks`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to