This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 578855d0d7a [SPARK-42602][CORE] Add reason argument to 
TaskScheduler.cancelTasks
578855d0d7a is described below

commit 578855d0d7a98fcfa28cefabee8c0a969a0999ec
Author: Bo Zhang <bo.zh...@databricks.com>
AuthorDate: Mon Feb 27 21:31:09 2023 -0800

    [SPARK-42602][CORE] Add reason argument to TaskScheduler.cancelTasks
    
    ### What changes were proposed in this pull request?
    
    This change adds a `reason: String` to the argument list of 
`TaskScheduler.cancelTasks`.
    
    ### Why are the changes needed?
    
    Currently all tasks killed by `TaskScheduler.cancelTasks` will have a 
`TaskEndReason` "TaskKilled (Stage cancelled)". To better differentiate reasons 
for stage cancellations (e.g. user-initiated or caused by task failures in the 
stage), we could add a reason argument in `TaskScheduler.cancelTasks`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Updated unit tests.
    
    Closes #40194 from bozhang2820/spark-42602.
    
    Authored-by: Bo Zhang <bo.zh...@databricks.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala  | 2 +-
 core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala | 2 +-
 .../main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala  | 7 +++++--
 .../test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala  | 4 ++--
 .../org/apache/spark/scheduler/ExternalClusterManagerSuite.scala   | 2 +-
 .../scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala  | 3 ++-
 6 files changed, 12 insertions(+), 8 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index f1ccaf05509..5b9a7d725b9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -2763,7 +2763,7 @@ private[spark] class DAGScheduler(
           val stage = stageIdToStage(stageId)
           if (runningStages.contains(stage)) {
             try { // cancelTasks will fail if a SchedulerBackend does not 
implement killTask
-              taskScheduler.cancelTasks(stageId, 
shouldInterruptTaskThread(job))
+              taskScheduler.cancelTasks(stageId, 
shouldInterruptTaskThread(job), reason)
               markStageAsFinished(stage, Some(reason))
             } catch {
               case e: UnsupportedOperationException =>
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index 5613966e8f5..38c7eb77c62 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -56,7 +56,7 @@ private[spark] trait TaskScheduler {
 
   // Kill all the tasks in a stage and fail the stage and all the jobs that 
depend on the stage.
   // Throw UnsupportedOperationException if the backend doesn't support kill 
tasks.
-  def cancelTasks(stageId: Int, interruptThread: Boolean): Unit
+  def cancelTasks(stageId: Int, interruptThread: Boolean, reason: String): Unit
 
   /**
    * Kills a task attempt.
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 91b0c983e4a..0e916bf375e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -296,10 +296,13 @@ private[spark] class TaskSchedulerImpl(
     new TaskSetManager(this, taskSet, maxTaskFailures, healthTrackerOpt, clock)
   }
 
-  override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = 
synchronized {
+  override def cancelTasks(
+      stageId: Int,
+      interruptThread: Boolean,
+      reason: String): Unit = synchronized {
     logInfo("Cancelling stage " + stageId)
     // Kill all running tasks for the stage.
-    killAllTaskAttempts(stageId, interruptThread, reason = "Stage cancelled")
+    killAllTaskAttempts(stageId, interruptThread, reason = "Stage cancelled: " 
+ reason)
     // Cancel all attempts for the stage.
     taskSetsByStageIdAndAttempt.get(stageId).foreach { attempts =>
       attempts.foreach { case (_, tsm) =>
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 17abf3aef4e..1e3f353a278 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -188,7 +188,7 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
       taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch)
       taskSets += taskSet
     }
-    override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = {
+    override def cancelTasks(stageId: Int, interruptThread: Boolean, reason: 
String): Unit = {
       cancelledStages += stageId
     }
     override def killTaskAttempt(
@@ -851,7 +851,7 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
       override def submitTasks(taskSet: TaskSet): Unit = {
         taskSets += taskSet
       }
-      override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = 
{
+      override def cancelTasks(stageId: Int, interruptThread: Boolean, reason: 
String): Unit = {
         throw new UnsupportedOperationException
       }
       override def killTaskAttempt(
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
index a30cb521bf4..9876600d493 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
@@ -82,7 +82,7 @@ private class DummyTaskScheduler extends TaskScheduler {
   override def start(): Unit = {}
   override def stop(exitCode: Int): Unit = {}
   override def submitTasks(taskSet: TaskSet): Unit = {}
-  override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = {}
+  override def cancelTasks(stageId: Int, interruptThread: Boolean, reason: 
String): Unit = {}
   override def killTaskAttempt(
     taskId: Long, interruptThread: Boolean, reason: String): Boolean = false
   override def killAllTaskAttempts(
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index af4cf8731b6..a484dae6f80 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -1648,6 +1648,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext
         // Since we only submit one stage attempt, the following call is 
sufficient to mark the
         // task as killed.
         taskScheduler.taskSetManagerForAttempt(0, 
0).get.runningTasksSet.remove(taskId)
+        assert(reason == "Stage cancelled: test message")
       }
     })
 
@@ -1661,7 +1662,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext
     val tsm = taskScheduler.taskSetManagerForAttempt(0, 0).get
     assert(2 === tsm.runningTasks)
 
-    taskScheduler.cancelTasks(0, false)
+    taskScheduler.cancelTasks(0, false, "test message")
     assert(0 === tsm.runningTasks)
     assert(tsm.isZombie)
     assert(taskScheduler.taskSetManagerForAttempt(0, 0).isEmpty)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to