Repository: spark
Updated Branches:
  refs/heads/master c875d81a3 -> 698ef762f


[SPARK-14269][SCHEDULER] Eliminate unnecessary submitStage() call.

## What changes were proposed in this pull request?

Currently a method `submitStage()` for waiting stages is called on every 
iteration of the event loop in `DAGScheduler` to submit all waiting stages, but 
most of them are not necessary because they are not related to Stage status.
The case we should try to submit waiting stages is only when their parent 
stages are successfully completed.

This elimination can improve `DAGScheduler` performance.

## How was this patch tested?

Added some checks and other existing tests, and our projects.

We have a project bottle-necked by `DAGScheduler`, having about 2000 stages.

Before this patch the almost all execution time in `Driver` process was spent 
to process `submitStage()` of `dag-scheduler-event-loop` thread but after this 
patch the performance was improved as follows:

|        | total execution time | `dag-scheduler-event-loop` thread time | 
`submitStage()` |
|--------|---------------------:|---------------------------------------:|----------------:|
| Before |              760 sec |                                710 sec |      
   667 sec |
| After  |              440 sec |                                 14 sec |      
    10 sec |

Author: Takuya UESHIN <ues...@happy-camper.st>

Closes #12060 from ueshin/issues/SPARK-14269.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/698ef762
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/698ef762
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/698ef762

Branch: refs/heads/master
Commit: 698ef762f80cf4c84bc7b7cf083aa97d44b87170
Parents: c875d81
Author: Takuya UESHIN <ues...@happy-camper.st>
Authored: Wed May 25 13:57:25 2016 -0700
Committer: Kay Ousterhout <kayousterh...@gmail.com>
Committed: Wed May 25 13:57:25 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 35 ++++++--------------
 .../spark/scheduler/DAGSchedulerSuite.scala     |  7 ++++
 2 files changed, 17 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/698ef762/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 5291b66..766e979 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -726,7 +726,6 @@ class DAGScheduler(
       reason = "as part of cancellation of all jobs"))
     activeJobs.clear() // These should already be empty by this point,
     jobIdToActiveJob.clear() // but just in case we lost track of some jobs...
-    submitWaitingStages()
   }
 
   /**
@@ -752,23 +751,21 @@ class DAGScheduler(
         submitStage(stage)
       }
     }
-    submitWaitingStages()
   }
 
   /**
    * Check for waiting stages which are now eligible for resubmission.
-   * Ordinarily run on every iteration of the event loop.
+   * Submits stages that depend on the given parent stage. Called when the 
parent stage completes
+   * successfully.
    */
