Github user jiangxb1987 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21758#discussion_r203738500
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
    @@ -1386,29 +1418,90 @@ class DAGScheduler(
                   )
                 }
               }
    -          // Mark the map whose fetch failed as broken in the map stage
    -          if (mapId != -1) {
    -            mapOutputTracker.unregisterMapOutput(shuffleId, mapId, 
bmAddress)
    -          }
    +        }
     
    -          // TODO: mark the executor as failed only if there were lots of 
fetch failures on it
    -          if (bmAddress != null) {
    -            val hostToUnregisterOutputs = if 
(env.blockManager.externalShuffleServiceEnabled &&
    -              unRegisterOutputOnHostOnFetchFailure) {
    -              // We had a fetch failure with the external shuffle service, 
so we
    -              // assume all shuffle data on the node is bad.
    -              Some(bmAddress.host)
    -            } else {
    -              // Unregister shuffle data just for one executor (we don't 
have any
    -              // reason to believe shuffle data has been lost for the 
entire host).
    -              None
    +      case failure: TaskFailedReason if task.isBarrier =>
    +        // Also handle the task failed reasons here.
    +        failure match {
    +          case Resubmitted =>
    +            logInfo("Resubmitted " + task + ", so marking it as still 
running")
    +            stage match {
    +              case sms: ShuffleMapStage =>
    +                sms.pendingPartitions += task.partitionId
    +
    +              case _ =>
    +                assert(false, "TaskSetManagers should only send 
Resubmitted task statuses for " +
    +                  "tasks in ShuffleMapStages.")
                 }
    -            removeExecutorAndUnregisterOutputs(
    -              execId = bmAddress.executorId,
    -              fileLost = true,
    -              hostToUnregisterOutputs = hostToUnregisterOutputs,
    -              maybeEpoch = Some(task.epoch))
    +
    +          case _ => // Do nothing.
    +        }
    +
    +        // Always fail the current stage and retry all the tasks when a 
barrier task fail.
    +        val failedStage = stageIdToStage(task.stageId)
    +        logInfo(s"Marking $failedStage (${failedStage.name}) as failed due 
to a barrier task " +
    +          "failed.")
    +        val message = s"Stage failed because barrier task $task finished 
unsuccessfully. " +
    +          s"${failure.toErrorString}"
    +        try {
    +          // cancelTasks will fail if a SchedulerBackend does not 
implement killTask
    +          taskScheduler.cancelTasks(stageId, interruptThread = false)
    +        } catch {
    +          case e: UnsupportedOperationException =>
    +            // Cannot continue with barrier stage if failed to cancel 
zombie barrier tasks.
    --- End diff --
    
    So far I don't see a easy way to mark a running task as not needed and 
prevent it from writing shuffle files/committing. Maybe we shall leave a TODO 
here?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to