Repository: spark
Updated Branches:
  refs/heads/branch-1.0 784b2a613 -> 5673c1ea4


SPARK-1235: manage the DAGScheduler EventProcessActor with supervisor and 
refactor the DAGScheduler with Akka

https://spark-project.atlassian.net/browse/SPARK-1235

In the current implementation, the running job will hang if the DAGScheduler 
crashes for some reason (eventProcessActor throws exception in receive() )

The reason is that the actor will automatically restart when the exception is 
thrown during the running but is not captured properly (Akka behaviour), and 
the JobWaiters are still waiting there for the completion of the tasks

In this patch, I refactored the DAGScheduler with Akka and manage the 
eventProcessActor with supervisor, so that upon the failure of a 
eventProcessActor, the supervisor will terminate the EventProcessActor and 
close the SparkContext

thanks for @kayousterhout and @markhamstra to give the hints in JIRA

Author: CodingCat <zhunans...@gmail.com>
Author: Xiangrui Meng <m...@databricks.com>
Author: Nan Zhu <coding...@users.noreply.github.com>

Closes #186 from CodingCat/SPARK-1235 and squashes the following commits:

a7fb0ee [CodingCat] throw Exception on failure of creating DAG
124d82d [CodingCat] blocking the constructor until event actor is ready
baf2d38 [CodingCat] fix the issue brought by non-blocking actorOf
35c886a [CodingCat] fix bug
82d08b3 [CodingCat] calling actorOf on system to ensure it is blocking
310a579 [CodingCat] style fix
cd02d9a [Nan Zhu] small fix
561cfbc [CodingCat] recover doCheckpoint
c048d0e [CodingCat] call submitWaitingStages for every event
a9eea039 [CodingCat] address Matei's comments
ac878ab [CodingCat] typo fix
5d1636a [CodingCat] re-trigger the test.....
9dfb033 [CodingCat] remove unnecessary changes
a7a2a97 [CodingCat] add StageCancelled message
fdf3b17 [CodingCat] just to retrigger the test......
089bc2f [CodingCat] address andrew's comments
228f4b0 [CodingCat] address comments from Mark
b68c1c7 [CodingCat] refactor DAGScheduler with Akka
810efd8 [Xiangrui Meng] akka solution

(cherry picked from commit 027f1b85f961ce16ee069afe3d90a36dce009994)
Signed-off-by: Matei Zaharia <ma...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5673c1ea
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5673c1ea
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5673c1ea

