Github user xuanyuanking commented on the issue:

    https://github.com/apache/spark/pull/20930
  
    The scenario can be reproduced by below test case added in 
`DAGSchedulerSuite`
    ```scala
    /**
       * This tests the case where origin task success after speculative task 
got FetchFailed
       * before.
       */
      test("[SPARK-23811] Fetch failed task should kill other attempt") {
        // Create 3 RDDs with shuffle dependencies on each other: rddA <--- 
rddB <--- rddC
        val rddA = new MyRDD(sc, 2, Nil)
        val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
        val shuffleIdA = shuffleDepA.shuffleId
    
        val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = 
mapOutputTracker)
        val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
    
        val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = 
mapOutputTracker)
    
        submit(rddC, Array(0, 1))
    
        // Complete both tasks in rddA.
        assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
        complete(taskSets(0), Seq(
          (Success, makeMapStatus("hostA", 2)),
          (Success, makeMapStatus("hostB", 2))))
    
        // The first task success
        runEvent(makeCompletionEvent(
          taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2)))
    
        // The second task's speculative attempt fails first, but task self 
still running.
        // This may caused by ExecutorLost.
        runEvent(makeCompletionEvent(
          taskSets(1).tasks(1),
          FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"),
          null))
        // Check currently missing partition
        
assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size 
=== 1)
        val missingPartition = 
mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get(0)
    
        // The second result task self success soon
        runEvent(makeCompletionEvent(
          taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
        // No missing partitions here, this will cause child stage never succeed
        
assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size 
=== 0)
      }
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to