Repository: spark Updated Branches: refs/heads/branch-1.0 5869f8bf1 -> 92b012502
[SPARK-1749] Job cancellation when SchedulerBackend does not implement killTask This is a fixed up version of #686 (cc @markhamstra @pwendell). The last commit (the only one I authored) reflects the changes I made from Mark's original patch. Author: Mark Hamstra <markhams...@gmail.com> Author: Kay Ousterhout <kayousterh...@gmail.com> Closes #1219 from kayousterhout/mark-SPARK-1749 and squashes the following commits: 42dfa7e [Kay Ousterhout] Got rid of terrible double-negative name 80b3205 [Kay Ousterhout] Don't notify listeners of job failure if it wasn't successfully cancelled. d156d33 [Mark Hamstra] Do nothing in no-kill submitTasks 9312baa [Mark Hamstra] code review update cc353c8 [Mark Hamstra] scalastyle e61f7f8 [Mark Hamstra] Catch UnsupportedOperationException when DAGScheduler tries to cancel a job on a SchedulerBackend that does not implement killTask (cherry picked from commit b88a59a66845b8935b22f06fc96d16841ed20c94) Signed-off-by: Patrick Wendell <pwend...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/92b01250 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/92b01250 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/92b01250 Branch: refs/heads/branch-1.0 Commit: 92b01250246ef1211d6ea15036ebc705ccabe7f8 Parents: 5869f8b Author: Mark Hamstra <markhams...@gmail.com> Authored: Wed Jun 25 20:57:48 2014 -0700 Committer: Patrick Wendell <pwend...@gmail.com> Committed: Wed Jun 25 20:57:58 2014 -0700 ---------------------------------------------------------------------- .../apache/spark/scheduler/DAGScheduler.scala | 30 ++++++++---- .../spark/scheduler/DAGSchedulerSuite.scala | 48 ++++++++++++++++++++ 2 files changed, 69 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/92b01250/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index ff411e2..81a8743 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1031,7 +1031,7 @@ class DAGScheduler( private def failJobAndIndependentStages(job: ActiveJob, failureReason: String, resultStage: Option[Stage]) { val error = new SparkException(failureReason) - job.listener.jobFailed(error) + var ableToCancelStages = true val shouldInterruptThread = if (job.properties == null) false @@ -1055,18 +1055,26 @@ class DAGScheduler( // This is the only job that uses this stage, so fail the stage if it is running. val stage = stageIdToStage(stageId) if (runningStages.contains(stage)) { - taskScheduler.cancelTasks(stageId, shouldInterruptThread) - val stageInfo = stageToInfos(stage) - stageInfo.stageFailed(failureReason) - listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage))) + try { // cancelTasks will fail if a SchedulerBackend does not implement killTask + taskScheduler.cancelTasks(stageId, shouldInterruptThread) + val stageInfo = stageToInfos(stage) + stageInfo.stageFailed(failureReason) + listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage))) + } catch { + case e: UnsupportedOperationException => + logInfo(s"Could not cancel tasks for stage $stageId", e) + ableToCancelStages = false + } } } } } - cleanupStateForJobAndIndependentStages(job, resultStage) - - listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error))) + if (ableToCancelStages) { + job.listener.jobFailed(error) + cleanupStateForJobAndIndependentStages(job, resultStage) + listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error))) + } } /** @@ -1148,7 +1156,11 @@ private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler) case x: Exception => logError("eventProcesserActor failed due to the error %s; shutting down SparkContext" .format(x.getMessage)) - dagScheduler.doCancelAllJobs() + try { + dagScheduler.doCancelAllJobs() + } catch { + case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t) + } dagScheduler.sc.stop() Stop } http://git-wip-us.apache.org/repos/asf/spark/blob/92b01250/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 7e901f8..23a350b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -114,6 +114,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F sc = new SparkContext("local", "DAGSchedulerSuite") sparkListener.successfulStages.clear() sparkListener.failedStages.clear() + failure = null sc.addSparkListener(sparkListener) taskSets.clear() cancelledStages.clear() @@ -299,6 +300,53 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F assertDataStructuresEmpty } + test("job cancellation no-kill backend") { + // make sure that the DAGScheduler doesn't crash when the TaskScheduler + // doesn't implement killTask() + val noKillTaskScheduler = new TaskScheduler() { + override def rootPool: Pool = null + override def schedulingMode: SchedulingMode = SchedulingMode.NONE + override def start() = {} + override def stop() = {} + override def submitTasks(taskSet: TaskSet) = { + taskSets += taskSet + } + override def cancelTasks(stageId: Int, interruptThread: Boolean) { + throw new UnsupportedOperationException + } + override def setDAGScheduler(dagScheduler: DAGScheduler) = {} + override def defaultParallelism() = 2 + } + val noKillScheduler = new DAGScheduler( + sc, + noKillTaskScheduler, + sc.listenerBus, + mapOutputTracker, + blockManagerMaster, + sc.env) { + override def runLocally(job: ActiveJob) { + // don't bother with the thread while unit testing + runLocallyWithinThread(job) + } + } + dagEventProcessTestActor = TestActorRef[DAGSchedulerEventProcessActor]( + Props(classOf[DAGSchedulerEventProcessActor], noKillScheduler))(system) + val rdd = makeRdd(1, Nil) + val jobId = submit(rdd, Array(0)) + cancel(jobId) + // Because the job wasn't actually cancelled, we shouldn't have received a failure message. + assert(failure === null) + + // When the task set completes normally, state should be correctly updated. + complete(taskSets(0), Seq((Success, 42))) + assert(results === Map(0 -> 42)) + assertDataStructuresEmpty + + assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + assert(sparkListener.failedStages.isEmpty) + assert(sparkListener.successfulStages.contains(0)) + } + test("run trivial shuffle") { val shuffleMapRdd = makeRdd(2, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)