Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r184260597 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2399,6 +2399,84 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } + /** + * This tests the case where origin task success after speculative task got FetchFailed + * before. + */ + test("SPARK-23811: ShuffleMapStage failed by FetchFailed should ignore following " + + "successful tasks") { + // Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC + val rddA = new MyRDD(sc, 2, Nil) + val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) + val shuffleIdA = shuffleDepA.shuffleId + + val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) + val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2)) + + val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker) + + submit(rddC, Array(0, 1)) + + // Complete both tasks in rddA. + assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) + complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2)))) + + // The first task success + runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + + // The second task's speculative attempt fails first, but task self still running. + // This may caused by ExecutorLost. + runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) + // Check currently missing partition. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) + // The second result task self success soon. + runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) + // Missing partition number should not change, otherwise it will cause child stage + // never succeed. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) + } + + test("SPARK-23811: check ResultStage failed by FetchFailed can ignore following " + + "successful tasks") { + val rddA = new MyRDD(sc, 2, Nil) + val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) + val shuffleIdA = shuffleDepA.shuffleId + val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) + submit(rddB, Array(0, 1)) + + // Complete both tasks in rddA. + assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) + complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2)))) + + // The first task of rddB success + assert(taskSets(1).tasks(0).isInstanceOf[ResultTask[_, _]]) + runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + + // The second task's speculative attempt fails first, but task self still running. + // This may caused by ExecutorLost. + runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) + // Make sure failedStage is not empty now + assert(scheduler.failedStages.nonEmpty) + // The second result task self success soon. + assert(taskSets(1).tasks(1).isInstanceOf[ResultTask[_, _]]) + runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) --- End diff -- The success task will be ignored by `OutputCommitCoordinator.taskCompleted`, in the taskCompleted logic, stageStates.getOrElse will return because the current stage is in failed set. The detailed log providing below: ``` 18/04/26 10:50:24.524 ScalaTest-run-running-DAGSchedulerSuite INFO DAGScheduler: Resubmitting ShuffleMapStage 0 (RDD at DAGSchedulerSuite.scala:74) and ResultStage 1 () due to fetch failure 18/04/26 10:50:24.535 ScalaTest-run-running-DAGSchedulerSuite DEBUG DAGSchedulerSuite$$anon$6: Increasing epoch to 2 18/04/26 10:50:24.538 ScalaTest-run-running-DAGSchedulerSuite INFO DAGScheduler: Executor lost: exec-hostA (epoch 1) 18/04/26 10:50:24.540 ScalaTest-run-running-DAGSchedulerSuite INFO DAGScheduler: Shuffle files lost for executor: exec-hostA (epoch 1) 18/04/26 10:50:24.545 ScalaTest-run-running-DAGSchedulerSuite DEBUG DAGSchedulerSuite$$anon$6: Increasing epoch to 3 18/04/26 10:50:24.552 ScalaTest-run-running-DAGSchedulerSuite DEBUG OutputCommitCoordinator: Ignoring task completion for completed stage 18/04/26 10:50:24.554 ScalaTest-run-running-DAGSchedulerSuite INFO DAGScheduler: ResultStage 1 () finished in 0.136 s 18/04/26 10:50:24.573 ScalaTest-run-running-DAGSchedulerSuite DEBUG DAGScheduler: Removing stage 1 from failed set. 18/04/26 10:50:24.575 ScalaTest-run-running-DAGSchedulerSuite DEBUG DAGScheduler: After removal of stage 1, remaining stages = 1 18/04/26 10:50:24.576 ScalaTest-run-running-DAGSchedulerSuite DEBUG DAGScheduler: Removing stage 0 from failed set. 18/04/26 10:50:24.576 ScalaTest-run-running-DAGSchedulerSuite DEBUG DAGScheduler: After removal of stage 0, remaining stages = 0 ```
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org