-  private def submitWaitingStages() {
-    // TODO: We might want to run this less often, when we are sure that 
something has become
-    // runnable that wasn't before.
-    logTrace("Checking for newly runnable parent stages")
+  private def submitWaitingChildStages(parent: Stage) {
+    logTrace(s"Checking if any dependencies of $parent are now runnable")
     logTrace("running: " + runningStages)
     logTrace("waiting: " + waitingStages)
     logTrace("failed: " + failedStages)
-    val waitingStagesCopy = waitingStages.toArray
-    waitingStages.clear()
-    for (stage <- waitingStagesCopy.sortBy(_.firstJobId)) {
+    val childStages = waitingStages.filter(_.parents.contains(parent)).toArray
+    waitingStages --= childStages
+    for (stage <- childStages.sortBy(_.firstJobId)) {
       submitStage(stage)
     }
   }
@@ -793,7 +790,6 @@ class DAGScheduler(
     }
     val jobIds = activeInGroup.map(_.jobId)
     jobIds.foreach(handleJobCancellation(_, "part of cancelled job group 
%s".format(groupId)))
-    submitWaitingStages()
   }
 
   private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) {
@@ -801,7 +797,6 @@ class DAGScheduler(
     // In that case, we wouldn't have the stage anymore in stageIdToStage.
     val stageAttemptId = 
stageIdToStage.get(task.stageId).map(_.latestInfo.attemptId).getOrElse(-1)
     listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, 
taskInfo))
-    submitWaitingStages()
   }
 
   private[scheduler] def handleTaskSetFailed(
@@ -809,7 +804,6 @@ class DAGScheduler(
       reason: String,
       exception: Option[Throwable]): Unit = {
     stageIdToStage.get(taskSet.stageId).foreach { abortStage(_, reason, 
exception) }
-    submitWaitingStages()
   }
 
   private[scheduler] def cleanUpAfterSchedulerStop() {
@@ -832,7 +826,6 @@ class DAGScheduler(
 
   private[scheduler] def handleGetTaskResult(taskInfo: TaskInfo) {
     listenerBus.post(SparkListenerTaskGettingResult(taskInfo))
-    submitWaitingStages()
   }
 
   private[scheduler] def handleJobSubmitted(jobId: Int,
@@ -871,8 +864,6 @@ class DAGScheduler(
     listenerBus.post(
       SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, 
properties))
     submitStage(finalStage)
-
-    submitWaitingStages()
   }
 
   private[scheduler] def handleMapStageSubmitted(jobId: Int,
@@ -916,8 +907,6 @@ class DAGScheduler(
     if (finalStage.isAvailable) {
       markMapStageJobAsFinished(job, 
mapOutputTracker.getStatistics(dependency))
     }
-
-    submitWaitingStages()
   }
 
   /** Submits stage, but first recursively submits any missing parents. */
@@ -1073,6 +1062,8 @@ class DAGScheduler(
           s"Stage ${stage} is actually done; (partitions: 
${stage.numPartitions})"
       }
       logDebug(debugString)
+
+      submitWaitingChildStages(stage)
     }
   }
 
@@ -1238,9 +1229,8 @@ class DAGScheduler(
                     markMapStageJobAsFinished(job, stats)
                   }
                 }
+                submitWaitingChildStages(shuffleStage)
               }
-
-              // Note: newly runnable stages will be submitted below when we 
submit waiting stages
             }
         }
 
@@ -1315,7 +1305,6 @@ class DAGScheduler(
         // Unrecognized failure - also do nothing. If the task fails 
repeatedly, the TaskScheduler
         // will abort the job.
     }
-    submitWaitingStages()
   }
 
   /**
@@ -1357,7 +1346,6 @@ class DAGScheduler(
       logDebug("Additional executor lost message for " + execId +
                "(epoch " + currentEpoch + ")")
     }
-    submitWaitingStages()
   }
 
   private[scheduler] def handleExecutorAdded(execId: String, host: String) {
@@ -1366,7 +1354,6 @@ class DAGScheduler(
       logInfo("Host added was in lost list earlier: " + host)
       failedEpoch -= execId
     }
-    submitWaitingStages()
   }
 
   private[scheduler] def handleStageCancellation(stageId: Int) {
@@ -1379,7 +1366,6 @@ class DAGScheduler(
       case None =>
         logInfo("No active jobs to kill for Stage " + stageId)
     }
-    submitWaitingStages()
   }
 
   private[scheduler] def handleJobCancellation(jobId: Int, reason: String = 
"") {
@@ -1389,7 +1375,6 @@ class DAGScheduler(
       failJobAndIndependentStages(
         jobIdToActiveJob(jobId), "Job %d cancelled %s".format(jobId, reason))
     }
-    submitWaitingStages()
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/698ef762/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 088a476..fc75c9e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -370,6 +370,13 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with Timeou
     assert(mapStageB.parents === List(mapStageA))
     assert(mapStageC.parents === List(mapStageA, mapStageB))
     assert(finalStage.parents === List(mapStageC))
+
+    complete(taskSets(0), Seq((Success, makeMapStatus("hostA", 1))))
+    complete(taskSets(1), Seq((Success, makeMapStatus("hostA", 1))))
+    complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1))))
+    complete(taskSets(3), Seq((Success, 42)))
+    assert(results === Map(0 -> 42))
+    assertDataStructuresEmpty()
   }
 
   test("zero split job") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to