This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ffc0b71aa3e [SPARK-45182][CORE] Ignore task completion from old stage 
after retrying indeterminate stages
7ffc0b71aa3e is described below

commit 7ffc0b71aa3e416a9b21e0975a169b2a8a8403a8
Author: mayurb <may...@uber.com>
AuthorDate: Tue Sep 26 11:04:07 2023 +0800

    [SPARK-45182][CORE] Ignore task completion from old stage after retrying 
indeterminate stages
    
    ### What changes were proposed in this pull request?
    [SPARK-25342](https://issues.apache.org/jira/browse/SPARK-25342) Added a 
support for rolling back shuffle map stage so that all tasks of the stage can 
be retried when the stage output is indeterminate. This is done by clearing all 
map outputs at the time of stage submission. This approach workouts well except 
for this case:
    
    Assume both Shuffle 1 and 2 are indeterminate
    
    ShuffleMapStage1 ––> Shuffle 1 ---–> ShuffleMapStage2 ----> Shuffle 2 ----> 
ResultStage
    
    - ShuffleMapStage1 is complete
    - A task from ShuffleMapStage2 fails with FetchFailed. Other tasks are 
still running
    - Both ShuffleMapStage1 and ShuffleMapStage2 are retried
    - ShuffleMapStage1 is retried and completes
    - ShuffleMapStage2 reattempt is scheduled for execution
    - Before all tasks of ShuffleMapStage2 reattempt could finish, one/more 
laggard tasks from the original attempt of ShuffleMapStage2 finish and 
ShuffleMapStage2 also gets marked as complete
    - Result Stage gets scheduled and finishes
    
    After this change, such laggard tasks from the old attempt of the 
indeterminate stage will be ignored
    
    ### Why are the changes needed?
    This can give wrong result when indeterminate stages needs to be retried 
under the circumstances mentioned above
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    A new test case
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #42950 from mayurdb/rollbackFix.
    
    Authored-by: mayurb <may...@uber.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../org/apache/spark/scheduler/DAGScheduler.scala  |  29 +++---
 .../apache/spark/scheduler/DAGSchedulerSuite.scala | 104 +++++++++++++++++++++
 2 files changed, 122 insertions(+), 11 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 8a1480fd2100..a456f91d4c96 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1903,19 +1903,26 @@ private[spark] class DAGScheduler(
 
           case smt: ShuffleMapTask =>
             val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
-            shuffleStage.pendingPartitions -= task.partitionId
-            val status = event.result.asInstanceOf[MapStatus]
-            val execId = status.location.executorId
-            logDebug("ShuffleMapTask finished on " + execId)
-            if (executorFailureEpoch.contains(execId) &&
+            // Ignore task completion for old attempt of indeterminate stage
+            val ignoreIndeterminate = stage.isIndeterminate &&
+              task.stageAttemptId < stage.latestInfo.attemptNumber()
+            if (!ignoreIndeterminate) {
+              shuffleStage.pendingPartitions -= task.partitionId
+              val status = event.result.asInstanceOf[MapStatus]
+              val execId = status.location.executorId
+              logDebug("ShuffleMapTask finished on " + execId)
+              if (executorFailureEpoch.contains(execId) &&
                 smt.epoch <= executorFailureEpoch(execId)) {
-              logInfo(s"Ignoring possibly bogus $smt completion from executor 
$execId")
+                logInfo(s"Ignoring possibly bogus $smt completion from 
executor $execId")
+              } else {
+                // The epoch of the task is acceptable (i.e., the task was 
launched after the most
+                // recent failure we're aware of for the executor), so mark 
the task's output as
+                // available.
+                mapOutputTracker.registerMapOutput(
+                  shuffleStage.shuffleDep.shuffleId, smt.partitionId, status)
+              }
             } else {
-              // The epoch of the task is acceptable (i.e., the task was 
launched after the most
-              // recent failure we're aware of for the executor), so mark the 
task's output as
-              // available.
-              mapOutputTracker.registerMapOutput(
-                shuffleStage.shuffleDep.shuffleId, smt.partitionId, status)
+              logInfo(s"Ignoring $smt completion from an older attempt of 
indeterminate stage")
             }
 
             if (runningStages.contains(shuffleStage) && 
shuffleStage.pendingPartitions.isEmpty) {
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index c7e4994e328f..e351f8b95bbb 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -3041,6 +3041,27 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
     (shuffleId1, shuffleId2)
   }
 
+  private def constructTwoIndeterminateStage(): (Int, Int) = {
+    val shuffleMapRdd1 = new MyRDD(sc, 2, Nil, indeterminate = true)
+
+    val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new 
HashPartitioner(2))
+    val shuffleId1 = shuffleDep1.shuffleId
+    val shuffleMapRdd2 = new MyRDD(sc, 2, List(shuffleDep1), tracker = 
mapOutputTracker,
+      indeterminate = true)
+
+    val shuffleDep2 = new ShuffleDependency(shuffleMapRdd2, new 
HashPartitioner(2))
+    val shuffleId2 = shuffleDep2.shuffleId
+    val finalRdd = new MyRDD(sc, 2, List(shuffleDep2), tracker = 
mapOutputTracker)
+
+    submit(finalRdd, Array(0, 1))
+
+    // Finish the first shuffle map stage.
+    completeShuffleMapStageSuccessfully(0, 0, 2)
+    assert(mapOutputTracker.findMissingPartitions(shuffleId1) === 
Some(Seq.empty))
+
+    (shuffleId1, shuffleId2)
+  }
+
   test("SPARK-25341: abort stage while using old fetch protocol") {
     conf.set(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL.key, "true")
     // Construct the scenario of indeterminate stage fetch failed.
@@ -3099,6 +3120,89 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
     assertDataStructuresEmpty()
   }
 
+  test("SPARK-45182: Ignore task completion from old stage after retrying 
indeterminate stages") {
+    val (shuffleId1, shuffleId2) = constructTwoIndeterminateStage()
+
+    // shuffleMapStage0 -> shuffleId1 -> shuffleMapStage1 -> shuffleId2 -> 
resultStage
+    val shuffleMapStage1 = 
scheduler.stageIdToStage(1).asInstanceOf[ShuffleMapStage]
+    val resultStage = scheduler.stageIdToStage(2).asInstanceOf[ResultStage]
+
+    // Shuffle map stage 0 is done
+    assert(mapOutputTracker.findMissingPartitions(shuffleId1) == 
Some(Seq.empty))
+    // Shuffle map stage 1 is still waiting for its 2 tasks to complete
+    assert(mapOutputTracker.findMissingPartitions(shuffleId2) == Some(Seq(0, 
1)))
+    // The result stage is still waiting for its 2 tasks to complete
+    assert(resultStage.findMissingPartitions() == Seq(0, 1))
+
+    scheduler.resubmitFailedStages()
+
+    // The first task of the shuffle map stage 1 fails with fetch failure
+    runEvent(makeCompletionEvent(
+      taskSets(1).tasks(0),
+      FetchFailed(makeBlockManagerId("hostA"), shuffleId1, 0L, 0, 0, 
"ignored"),
+      null))
+
+    // Both the stages should have been resubmitted
+    val newFailedStages = scheduler.failedStages.toSeq
+    assert(newFailedStages.map(_.id) == Seq(0, 1))
+
+    scheduler.resubmitFailedStages()
+
+    // Since shuffleId1 is indeterminate, all tasks of shuffle map stage 0 
should be ran
+    assert(taskSets(2).stageId == 0)
+    assert(taskSets(2).stageAttemptId == 1)
+    assert(taskSets(2).tasks.length == 2)
+
+    // Complete the re-attempt of shuffle map stage 0
+    completeShuffleMapStageSuccessfully(0, 1, 2)
+    assert(mapOutputTracker.findMissingPartitions(shuffleId1) === 
Some(Seq.empty))
+
+    // Since shuffleId2 is indeterminate, all tasks of shuffle map stage 1 
should be ran
+    assert(taskSets(3).stageId == 1)
+    assert(taskSets(3).stageAttemptId == 1)
+    assert(taskSets(3).tasks.length == 2)
+
+    // The first task of the shuffle map stage 1 from 2nd attempt succeeds
+    runEvent(makeCompletionEvent(
+      taskSets(3).tasks(0),
+      Success,
+      makeMapStatus("hostB",
+        2)))
+
+    // The second task of the  shuffle map stage 1 from 1st attempt succeeds
+    runEvent(makeCompletionEvent(
+      taskSets(1).tasks(1),
+      Success,
+      makeMapStatus("hostC",
+        2)))
+
+    // This task completion should get ignored and partition 1 should be 
missing
+    // for shuffle map stage 1
+    assert(mapOutputTracker.findMissingPartitions(shuffleId2) == Some(Seq(1)))
+
+    // The second task of the shuffle map stage 1 from 2nd attempt succeeds
+    runEvent(makeCompletionEvent(
+      taskSets(3).tasks(1),
+      Success,
+      makeMapStatus("hostD",
+        2)))
+
+    // The shuffle map stage 1 should be done
+    assert(mapOutputTracker.findMissingPartitions(shuffleId2) === 
Some(Seq.empty))
+
+    // The shuffle map outputs for shuffleId1 should be from latest attempt of 
shuffle map stage 1
+    assert(mapOutputTracker.getMapLocation(shuffleMapStage1.shuffleDep, 0, 2)
+      === Seq("hostB", "hostD"))
+
+    // Complete result stage
+    complete(taskSets(4), Seq((Success, 11), (Success, 12)))
+
+    // Job successfully ended
+    assert(results === Map(0 -> 11, 1 -> 12))
+    results.clear()
+    assertDataStructuresEmpty()
+  }
+
   test("SPARK-25341: continuous indeterminate stage roll back") {
     // shuffleMapRdd1/2/3 are all indeterminate.
     val shuffleMapRdd1 = new MyRDD(sc, 2, Nil, indeterminate = true)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to