spark git commit: [SPARK-10192][CORE] simple test w/ failure involving a shared dependency
Repository: spark Updated Branches: refs/heads/master c0e48dfa6 -> 33112f9c4 [SPARK-10192][CORE] simple test w/ failure involving a shared dependency just trying to increase test coverage in the scheduler, this already works. It includes a regression test for SPARK-9809 copied some test utils from https://github.com/apache/spark/pull/5636, we can wait till that is merged first Author: Imran RashidCloses #8402 from squito/test_retry_in_shared_shuffle_dep. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/33112f9c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/33112f9c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/33112f9c Branch: refs/heads/master Commit: 33112f9c48680c33d663978f76806ebf0ea39789 Parents: c0e48df Author: Imran Rashid Authored: Tue Nov 10 16:50:22 2015 -0800 Committer: Andrew Or Committed: Tue Nov 10 16:50:22 2015 -0800 -- .../spark/scheduler/DAGSchedulerSuite.scala | 51 +++- 1 file changed, 49 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/33112f9c/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 3816b8c..068b49b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -594,11 +594,17 @@ class DAGSchedulerSuite * @param stageId - The current stageId * @param attemptIdx - The current attempt count */ - private def completeNextResultStageWithSuccess(stageId: Int, attemptIdx: Int): Unit = { + private def completeNextResultStageWithSuccess( + stageId: Int, + attemptIdx: Int, + partitionToResult: Int => Int = _ => 42): Unit = { val stageAttempt = taskSets.last checkStageId(stageId, attemptIdx, stageAttempt) assert(scheduler.stageIdToStage(stageId).isInstanceOf[ResultStage]) -complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map(_ => (Success, 42)).toSeq) +val taskResults = stageAttempt.tasks.zipWithIndex.map { case (task, idx) => + (Success, partitionToResult(idx)) +} +complete(stageAttempt, taskResults.toSeq) } /** @@ -1055,6 +1061,47 @@ class DAGSchedulerSuite } /** + * Run two jobs, with a shared dependency. We simulate a fetch failure in the second job, which + * requires regenerating some outputs of the shared dependency. One key aspect of this test is + * that the second job actually uses a different stage for the shared dependency (a "skipped" + * stage). + */ + test("shuffle fetch failure in a reused shuffle dependency") { +// Run the first job successfully, which creates one shuffle dependency + +val shuffleMapRdd = new MyRDD(sc, 2, Nil) +val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) +val reduceRdd = new MyRDD(sc, 2, List(shuffleDep)) +submit(reduceRdd, Array(0, 1)) + +completeShuffleMapStageSuccessfully(0, 0, 2) +completeNextResultStageWithSuccess(1, 0) +assert(results === Map(0 -> 42, 1 -> 42)) +assertDataStructuresEmpty() + +// submit another job w/ the shared dependency, and have a fetch failure +val reduce2 = new MyRDD(sc, 2, List(shuffleDep)) +submit(reduce2, Array(0, 1)) +// Note that the stage numbering here is only b/c the shared dependency produces a new, skipped +// stage. If instead it reused the existing stage, then this would be stage 2 +completeNextStageWithFetchFailure(3, 0, shuffleDep) +scheduler.resubmitFailedStages() + +// the scheduler now creates a new task set to regenerate the missing map output, but this time +// using a different stage, the "skipped" one + +// SPARK-9809 -- this stage is submitted without a task for each partition (because some of +// the shuffle map output is still available from stage 0); make sure we've still got internal +// accumulators setup +assert(scheduler.stageIdToStage(2).internalAccumulators.nonEmpty) +completeShuffleMapStageSuccessfully(2, 0, 2) +completeNextResultStageWithSuccess(3, 1, idx => idx + 1234) +assert(results === Map(0 -> 1234, 1 -> 1235)) + +assertDataStructuresEmpty() + } + + /** * This test runs a three stage job, with a fetch failure in stage 1. but during the retry, we * have completions from both the first & second attempt of stage 1. So all the map output is * available before we finish any task set for stage 1. We want to make sure that we don't
spark git commit: [SPARK-10192][CORE] simple test w/ failure involving a shared dependency
Repository: spark Updated Branches: refs/heads/branch-1.6 d8bfc025c -> 10272d5c9 [SPARK-10192][CORE] simple test w/ failure involving a shared dependency just trying to increase test coverage in the scheduler, this already works. It includes a regression test for SPARK-9809 copied some test utils from https://github.com/apache/spark/pull/5636, we can wait till that is merged first Author: Imran RashidCloses #8402 from squito/test_retry_in_shared_shuffle_dep. (cherry picked from commit 33112f9c48680c33d663978f76806ebf0ea39789) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/10272d5c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/10272d5c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/10272d5c Branch: refs/heads/branch-1.6 Commit: 10272d5c98694e5a0cfef2587a81be7ce609cbb7 Parents: d8bfc02 Author: Imran Rashid Authored: Tue Nov 10 16:50:22 2015 -0800 Committer: Andrew Or Committed: Tue Nov 10 16:50:34 2015 -0800 -- .../spark/scheduler/DAGSchedulerSuite.scala | 51 +++- 1 file changed, 49 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/10272d5c/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 3816b8c..068b49b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -594,11 +594,17 @@ class DAGSchedulerSuite * @param stageId - The current stageId * @param attemptIdx - The current attempt count */ - private def completeNextResultStageWithSuccess(stageId: Int, attemptIdx: Int): Unit = { + private def completeNextResultStageWithSuccess( + stageId: Int, + attemptIdx: Int, + partitionToResult: Int => Int = _ => 42): Unit = { val stageAttempt = taskSets.last checkStageId(stageId, attemptIdx, stageAttempt) assert(scheduler.stageIdToStage(stageId).isInstanceOf[ResultStage]) -complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map(_ => (Success, 42)).toSeq) +val taskResults = stageAttempt.tasks.zipWithIndex.map { case (task, idx) => + (Success, partitionToResult(idx)) +} +complete(stageAttempt, taskResults.toSeq) } /** @@ -1055,6 +1061,47 @@ class DAGSchedulerSuite } /** + * Run two jobs, with a shared dependency. We simulate a fetch failure in the second job, which + * requires regenerating some outputs of the shared dependency. One key aspect of this test is + * that the second job actually uses a different stage for the shared dependency (a "skipped" + * stage). + */ + test("shuffle fetch failure in a reused shuffle dependency") { +// Run the first job successfully, which creates one shuffle dependency + +val shuffleMapRdd = new MyRDD(sc, 2, Nil) +val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) +val reduceRdd = new MyRDD(sc, 2, List(shuffleDep)) +submit(reduceRdd, Array(0, 1)) + +completeShuffleMapStageSuccessfully(0, 0, 2) +completeNextResultStageWithSuccess(1, 0) +assert(results === Map(0 -> 42, 1 -> 42)) +assertDataStructuresEmpty() + +// submit another job w/ the shared dependency, and have a fetch failure +val reduce2 = new MyRDD(sc, 2, List(shuffleDep)) +submit(reduce2, Array(0, 1)) +// Note that the stage numbering here is only b/c the shared dependency produces a new, skipped +// stage. If instead it reused the existing stage, then this would be stage 2 +completeNextStageWithFetchFailure(3, 0, shuffleDep) +scheduler.resubmitFailedStages() + +// the scheduler now creates a new task set to regenerate the missing map output, but this time +// using a different stage, the "skipped" one + +// SPARK-9809 -- this stage is submitted without a task for each partition (because some of +// the shuffle map output is still available from stage 0); make sure we've still got internal +// accumulators setup +assert(scheduler.stageIdToStage(2).internalAccumulators.nonEmpty) +completeShuffleMapStageSuccessfully(2, 0, 2) +completeNextResultStageWithSuccess(3, 1, idx => idx + 1234) +assert(results === Map(0 -> 1234, 1 -> 1235)) + +assertDataStructuresEmpty() + } + + /** * This test runs a three stage job, with a fetch failure in stage 1. but during the retry, we * have completions from both the first & second attempt of stage 1.