Repository: spark
Updated Branches:
  refs/heads/branch-1.6 d8bfc025c -> 10272d5c9


[SPARK-10192][CORE] simple test w/ failure involving a shared dependency

just trying to increase test coverage in the scheduler, this already works.  It 
includes a regression test for SPARK-9809

copied some test utils from https://github.com/apache/spark/pull/5636, we can 
wait till that is merged first

Author: Imran Rashid <iras...@cloudera.com>

Closes #8402 from squito/test_retry_in_shared_shuffle_dep.

(cherry picked from commit 33112f9c48680c33d663978f76806ebf0ea39789)
Signed-off-by: Andrew Or <and...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/10272d5c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/10272d5c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/10272d5c

Branch: refs/heads/branch-1.6
Commit: 10272d5c98694e5a0cfef2587a81be7ce609cbb7
Parents: d8bfc02
Author: Imran Rashid <iras...@cloudera.com>
Authored: Tue Nov 10 16:50:22 2015 -0800
Committer: Andrew Or <and...@databricks.com>
Committed: Tue Nov 10 16:50:34 2015 -0800

----------------------------------------------------------------------
 .../spark/scheduler/DAGSchedulerSuite.scala     | 51 +++++++++++++++++++-
 1 file changed, 49 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/10272d5c/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 3816b8c..068b49b 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -594,11 +594,17 @@ class DAGSchedulerSuite
    * @param stageId - The current stageId
    * @param attemptIdx - The current attempt count
    */
-  private def completeNextResultStageWithSuccess(stageId: Int, attemptIdx: 
Int): Unit = {
+  private def completeNextResultStageWithSuccess(
+      stageId: Int,
+      attemptIdx: Int,
+      partitionToResult: Int => Int = _ => 42): Unit = {
     val stageAttempt = taskSets.last
     checkStageId(stageId, attemptIdx, stageAttempt)
     assert(scheduler.stageIdToStage(stageId).isInstanceOf[ResultStage])
-    complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map(_ => (Success, 
42)).toSeq)
+    val taskResults = stageAttempt.tasks.zipWithIndex.map { case (task, idx) =>
+      (Success, partitionToResult(idx))
+    }
+    complete(stageAttempt, taskResults.toSeq)
   }
 
   /**
@@ -1055,6 +1061,47 @@ class DAGSchedulerSuite
   }
 
   /**
+   * Run two jobs, with a shared dependency.  We simulate a fetch failure in 
the second job, which
+   * requires regenerating some outputs of the shared dependency.  One key 
aspect of this test is
+   * that the second job actually uses a different stage for the shared 
dependency (a "skipped"
+   * stage).
+   */
+  test("shuffle fetch failure in a reused shuffle dependency") {
+    // Run the first job successfully, which creates one shuffle dependency
+
+    val shuffleMapRdd = new MyRDD(sc, 2, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
+    val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
+    submit(reduceRdd, Array(0, 1))
+
+    completeShuffleMapStageSuccessfully(0, 0, 2)
+    completeNextResultStageWithSuccess(1, 0)
+    assert(results === Map(0 -> 42, 1 -> 42))
+    assertDataStructuresEmpty()
+
+    // submit another job w/ the shared dependency, and have a fetch failure
+    val reduce2 = new MyRDD(sc, 2, List(shuffleDep))
+    submit(reduce2, Array(0, 1))
+    // Note that the stage numbering here is only b/c the shared dependency 
produces a new, skipped
+    // stage.  If instead it reused the existing stage, then this would be 
stage 2
+    completeNextStageWithFetchFailure(3, 0, shuffleDep)
+    scheduler.resubmitFailedStages()
+
+    // the scheduler now creates a new task set to regenerate the missing map 
output, but this time
+    // using a different stage, the "skipped" one
+
+    // SPARK-9809 -- this stage is submitted without a task for each partition 
(because some of
+    // the shuffle map output is still available from stage 0); make sure 
we've still got internal
+    // accumulators setup
+    assert(scheduler.stageIdToStage(2).internalAccumulators.nonEmpty)
+    completeShuffleMapStageSuccessfully(2, 0, 2)
+    completeNextResultStageWithSuccess(3, 1, idx => idx + 1234)
+    assert(results === Map(0 -> 1234, 1 -> 1235))
+
+    assertDataStructuresEmpty()
+  }
+
+  /**
    * This test runs a three stage job, with a fetch failure in stage 1.  but 
during the retry, we
    * have completions from both the first & second attempt of stage 1.  So all 
the map output is
    * available before we finish any task set for stage 1.  We want to make 
sure that we don't


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to