Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20930#discussion_r184260210
  
    --- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -2399,6 +2399,84 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
         }
       }
     
    +  /**
    +   * This tests the case where origin task success after speculative task 
got FetchFailed
    +   * before.
    +   */
    +  test("SPARK-23811: ShuffleMapStage failed by FetchFailed should ignore 
following " +
    +    "successful tasks") {
    +    // Create 3 RDDs with shuffle dependencies on each other: rddA <--- 
rddB <--- rddC
    +    val rddA = new MyRDD(sc, 2, Nil)
    +    val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
    +    val shuffleIdA = shuffleDepA.shuffleId
    +
    +    val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = 
mapOutputTracker)
    +    val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
    +
    +    val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = 
mapOutputTracker)
    +
    +    submit(rddC, Array(0, 1))
    +
    +    // Complete both tasks in rddA.
    +    assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
    +    complete(taskSets(0), Seq(
    +      (Success, makeMapStatus("hostA", 2)),
    +      (Success, makeMapStatus("hostB", 2))))
    +
    +    // The first task success
    +    runEvent(makeCompletionEvent(
    +      taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2)))
    +
    +    // The second task's speculative attempt fails first, but task self 
still running.
    +    // This may caused by ExecutorLost.
    +    runEvent(makeCompletionEvent(
    +      taskSets(1).tasks(1),
    +      FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, 
"ignored"),
    +      null))
    +    // Check currently missing partition.
    +    
assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size 
=== 1)
    +    // The second result task self success soon.
    +    runEvent(makeCompletionEvent(
    +      taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
    +    // Missing partition number should not change, otherwise it will cause 
child stage
    +    // never succeed.
    +    
assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size 
=== 1)
    +  }
    +
    +  test("SPARK-23811: check ResultStage failed by FetchFailed can ignore 
following " +
    +    "successful tasks") {
    +    val rddA = new MyRDD(sc, 2, Nil)
    +    val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
    +    val shuffleIdA = shuffleDepA.shuffleId
    +    val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = 
mapOutputTracker)
    +    submit(rddB, Array(0, 1))
    +
    +    // Complete both tasks in rddA.
    +    assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
    +    complete(taskSets(0), Seq(
    +      (Success, makeMapStatus("hostA", 2)),
    +      (Success, makeMapStatus("hostB", 2))))
    +
    +    // The first task of rddB success
    +    assert(taskSets(1).tasks(0).isInstanceOf[ResultTask[_, _]])
    +    runEvent(makeCompletionEvent(
    +      taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2)))
    +
    +    // The second task's speculative attempt fails first, but task self 
still running.
    +    // This may caused by ExecutorLost.
    +    runEvent(makeCompletionEvent(
    +      taskSets(1).tasks(1),
    +      FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, 
"ignored"),
    +      null))
    +    // Make sure failedStage is not empty now
    +    assert(scheduler.failedStages.nonEmpty)
    +    // The second result task self success soon.
    +    assert(taskSets(1).tasks(1).isInstanceOf[ResultTask[_, _]])
    +    runEvent(makeCompletionEvent(
    +      taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
    +    assertDataStructuresEmpty()
    --- End diff --
    
    I add this test for answering your previous question "Can you simulate what 
happens to result task if FechFaileded comes before task success?". This test 
can pass without my code changing in DAGScheduler.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to