Repository: spark
Updated Branches:
  refs/heads/master 219003775 -> 46d1203bf


[SPARK-17644][CORE] Do not add failedStages when abortStage for fetch failure

## What changes were proposed in this pull request?
| Time        |Thread 1 ,  Job1          | Thread 2 ,  Job2  |
|:-------------:|:-------------:|:-----:|
| 1 | abort stage due to FetchFailed |  |
| 2 | failedStages += failedStage |    |
| 3 |      |  task failed due to  FetchFailed |
| 4 |      |  can not post ResubmitFailedStages because failedStages is not 
empty |

Then job2 of thread2 never resubmit the failed stage and hang.

We should not add the failedStages when abortStage for fetch failure

## How was this patch tested?

added unit test

Author: w00228970 <wangf...@huawei.com>
Author: wangfei <wangfei_he...@126.com>

Closes #15213 from scwf/dag-resubmit.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/46d1203b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/46d1203b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/46d1203b

Branch: refs/heads/master
Commit: 46d1203bf2d01b219c4efc7e0e77a844c0c664da
Parents: 2190037
Author: w00228970 <wangf...@huawei.com>
Authored: Wed Sep 28 12:02:59 2016 -0700
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Wed Sep 28 12:02:59 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 24 ++++----
 .../spark/scheduler/DAGSchedulerSuite.scala     | 58 +++++++++++++++++++-
 2 files changed, 70 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/46d1203b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 5ea0b48..f251740 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1263,18 +1263,20 @@ class DAGScheduler(
               s"has failed the maximum allowable number of " +
               s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " +
               s"Most recent failure reason: ${failureMessage}", None)
-          } else if (failedStages.isEmpty) {
-            // Don't schedule an event to resubmit failed stages if failed 
isn't empty, because
-            // in that case the event will already have been scheduled.
-            // TODO: Cancel running tasks in the stage
-            logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
-              s"$failedStage (${failedStage.name}) due to fetch failure")
-            messageScheduler.schedule(new Runnable {
-              override def run(): Unit = 
eventProcessLoop.post(ResubmitFailedStages)
-            }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
+          } else {
+            if (failedStages.isEmpty) {
+              // Don't schedule an event to resubmit failed stages if failed 
isn't empty, because
+              // in that case the event will already have been scheduled.
+              // TODO: Cancel running tasks in the stage
+              logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
+                s"$failedStage (${failedStage.name}) due to fetch failure")
+              messageScheduler.schedule(new Runnable {
+                override def run(): Unit = 
eventProcessLoop.post(ResubmitFailedStages)
+              }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
+            }
+            failedStages += failedStage
+            failedStages += mapStage
           }
-          failedStages += failedStage
-          failedStages += mapStage
           // Mark the map whose fetch failed as broken in the map stage
           if (mapId != -1) {
             mapStage.removeOutputLoc(mapId, bmAddress)

http://git-wip-us.apache.org/repos/asf/spark/blob/46d1203b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 6787b30..bec95d1 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.scheduler
 
 import java.util.Properties
+import java.util.concurrent.atomic.AtomicBoolean
 
 import scala.annotation.meta.param
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
@@ -31,7 +32,7 @@ import org.apache.spark._
 import org.apache.spark.broadcast.BroadcastManager
 import org.apache.spark.rdd.RDD
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
-import org.apache.spark.shuffle.MetadataFetchFailedException
+import org.apache.spark.shuffle.{FetchFailedException, 
MetadataFetchFailedException}
 import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
 import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, 
LongAccumulator, Utils}
 
@@ -2105,6 +2106,61 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with Timeou
     assert(scheduler.getShuffleDependencies(rddE) === Set(shuffleDepA, 
shuffleDepC))
   }
 
+  test("SPARK-17644: After one stage is aborted for too many failed attempts, 
subsequent stages" +
+    "still behave correctly on fetch failures") {
+    // Runs a job that always encounters a fetch failure, so should eventually 
be aborted
+    def runJobWithPersistentFetchFailure: Unit = {
+      val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey()
+      val shuffleHandle =
+        rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _, 
_]].shuffleHandle
+      rdd1.map {
+        case (x, _) if (x == 1) =>
+          throw new FetchFailedException(
+            BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test")
+        case (x, _) => x
+      }.count()
+    }
+
+    // Runs a job that encounters a single fetch failure but succeeds on the 
second attempt
+    def runJobWithTemporaryFetchFailure: Unit = {
+      object FailThisAttempt {
+        val _fail = new AtomicBoolean(true)
+      }
+      val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey()
+      val shuffleHandle =
+        rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _, 
_]].shuffleHandle
+      rdd1.map {
+        case (x, _) if (x == 1) && FailThisAttempt._fail.getAndSet(false) =>
+          throw new FetchFailedException(
+            BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test")
+      }
+    }
+
+    failAfter(10.seconds) {
+      val e = intercept[SparkException] {
+        runJobWithPersistentFetchFailure
+      }
+      
assert(e.getMessage.contains("org.apache.spark.shuffle.FetchFailedException"))
+    }
+
+    // Run a second job that will fail due to a fetch failure.
+    // This job will hang without the fix for SPARK-17644.
+    failAfter(10.seconds) {
+      val e = intercept[SparkException] {
+        runJobWithPersistentFetchFailure
+      }
+      
assert(e.getMessage.contains("org.apache.spark.shuffle.FetchFailedException"))
+    }
+
+    failAfter(10.seconds) {
+      try {
+        runJobWithTemporaryFetchFailure
+      } catch {
+        case e: Throwable => fail("A job with one fetch failure should 
eventually succeed")
+      }
+    }
+  }
+
   /**
    * Assert that the supplied TaskSet has exactly the given hosts as its 
preferred locations.
    * Note that this checks only the host and not the executor ID.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to