Branch: refs/heads/branch-1.0
Commit: 5673c1ea4eb16e23a8af9beb2ee95e61ebeb9c66
Parents: 784b2a6
Author: CodingCat <zhunans...@gmail.com>
Authored: Fri Apr 25 16:04:48 2014 -0700
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Fri Apr 25 16:05:02 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  20 +-
 .../main/scala/org/apache/spark/rdd/RDD.scala   |   6 +-
 .../apache/spark/scheduler/DAGScheduler.scala   | 419 +++++++++++--------
 .../spark/scheduler/DAGSchedulerEvent.scala     |   4 +-
 .../apache/spark/scheduler/TaskSetManager.scala |   2 +-
 .../spark/scheduler/DAGSchedulerSuite.scala     |  58 ++-
 .../scheduler/TaskSchedulerImplSuite.scala      |   2 +-
 7 files changed, 290 insertions(+), 221 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5673c1ea/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index e9d2f57..eb14d87 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -300,10 +300,17 @@ class SparkContext(config: SparkConf) extends Logging {
 
   // Create and start the scheduler
   private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, 
master)
-  taskScheduler.start()
+  @volatile private[spark] var dagScheduler: DAGScheduler = _
+  try {
+    dagScheduler = new DAGScheduler(this)
+  } catch {
+    case e: Exception => throw
+      new SparkException("DAGScheduler cannot be initialized due to 
%s".format(e.getMessage))
+  }
 
-  @volatile private[spark] var dagScheduler = new DAGScheduler(this)
-  dagScheduler.start()
+  // start TaskScheduler after taskScheduler sets DAGScheduler reference in 
DAGScheduler's
+  // constructor
+  taskScheduler.start()
 
   private[spark] val cleaner: Option[ContextCleaner] = {
     if (conf.getBoolean("spark.cleaner.referenceTracking", true)) {
@@ -1022,8 +1029,8 @@ class SparkContext(config: SparkConf) extends Logging {
       partitions: Seq[Int],
       allowLocal: Boolean,
       resultHandler: (Int, U) => Unit) {
-    partitions.foreach{ p =>
-      require(p >= 0 && p < rdd.partitions.size, s"Invalid partition 
requested: $p")
+    if (dagScheduler == null) {
+      throw new SparkException("SparkContext has been shutdown")
     }
     val callSite = getCallSite
     val cleanedFunc = clean(func)
@@ -1132,9 +1139,6 @@ class SparkContext(config: SparkConf) extends Logging {
       resultHandler: (Int, U) => Unit,
       resultFunc: => R): SimpleFutureAction[R] =
   {
-    partitions.foreach{ p =>
-      require(p >= 0 && p < rdd.partitions.size, s"Invalid partition 
requested: $p")
-    }
     val cleanF = clean(processPartition)
     val callSite = getCallSite
     val waiter = dagScheduler.submitJob(

http://git-wip-us.apache.org/repos/asf/spark/blob/5673c1ea/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index e8bbfbf..3b3524f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1142,9 +1142,9 @@ abstract class RDD[T: ClassTag](
   @transient private var doCheckpointCalled = false
 
   /**
-   * Performs the checkpointing of this RDD by saving this. It is called by 
the DAGScheduler
-   * after a job using this RDD has completed (therefore the RDD has been 
materialized and
-   * potentially stored in memory). doCheckpoint() is called recursively on 
the parent RDDs.
+   * Performs the checkpointing of this RDD by saving this. It is called after 
a job using this RDD
+   * has completed (therefore the RDD has been materialized and potentially 
stored in memory).
+   * doCheckpoint() is called recursively on the parent RDDs.
    */
   private[spark] def doCheckpoint() {
     if (!doCheckpointCalled) {

http://git-wip-us.apache.org/repos/asf/spark/blob/5673c1ea/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index dbde9b5..ff411e2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -22,10 +22,16 @@ import java.util.Properties
 import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
+import scala.concurrent.Await
 import scala.concurrent.duration._
+import scala.language.postfixOps
 import scala.reflect.ClassTag
 
 import akka.actor._
+import akka.actor.OneForOneStrategy
+import akka.actor.SupervisorStrategy.Stop
+import akka.pattern.ask
+import akka.util.Timeout
 
 import org.apache.spark._
 import org.apache.spark.executor.TaskMetrics
@@ -47,14 +53,11 @@ import org.apache.spark.util.Utils
  * not caused by shuffle file loss are handled by the TaskScheduler, which 
will retry each task
  * a small number of times before cancelling the whole stage.
  *
- * THREADING: This class runs all its logic in a single thread executing the 
run() method, to which
- * events are submitted using a synchronized queue (eventQueue). The public 
API methods, such as
- * runJob, taskEnded and executorLost, post events asynchronously to this 
queue. All other methods
- * should be private.
  */
 private[spark]
 class DAGScheduler(
-    taskScheduler: TaskScheduler,
+    private[scheduler] val sc: SparkContext,
+    private[scheduler] val taskScheduler: TaskScheduler,
     listenerBus: LiveListenerBus,
     mapOutputTracker: MapOutputTrackerMaster,
     blockManagerMaster: BlockManagerMaster,
@@ -65,6 +68,7 @@ class DAGScheduler(
 
   def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
     this(
+      sc,
       taskScheduler,
       sc.listenerBus,
       sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
@@ -74,8 +78,6 @@ class DAGScheduler(
 
   def this(sc: SparkContext) = this(sc, sc.taskScheduler)
 
-  private var eventProcessActor: ActorRef = _
-
   private[scheduler] val nextJobId = new AtomicInteger(0)
   private[scheduler] def numTotalJobs: Int = nextJobId.get()
   private val nextStageId = new AtomicInteger(0)
@@ -113,50 +115,31 @@ class DAGScheduler(
   //       stray messages to detect.
   private val failedEpoch = new HashMap[String, Long]
 
-  taskScheduler.setDAGScheduler(this)
+  private val dagSchedulerActorSupervisor =
+    env.actorSystem.actorOf(Props(new DAGSchedulerActorSupervisor(this)))
 
-  /**
-   * Starts the event processing actor.  The actor has two responsibilities:
-   *
-   * 1. Waits for events like job submission, task finished, task failure 
etc., and calls
-   *    [[org.apache.spark.scheduler.DAGScheduler.processEvent()]] to process 
them.
-   * 2. Schedules a periodical task to resubmit failed stages.
-   *
-   * NOTE: the actor cannot be started in the constructor, because the 
periodical task references
-   * some internal states of the enclosing 
[[org.apache.spark.scheduler.DAGScheduler]] object, thus
-   * cannot be scheduled until the [[org.apache.spark.scheduler.DAGScheduler]] 
is fully constructed.
-   */
-  def start() {
-    eventProcessActor = env.actorSystem.actorOf(Props(new Actor {
-      /**
-       * The main event loop of the DAG scheduler.
-       */
-      def receive = {
-        case event: DAGSchedulerEvent =>
-          logTrace("Got event of type " + event.getClass.getName)
-
-          /**
-           * All events are forwarded to `processEvent()`, so that the event 
processing logic can
-           * easily tested without starting a dedicated actor.  Please refer 
to `DAGSchedulerSuite`
-           * for details.
-           */
-          if (!processEvent(event)) {
-            submitWaitingStages()
-          } else {
-            context.stop(self)
-          }
-      }
-    }))
+  private[scheduler] var eventProcessActor: ActorRef = _
+
+  private def initializeEventProcessActor() {
+    // blocking the thread until supervisor is started, which ensures 
eventProcessActor is
+    // not null before any job is submitted
+    implicit val timeout = Timeout(30 seconds)
+    val initEventActorReply =
+      dagSchedulerActorSupervisor ? Props(new 
DAGSchedulerEventProcessActor(this))
+    eventProcessActor = Await.result(initEventActorReply, timeout.duration).
+      asInstanceOf[ActorRef]
   }
 
+  initializeEventProcessActor()
+
   // Called by TaskScheduler to report task's starting.
   def taskStarted(task: Task[_], taskInfo: TaskInfo) {
     eventProcessActor ! BeginEvent(task, taskInfo)
   }
 
   // Called to report that a task has completed and results are being fetched 
remotely.
-  def taskGettingResult(task: Task[_], taskInfo: TaskInfo) {
-    eventProcessActor ! GettingResultEvent(task, taskInfo)
+  def taskGettingResult(taskInfo: TaskInfo) {
+    eventProcessActor ! GettingResultEvent(taskInfo)
   }
 
   // Called by TaskScheduler to report task completions or failures.
@@ -436,7 +419,7 @@ class DAGScheduler(
   {
     // Check to make sure we are not launching a task on a partition that does 
not exist.
     val maxPartitions = rdd.partitions.length
-    partitions.find(p => p >= maxPartitions).foreach { p =>
+    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
       throw new IllegalArgumentException(
         "Attempting to access a non-existent partition: " + p + ". " +
           "Total number of partitions: " + maxPartitions)
@@ -511,6 +494,15 @@ class DAGScheduler(
     eventProcessActor ! AllJobsCancelled
   }
 
+  private[scheduler] def doCancelAllJobs() {
+    // Cancel all running jobs.
+    runningStages.map(_.jobId).foreach(handleJobCancellation(_,
+      reason = "as part of cancellation of all jobs"))
+    activeJobs.clear() // These should already be empty by this point,
+    jobIdToActiveJob.clear() // but just in case we lost track of some jobs...
+    submitWaitingStages()
+  }
+
   /**
    * Cancel all jobs associated with a running or scheduled stage.
    */
@@ -519,147 +511,29 @@ class DAGScheduler(
   }
 
   /**
-   * Process one event retrieved from the event processing actor.
-   *
-   * @param event The event to be processed.
-   * @return `true` if we should stop the event loop.
-   */
-  private[scheduler] def processEvent(event: DAGSchedulerEvent): Boolean = {
-    event match {
-      case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, 
listener, properties) =>
-        var finalStage: Stage = null
-        try {
-          // New stage creation may throw an exception if, for example, jobs 
are run on a HadoopRDD
-          // whose underlying HDFS files have been deleted.
-          finalStage = newStage(rdd, partitions.size, None, jobId, 
Some(callSite))
-        } catch {
-          case e: Exception =>
-            logWarning("Creating new stage failed due to exception - job: " + 
jobId, e)
-            listener.jobFailed(e)
-            return false
-        }
-        val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, 
listener, properties)
-        clearCacheLocs()
-        logInfo("Got job " + job.jobId + " (" + callSite + ") with " + 
partitions.length +
-                " output partitions (allowLocal=" + allowLocal + ")")
-        logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
-        logInfo("Parents of final stage: " + finalStage.parents)
-        logInfo("Missing parents: " + getMissingParentStages(finalStage))
-        if (allowLocal && finalStage.parents.size == 0 && partitions.length == 
1) {
-          // Compute very short actions like first() or take() with no parent 
stages locally.
-          listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), 
properties))
-          runLocally(job)
-        } else {
-          jobIdToActiveJob(jobId) = job
-          activeJobs += job
-          resultStageToJob(finalStage) = job
-          listenerBus.post(
-            SparkListenerJobStart(job.jobId, jobIdToStageIds(jobId).toArray, 
properties))
-          submitStage(finalStage)
-        }
-
-      case StageCancelled(stageId) =>
-        handleStageCancellation(stageId)
-
-      case JobCancelled(jobId) =>
-        handleJobCancellation(jobId)
-
-      case JobGroupCancelled(groupId) =>
-        // Cancel all jobs belonging to this job group.
-        // First finds all active jobs with this group id, and then kill 
stages for them.
-        val activeInGroup = activeJobs.filter(activeJob =>
-          groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID))
-        val jobIds = activeInGroup.map(_.jobId)
-        jobIds.foreach(jobId => handleJobCancellation(jobId,
-          "as part of cancelled job group %s".format(groupId)))
-
-      case AllJobsCancelled =>
-        // Cancel all running jobs.
-        runningStages.map(_.jobId).foreach(jobId => 
handleJobCancellation(jobId,
-          "as part of cancellation of all jobs"))
-        activeJobs.clear()      // These should already be empty by this point,
-        jobIdToActiveJob.clear()   // but just in case we lost track of some 
jobs...
-
-      case ExecutorAdded(execId, host) =>
-        handleExecutorAdded(execId, host)
-
-      case ExecutorLost(execId) =>
-        handleExecutorLost(execId)
-
-      case BeginEvent(task, taskInfo) =>
-        for (
-          stage <- stageIdToStage.get(task.stageId);
-          stageInfo <- stageToInfos.get(stage)
-        ) {
-          if (taskInfo.serializedSize > TASK_SIZE_TO_WARN * 1024 &&
-              !stageInfo.emittedTaskSizeWarning) {
-            stageInfo.emittedTaskSizeWarning = true
-            logWarning(("Stage %d (%s) contains a task of very large " +
-              "size (%d KB). The maximum recommended task size is %d 
KB.").format(
-              task.stageId, stageInfo.name, taskInfo.serializedSize / 1024, 
TASK_SIZE_TO_WARN))
-          }
-        }
-        listenerBus.post(SparkListenerTaskStart(task.stageId, taskInfo))
-
-      case GettingResultEvent(task, taskInfo) =>
-        listenerBus.post(SparkListenerTaskGettingResult(taskInfo))
-
-      case completion @ CompletionEvent(task, reason, _, _, taskInfo, 
taskMetrics) =>
-        val stageId = task.stageId
-        val taskType = Utils.getFormattedClassName(task)
-        listenerBus.post(SparkListenerTaskEnd(stageId, taskType, reason, 
taskInfo, taskMetrics))
-        handleTaskCompletion(completion)
-
-      case TaskSetFailed(taskSet, reason) =>
-        stageIdToStage.get(taskSet.stageId).foreach { abortStage(_, reason) }
-
-      case ResubmitFailedStages =>
-        if (failedStages.size > 0) {
-          // Failed stages may be removed by job cancellation, so failed might 
be empty even if
-          // the ResubmitFailedStages event has been scheduled.
-          resubmitFailedStages()
-        }
-
-      case StopDAGScheduler =>
-        // Cancel any active jobs
-        for (job <- activeJobs) {
-          val error = new SparkException("Job cancelled because SparkContext 
was shut down")
-          job.listener.jobFailed(error)
-          // Tell the listeners that all of the running stages have ended.  
Don't bother
-          // cancelling the stages because if the DAG scheduler is stopped, 
the entire application
-          // is in the process of getting stopped.
-          val stageFailedMessage = "Stage cancelled because SparkContext was 
shut down"
-          runningStages.foreach { stage =>
-            val info = stageToInfos(stage)
-            info.stageFailed(stageFailedMessage)
-            listenerBus.post(SparkListenerStageCompleted(info))
-          }
-          listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error)))
-        }
-        return true
-    }
-    false
-  }
-
-  /**
    * Resubmit any failed stages. Ordinarily called after a small amount of 
time has passed since
    * the last fetch failure.
    */
   private[scheduler] def resubmitFailedStages() {
-    logInfo("Resubmitting failed stages")
-    clearCacheLocs()
-    val failedStagesCopy = failedStages.toArray
-    failedStages.clear()
-    for (stage <- failedStagesCopy.sortBy(_.jobId)) {
-      submitStage(stage)
+    if (failedStages.size > 0) {
+      // Failed stages may be removed by job cancellation, so failed might be 
empty even if
+      // the ResubmitFailedStages event has been scheduled.
+      logInfo("Resubmitting failed stages")
+      clearCacheLocs()
+      val failedStagesCopy = failedStages.toArray
+      failedStages.clear()
+      for (stage <- failedStagesCopy.sortBy(_.jobId)) {
+        submitStage(stage)
+      }
     }
+    submitWaitingStages()
   }
 
   /**
    * Check for waiting or failed stages which are now eligible for 
resubmission.
    * Ordinarily run on every iteration of the event loop.
    */
-  private[scheduler] def submitWaitingStages() {
+  private def submitWaitingStages() {
     // TODO: We might want to run this less often, when we are sure that 
something has become
     // runnable that wasn't before.
     logTrace("Checking for newly runnable parent stages")
@@ -730,6 +604,102 @@ class DAGScheduler(
     }
   }
 
+  private[scheduler] def handleJobGroupCancelled(groupId: String) {
+    // Cancel all jobs belonging to this job group.
+    // First finds all active jobs with this group id, and then kill stages 
for them.
+    val activeInGroup = activeJobs.filter(activeJob =>
+      groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID))
+    val jobIds = activeInGroup.map(_.jobId)
+    jobIds.foreach(handleJobCancellation(_, "part of cancelled job group 
%s".format(groupId)))
+    submitWaitingStages()
+  }
+
+  private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) {
+    for (stage <- stageIdToStage.get(task.stageId); stageInfo <- 
stageToInfos.get(stage)) {
+      if (taskInfo.serializedSize > DAGScheduler.TASK_SIZE_TO_WARN * 1024 &&
+        !stageInfo.emittedTaskSizeWarning) {
+        stageInfo.emittedTaskSizeWarning = true
+        logWarning(("Stage %d (%s) contains a task of very large " +
+          "size (%d KB). The maximum recommended task size is %d KB.").format(
+            task.stageId, stageInfo.name, taskInfo.serializedSize / 1024,
+            DAGScheduler.TASK_SIZE_TO_WARN))
+      }
+    }
+    listenerBus.post(SparkListenerTaskStart(task.stageId, taskInfo))
+    submitWaitingStages()
+  }
+
+  private[scheduler] def handleTaskSetFailed(taskSet: TaskSet, reason: String) 
{
+    stageIdToStage.get(taskSet.stageId).foreach {abortStage(_, reason) }
+    submitWaitingStages()
+  }
+
+  private[scheduler] def cleanUpAfterSchedulerStop() {
+    for (job <- activeJobs) {
+      val error = new SparkException("Job cancelled because SparkContext was 
shut down")
+      job.listener.jobFailed(error)
+      // Tell the listeners that all of the running stages have ended.  Don't 
bother
+      // cancelling the stages because if the DAG scheduler is stopped, the 
entire application
+      // is in the process of getting stopped.
+      val stageFailedMessage = "Stage cancelled because SparkContext was shut 
down"
+      runningStages.foreach { stage =>
+        val info = stageToInfos(stage)
+        info.stageFailed(stageFailedMessage)
+        listenerBus.post(SparkListenerStageCompleted(info))
+      }
+      listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error)))
+    }
+  }
+
+  private[scheduler] def handleGetTaskResult(taskInfo: TaskInfo) {
+    listenerBus.post(SparkListenerTaskGettingResult(taskInfo))
+    submitWaitingStages()
+  }
+
+  private[scheduler] def handleJobSubmitted(jobId: Int,
+      finalRDD: RDD[_],
+      func: (TaskContext, Iterator[_]) => _,
+      partitions: Array[Int],
+      allowLocal: Boolean,
+      callSite: String,
+      listener: JobListener,
+      properties: Properties = null)
+  {
+    var finalStage: Stage = null
+    try {
+      // New stage creation may throw an exception if, for example, jobs are 
run on a
+      // HadoopRDD whose underlying HDFS files have been deleted.
+      finalStage = newStage(finalRDD, partitions.size, None, jobId, 
Some(callSite))
+    } catch {
+      case e: Exception =>
+        logWarning("Creating new stage failed due to exception - job: " + 
jobId, e)
+        listener.jobFailed(e)
+        return
+    }
+    if (finalStage != null) {
+      val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, 
listener, properties)
+      clearCacheLocs()
+      logInfo("Got job %s (%s) with %d output partitions 
(allowLocal=%s)".format(
+        job.jobId, callSite, partitions.length, allowLocal))
+      logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
+      logInfo("Parents of final stage: " + finalStage.parents)
+      logInfo("Missing parents: " + getMissingParentStages(finalStage))
+      if (allowLocal && finalStage.parents.size == 0 && partitions.length == 
1) {
+        // Compute very short actions like first() or take() with no parent 
stages locally.
+        listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), 
properties))
+        runLocally(job)
+      } else {
+        jobIdToActiveJob(jobId) = job
+        activeJobs += job
+        resultStageToJob(finalStage) = job
+        listenerBus.post(SparkListenerJobStart(job.jobId, 
jobIdToStageIds(jobId).toArray,
+          properties))
+        submitStage(finalStage)
+      }
+    }
+    submitWaitingStages()
+  }
+
   /** Submits stage, but first recursively submits any missing parents. */
   private def submitStage(stage: Stage) {
     val jobId = activeJobForStage(stage)
@@ -819,9 +789,12 @@ class DAGScheduler(
    * Responds to a task finishing. This is called inside the event loop so it 
assumes that it can
    * modify the scheduler's internal state. Use taskEnded() to post a task end 
event from outside.
    */
-  private def handleTaskCompletion(event: CompletionEvent) {
+  private[scheduler] def handleTaskCompletion(event: CompletionEvent) {
     val task = event.task
-
+    val stageId = task.stageId
+    val taskType = Utils.getFormattedClassName(task)
+    listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, 
event.taskInfo,
+      event.taskMetrics))
     if (!stageIdToStage.contains(task.stageId)) {
       // Skip all the actions if the stage has been cancelled.
       return
@@ -964,6 +937,7 @@ class DAGScheduler(
         // Unrecognized failure - also do nothing. If the task fails 
repeatedly, the TaskScheduler
         // will abort the job.
     }
+    submitWaitingStages()
   }
 
   /**
@@ -973,7 +947,7 @@ class DAGScheduler(
    * Optionally the epoch during which the failure was caught can be passed to 
avoid allowing
    * stray fetch failures from possibly retriggering the detection of a node 
as lost.
    */
-  private def handleExecutorLost(execId: String, maybeEpoch: Option[Long] = 
None) {
+  private[scheduler] def handleExecutorLost(execId: String, maybeEpoch: 
Option[Long] = None) {
     val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
     if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) {
       failedEpoch(execId) = currentEpoch
@@ -993,17 +967,19 @@ class DAGScheduler(
       logDebug("Additional executor lost message for " + execId +
                "(epoch " + currentEpoch + ")")
     }
+    submitWaitingStages()
   }
 
-  private def handleExecutorAdded(execId: String, host: String) {
+  private[scheduler] def handleExecutorAdded(execId: String, host: String) {
     // remove from failedEpoch(execId) ?
     if (failedEpoch.contains(execId)) {
       logInfo("Host added was in lost list earlier: " + host)
       failedEpoch -= execId
     }
+    submitWaitingStages()
   }
 
-  private def handleStageCancellation(stageId: Int) {
+  private[scheduler] def handleStageCancellation(stageId: Int) {
     if (stageIdToJobIds.contains(stageId)) {
       val jobsThatUseStage: Array[Int] = stageIdToJobIds(stageId).toArray
       jobsThatUseStage.foreach(jobId => {
@@ -1012,22 +988,24 @@ class DAGScheduler(
     } else {
       logInfo("No active jobs to kill for Stage " + stageId)
     }
+    submitWaitingStages()
   }
 
-  private def handleJobCancellation(jobId: Int, reason: String = "") {
+  private[scheduler] def handleJobCancellation(jobId: Int, reason: String = 
"") {
     if (!jobIdToStageIds.contains(jobId)) {
       logDebug("Trying to cancel unregistered job " + jobId)
     } else {
       failJobAndIndependentStages(jobIdToActiveJob(jobId),
         "Job %d cancelled %s".format(jobId, reason), None)
     }
+    submitWaitingStages()
   }
 
   /**
    * Aborts all jobs depending on a particular Stage. This is called in 
response to a task set
    * being canceled by the TaskScheduler. Use taskSetFailed() to inject this 
event from outside.
    */
-  private def abortStage(failedStage: Stage, reason: String) {
+  private[scheduler] def abortStage(failedStage: Stage, reason: String) {
     if (!stageIdToStage.contains(failedStage.id)) {
       // Skip all the actions if the stage has been removed.
       return
@@ -1156,13 +1134,88 @@ class DAGScheduler(
   }
 
   def stop() {
-    if (eventProcessActor != null) {
-      eventProcessActor ! StopDAGScheduler
-    }
+    logInfo("Stopping DAGScheduler")
+    dagSchedulerActorSupervisor ! PoisonPill
     taskScheduler.stop()
   }
 }
 
+private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: 
DAGScheduler)
+  extends Actor with Logging {
+
+  override val supervisorStrategy =
+    OneForOneStrategy() {
+      case x: Exception =>
+        logError("eventProcesserActor failed due to the error %s; shutting 
down SparkContext"
+          .format(x.getMessage))
+        dagScheduler.doCancelAllJobs()
+        dagScheduler.sc.stop()
+        Stop
+    }
+
+  def receive = {
+    case p: Props => sender ! context.actorOf(p)
+    case _ => logWarning("received unknown message in 
DAGSchedulerActorSupervisor")
+  }
+}
+
+private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: 
DAGScheduler)
+  extends Actor with Logging {
+
+  override def preStart() {
+    // set DAGScheduler for taskScheduler to ensure eventProcessActor is always
+    // valid when the messages arrive
+    dagScheduler.taskScheduler.setDAGScheduler(dagScheduler)
+  }
+
+  /**
+   * The main event loop of the DAG scheduler.
+   */
+  def receive = {
+    case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, 
listener, properties) =>
+      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, 
allowLocal, callSite,
+        listener, properties)
+
+    case StageCancelled(stageId) =>
+      dagScheduler.handleStageCancellation(stageId)
+
+    case JobCancelled(jobId) =>
+      dagScheduler.handleJobCancellation(jobId)
+
+    case JobGroupCancelled(groupId) =>
+      dagScheduler.handleJobGroupCancelled(groupId)
+
+    case AllJobsCancelled =>
+      dagScheduler.doCancelAllJobs()
+
+    case ExecutorAdded(execId, host) =>
+      dagScheduler.handleExecutorAdded(execId, host)
+
+    case ExecutorLost(execId) =>
+      dagScheduler.handleExecutorLost(execId)
+
+    case BeginEvent(task, taskInfo) =>
+      dagScheduler.handleBeginEvent(task, taskInfo)
+
+    case GettingResultEvent(taskInfo) =>
+      dagScheduler.handleGetTaskResult(taskInfo)
+
+    case completion @ CompletionEvent(task, reason, _, _, taskInfo, 
taskMetrics) =>
+      dagScheduler.handleTaskCompletion(completion)
+
+    case TaskSetFailed(taskSet, reason) =>
+      dagScheduler.handleTaskSetFailed(taskSet, reason)
+
+    case ResubmitFailedStages =>
+      dagScheduler.resubmitFailedStages()
+  }
+
+  override def postStop() {
+    // Cancel any active jobs in postStop hook
+    dagScheduler.cleanUpAfterSchedulerStop()
+  }
+}
+
 private[spark] object DAGScheduler {
   // The time, in millis, to wait for fetch failure events to stop coming in 
after one is detected;
   // this is a simplistic way to avoid resubmitting tasks in the non-fetchable 
map stage one by one

http://git-wip-us.apache.org/repos/asf/spark/blob/5673c1ea/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index 0800c56..23f5744 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -57,7 +57,7 @@ private[scheduler]
 case class BeginEvent(task: Task[_], taskInfo: TaskInfo) extends 
DAGSchedulerEvent
 
 private[scheduler]
-case class GettingResultEvent(task: Task[_], taskInfo: TaskInfo) extends 
DAGSchedulerEvent
+case class GettingResultEvent(taskInfo: TaskInfo) extends DAGSchedulerEvent
 
 private[scheduler] case class CompletionEvent(
     task: Task[_],
@@ -76,5 +76,3 @@ private[scheduler]
 case class TaskSetFailed(taskSet: TaskSet, reason: String) extends 
DAGSchedulerEvent
 
 private[scheduler] case object ResubmitFailedStages extends DAGSchedulerEvent
-
-private[scheduler] case object StopDAGScheduler extends DAGSchedulerEvent

http://git-wip-us.apache.org/repos/asf/spark/blob/5673c1ea/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index a81b834..f3bd079 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -465,7 +465,7 @@ private[spark] class TaskSetManager(
   def handleTaskGettingResult(tid: Long) = {
     val info = taskInfos(tid)
     info.markGettingResult()
-    sched.dagScheduler.taskGettingResult(tasks(info.index), info)
+    sched.dagScheduler.taskGettingResult(info)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/5673c1ea/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index ff69eb7..d172dd1 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -21,6 +21,8 @@ import scala.Tuple2
 import scala.collection.mutable.{HashSet, HashMap, Map}
 import scala.language.reflectiveCalls
 
+import akka.actor._
+import akka.testkit.{ImplicitSender, TestKit, TestActorRef}
 import org.scalatest.{BeforeAndAfter, FunSuite}
 
 import org.apache.spark._
@@ -28,19 +30,16 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
 
-/**
- * Tests for DAGScheduler. These tests directly call the event processing 
functions in DAGScheduler
- * rather than spawning an event loop thread as happens in the real code. They 
use EasyMock
- * to mock out two classes that DAGScheduler interacts with: TaskScheduler (to 
which TaskSets are
- * submitted) and BlockManagerMaster (from which cache locations are retrieved 
and to which dead
- * host notifications are sent). In addition, tests may check for side effects 
on a non-mocked
- * MapOutputTracker instance.
- *
- * Tests primarily consist of running DAGScheduler#processEvent and
- * DAGScheduler#submitWaitingStages (via test utility functions like runEvent 
or respondToTaskSet)
- * and capturing the resulting TaskSets from the mock TaskScheduler.
- */
-class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with 
LocalSparkContext {
+class BuggyDAGEventProcessActor extends Actor {
+  val state = 0
+  def receive = {
+    case _ => throw new SparkException("error")
+  }
+}
+
+class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with 
FunSuite
+  with ImplicitSender with BeforeAndAfter with LocalSparkContext {
+
   val conf = new SparkConf
   /** Set of TaskSets the DAGScheduler has requested executed. */
   val taskSets = scala.collection.mutable.Buffer[TaskSet]()
@@ -82,6 +81,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter 
with LocalSparkCont
 
   var mapOutputTracker: MapOutputTrackerMaster = null
   var scheduler: DAGScheduler = null
+  var dagEventProcessTestActor: TestActorRef[DAGSchedulerEventProcessActor] = 
null
 
   /**
    * Set of cache locations to return from our mock BlockManagerMaster.
@@ -121,6 +121,7 @@ class DAGSchedulerSuite extends FunSuite with 
BeforeAndAfter with LocalSparkCont
     results.clear()
     mapOutputTracker = new MapOutputTrackerMaster(conf)
     scheduler = new DAGScheduler(
+        sc,
         taskScheduler,
         sc.listenerBus,
         mapOutputTracker,
@@ -131,10 +132,13 @@ class DAGSchedulerSuite extends FunSuite with 
BeforeAndAfter with LocalSparkCont
         runLocallyWithinThread(job)
       }
     }
+    dagEventProcessTestActor = TestActorRef[DAGSchedulerEventProcessActor](
+      Props(classOf[DAGSchedulerEventProcessActor], scheduler))(system)
   }
 
-  after {
-    scheduler.stop()
+  override def afterAll() {
+    super.afterAll()
+    TestKit.shutdownActorSystem(system)
   }
 
   /**
@@ -178,8 +182,7 @@ class DAGSchedulerSuite extends FunSuite with 
BeforeAndAfter with LocalSparkCont
    * DAGScheduler event loop.
    */
   private def runEvent(event: DAGSchedulerEvent) {
-    assert(!scheduler.processEvent(event))
-    scheduler.submitWaitingStages()
+    dagEventProcessTestActor.receive(event)
   }
 
   /**
@@ -209,7 +212,7 @@ class DAGSchedulerSuite extends FunSuite with 
BeforeAndAfter with LocalSparkCont
       listener: JobListener = jobListener): Int = {
     val jobId = scheduler.nextJobId.getAndIncrement()
     runEvent(JobSubmitted(jobId, rdd, func, partitions, allowLocal, null, 
listener))
-    return jobId
+    jobId
   }
 
   /** Sends TaskSetFailed to the scheduler. */
@@ -223,19 +226,17 @@ class DAGSchedulerSuite extends FunSuite with 
BeforeAndAfter with LocalSparkCont
   }
 
   test("zero split job") {
-    val rdd = makeRdd(0, Nil)
     var numResults = 0
     val fakeListener = new JobListener() {
       override def taskSucceeded(partition: Int, value: Any) = numResults += 1
       override def jobFailed(exception: Exception) = throw exception
     }
-    submit(rdd, Array(), listener = fakeListener)
+    submit(makeRdd(0, Nil), Array(), listener = fakeListener)
     assert(numResults === 0)
   }
 
   test("run trivial job") {
-    val rdd = makeRdd(1, Nil)
-    submit(rdd, Array(0))
+    submit(makeRdd(1, Nil), Array(0))
     complete(taskSets(0), List((Success, 42)))
     assert(results === Map(0 -> 42))
     assertDataStructuresEmpty
@@ -529,6 +530,18 @@ class DAGSchedulerSuite extends FunSuite with 
BeforeAndAfter with LocalSparkCont
     assertDataStructuresEmpty
   }
 
+  test("DAGSchedulerActorSupervisor closes the SparkContext when 
EventProcessActor crashes") {
+    val actorSystem = ActorSystem("test")
+    val supervisor = actorSystem.actorOf(
+      Props(classOf[DAGSchedulerActorSupervisor], scheduler), "dagSupervisor")
+    supervisor ! Props[BuggyDAGEventProcessActor]
+    val child = expectMsgType[ActorRef]
+    watch(child)
+    child ! "hi"
+    expectMsgPF(){ case Terminated(child) => () }
+    assert(scheduler.sc.dagScheduler === null)
+  }
+
   /**
    * Assert that the supplied TaskSet has exactly the given hosts as its 
preferred locations.
    * Note that this checks only the host and not the executor ID.
@@ -561,3 +574,4 @@ class DAGSchedulerSuite extends FunSuite with 
BeforeAndAfter with LocalSparkCont
     assert(scheduler.waitingStages.isEmpty)
   }
 }
+

http://git-wip-us.apache.org/repos/asf/spark/blob/5673c1ea/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 2fb750d..a8b605c 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -305,7 +305,7 @@ class TaskSchedulerImplSuite extends FunSuite with 
LocalSparkContext with Loggin
       override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
       override def executorAdded(execId: String, host: String) {}
     }
-
+    taskScheduler.setDAGScheduler(dagScheduler)
     // Give zero core offers. Should not generate any tasks
     val zeroCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", 0),
       new WorkerOffer("executor1", "host1", 0))

Reply via email to