Github user markhamstra commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    The way that I am thinking about this right now is that @kayousterhout is 
on the right track with the early return at 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1141
 , but that her proposed `stage...attemptId != task.stageAttemptId` is broader 
than it needs to be.  My idea is that we want to be throwing away task results 
from earlier attempts that were run on executors that failed (on the 
presumption that one fetch failure means that other fetches from there are also 
going to fail), but that if the executor didn't fail, then the outputs from 
earlier attempts of tasks that complete late but successfully on still-good 
executors should still be valid and available, so we should accept them as 
though they were successful task completions for the current attempt.
    
    What you end up with is that if-statement now looking like:
    ```scala
        val stageHasBeenCancelled = !stageIdToStage.contains(task.stageId)
        val shuffleMapTaskIsFromFailedExecutor = task match {
          case smt: ShuffleMapTask =>
            val status = event.result.asInstanceOf[MapStatus]
            val execId = status.location.executorId
            failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)
          case _ => false
        }
        if (stageHasBeenCancelled || shuffleMapTaskIsFromFailedExecutor) {
          return
        }
    ```
    ...and then the `failedEpoch.contains(execId) && smt.epoch <= 
failedEpoch(execId)` check can be removed from `case smt: ShuffleMapTask =>`.
    
    If we can do it cleanly, I think we should be avoiding re-running Tasks 
that complete successfully and should still be available.  This is a bit 
different from the intent of SPARK-14649, which I am reading as an effort not 
to ignore the results of long-running tasks that start and eventually complete 
on an executor on which some other tasks actually run into fetch failures.  I'm 
really only trying to preserve the results of successful tasks run on executors 
that haven't failed.
    
    Unfortunately, the DAGSchedulerSuite doesn't agree with my intentions, 
because the above change actually leads to multiple test failures.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to