Repository: spark
Updated Branches:
  refs/heads/master 94de5609b -> c36537fcf


[SPARK-25773][CORE] Cancel zombie tasks in a result stage when the job finishes

## What changes were proposed in this pull request?

When a job finishes, there may be some zombie tasks still running due to stage 
retry. Since a result stage will never be used by other jobs, running these 
tasks are just wasting the cluster resource. This PR just asks TaskScheduler to 
cancel the running tasks of a result stage when it's already finished. Credits 
go to srinathshankar who suggested this idea to me.

This PR also fixes two minor issues while I'm touching DAGScheduler:
- Invalid spark.job.interruptOnCancel should not crash DAGScheduler.
- Non fatal errors should not crash DAGScheduler.

## How was this patch tested?

The new unit tests.

Closes #22771 from zsxwing/SPARK-25773.

Lead-authored-by: Shixiong Zhu <zsxw...@gmail.com>
Co-authored-by: Shixiong Zhu <shixi...@databricks.com>
Signed-off-by: Shixiong Zhu <zsxw...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c36537fc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c36537fc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c36537fc

Branch: refs/heads/master
Commit: c36537fcfddc1eae1581b1b84d9d4384c5985c26
Parents: 94de560
Author: Shixiong Zhu <zsxw...@gmail.com>
Authored: Tue Oct 30 10:48:04 2018 -0700
Committer: Shixiong Zhu <zsxw...@gmail.com>
Committed: Tue Oct 30 10:48:04 2018 -0700

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 48 +++++++++++++++---
 .../org/apache/spark/SparkContextSuite.scala    | 53 +++++++++++++++++++-
 .../spark/scheduler/DAGSchedulerSuite.scala     | 51 +++++++++++++------
 3 files changed, 129 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c36537fc/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 34b1160..06966e7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1296,6 +1296,27 @@ private[spark] class DAGScheduler(
   }
 
   /**
+   * Check [[SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL]] in job properties to 
see if we should
+   * interrupt running tasks. Returns `false` if the property value is not a 
boolean value
+   */
+  private def shouldInterruptTaskThread(job: ActiveJob): Boolean = {
+    if (job.properties == null) {
+      false
+    } else {
+      val shouldInterruptThread =
+        job.properties.getProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, 
"false")
+      try {
+        shouldInterruptThread.toBoolean
+      } catch {
+        case e: IllegalArgumentException =>
+          logWarning(s"${SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL} in Job 
${job.jobId} " +
+            s"is invalid: $shouldInterruptThread. Using 'false' instead", e)
+          false
+      }
+    }
+  }
+
+  /**
    * Responds to a task finishing. This is called inside the event loop so it 
assumes that it can
    * modify the scheduler's internal state. Use taskEnded() to post a task end 
event from outside.
    */
@@ -1364,6 +1385,21 @@ private[spark] class DAGScheduler(
                   if (job.numFinished == job.numPartitions) {
                     markStageAsFinished(resultStage)
                     cleanupStateForJobAndIndependentStages(job)
+                    try {
+                      // killAllTaskAttempts will fail if a SchedulerBackend 
does not implement
+                      // killTask.
+                      logInfo(s"Job ${job.jobId} is finished. Cancelling 
potential speculative " +
+                        "or zombie tasks for this job")
+                      // ResultStage is only used by this job. It's safe to 
kill speculative or
+                      // zombie tasks in this stage.
+                      taskScheduler.killAllTaskAttempts(
+                        stageId,
+                        shouldInterruptTaskThread(job),
+                        reason = "Stage finished")
+                    } catch {
+                      case e: UnsupportedOperationException =>
+                        logWarning(s"Could not cancel tasks for stage 
$stageId", e)
+                    }
                     listenerBus.post(
                       SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), 
JobSucceeded))
                   }
