Github user squito commented on the pull request:

    https://github.com/apache/spark/pull/4055#issuecomment-114653145
  
    @suyanNone  I think the problem w/ the test case as-is, is that you are 
still having multiple completion events for the exact same task, which we 
doesn't really happen.  I think the situation you are looking to recreate is a 
little different -- after a fetch failure, what happens is the other tasks in 
that task set will continue to run.  (the task set is marked as a zombie, but 
it doesn't stop the tasks from running).  So you can end up with some tasks 
finishing from the zombie stage attempt, at the same time as some tasks finish 
from the new attempt, and that's how you can trigger the situation you are 
describing.
    
    Here's the test case I came up with:
    
    ```scala
      test("run with ShuffleMapStage retry") {
        val firstRDD = new MyRDD(sc, 3, Nil)
        val firstShuffleDep = new ShuffleDependency(firstRDD, null)
        val firstShuffleId = firstShuffleDep.shuffleId
        val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep))
        val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
        val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
        submit(reduceRdd, Array(0))
    
        // things start out smoothly, stage 0 completes with no issues
        complete(taskSets(0), Seq(
          (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.size)),
          (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.size)),
          (Success, makeMapStatus("hostA", shuffleMapRdd.partitions.size))
        ))
    
        // then one executor dies, and a task fails in stage 1
        runEvent(ExecutorLost("exec-hostA"))
        runEvent(CompletionEvent(taskSets(1).tasks(0),
          FetchFailed(null, firstShuffleId, 2, 0, "Fetch failed"),
          null, null, createFakeTaskInfo(), null))
        // so we resubmit stage 0, which completes happily
        scheduler.resubmitFailedStages()
        val stage0Resubmit = taskSets(2)
        assert(stage0Resubmit.stageId == 0)
        assert(stage0Resubmit.attempt === 1)
        val task = stage0Resubmit.tasks(0)
        assert(task.partitionId === 2)
        runEvent(CompletionEvent(task, Success,
          makeMapStatus("hostC", shuffleMapRdd.partitions.size), null, 
createFakeTaskInfo(), null))
    
        // now here is where things get tricky : we will now have a task set 
representing
        // the second attempt for stage 1, but we *also* have some tasks for 
the first attempt for
        // stage 1 still going
        val stage1Resubmit = taskSets(3)
        assert(stage1Resubmit.stageId == 1)
        assert(stage1Resubmit.attempt === 1)
        assert(stage1Resubmit.tasks.length === 3)
    
        // we'll have some tasks finish from the first attempt, and some finish 
from the second attempt,
        // so that we actually have all stage outputs, though no attempt has 
completed all its
        // tasks
        runEvent(CompletionEvent(taskSets(3).tasks(0), Success,
          makeMapStatus("hostC", reduceRdd.partitions.size), null, 
createFakeTaskInfo(), null))
        runEvent(CompletionEvent(taskSets(3).tasks(1), Success,
          makeMapStatus("hostC", reduceRdd.partitions.size), null, 
createFakeTaskInfo(), null))
        // late task finish from the first attempt
        runEvent(CompletionEvent(taskSets(1).tasks(2), Success,
          makeMapStatus("hostB", reduceRdd.partitions.size), null, 
createFakeTaskInfo(), null))
    
        // What should happen now is that we submit stage 2.  However, we might 
not see an error
        // b/c of DAGScheduler's error handling (it tends to swallow errors and 
just log them).  But
        // we can check some conditions.
        // Note that the really important thing here is not so much that we 
submit stage 2 *immediately*
        // but that we don't end up with some error from these interleaved 
completions.  It would also
        // be OK (though sub-optimal) if stage 2 simply waited until the 
resubmission of stage 1 had
        // all its tasks complete
    
        // check that we have all the map output for stage 0 (it should have 
been there even before
        // the last round of completions from stage 1, but just to double check 
it hasn't been messed
        // up)
        (0 until 3).foreach { reduceIdx =>
          val arr = mapOutputTracker.getServerStatuses(0, reduceIdx)
          assert(arr != null)
          assert(arr.nonEmpty)
        }
    
        // and check we have all the map output for stage 1
        (0 until 1).foreach { reduceIdx =>
          val arr = mapOutputTracker.getServerStatuses(1,reduceIdx)
          assert(arr != null)
          assert(arr.nonEmpty)
        }
    
        // and check that stage 2 has been submitted
        assert(taskSets.size == 5)
        val stage2TaskSet = taskSets(4)
        assert(stage2TaskSet.stageId == 2)
        assert(stage2TaskSet.attempt == 0)
      }
    ```
    
    And sure enough, your changes to `Task`, by adding `equals` and `hashCode` 
make that test case pass.  
    
    However, I do **not** think that is the right change.  I don't like the 
idea of tasks being considered equal just because they are the same stage & 
partition -- you can have multiple tasks running concurrently for the same 
stage & partition from different attempts, and it seems weird to consider them 
equal.
    
    A simple solution might be to just change `stage.pendingTasks` to key on 
stage & partition, without changing `Task` for all uses.  (eg., make 
`stage.pendingTasks` a `Map[(Int,Int),Task]`.)
    
    I think a better option would be to change the logic of when we register 
the mapOutputs, to ignore `stage.pendingTasks` completely, and just use 
`stage.isAvailable`.  After all, the fundamental problem here is that 
`DAGScheduler` is using two different metrics to decide when a stage is 
complete, and in the scenario you've discovered, those two metrics differ.  
Well, we should just use one metric in all places (as long as its the right 
one).
    
    Another general issue this points out is that the semantics of some of 
these variables really aren't that clear ... eg. should `stage.pendingTasks` 
keep track of tasks from "zombie" taskSets?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to