Repository: spark
Updated Branches:
  refs/heads/master 44948a2e9 -> 4bd85d06e


[SPARK-5945] Spark should not retry a stage infinitely on a FetchFailedException

The ```Stage``` class now tracks whether there were a sufficient number of 
consecutive failures of that stage to trigger an abort.

To avoid an infinite loop of stage retries, we abort the job completely after 4 
consecutive stage failures for one stage. We still allow more than 4 
consecutive stage failures if there is an intervening successful attempt for 
the stage, so that in very long-lived applications, where a stage may get 
reused many times, we don't abort the job after failures that have been 
recovered from successfully.

I've added test cases to exercise the most obvious scenarios.

Author: Ilya Ganelin <ilya.gane...@capitalone.com>

Closes #5636 from ilganeli/SPARK-5945.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4bd85d06
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4bd85d06
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4bd85d06

Branch: refs/heads/master
Commit: 4bd85d06e0334c49be18c4612b04d013b37f189c
Parents: 44948a2
Author: Ilya Ganelin <ilya.gane...@capitalone.com>
Authored: Wed Sep 2 22:07:50 2015 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Wed Sep 2 22:08:24 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   |  13 +-
 .../org/apache/spark/scheduler/Stage.scala      |  30 +-
 .../spark/scheduler/DAGSchedulerSuite.scala     | 282 ++++++++++++++++++-
 3 files changed, 320 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4bd85d06/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index daf9b0f..d673cb0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1101,7 +1101,6 @@ class DAGScheduler(
             s" ${task.stageAttemptId} and there is a more recent attempt for 
that stage " +
             s"(attempt ID ${failedStage.latestInfo.attemptId}) running")
         } else {
-
           // It is likely that we receive multiple FetchFailed for a single 
stage (because we have
           // multiple tasks running concurrently on different executors). In 
that case, it is
           // possible the fetch failure has already been handled by the 
scheduler.
@@ -1117,6 +1116,11 @@ class DAGScheduler(
           if (disallowStageRetryForTest) {
             abortStage(failedStage, "Fetch failure will not retry stage due to 
testing config",
               None)
+          } else if 
(failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)) {
+            abortStage(failedStage, s"$failedStage (${failedStage.name}) " +
+              s"has failed the maximum allowable number of " +
+              s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " +
+              s"Most recent failure reason: ${failureMessage}", None)
           } else if (failedStages.isEmpty) {
             // Don't schedule an event to resubmit failed stages if failed 
isn't empty, because
             // in that case the event will already have been scheduled.
@@ -1240,10 +1244,17 @@ class DAGScheduler(
     if (errorMessage.isEmpty) {
       logInfo("%s (%s) finished in %s s".format(stage, stage.name, 
serviceTime))
       stage.latestInfo.completionTime = Some(clock.getTimeMillis())
+
+      // Clear failure count for this stage, now that it's succeeded.
+      // We only limit consecutive failures of stage attempts,so that if a 
stage is
+      // re-used many times in a long-running job, unrelated failures don't 
eventually cause the
+      // stage to be aborted.
+      stage.clearFailures()
     } else {
       stage.latestInfo.stageFailed(errorMessage.get)
       logInfo("%s (%s) failed in %s s".format(stage, stage.name, serviceTime))
     }
+
     outputCommitCoordinator.stageEnd(stage.id)
     listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
     runningStages -= stage

http://git-wip-us.apache.org/repos/asf/spark/blob/4bd85d06/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala 
b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index 1cf0685..c086535 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -46,7 +46,7 @@ import org.apache.spark.util.CallSite
  * be updated for each attempt.
  *
  */
-private[spark] abstract class Stage(
+private[scheduler] abstract class Stage(
     val id: Int,
     val rdd: RDD[_],
     val numTasks: Int,
@@ -92,6 +92,29 @@ private[spark] abstract class Stage(
    */
   private var _latestInfo: StageInfo = StageInfo.fromStage(this, nextAttemptId)
 
+  /**
+   * Set of stage attempt IDs that have failed with a FetchFailure. We keep 
track of these
+   * failures in order to avoid endless retries if a stage keeps failing with 
a FetchFailure.
+   * We keep track of each attempt ID that has failed to avoid recording 
duplicate failures if
+   * multiple tasks from the same stage attempt fail (SPARK-5945).
+   */
+  private val fetchFailedAttemptIds = new HashSet[Int]
+
+  private[scheduler] def clearFailures() : Unit = {
+    fetchFailedAttemptIds.clear()
+  }
+
+  /**
+   * Check whether we should abort the failedStage due to multiple consecutive 
fetch failures.
+   *
+   * This method updates the running set of failed stage attempts and returns
+   * true if the number of failures exceeds the allowable number of failures.
+   */
+  private[scheduler] def failedOnFetchAndShouldAbort(stageAttemptId: Int): 
Boolean = {
+    fetchFailedAttemptIds.add(stageAttemptId)
+    fetchFailedAttemptIds.size >= Stage.MAX_CONSECUTIVE_FETCH_FAILURES
+  }
+
   /** Creates a new attempt for this stage by creating a new StageInfo with a 
new attempt ID. */
   def makeNewStageAttempt(
       numPartitionsToCompute: Int,
@@ -110,3 +133,8 @@ private[spark] abstract class Stage(
     case _ => false
   }
 }
+
+private[scheduler] object Stage {
+  // The number of consecutive failures allowed before a stage is aborted
+  val MAX_CONSECUTIVE_FETCH_FAILURES = 4
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/4bd85d06/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 2e8688c..62957c6 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -26,11 +26,11 @@ import org.scalatest.concurrent.Timeouts
 import org.scalatest.time.SpanSugar._
 
 import org.apache.spark._
+import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.rdd.RDD
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
 import org.apache.spark.util.CallSite
-import org.apache.spark.executor.TaskMetrics
 
 class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler)
   extends DAGSchedulerEventProcessLoop(dagScheduler) {
@@ -473,6 +473,282 @@ class DAGSchedulerSuite
     assertDataStructuresEmpty()
   }
 
+
+  // Helper function to validate state when creating tests for task failures
+  private def checkStageId(stageId: Int, attempt: Int, stageAttempt: TaskSet) {
+    assert(stageAttempt.stageId === stageId)
+    assert(stageAttempt.stageAttemptId == attempt)
+  }
+
+
+  // Helper functions to extract commonly used code in Fetch Failure test cases
+  private def setupStageAbortTest(sc: SparkContext) {
+    sc.listenerBus.addListener(new EndListener())
+    ended = false
+    jobResult = null
+  }
+
+  // Create a new Listener to confirm that the listenerBus sees the JobEnd 
message
+  // when we abort the stage. This message will also be consumed by the 
EventLoggingListener
+  // so this will propagate up to the user.
+  var ended = false
+  var jobResult : JobResult = null
+
+  class EndListener extends SparkListener {
+    override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
+      jobResult = jobEnd.jobResult
+      ended = true
+    }
+  }
+
+  /**
+   * Common code to get the next stage attempt, confirm it's the one we 
expect, and complete it
+   * successfully.
+   *
+   * @param stageId - The current stageId
+   * @param attemptIdx - The current attempt count
+   * @param numShufflePartitions - The number of partitions in the next stage
+   */
+  private def completeShuffleMapStageSuccessfully(
+      stageId: Int,
+      attemptIdx: Int,
+      numShufflePartitions: Int): Unit = {
+    val stageAttempt = taskSets.last
+    checkStageId(stageId, attemptIdx, stageAttempt)
+    complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map {
+      case (task, idx) =>
+        (Success, makeMapStatus("host" + ('A' + idx).toChar, 
numShufflePartitions))
+    }.toSeq)
+  }
+
+  /**
+   * Common code to get the next stage attempt, confirm it's the one we 
expect, and complete it
+   * with all FetchFailure.
+   *
+   * @param stageId - The current stageId
+   * @param attemptIdx - The current attempt count
+   * @param shuffleDep - The shuffle dependency of the stage with a fetch 
failure
+   */
+  private def completeNextStageWithFetchFailure(
+      stageId: Int,
+      attemptIdx: Int,
+      shuffleDep: ShuffleDependency[_, _, _]): Unit = {
+    val stageAttempt = taskSets.last
+    checkStageId(stageId, attemptIdx, stageAttempt)
+    complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map { case (task, 
idx) =>
+      (FetchFailed(makeBlockManagerId("hostA"), shuffleDep.shuffleId, 0, idx, 
"ignored"), null)
+    }.toSeq)
+  }
+
+  /**
+   * Common code to get the next result stage attempt, confirm it's the one we 
expect, and
+   * complete it with a success where we return 42.
+   *
+   * @param stageId - The current stageId
+   * @param attemptIdx - The current attempt count
+   */
+  private def completeNextResultStageWithSuccess(stageId: Int, attemptIdx: 
Int): Unit = {
+    val stageAttempt = taskSets.last
+    checkStageId(stageId, attemptIdx, stageAttempt)
+    assert(scheduler.stageIdToStage(stageId).isInstanceOf[ResultStage])
+    complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map(_ => (Success, 
42)).toSeq)
+  }
+
+  /**
+   * In this test, we simulate a job where many tasks in the same stage fail. 
We want to show
+   * that many fetch failures inside a single stage attempt do not trigger an 
abort
+   * on their own, but only when there are enough failing stage attempts.
+   */
+  test("Single stage fetch failure should not abort the stage.") {
+    setupStageAbortTest(sc)
+
+    val parts = 8
+    val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
+    val shuffleId = shuffleDep.shuffleId
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep))
+    submit(reduceRdd, (0 until parts).toArray)
+
+    completeShuffleMapStageSuccessfully(0, 0, numShufflePartitions = parts)
+
+    completeNextStageWithFetchFailure(1, 0, shuffleDep)
+
+    // Resubmit and confirm that now all is well
+    scheduler.resubmitFailedStages()
+
+    assert(scheduler.runningStages.nonEmpty)
+    assert(!ended)
+
+    // Complete stage 0 and then stage 1 with a "42"
+    completeShuffleMapStageSuccessfully(0, 1, numShufflePartitions = parts)
+    completeNextResultStageWithSuccess(1, 1)
+
+    // Confirm job finished succesfully
+    sc.listenerBus.waitUntilEmpty(1000)
+    assert(ended === true)
+    assert(results === (0 until parts).map { idx => idx -> 42 }.toMap)
+    assertDataStructuresEmpty()
+  }
+
+  /**
+   * In this test we simulate a job failure where the first stage completes 
successfully and
+   * the second stage fails due to a fetch failure. Multiple successive fetch 
failures of a stage
+   * trigger an overall job abort to avoid endless retries.
+   */
+  test("Multiple consecutive stage fetch failures should lead to job being 
aborted.") {
+    setupStageAbortTest(sc)
+
+    val shuffleMapRdd = new MyRDD(sc, 2, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
+    val shuffleId = shuffleDep.shuffleId
+    val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
+    submit(reduceRdd, Array(0, 1))
+
+    for (attempt <- 0 until Stage.MAX_CONSECUTIVE_FETCH_FAILURES) {
+      // Complete all the tasks for the current attempt of stage 0 successfully
+      completeShuffleMapStageSuccessfully(0, attempt, numShufflePartitions = 2)
+
+      // Now we should have a new taskSet, for a new attempt of stage 1.
+      // Fail all these tasks with FetchFailure
+      completeNextStageWithFetchFailure(1, attempt, shuffleDep)
+
+      // this will trigger a resubmission of stage 0, since we've lost some of 
its
+      // map output, for the next iteration through the loop
+      scheduler.resubmitFailedStages()
+
+      if (attempt < Stage.MAX_CONSECUTIVE_FETCH_FAILURES - 1) {
+        assert(scheduler.runningStages.nonEmpty)
+        assert(!ended)
+      } else {
+        // Stage should have been aborted and removed from running stages
+        assertDataStructuresEmpty()
+        sc.listenerBus.waitUntilEmpty(1000)
+        assert(ended)
+        jobResult match {
+          case JobFailed(reason) =>
+            assert(reason.getMessage.contains("ResultStage 1 () has failed the 
maximum"))
+          case other => fail(s"expected JobFailed, not $other")
+        }
+      }
+    }
+  }
+
+  /**
+   * In this test, we create a job with two consecutive shuffles, and simulate 
2 failures for each
+   * shuffle fetch. In total In total, the job has had four failures overall 
but not four failures
+   * for a particular stage, and as such should not be aborted.
+   */
+  test("Failures in different stages should not trigger an overall abort") {
+    setupStageAbortTest(sc)
+
+    val shuffleOneRdd = new MyRDD(sc, 2, Nil).cache()
+    val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null)
+    val shuffleTwoRdd = new MyRDD(sc, 2, List(shuffleDepOne)).cache()
+    val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null)
+    val finalRdd = new MyRDD(sc, 1, List(shuffleDepTwo))
+    submit(finalRdd, Array(0))
+
+    // In the first two iterations, Stage 0 succeeds and stage 1 fails. In the 
next two iterations,
+    // stage 2 fails.
+    for (attempt <- 0 until Stage.MAX_CONSECUTIVE_FETCH_FAILURES) {
+      // Complete all the tasks for the current attempt of stage 0 successfully
+      completeShuffleMapStageSuccessfully(0, attempt, numShufflePartitions = 2)
+
+      if (attempt < Stage.MAX_CONSECUTIVE_FETCH_FAILURES / 2) {
+        // Now we should have a new taskSet, for a new attempt of stage 1.
+        // Fail all these tasks with FetchFailure
+        completeNextStageWithFetchFailure(1, attempt, shuffleDepOne)
+      } else {
+        completeShuffleMapStageSuccessfully(1, attempt, numShufflePartitions = 
1)
+
+        // Fail stage 2
+        completeNextStageWithFetchFailure(2, attempt - 
Stage.MAX_CONSECUTIVE_FETCH_FAILURES / 2,
+          shuffleDepTwo)
+      }
+
+      // this will trigger a resubmission of stage 0, since we've lost some of 
its
+      // map output, for the next iteration through the loop
+      scheduler.resubmitFailedStages()
+    }
+
+    completeShuffleMapStageSuccessfully(0, 4, numShufflePartitions = 2)
+    completeShuffleMapStageSuccessfully(1, 4, numShufflePartitions = 1)
+
+    // Succeed stage2 with a "42"
+    completeNextResultStageWithSuccess(2, 
Stage.MAX_CONSECUTIVE_FETCH_FAILURES/2)
+
+    assert(results === Map(0 -> 42))
+    assertDataStructuresEmpty()
+  }
+
+  /**
+   * In this test we demonstrate that only consecutive failures trigger a 
stage abort. A stage may
+   * fail multiple times, succeed, then fail a few more times (because its run 
again by downstream
+   * dependencies). The total number of failed attempts for one stage will go 
over the limit,
+   * but that doesn't matter, since they have successes in the middle.
+   */
+  test("Non-consecutive stage failures don't trigger abort") {
+    setupStageAbortTest(sc)
+
+    val shuffleOneRdd = new MyRDD(sc, 2, Nil).cache()
+    val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null)
+    val shuffleTwoRdd = new MyRDD(sc, 2, List(shuffleDepOne)).cache()
+    val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null)
+    val finalRdd = new MyRDD(sc, 1, List(shuffleDepTwo))
+    submit(finalRdd, Array(0))
+
+    // First, execute stages 0 and 1, failing stage 1 up to MAX-1 times.
+    for (attempt <- 0 until Stage.MAX_CONSECUTIVE_FETCH_FAILURES - 1) {
+      // Make each task in stage 0 success
+      completeShuffleMapStageSuccessfully(0, attempt, numShufflePartitions = 2)
+
+      // Now we should have a new taskSet, for a new attempt of stage 1.
+      // Fail these tasks with FetchFailure
+      completeNextStageWithFetchFailure(1, attempt, shuffleDepOne)
+
+      scheduler.resubmitFailedStages()
+
+      // Confirm we have not yet aborted
+      assert(scheduler.runningStages.nonEmpty)
+      assert(!ended)
+    }
+
+    // Rerun stage 0 and 1 to step through the task set
+    completeShuffleMapStageSuccessfully(0, 3, numShufflePartitions = 2)
+    completeShuffleMapStageSuccessfully(1, 3, numShufflePartitions = 1)
+
+    // Fail stage 2 so that stage 1 is resubmitted when we call 
scheduler.resubmitFailedStages()
+    completeNextStageWithFetchFailure(2, 0, shuffleDepTwo)
+
+    scheduler.resubmitFailedStages()
+
+    // Rerun stage 0 to step through the task set
+    completeShuffleMapStageSuccessfully(0, 4, numShufflePartitions = 2)
+
+    // Now again, fail stage 1 (up to MAX_FAILURES) but confirm that this 
doesn't trigger an abort
+    // since we succeeded in between.
+    completeNextStageWithFetchFailure(1, 4, shuffleDepOne)
+
+    scheduler.resubmitFailedStages()
+
+    // Confirm we have not yet aborted
+    assert(scheduler.runningStages.nonEmpty)
+    assert(!ended)
+
+    // Next, succeed all and confirm output
+    // Rerun stage 0 + 1
+    completeShuffleMapStageSuccessfully(0, 5, numShufflePartitions = 2)
+    completeShuffleMapStageSuccessfully(1, 5, numShufflePartitions = 1)
+
+    // Succeed stage 2 and verify results
+    completeNextResultStageWithSuccess(2, 1)
+
+    assertDataStructuresEmpty()
+    sc.listenerBus.waitUntilEmpty(1000)
+    assert(ended === true)
+    assert(results === Map(0 -> 42))
+  }
+
   test("trivial shuffle with multiple fetch failures") {
     val shuffleMapRdd = new MyRDD(sc, 2, Nil)
     val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
@@ -810,7 +1086,7 @@ class DAGSchedulerSuite
     submit(finalRdd, Array(0))
     cacheLocations(shuffleTwoRdd.id -> 0) = Seq(makeBlockManagerId("hostD"))
     cacheLocations(shuffleTwoRdd.id -> 1) = Seq(makeBlockManagerId("hostC"))
-    // complete stage 2
+    // complete stage 0
     complete(taskSets(0), Seq(
         (Success, makeMapStatus("hostA", 2)),
         (Success, makeMapStatus("hostB", 2))))
@@ -818,7 +1094,7 @@ class DAGSchedulerSuite
     complete(taskSets(1), Seq(
         (Success, makeMapStatus("hostA", 1)),
         (Success, makeMapStatus("hostB", 1))))
-    // pretend stage 0 failed because hostA went down
+    // pretend stage 2 failed because hostA went down
     complete(taskSets(2), Seq(
         (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 
0, "ignored"), null)))
     // TODO assert this:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to