Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21758#discussion_r203618106
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
    @@ -1349,6 +1339,48 @@ class DAGScheduler(
                   s"longer running")
               }
     
    +          if (mapStage.rdd.isBarrier()) {
    +            // Mark all the map as broken in the map stage, to ensure 
retry all the tasks on
    +            // resubmitted stage attempt.
    +            mapOutputTracker.unregisterAllMapOutput(shuffleId)
    +          } else if (mapId != -1) {
    +            // Mark the map whose fetch failed as broken in the map stage
    +            mapOutputTracker.unregisterMapOutput(shuffleId, mapId, 
bmAddress)
    +          }
    +
    +          if (failedStage.rdd.isBarrier()) {
    +            failedStage match {
    +              case mapStage: ShuffleMapStage =>
    +                // Mark all the map as broken in the map stage, to ensure 
retry all the tasks on
    +                // resubmitted stage attempt.
    +                
mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId)
    +
    +              case resultStage: ResultStage =>
    +                // Mark all the partitions of the result stage to be not 
finished, to ensure retry
    +                // all the tasks on resubmitted stage attempt.
    +                
resultStage.activeJob.map(_.markAllPartitionsAsUnfinished())
    +            }
    +          }
    +
    +          // TODO: mark the executor as failed only if there were lots of 
fetch failures on it
    +          if (bmAddress != null) {
    --- End diff --
    
    why move this before the `if (shouldAbortStage) { ...`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to