Repository: spark
Updated Branches:
  refs/heads/master ed4101d29 -> 3990daaf3


[SPARK-23948] Trigger mapstage's job listener in submitMissingTasks

## What changes were proposed in this pull request?

SparkContext submitted a map stage from `submitMapStage` to `DAGScheduler`,
`markMapStageJobAsFinished` is called only in 
(https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L933
 and 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1314);

But think about below scenario:
1. stage0 and stage1 are all `ShuffleMapStage` and stage1 depends on stage0;
2. We submit stage1 by `submitMapStage`;
3. When stage 1 running, `FetchFailed` happened, stage0 and stage1 got 
resubmitted as stage0_1 and stage1_1;
4. When stage0_1 running, speculated tasks in old stage1 come as succeeded, but 
stage1 is not inside `runningStages`. So even though all splits(including the 
speculated tasks) in stage1 succeeded, job listener in stage1 will not be 
called;
5. stage0_1 finished, stage1_1 starts running. When `submitMissingTasks`, there 
is no missing tasks. But in current code, job listener is not triggered.

We should call the job listener for map stage in `5`.

## How was this patch tested?

Not added yet.

Author: jinxing <jinxing6...@126.com>

Closes #21019 from jinxing64/SPARK-23948.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3990daaf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3990daaf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3990daaf

Branch: refs/heads/master
Commit: 3990daaf3b6ca2c5a9f7790030096262efb12cb2
Parents: ed4101d
Author: jinxing <jinxing6...@126.com>
Authored: Tue Apr 17 08:55:01 2018 -0500
Committer: Imran Rashid <iras...@cloudera.com>
Committed: Tue Apr 17 08:55:01 2018 -0500

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 33 +++++++------
 .../spark/scheduler/DAGSchedulerSuite.scala     | 52 ++++++++++++++++++++
 2 files changed, 70 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3990daaf/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 8c46a84..78b6b34 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1092,17 +1092,16 @@ class DAGScheduler(
       // the stage as completed here in case there are no tasks to run
       markStageAsFinished(stage, None)
 
-      val debugString = stage match {
+      stage match {
         case stage: ShuffleMapStage =>
-          s"Stage ${stage} is actually done; " +
-            s"(available: ${stage.isAvailable}," +
-            s"available outputs: ${stage.numAvailableOutputs}," +
-            s"partitions: ${stage.numPartitions})"
+          logDebug(s"Stage ${stage} is actually done; " +
+              s"(available: ${stage.isAvailable}," +
+              s"available outputs: ${stage.numAvailableOutputs}," +
+              s"partitions: ${stage.numPartitions})")
+          markMapStageJobsAsFinished(stage)
         case stage : ResultStage =>
-          s"Stage ${stage} is actually done; (partitions: 
${stage.numPartitions})"
+          logDebug(s"Stage ${stage} is actually done; (partitions: 
${stage.numPartitions})")
       }
-      logDebug(debugString)
-
       submitWaitingChildStages(stage)
     }
   }
@@ -1307,13 +1306,7 @@ class DAGScheduler(
                   shuffleStage.findMissingPartitions().mkString(", "))
                 submitStage(shuffleStage)
               } else {
-                // Mark any map-stage jobs waiting on this stage as finished
-                if (shuffleStage.mapStageJobs.nonEmpty) {
-                  val stats = 
mapOutputTracker.getStatistics(shuffleStage.shuffleDep)
-                  for (job <- shuffleStage.mapStageJobs) {
-                    markMapStageJobAsFinished(job, stats)
-                  }
-                }
+                markMapStageJobsAsFinished(shuffleStage)
                 submitWaitingChildStages(shuffleStage)
               }
             }
@@ -1433,6 +1426,16 @@ class DAGScheduler(
     }
   }
 
+  private[scheduler] def markMapStageJobsAsFinished(shuffleStage: 
ShuffleMapStage): Unit = {
+    // Mark any map-stage jobs waiting on this stage as finished
+    if (shuffleStage.isAvailable && shuffleStage.mapStageJobs.nonEmpty) {
+      val stats = mapOutputTracker.getStatistics(shuffleStage.shuffleDep)
+      for (job <- shuffleStage.mapStageJobs) {
+        markMapStageJobAsFinished(job, stats)
+      }
+    }
+  }
+
   /**
    * Responds to an executor being lost. This is called inside the event loop, 
so it assumes it can
    * modify the scheduler's internal state. Use executorLost() to post a loss 
event from outside.

http://git-wip-us.apache.org/repos/asf/spark/blob/3990daaf/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index d812b5b..8b6ec37 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -2146,6 +2146,58 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
     assertDataStructuresEmpty()
   }
 
+  test("Trigger mapstage's job listener in submitMissingTasks") {
+    val rdd1 = new MyRDD(sc, 2, Nil)
+    val dep1 = new ShuffleDependency(rdd1, new HashPartitioner(2))
+    val rdd2 = new MyRDD(sc, 2, List(dep1), tracker = mapOutputTracker)
+    val dep2 = new ShuffleDependency(rdd2, new HashPartitioner(2))
+
+    val listener1 = new SimpleListener
+    val listener2 = new SimpleListener
+
+    submitMapStage(dep1, listener1)
+    submitMapStage(dep2, listener2)
+
+    // Complete the stage0.
+    assert(taskSets(0).stageId === 0)
+    complete(taskSets(0), Seq(
+      (Success, makeMapStatus("hostA", rdd1.partitions.length)),
+      (Success, makeMapStatus("hostB", rdd1.partitions.length))))
+    assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 
0).map(_._1).toSet ===
+        HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
+    assert(listener1.results.size === 1)
+
+    // When attempting stage1, trigger a fetch failure.
+    assert(taskSets(1).stageId === 1)
+    complete(taskSets(1), Seq(
+      (Success, makeMapStatus("hostC", rdd2.partitions.length)),
+      (FetchFailed(makeBlockManagerId("hostA"), dep1.shuffleId, 0, 0, 
"ignored"), null)))
+    scheduler.resubmitFailedStages()
+    // Stage1 listener should not have a result yet
+    assert(listener2.results.size === 0)
+
+    // Speculative task succeeded in stage1.
+    runEvent(makeCompletionEvent(
+      taskSets(1).tasks(1),
+      Success,
+      makeMapStatus("hostD", rdd2.partitions.length)))
+    // stage1 listener still should not have a result, though there's no 
missing partitions
+    // in it. Because stage1 has been failed and is not inside `runningStages` 
at this moment.
+    assert(listener2.results.size === 0)
+
+    // Stage0 should now be running as task set 2; make its task succeed
+    assert(taskSets(2).stageId === 0)
+    complete(taskSets(2), Seq(
+      (Success, makeMapStatus("hostC", rdd2.partitions.length))))
+    assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 
0).map(_._1).toSet ===
+        Set(makeBlockManagerId("hostC"), makeBlockManagerId("hostB")))
+
+    // After stage0 is finished, stage1 will be submitted and found there is 
no missing
+    // partitions in it. Then listener got triggered.
+    assert(listener2.results.size === 1)
+    assertDataStructuresEmpty()
+  }
+
   /**
    * In this test, we run a map stage where one of the executors fails but we 
still receive a
    * "zombie" complete message from that executor. We want to make sure the 
stage is not reported


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to