spark git commit: [SPARK-10192][CORE] simple test w/ failure involving a shared dependency

2015-11-10 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master c0e48dfa6 -> 33112f9c4


[SPARK-10192][CORE] simple test w/ failure involving a shared dependency

just trying to increase test coverage in the scheduler, this already works.  It 
includes a regression test for SPARK-9809

copied some test utils from https://github.com/apache/spark/pull/5636, we can 
wait till that is merged first

Author: Imran Rashid 

Closes #8402 from squito/test_retry_in_shared_shuffle_dep.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/33112f9c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/33112f9c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/33112f9c

Branch: refs/heads/master
Commit: 33112f9c48680c33d663978f76806ebf0ea39789
Parents: c0e48df
Author: Imran Rashid 
Authored: Tue Nov 10 16:50:22 2015 -0800
Committer: Andrew Or 
Committed: Tue Nov 10 16:50:22 2015 -0800

--
 .../spark/scheduler/DAGSchedulerSuite.scala | 51 +++-
 1 file changed, 49 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/33112f9c/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 3816b8c..068b49b 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -594,11 +594,17 @@ class DAGSchedulerSuite
* @param stageId - The current stageId
* @param attemptIdx - The current attempt count
*/
-  private def completeNextResultStageWithSuccess(stageId: Int, attemptIdx: 
Int): Unit = {
+  private def completeNextResultStageWithSuccess(
+  stageId: Int,
+  attemptIdx: Int,
+  partitionToResult: Int => Int = _ => 42): Unit = {
 val stageAttempt = taskSets.last
 checkStageId(stageId, attemptIdx, stageAttempt)
 assert(scheduler.stageIdToStage(stageId).isInstanceOf[ResultStage])
-complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map(_ => (Success, 
42)).toSeq)
+val taskResults = stageAttempt.tasks.zipWithIndex.map { case (task, idx) =>
+  (Success, partitionToResult(idx))
+}
+complete(stageAttempt, taskResults.toSeq)
   }
 
   /**
@@ -1055,6 +1061,47 @@ class DAGSchedulerSuite
   }
 
   /**
+   * Run two jobs, with a shared dependency.  We simulate a fetch failure in 
the second job, which
+   * requires regenerating some outputs of the shared dependency.  One key 
aspect of this test is
+   * that the second job actually uses a different stage for the shared 
dependency (a "skipped"
+   * stage).
+   */
+  test("shuffle fetch failure in a reused shuffle dependency") {
+// Run the first job successfully, which creates one shuffle dependency
+
+val shuffleMapRdd = new MyRDD(sc, 2, Nil)
+val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
+val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
+submit(reduceRdd, Array(0, 1))
+
+completeShuffleMapStageSuccessfully(0, 0, 2)
+completeNextResultStageWithSuccess(1, 0)
+assert(results === Map(0 -> 42, 1 -> 42))
+assertDataStructuresEmpty()
+
+// submit another job w/ the shared dependency, and have a fetch failure
+val reduce2 = new MyRDD(sc, 2, List(shuffleDep))
+submit(reduce2, Array(0, 1))
+// Note that the stage numbering here is only b/c the shared dependency 
produces a new, skipped
+// stage.  If instead it reused the existing stage, then this would be 
stage 2
+completeNextStageWithFetchFailure(3, 0, shuffleDep)
+scheduler.resubmitFailedStages()
+
+// the scheduler now creates a new task set to regenerate the missing map 
output, but this time
+// using a different stage, the "skipped" one
+
+// SPARK-9809 -- this stage is submitted without a task for each partition 
(because some of
+// the shuffle map output is still available from stage 0); make sure 
we've still got internal
+// accumulators setup
+assert(scheduler.stageIdToStage(2).internalAccumulators.nonEmpty)
+completeShuffleMapStageSuccessfully(2, 0, 2)
+completeNextResultStageWithSuccess(3, 1, idx => idx + 1234)
+assert(results === Map(0 -> 1234, 1 -> 1235))
+
+assertDataStructuresEmpty()
+  }
+
+  /**
* This test runs a three stage job, with a fetch failure in stage 1.  but 
during the retry, we
* have completions from both the first & second attempt of stage 1.  So all 
the map output is
* available before we finish any task set for stage 1.  We want to make 
sure that we don't



spark git commit: [SPARK-10192][CORE] simple test w/ failure involving a shared dependency

2015-11-10 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 d8bfc025c -> 10272d5c9


[SPARK-10192][CORE] simple test w/ failure involving a shared dependency

just trying to increase test coverage in the scheduler, this already works.  It 
includes a regression test for SPARK-9809

copied some test utils from https://github.com/apache/spark/pull/5636, we can 
wait till that is merged first

Author: Imran Rashid 

Closes #8402 from squito/test_retry_in_shared_shuffle_dep.

(cherry picked from commit 33112f9c48680c33d663978f76806ebf0ea39789)
Signed-off-by: Andrew Or 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/10272d5c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/10272d5c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/10272d5c

Branch: refs/heads/branch-1.6
Commit: 10272d5c98694e5a0cfef2587a81be7ce609cbb7
Parents: d8bfc02
Author: Imran Rashid 
Authored: Tue Nov 10 16:50:22 2015 -0800
Committer: Andrew Or 
Committed: Tue Nov 10 16:50:34 2015 -0800

--
 .../spark/scheduler/DAGSchedulerSuite.scala | 51 +++-
 1 file changed, 49 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/10272d5c/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 3816b8c..068b49b 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -594,11 +594,17 @@ class DAGSchedulerSuite
* @param stageId - The current stageId
* @param attemptIdx - The current attempt count
*/
-  private def completeNextResultStageWithSuccess(stageId: Int, attemptIdx: 
Int): Unit = {
+  private def completeNextResultStageWithSuccess(
+  stageId: Int,
+  attemptIdx: Int,
+  partitionToResult: Int => Int = _ => 42): Unit = {
 val stageAttempt = taskSets.last
 checkStageId(stageId, attemptIdx, stageAttempt)
 assert(scheduler.stageIdToStage(stageId).isInstanceOf[ResultStage])
-complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map(_ => (Success, 
42)).toSeq)
+val taskResults = stageAttempt.tasks.zipWithIndex.map { case (task, idx) =>
+  (Success, partitionToResult(idx))
+}
+complete(stageAttempt, taskResults.toSeq)
   }
 
   /**
@@ -1055,6 +1061,47 @@ class DAGSchedulerSuite
   }
 
   /**
+   * Run two jobs, with a shared dependency.  We simulate a fetch failure in 
the second job, which
+   * requires regenerating some outputs of the shared dependency.  One key 
aspect of this test is
+   * that the second job actually uses a different stage for the shared 
dependency (a "skipped"
+   * stage).
+   */
+  test("shuffle fetch failure in a reused shuffle dependency") {
+// Run the first job successfully, which creates one shuffle dependency
+
+val shuffleMapRdd = new MyRDD(sc, 2, Nil)
+val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
+val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
+submit(reduceRdd, Array(0, 1))
+
+completeShuffleMapStageSuccessfully(0, 0, 2)
+completeNextResultStageWithSuccess(1, 0)
+assert(results === Map(0 -> 42, 1 -> 42))
+assertDataStructuresEmpty()
+
+// submit another job w/ the shared dependency, and have a fetch failure
+val reduce2 = new MyRDD(sc, 2, List(shuffleDep))
+submit(reduce2, Array(0, 1))
+// Note that the stage numbering here is only b/c the shared dependency 
produces a new, skipped
+// stage.  If instead it reused the existing stage, then this would be 
stage 2
+completeNextStageWithFetchFailure(3, 0, shuffleDep)
+scheduler.resubmitFailedStages()
+
+// the scheduler now creates a new task set to regenerate the missing map 
output, but this time
+// using a different stage, the "skipped" one
+
+// SPARK-9809 -- this stage is submitted without a task for each partition 
(because some of
+// the shuffle map output is still available from stage 0); make sure 
we've still got internal
+// accumulators setup
+assert(scheduler.stageIdToStage(2).internalAccumulators.nonEmpty)
+completeShuffleMapStageSuccessfully(2, 0, 2)
+completeNextResultStageWithSuccess(3, 1, idx => idx + 1234)
+assert(results === Map(0 -> 1234, 1 -> 1235))
+
+assertDataStructuresEmpty()
+  }
+
+  /**
* This test runs a three stage job, with a fetch failure in stage 1.  but 
during the retry, we
* have completions from both the first & second attempt of stage 1.