Repository: spark
Updated Branches:
  refs/heads/branch-1.0 5869f8bf1 -> 92b012502


[SPARK-1749] Job cancellation when SchedulerBackend does not implement killTask

This is a fixed up version of #686 (cc @markhamstra @pwendell).  The last 
commit (the only one I authored) reflects the changes I made from Mark's 
original patch.

Author: Mark Hamstra <markhams...@gmail.com>
Author: Kay Ousterhout <kayousterh...@gmail.com>

Closes #1219 from kayousterhout/mark-SPARK-1749 and squashes the following 
commits:

42dfa7e [Kay Ousterhout] Got rid of terrible double-negative name
80b3205 [Kay Ousterhout] Don't notify listeners of job failure if it wasn't 
successfully cancelled.
d156d33 [Mark Hamstra] Do nothing in no-kill submitTasks
9312baa [Mark Hamstra] code review update
cc353c8 [Mark Hamstra] scalastyle
e61f7f8 [Mark Hamstra] Catch UnsupportedOperationException when DAGScheduler 
tries to cancel a job on a SchedulerBackend that does not implement killTask
(cherry picked from commit b88a59a66845b8935b22f06fc96d16841ed20c94)

Signed-off-by: Patrick Wendell <pwend...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/92b01250
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/92b01250
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/92b01250

Branch: refs/heads/branch-1.0
Commit: 92b01250246ef1211d6ea15036ebc705ccabe7f8
Parents: 5869f8b
Author: Mark Hamstra <markhams...@gmail.com>
Authored: Wed Jun 25 20:57:48 2014 -0700
Committer: Patrick Wendell <pwend...@gmail.com>
Committed: Wed Jun 25 20:57:58 2014 -0700

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 30 ++++++++----
 .../spark/scheduler/DAGSchedulerSuite.scala     | 48 ++++++++++++++++++++
 2 files changed, 69 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/92b01250/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index ff411e2..81a8743 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1031,7 +1031,7 @@ class DAGScheduler(
   private def failJobAndIndependentStages(job: ActiveJob, failureReason: 
String,
       resultStage: Option[Stage]) {
     val error = new SparkException(failureReason)
-    job.listener.jobFailed(error)
+    var ableToCancelStages = true
 
     val shouldInterruptThread =
       if (job.properties == null) false
@@ -1055,18 +1055,26 @@ class DAGScheduler(
           // This is the only job that uses this stage, so fail the stage if 
it is running.
           val stage = stageIdToStage(stageId)
           if (runningStages.contains(stage)) {
-            taskScheduler.cancelTasks(stageId, shouldInterruptThread)
-            val stageInfo = stageToInfos(stage)
-            stageInfo.stageFailed(failureReason)
-            listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
+            try { // cancelTasks will fail if a SchedulerBackend does not 
implement killTask
+              taskScheduler.cancelTasks(stageId, shouldInterruptThread)
+              val stageInfo = stageToInfos(stage)
+              stageInfo.stageFailed(failureReason)
+              
listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
+            } catch {
+              case e: UnsupportedOperationException =>
+                logInfo(s"Could not cancel tasks for stage $stageId", e)
+              ableToCancelStages = false
+            }
           }
         }
       }
     }
 
-    cleanupStateForJobAndIndependentStages(job, resultStage)
-
-    listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error)))
+    if (ableToCancelStages) {
+      job.listener.jobFailed(error)
+      cleanupStateForJobAndIndependentStages(job, resultStage)
+      listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error)))
+    }
   }
 
   /**
@@ -1148,7 +1156,11 @@ private[scheduler] class 
DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler)
       case x: Exception =>
         logError("eventProcesserActor failed due to the error %s; shutting 
down SparkContext"
           .format(x.getMessage))
-        dagScheduler.doCancelAllJobs()
+        try {
+          dagScheduler.doCancelAllJobs()
+        } catch {
+          case t: Throwable => logError("DAGScheduler failed to cancel all 
jobs.", t)
+        }
         dagScheduler.sc.stop()
         Stop
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/92b01250/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 7e901f8..23a350b 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -114,6 +114,7 @@ class DAGSchedulerSuite extends 
TestKit(ActorSystem("DAGSchedulerSuite")) with F
     sc = new SparkContext("local", "DAGSchedulerSuite")
     sparkListener.successfulStages.clear()
     sparkListener.failedStages.clear()
+    failure = null
     sc.addSparkListener(sparkListener)
     taskSets.clear()
     cancelledStages.clear()
@@ -299,6 +300,53 @@ class DAGSchedulerSuite extends 
TestKit(ActorSystem("DAGSchedulerSuite")) with F
     assertDataStructuresEmpty
   }
 
+  test("job cancellation no-kill backend") {
+    // make sure that the DAGScheduler doesn't crash when the TaskScheduler
+    // doesn't implement killTask()
+    val noKillTaskScheduler = new TaskScheduler() {
+      override def rootPool: Pool = null
+      override def schedulingMode: SchedulingMode = SchedulingMode.NONE
+      override def start() = {}
+      override def stop() = {}
+      override def submitTasks(taskSet: TaskSet) = {
+        taskSets += taskSet
+      }
+      override def cancelTasks(stageId: Int, interruptThread: Boolean) {
+        throw new UnsupportedOperationException
+      }
+      override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
+      override def defaultParallelism() = 2
+    }
+    val noKillScheduler = new DAGScheduler(
+      sc,
+      noKillTaskScheduler,
+      sc.listenerBus,
+      mapOutputTracker,
+      blockManagerMaster,
+      sc.env) {
+      override def runLocally(job: ActiveJob) {
+        // don't bother with the thread while unit testing
+        runLocallyWithinThread(job)
+      }
+    }
+    dagEventProcessTestActor = TestActorRef[DAGSchedulerEventProcessActor](
+      Props(classOf[DAGSchedulerEventProcessActor], noKillScheduler))(system)
+    val rdd = makeRdd(1, Nil)
+    val jobId = submit(rdd, Array(0))
+    cancel(jobId)
+    // Because the job wasn't actually cancelled, we shouldn't have received a 
failure message.
+    assert(failure === null)
+
+    // When the task set completes normally, state should be correctly updated.
+    complete(taskSets(0), Seq((Success, 42)))
+    assert(results === Map(0 -> 42))
+    assertDataStructuresEmpty
+
+    assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+    assert(sparkListener.failedStages.isEmpty)
+    assert(sparkListener.successfulStages.contains(0))
+  }
+
   test("run trivial shuffle") {
     val shuffleMapRdd = makeRdd(2, Nil)
     val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)

Reply via email to