Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20930#discussion_r184276403
  
    --- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -2399,6 +2399,84 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
         }
       }
     
    +  /**
    +   * This tests the case where origin task success after speculative task 
got FetchFailed
    +   * before.
    +   */
    +  test("SPARK-23811: ShuffleMapStage failed by FetchFailed should ignore 
following " +
    +    "successful tasks") {
    +    // Create 3 RDDs with shuffle dependencies on each other: rddA <--- 
rddB <--- rddC
    +    val rddA = new MyRDD(sc, 2, Nil)
    +    val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
    +    val shuffleIdA = shuffleDepA.shuffleId
    +
    +    val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = 
mapOutputTracker)
    +    val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
    +
    +    val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = 
mapOutputTracker)
    +
    +    submit(rddC, Array(0, 1))
    +
    +    // Complete both tasks in rddA.
    +    assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
    +    complete(taskSets(0), Seq(
    +      (Success, makeMapStatus("hostA", 2)),
    +      (Success, makeMapStatus("hostB", 2))))
    +
    +    // The first task success
    +    runEvent(makeCompletionEvent(
    +      taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2)))
    +
    +    // The second task's speculative attempt fails first, but task self 
still running.
    +    // This may caused by ExecutorLost.
    +    runEvent(makeCompletionEvent(
    +      taskSets(1).tasks(1),
    +      FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, 
"ignored"),
    +      null))
    +    // Check currently missing partition.
    +    
assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size 
=== 1)
    +    // The second result task self success soon.
    +    runEvent(makeCompletionEvent(
    +      taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
    +    // Missing partition number should not change, otherwise it will cause 
child stage
    +    // never succeed.
    +    
assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size 
=== 1)
    +  }
    +
    +  test("SPARK-23811: check ResultStage failed by FetchFailed can ignore 
following " +
    +    "successful tasks") {
    +    val rddA = new MyRDD(sc, 2, Nil)
    +    val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
    +    val shuffleIdA = shuffleDepA.shuffleId
    +    val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = 
mapOutputTracker)
    +    submit(rddB, Array(0, 1))
    +
    +    // Complete both tasks in rddA.
    +    assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
    +    complete(taskSets(0), Seq(
    +      (Success, makeMapStatus("hostA", 2)),
    +      (Success, makeMapStatus("hostB", 2))))
    +
    +    // The first task of rddB success
    +    assert(taskSets(1).tasks(0).isInstanceOf[ResultTask[_, _]])
    +    runEvent(makeCompletionEvent(
    +      taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2)))
    +
    +    // The second task's speculative attempt fails first, but task self 
still running.
    +    // This may caused by ExecutorLost.
    +    runEvent(makeCompletionEvent(
    +      taskSets(1).tasks(1),
    +      FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, 
"ignored"),
    +      null))
    +    // Make sure failedStage is not empty now
    +    assert(scheduler.failedStages.nonEmpty)
    +    // The second result task self success soon.
    +    assert(taskSets(1).tasks(1).isInstanceOf[ResultTask[_, _]])
    +    runEvent(makeCompletionEvent(
    +      taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
    --- End diff --
    
    Yep, you're right. The success completely event in UT was treated as normal 
success task. I fixed this by ignore this event at the beginning of 
handleTaskCompletion.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to