Github user kiszk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21758#discussion_r204235813
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
    @@ -1411,6 +1420,77 @@ class DAGScheduler(
               }
             }
     
    +      case failure: TaskFailedReason if task.isBarrier =>
    +        // Also handle the task failed reasons here.
    +        failure match {
    +          case Resubmitted =>
    +            handleResubmittedFailure(task, stage)
    +
    +          case _ => // Do nothing.
    +        }
    +
    +        // Always fail the current stage and retry all the tasks when a 
barrier task fail.
    +        val failedStage = stageIdToStage(task.stageId)
    +        logInfo(s"Marking $failedStage (${failedStage.name}) as failed due 
to a barrier task " +
    +          "failed.")
    +        val message = s"Stage failed because barrier task $task finished 
unsuccessfully. " +
    +          s"${failure.toErrorString}"
    +        try {
    +          // cancelTasks will fail if a SchedulerBackend does not 
implement killTask
    +          taskScheduler.cancelTasks(stageId, interruptThread = false)
    +        } catch {
    +          case e: UnsupportedOperationException =>
    +            // Cannot continue with barrier stage if failed to cancel 
zombie barrier tasks.
    +            // TODO SPARK-24877 leave the zombie tasks and ignore their 
completion events.
    +            logWarning(s"Could not cancel tasks for stage $stageId", e)
    +            abortStage(failedStage, "Could not cancel zombie barrier tasks 
for stage " +
    +              s"$failedStage (${failedStage.name})", Some(e))
    +        }
    +        markStageAsFinished(failedStage, Some(message))
    +
    +        failedStage.failedAttemptIds.add(task.stageAttemptId)
    +        // TODO Refactor the failure handling logic to combine similar 
code with that of
    +        // FetchFailed.
    +        val shouldAbortStage =
    +          failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts 
||
    +            disallowStageRetryForTest
    +
    +        if (shouldAbortStage) {
    +          val abortMessage = if (disallowStageRetryForTest) {
    +            "Barrier stage will not retry stage due to testing config"
    +          } else {
    +            s"""$failedStage (${failedStage.name})
    +               |has failed the maximum allowable number of
    +               |times: $maxConsecutiveStageAttempts.
    +               |Most recent failure reason: $message
    +          """.stripMargin.replaceAll("\n", " ")
    --- End diff --
    
    nit: need more spaces?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to