Github user markhamstra commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17297#discussion_r107017201
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
    @@ -1265,64 +1280,11 @@ class DAGScheduler(
             val failedStage = stageIdToStage(task.stageId)
             val mapStage = shuffleIdToMapStage(shuffleId)
     
    -        if (failedStage.latestInfo.attemptId != task.stageAttemptId) {
    -          logInfo(s"Ignoring fetch failure from $task as it's from 
$failedStage attempt" +
    -            s" ${task.stageAttemptId} and there is a more recent attempt 
for that stage " +
    -            s"(attempt ID ${failedStage.latestInfo.attemptId}) running")
    -        } else {
    -          // It is likely that we receive multiple FetchFailed for a 
single stage (because we have
    -          // multiple tasks running concurrently on different executors). 
In that case, it is
    -          // possible the fetch failure has already been handled by the 
scheduler.
    -          if (runningStages.contains(failedStage)) {
    -            logInfo(s"Marking $failedStage (${failedStage.name}) as failed 
" +
    -              s"due to a fetch failure from $mapStage (${mapStage.name})")
    -            markStageAsFinished(failedStage, Some(failureMessage))
    -          } else {
    -            logDebug(s"Received fetch failure from $task, but its from 
$failedStage which is no " +
    -              s"longer running")
    -          }
    -
    -          val shouldAbortStage =
    -            failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId) ||
    -            disallowStageRetryForTest
    -
    -          if (shouldAbortStage) {
    -            val abortMessage = if (disallowStageRetryForTest) {
    -              "Fetch failure will not retry stage due to testing config"
    -            } else {
    -              s"""$failedStage (${failedStage.name})
    -                 |has failed the maximum allowable number of
    -                 |times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}.
    -                 |Most recent failure reason: 
$failureMessage""".stripMargin.replaceAll("\n", " ")
    -            }
    -            abortStage(failedStage, abortMessage, None)
    -          } else { // update failedStages and make sure a 
ResubmitFailedStages event is enqueued
    -            // TODO: Cancel running tasks in the failed stage -- cf. 
SPARK-17064
    -            val noResubmitEnqueued = !failedStages.contains(failedStage)
    -            failedStages += failedStage
    -            failedStages += mapStage
    -            if (noResubmitEnqueued) {
    -              // We expect one executor failure to trigger many 
FetchFailures in rapid succession,
    -              // but all of those task failures can typically be handled 
by a single resubmission of
    -              // the failed stage.  We avoid flooding the scheduler's 
event queue with resubmit
    -              // messages by checking whether a resubmit is already in the 
event queue for the
    -              // failed stage.  If there is already a resubmit enqueued 
for a different failed
    -              // stage, that event would also be sufficient to handle the 
current failed stage, but
    -              // producing a resubmit for each failed stage makes 
debugging and logging a little
    -              // simpler while not producing an overwhelming number of 
scheduler events.
    -              logInfo(
    -                s"Resubmitting $mapStage (${mapStage.name}) and " +
    -                s"$failedStage (${failedStage.name}) due to fetch failure"
    -              )
    -              messageScheduler.schedule(
    -                new Runnable {
    -                  override def run(): Unit = 
eventProcessLoop.post(ResubmitFailedStages)
    -                },
    -                DAGScheduler.RESUBMIT_TIMEOUT,
    -                TimeUnit.MILLISECONDS
    -              )
    -            }
    -          }
    +        val epochForMapOutput = 
mapOutputTracker.getEpochForMapOutput(shuffleId, mapId)
    +        // It is possible that the map output was regenerated by rerun of 
the stage and the
    +        // fetch failure is being reported for stale map output. In that 
case, we should just
    +        // ignore the fetch failure and relaunch the task with latest map 
output info.
    +        if (epochForMapOutput.nonEmpty && epochForMapOutput.get <= 
task.epoch) {
    --- End diff --
    
    I'd be inclined to do this without the extra binding and `get`:
    ```scala
            for(epochForMapOutput <- 
mapOutputTracker.getEpochForMapOutput(shuffleId, mapId) if
                epochForMapOutput <= task.epoch) {
              // Mark the map whose fetch failed as broken in the map stage
              if (mapId != -1) {
                mapStage.removeOutputLoc(mapId, bmAddress)
                mapOutputTracker.unregisterMapOutput(shuffleId, mapId, 
bmAddress)
              }
    
              // TODO: mark the executor as failed only if there were lots of 
fetch failures on it
              if (bmAddress != null) {
                handleExecutorLost(bmAddress.executorId, filesLost = true, 
Some(task.epoch))
              }
            }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to