@@ -1373,7 +1409,7 @@ private[spark] class DAGScheduler(
                   try {
                     job.listener.taskSucceeded(rt.outputId, event.result)
                   } catch {
-                    case e: Exception =>
+                    case e: Throwable if !Utils.isFatalError(e) =>
                       // TODO: Perhaps we want to mark the resultStage as 
failed?
                       job.listener.jobFailed(new 
SparkDriverExecutionException(e))
                   }
@@ -1890,10 +1926,6 @@ private[spark] class DAGScheduler(
     val error = new SparkException(failureReason, exception.getOrElse(null))
     var ableToCancelStages = true
 
-    val shouldInterruptThread =
-      if (job.properties == null) false
-      else 
job.properties.getProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, 
"false").toBoolean
-
     // Cancel all independent, running stages.
     val stages = jobIdToStageIds(job.jobId)
     if (stages.isEmpty) {
@@ -1913,12 +1945,12 @@ private[spark] class DAGScheduler(
           val stage = stageIdToStage(stageId)
           if (runningStages.contains(stage)) {
             try { // cancelTasks will fail if a SchedulerBackend does not 
implement killTask
-              taskScheduler.cancelTasks(stageId, shouldInterruptThread)
+              taskScheduler.cancelTasks(stageId, 
shouldInterruptTaskThread(job))
               markStageAsFinished(stage, Some(failureReason))
             } catch {
               case e: UnsupportedOperationException =>
-                logInfo(s"Could not cancel tasks for stage $stageId", e)
-              ableToCancelStages = false
+                logWarning(s"Could not cancel tasks for stage $stageId", e)
+                ableToCancelStages = false
             }
           }
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/c36537fc/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index e1666a3..79192f3 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -33,7 +33,9 @@ import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat 
=> NewTextInputFor
 import org.scalatest.Matchers._
 import org.scalatest.concurrent.Eventually
 
-import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart, 
SparkListenerTaskEnd, SparkListenerTaskStart}
+import 
org.apache.spark.internal.config.EXECUTOR_HEARTBEAT_DROP_ZERO_ACCUMULATOR_UPDATES
+import org.apache.spark.scheduler.{SparkListener, 
SparkListenerExecutorMetricsUpdate, SparkListenerJobStart, 
SparkListenerTaskEnd, SparkListenerTaskStart}
+import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.util.{ThreadUtils, Utils}
 
 
@@ -672,6 +674,55 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
       assert(sc.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum == 
0)
     }
   }
