Repository: spark Updated Branches: refs/heads/master d52f63622 -> ac7fc3075
[SPARK-20288] Avoid generating the MapStatus by stageId in BasicSchedulerIntegrationSuite ## What changes were proposed in this pull request? ShuffleId is determined before job submitted. But it's hard to predict stageId by shuffleId. Stage is created in DAGScheduler( https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L381), but the order is n ot determined in `HashSet`. I added a log(println(s"Creating ShufflMapStage-$id on shuffle-${shuffleDep.shuffleId}")) after (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L331), when testing BasicSchedulerIntegrationSuite:"multi-stage job". It will print: Creating ShufflMapStage-0 on shuffle-0 Creating ShufflMapStage-1 on shuffle-2 Creating ShufflMapStage-2 on shuffle-1 Creating ShufflMapStage-3 on shuffle-3 or Creating ShufflMapStage-0 on shuffle-1 Creating ShufflMapStage-1 on shuffle-3 Creating ShufflMapStage-2 on shuffle-0 Creating ShufflMapStage-3 on shuffle-2 It might be better to avoid generating the MapStatus by stageId. Author: jinxing <jinxing6...@126.com> Closes #17603 from jinxing64/SPARK-20288. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ac7fc307 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ac7fc307 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ac7fc307 Branch: refs/heads/master Commit: ac7fc3075b7323261346fb4cd38c26f3b8f08bc2 Parents: d52f636 Author: jinxing <jinxing6...@126.com> Authored: Wed May 31 10:46:23 2017 -0500 Committer: Imran Rashid <iras...@cloudera.com> Committed: Wed May 31 10:46:23 2017 -0500 ---------------------------------------------------------------------- .../spark/scheduler/SchedulerIntegrationSuite.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ac7fc307/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index 37b0898..a8249e1 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -553,10 +553,10 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor */ testScheduler("multi-stage job") { - def stageToOutputParts(stageId: Int): Int = { - stageId match { + def shuffleIdToOutputParts(shuffleId: Int): Int = { + shuffleId match { case 0 => 10 - case 2 => 20 + case 1 => 20 case _ => 30 } } @@ -577,11 +577,12 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor // b/c the stage numbering is non-deterministic, so stage number alone doesn't tell // us what to check } - (task.stageId, task.stageAttemptId, task.partitionId) match { case (stage, 0, _) if stage < 4 => + val shuffleId = + scheduler.stageIdToStage(stage).asInstanceOf[ShuffleMapStage].shuffleDep.shuffleId backend.taskSuccess(taskDescription, - DAGSchedulerSuite.makeMapStatus("hostA", stageToOutputParts(stage))) + DAGSchedulerSuite.makeMapStatus("hostA", shuffleIdToOutputParts(shuffleId))) case (4, 0, partition) => backend.taskSuccess(taskDescription, 4321 + partition) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org