This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new e468576  [SPARK-28699][CORE][2.4] Fix a corner case for aborting 
indeterminate stage
e468576 is described below

commit e468576423b0ee235b74970a0b923191145563d9
Author: Yuanjian Li <xyliyuanj...@gmail.com>
AuthorDate: Thu Aug 22 00:47:31 2019 -0700

    [SPARK-28699][CORE][2.4] Fix a corner case for aborting indeterminate stage
    
    ### What changes were proposed in this pull request?
    Change the logic of collecting the indeterminate stage, we should look at 
stages from mapStage, not failedStage during handle FetchFailed.
    
    ### Why are the changes needed?
    In the fetch failed error handle logic, the original logic of collecting 
indeterminate stage from the fetch failed stage. And in the scenario of the 
fetch failed happened in the first task of this stage, this logic will cause 
the indeterminate stage to resubmit partially. Eventually, we are capable of 
getting correctness bug.
    
    ### Does this PR introduce any user-facing change?
    It makes the corner case of indeterminate stage abort as expected.
    
    ### How was this patch tested?
    New UT in DAGSchedulerSuite.
    Run below integrated test with `local-cluster[5, 2, 5120]`, and set 
`spark.sql.execution.sortBeforeRepartition`=false, it will abort the 
indeterminate stage as expected:
    ```scala
    import scala.sys.process._
    import org.apache.spark.TaskContext
    
    val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000, x)}
    // kill an executor in the stage that performs repartition(239)
    val df = res.repartition(113).map{ x => (x._1 + 1, 
x._2)}.repartition(239).map { x =>
      if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 
&& TaskContext.get.stageAttemptNumber == 0) {
        throw new Exception("pkill -f -n java".!!)
      }
      x
    }
    val r2 = df.distinct.count()
    ```
    
    Closes #25509 from xuanyuanking/SPARK-28699-backport-2.4.
    
    Authored-by: Yuanjian Li <xyliyuanj...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../org/apache/spark/scheduler/DAGScheduler.scala  |  6 +--
 .../apache/spark/scheduler/DAGSchedulerSuite.scala | 52 +++++++++++++---------
 2 files changed, 34 insertions(+), 24 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index d314b73..0ba8343 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1505,13 +1505,13 @@ private[spark] class DAGScheduler(
               // guaranteed to be determinate, so the input data of the 
reducers will not change
               // even if the map tasks are re-tried.
               if (mapStage.rdd.outputDeterministicLevel == 
DeterministicLevel.INDETERMINATE) {
-                // It's a little tricky to find all the succeeding stages of 
`failedStage`, because
+                // It's a little tricky to find all the succeeding stages of 
`mapStage`, because
                 // each stage only know its parents not children. Here we 
traverse the stages from
                 // the leaf nodes (the result stages of active jobs), and 
rollback all the stages
-                // in the stage chains that connect to the `failedStage`. To 
speed up the stage
+                // in the stage chains that connect to the `mapStage`. To 
speed up the stage
                 // traversing, we collect the stages to rollback first. If a 
stage needs to
                 // rollback, all its succeeding stages need to rollback to.
-                val stagesToRollback = 
scala.collection.mutable.HashSet(failedStage)
+                val stagesToRollback = HashSet[Stage](mapStage)
 
                 def collectStagesToRollback(stageChain: List[Stage]): Unit = {
                   if (stagesToRollback.contains(stageChain.head)) {
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index e4bf0ab..fb68f1b 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -2704,27 +2704,10 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
       FetchFailed(makeBlockManagerId("hostC"), shuffleId2, 0, 0, "ignored"),
       null))
 
-    val failedStages = scheduler.failedStages.toSeq
-    assert(failedStages.length == 2)
-    // Shuffle blocks of "hostC" is lost, so first task of the 
`shuffleMapRdd2` needs to retry.
-    assert(failedStages.collect {
-      case stage: ShuffleMapStage if stage.shuffleDep.shuffleId == shuffleId2 
=> stage
-    }.head.findMissingPartitions() == Seq(0))
-    // The result stage is still waiting for its 2 tasks to complete
-    assert(failedStages.collect {
-      case stage: ResultStage => stage
-    }.head.findMissingPartitions() == Seq(0, 1))
-
-    scheduler.resubmitFailedStages()
-
-    // The first task of the `shuffleMapRdd2` failed with fetch failure
-    runEvent(makeCompletionEvent(
-      taskSets(3).tasks(0),
-      FetchFailed(makeBlockManagerId("hostA"), shuffleId1, 0, 0, "ignored"),
-      null))
-
-    // The job should fail because Spark can't rollback the shuffle map stage.
-    assert(failure != null && failure.getMessage.contains("Spark cannot 
rollback"))
+    // The second shuffle map stage need to rerun, the job will abort for the 
indeterminate
+    // stage rerun.
+    assert(failure != null && failure.getMessage
+      .contains("Spark cannot rollback the ShuffleMapStage 1"))
   }
 
   private def assertResultStageFailToRollback(mapRdd: MyRDD): Unit = {
@@ -2819,6 +2802,33 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
     assertResultStageFailToRollback(shuffleMapRdd)
   }
 
+  test("SPARK-28699: abort stage if parent stage is indeterminate stage") {
+    val shuffleMapRdd = new MyRDD(sc, 2, Nil, indeterminate = true)
+
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(2))
+    val shuffleId = shuffleDep.shuffleId
+    val finalRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = 
mapOutputTracker)
+
+    submit(finalRdd, Array(0, 1))
+
+    // Finish the first shuffle map stage.
+    complete(taskSets(0), Seq(
+      (Success, makeMapStatus("hostA", 2)),
+      (Success, makeMapStatus("hostB", 2))))
+    assert(mapOutputTracker.findMissingPartitions(shuffleId) === 
Some(Seq.empty))
+
+    runEvent(makeCompletionEvent(
+      taskSets(1).tasks(0),
+      FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"),
+      null))
+
+    // Shuffle blocks of "hostA" is lost, so first task of the `shuffleMapRdd` 
needs to retry.
+    // The result stage is still waiting for its 2 tasks to complete.
+    // Because of shuffleMapRdd is indeterminate, this job will be abort.
+    assert(failure != null && failure.getMessage
+      .contains("Spark cannot rollback the ShuffleMapStage 0"))
+  }
+
   /**
    * Assert that the supplied TaskSet has exactly the given hosts as its 
preferred locations.
    * Note that this checks only the host and not the executor ID.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to