+
+  test("cancel zombie tasks in a result stage when the job finishes") {
+    val conf = new SparkConf()
+      .setMaster("local-cluster[1,2,1024]")
+      .setAppName("test-cluster")
+      .set("spark.ui.enabled", "false")
+      // Disable this so that if a task is running, we can make sure the 
executor will always send
+      // task metrics via heartbeat to driver.
+      .set(EXECUTOR_HEARTBEAT_DROP_ZERO_ACCUMULATOR_UPDATES.key, "false")
+      // Set a short heartbeat interval to send 
SparkListenerExecutorMetricsUpdate fast
+      .set("spark.executor.heartbeatInterval", "1s")
+    sc = new SparkContext(conf)
+    sc.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "true")
+    @volatile var runningTaskIds: Seq[Long] = null
+    val listener = new SparkListener {
+      override def onExecutorMetricsUpdate(
+          executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = {
+        if (executorMetricsUpdate.execId != SparkContext.DRIVER_IDENTIFIER) {
+          runningTaskIds = executorMetricsUpdate.accumUpdates.map(_._1)
+        }
+      }
+    }
+    sc.addSparkListener(listener)
+    sc.range(0, 2).groupBy((x: Long) => x % 2, 2).map { case (x, _) =>
+      val context = org.apache.spark.TaskContext.get()
+      if (context.stageAttemptNumber == 0) {
+        if (context.partitionId == 0) {
+          // Make the first task in the first stage attempt fail.
+          throw new 
FetchFailedException(SparkEnv.get.blockManager.blockManagerId, 0, 0, 0,
+            new java.io.IOException("fake"))
+        } else {
+          // Make the second task in the first stage attempt sleep to generate 
a zombie task
+          Thread.sleep(60000)
+        }
+      } else {
+        // Make the second stage attempt successful.
+      }
+      x
+    }.collect()
+    sc.listenerBus.waitUntilEmpty(10000)
+    // As executors will send the metrics of running tasks via heartbeat, we 
can use this to check
+    // whether there is any running task.
+    eventually(timeout(10.seconds)) {
+      // Make sure runningTaskIds has been set
+      assert(runningTaskIds != null)
+      // Verify there is no running task.
+      assert(runningTaskIds.isEmpty)
+    }
+  }
 }
 
 object SparkContextSuite {

http://git-wip-us.apache.org/repos/asf/spark/blob/c36537fc/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index b41d2ac..5f4ffa1 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -1901,27 +1901,50 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
   }
 
   /**
-   * The job will be failed on first task throwing a 
DAGSchedulerSuiteDummyException.
+   * The job will be failed on first task throwing an error.
    *  Any subsequent task WILL throw a legitimate 
java.lang.UnsupportedOperationException.
    *  If multiple tasks, there exists a race condition between the 
SparkDriverExecutionExceptions
    *  and their differing causes as to which will represent result for job...
    */
   test("misbehaved resultHandler should not crash DAGScheduler and 
SparkContext") {
-    val e = intercept[SparkDriverExecutionException] {
-      // Number of parallelized partitions implies number of tasks of job
-      val rdd = sc.parallelize(1 to 10, 2)
-      sc.runJob[Int, Int](
-        rdd,
-        (context: TaskContext, iter: Iterator[Int]) => iter.size,
-        // For a robust test assertion, limit number of job tasks to 1; that 
is,
-        // if multiple RDD partitions, use id of any one partition, say, first 
partition id=0
-        Seq(0),
-        (part: Int, result: Int) => throw new DAGSchedulerSuiteDummyException)
+    failAfter(1.minute) { // If DAGScheduler crashes, the following test will 
hang forever
+      for (error <- Seq(
+        new DAGSchedulerSuiteDummyException,
+        new AssertionError, // E.g., assert(foo == bar) fails
+        new NotImplementedError // E.g., call a method with `???` 
implementation.
+      )) {
+        val e = intercept[SparkDriverExecutionException] {
+          // Number of parallelized partitions implies number of tasks of job
+          val rdd = sc.parallelize(1 to 10, 2)
+          sc.runJob[Int, Int](
+            rdd,
+            (context: TaskContext, iter: Iterator[Int]) => iter.size,
+            // For a robust test assertion, limit number of job tasks to 1; 
that is,
+            // if multiple RDD partitions, use id of any one partition, say, 
first partition id=0
+            Seq(0),
+            (part: Int, result: Int) => throw error)
+        }
+        assert(e.getCause eq error)
+
+        // Make sure we can still run commands on our SparkContext
+        assert(sc.parallelize(1 to 10, 2).count() === 10)
+      }
     }
-    assert(e.getCause.isInstanceOf[DAGSchedulerSuiteDummyException])
+  }
 
-    // Make sure we can still run commands on our SparkContext
-    assert(sc.parallelize(1 to 10, 2).count() === 10)
+  test(s"invalid ${SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL} should not 
crash DAGScheduler") {
+    sc.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "invalid")
+    try {
+      intercept[SparkException] {
+        sc.parallelize(1 to 1, 1).foreach { _ =>
+          throw new DAGSchedulerSuiteDummyException
+        }
+      }
+      // Verify the above job didn't crash DAGScheduler by running a simple job
+      assert(sc.parallelize(1 to 10, 2).count() === 10)
+    } finally {
+      sc.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, null)
+    }
   }
 
   test("getPartitions exceptions should not crash DAGScheduler and 
SparkContext (SPARK-8606)") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to