Github user squito commented on the pull request: https://github.com/apache/spark/pull/4055#issuecomment-114653145 @suyanNone I think the problem w/ the test case as-is, is that you are still having multiple completion events for the exact same task, which we doesn't really happen. I think the situation you are looking to recreate is a little different -- after a fetch failure, what happens is the other tasks in that task set will continue to run. (the task set is marked as a zombie, but it doesn't stop the tasks from running). So you can end up with some tasks finishing from the zombie stage attempt, at the same time as some tasks finish from the new attempt, and that's how you can trigger the situation you are describing. Here's the test case I came up with: ```scala test("run with ShuffleMapStage retry") { val firstRDD = new MyRDD(sc, 3, Nil) val firstShuffleDep = new ShuffleDependency(firstRDD, null) val firstShuffleId = firstShuffleDep.shuffleId val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep)) val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) val reduceRdd = new MyRDD(sc, 1, List(shuffleDep)) submit(reduceRdd, Array(0)) // things start out smoothly, stage 0 completes with no issues complete(taskSets(0), Seq( (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.size)), (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.size)), (Success, makeMapStatus("hostA", shuffleMapRdd.partitions.size)) )) // then one executor dies, and a task fails in stage 1 runEvent(ExecutorLost("exec-hostA")) runEvent(CompletionEvent(taskSets(1).tasks(0), FetchFailed(null, firstShuffleId, 2, 0, "Fetch failed"), null, null, createFakeTaskInfo(), null)) // so we resubmit stage 0, which completes happily scheduler.resubmitFailedStages() val stage0Resubmit = taskSets(2) assert(stage0Resubmit.stageId == 0) assert(stage0Resubmit.attempt === 1) val task = stage0Resubmit.tasks(0) assert(task.partitionId === 2) runEvent(CompletionEvent(task, Success, makeMapStatus("hostC", shuffleMapRdd.partitions.size), null, createFakeTaskInfo(), null)) // now here is where things get tricky : we will now have a task set representing // the second attempt for stage 1, but we *also* have some tasks for the first attempt for // stage 1 still going val stage1Resubmit = taskSets(3) assert(stage1Resubmit.stageId == 1) assert(stage1Resubmit.attempt === 1) assert(stage1Resubmit.tasks.length === 3) // we'll have some tasks finish from the first attempt, and some finish from the second attempt, // so that we actually have all stage outputs, though no attempt has completed all its // tasks runEvent(CompletionEvent(taskSets(3).tasks(0), Success, makeMapStatus("hostC", reduceRdd.partitions.size), null, createFakeTaskInfo(), null)) runEvent(CompletionEvent(taskSets(3).tasks(1), Success, makeMapStatus("hostC", reduceRdd.partitions.size), null, createFakeTaskInfo(), null)) // late task finish from the first attempt runEvent(CompletionEvent(taskSets(1).tasks(2), Success, makeMapStatus("hostB", reduceRdd.partitions.size), null, createFakeTaskInfo(), null)) // What should happen now is that we submit stage 2. However, we might not see an error // b/c of DAGScheduler's error handling (it tends to swallow errors and just log them). But // we can check some conditions. // Note that the really important thing here is not so much that we submit stage 2 *immediately* // but that we don't end up with some error from these interleaved completions. It would also // be OK (though sub-optimal) if stage 2 simply waited until the resubmission of stage 1 had // all its tasks complete // check that we have all the map output for stage 0 (it should have been there even before // the last round of completions from stage 1, but just to double check it hasn't been messed // up) (0 until 3).foreach { reduceIdx => val arr = mapOutputTracker.getServerStatuses(0, reduceIdx) assert(arr != null) assert(arr.nonEmpty) } // and check we have all the map output for stage 1 (0 until 1).foreach { reduceIdx => val arr = mapOutputTracker.getServerStatuses(1,reduceIdx) assert(arr != null) assert(arr.nonEmpty) } // and check that stage 2 has been submitted assert(taskSets.size == 5) val stage2TaskSet = taskSets(4) assert(stage2TaskSet.stageId == 2) assert(stage2TaskSet.attempt == 0) } ``` And sure enough, your changes to `Task`, by adding `equals` and `hashCode` make that test case pass. However, I do **not** think that is the right change. I don't like the idea of tasks being considered equal just because they are the same stage & partition -- you can have multiple tasks running concurrently for the same stage & partition from different attempts, and it seems weird to consider them equal. A simple solution might be to just change `stage.pendingTasks` to key on stage & partition, without changing `Task` for all uses. (eg., make `stage.pendingTasks` a `Map[(Int,Int),Task]`.) I think a better option would be to change the logic of when we register the mapOutputs, to ignore `stage.pendingTasks` completely, and just use `stage.isAvailable`. After all, the fundamental problem here is that `DAGScheduler` is using two different metrics to decide when a stage is complete, and in the scenario you've discovered, those two metrics differ. Well, we should just use one metric in all places (as long as its the right one). Another general issue this points out is that the semantics of some of these variables really aren't that clear ... eg. should `stage.pendingTasks` keep track of tasks from "zombie" taskSets?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org