Repository: spark
Updated Branches:
  refs/heads/master d52f63622 -> ac7fc3075


[SPARK-20288] Avoid generating the MapStatus by stageId in 
BasicSchedulerIntegrationSuite

## What changes were proposed in this pull request?

ShuffleId is determined before job submitted. But it's hard to predict stageId 
by shuffleId.
Stage is created in DAGScheduler(
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L381),
 but the order is n
ot determined in `HashSet`.
I added a log(println(s"Creating ShufflMapStage-$id on 
shuffle-${shuffleDep.shuffleId}")) after 
(https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L331),
 when testing BasicSchedulerIntegrationSuite:"multi-stage job". It will print:
Creating ShufflMapStage-0 on shuffle-0
Creating ShufflMapStage-1 on shuffle-2
Creating ShufflMapStage-2 on shuffle-1
Creating ShufflMapStage-3 on shuffle-3
or
Creating ShufflMapStage-0 on shuffle-1
Creating ShufflMapStage-1 on shuffle-3
Creating ShufflMapStage-2 on shuffle-0
Creating ShufflMapStage-3 on shuffle-2
It might be better to avoid generating the MapStatus by stageId.

Author: jinxing <jinxing6...@126.com>

Closes #17603 from jinxing64/SPARK-20288.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ac7fc307
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ac7fc307
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ac7fc307

Branch: refs/heads/master
Commit: ac7fc3075b7323261346fb4cd38c26f3b8f08bc2
Parents: d52f636
Author: jinxing <jinxing6...@126.com>
Authored: Wed May 31 10:46:23 2017 -0500
Committer: Imran Rashid <iras...@cloudera.com>
Committed: Wed May 31 10:46:23 2017 -0500

----------------------------------------------------------------------
 .../spark/scheduler/SchedulerIntegrationSuite.scala      | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ac7fc307/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
index 37b0898..a8249e1 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
@@ -553,10 +553,10 @@ class BasicSchedulerIntegrationSuite extends 
SchedulerIntegrationSuite[SingleCor
    */
   testScheduler("multi-stage job") {
 
-    def stageToOutputParts(stageId: Int): Int = {
-      stageId match {
+    def shuffleIdToOutputParts(shuffleId: Int): Int = {
+      shuffleId match {
         case 0 => 10
-        case 2 => 20
+        case 1 => 20
         case _ => 30
       }
     }
@@ -577,11 +577,12 @@ class BasicSchedulerIntegrationSuite extends 
SchedulerIntegrationSuite[SingleCor
         // b/c the stage numbering is non-deterministic, so stage number alone 
doesn't tell
         // us what to check
       }
-
       (task.stageId, task.stageAttemptId, task.partitionId) match {
         case (stage, 0, _) if stage < 4 =>
+          val shuffleId =
+            
scheduler.stageIdToStage(stage).asInstanceOf[ShuffleMapStage].shuffleDep.shuffleId
           backend.taskSuccess(taskDescription,
-            DAGSchedulerSuite.makeMapStatus("hostA", 
stageToOutputParts(stage)))
+            DAGSchedulerSuite.makeMapStatus("hostA", 
shuffleIdToOutputParts(shuffleId)))
         case (4, 0, partition) =>
           backend.taskSuccess(taskDescription, 4321 + partition)
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to