[ https://issues.apache.org/jira/browse/SPARK-13902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Takuya Ueshin updated SPARK-13902: ---------------------------------- Description: {{DAGScheduler}} sometimes generate incorrect stage graph. Suppose you have the following DAG (please see this in monospaced font): {noformat} [A] <--(s_A)-- [B] <--(s_B)-- [C] <--(s_C)-- [D] \ / <------------- {noformat} Note: [] means an RDD, () means a shuffle dependency. Here, RDD {{B}} has a shuffle dependency on RDD {{A}}, and RDD {{C}} has shuffle dependency on both {{B}} and {{A}}. The shuffle dependency IDs are numbers in the {{DAGScheduler}}, but to make the example easier to understand, let's call the shuffled data from {{A}} shuffle dependency ID {{s_A}} and the shuffled data from {{B}} shuffle dependency ID {{s_B}}. The {{getAncestorShuffleDependencies}} method in {{DAGScheduler}} (incorrectly) does not check for duplicates when it's adding ShuffleDependencies to the parents data structure, so for this DAG, when {{getAncestorShuffleDependencies}} gets called on {{C}} (previous of the final RDD), {{getAncestorShuffleDependencies}} will return {{s_A}}, {{s_A}}, {{s_B}} ({{s_A}} gets added twice: once when the method "visit"s RDD {{C}}, and once when the method "visit"s RDD {{B}}). This is problematic because this line of code: https://github.com/apache/spark/blob/8ef3399/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L289 then generates a new shuffle stage for each dependency returned by {{getAncestorShuffleDependencies}}, resulting in duplicate map stages that compute the map output from RDD A. As a result, {{DAGScheduler}} generates the following stages and their parents for each shuffle: | | stage | parents | | s_A | ShuffleMapStage 2 | List() | | s_B | ShuffleMapStage 1 | List(ShuffleMapStage 0) | | s_C | ShuffleMapStage 3 | List(ShuffleMapStage 1, ShuffleMapStage 2) | | \- | ResultStage 4 | List(ShuffleMapStage 3) | The stage for {{s_A}} should be {{ShuffleMapStage 0}}, but the stage for {{s_A}} is generated twice as {{ShuffleMapStage 2}} and {{ShuffleMapStage 0}} is overwritten by {{ShuffleMapStage 2}}, and the stage {{ShuffleMap Stage1}} keeps referring the _old_ stage {{ShuffleMapStage 0}}. was: {{DAGScheduler}} sometimes generate incorrect stage graph. Some stages are generated for the same shuffleId twice or more and they are referenced by the child stages because the building order of the graph is not correct. Here, we submit an RDD\[F\] having a linage of RDDs as follows (please see this in {{monospaced}} font): {noformat} <-------------------- / \ [A] <--(1)-- [B] <--(2)-- [C] <--(3)-- [D] <--(4)-- [E] <--(5)-- [F] \ / <-------------------- {noformat} Note: \[\] means an RDD, () means a shuffle dependency. {{DAGScheduler}} generates the following stages and their parents for each shuffle: | | stage | parents | | (1) | ShuffleMapStage 2 | List() | | (2) | ShuffleMapStage 1 | List(ShuffleMapStage 0) | | (3) | ShuffleMapStage 3 | List(ShuffleMapStage 1) | | (4) | ShuffleMapStage 4 | List(ShuffleMapStage 2, ShuffleMapStage 3) | | (5) | ShuffleMapStage 5 | List(ShuffleMapStage 1, ShuffleMapStage 4) | | \- | ResultStage 6 | List(ShuffleMapStage 5) | The stage for shuffle id {{0}} should be {{ShuffleMapStage 0}}, but the stage for shuffle id {{0}} is generated twice as {{ShuffleMapStage 2}} and {{ShuffleMapStage 0}} is overwritten by {{ShuffleMapStage 2}}, and the stage {{ShuffleMap Stage1}} keeps referring the _old_ stage {{ShuffleMapStage 0}}. Summary: Make DAGScheduler not to create duplicate stage. (was: Make DAGScheduler.getAncestorShuffleDependencies() return in topological order to ensure building ancestor stages first.) > Make DAGScheduler not to create duplicate stage. > ------------------------------------------------ > > Key: SPARK-13902 > URL: https://issues.apache.org/jira/browse/SPARK-13902 > Project: Spark > Issue Type: Bug > Components: Scheduler > Reporter: Takuya Ueshin > > {{DAGScheduler}} sometimes generate incorrect stage graph. > Suppose you have the following DAG (please see this in monospaced font): > {noformat} > [A] <--(s_A)-- [B] <--(s_B)-- [C] <--(s_C)-- [D] > \ / > <------------- > {noformat} > Note: [] means an RDD, () means a shuffle dependency. > Here, RDD {{B}} has a shuffle dependency on RDD {{A}}, and RDD {{C}} has > shuffle dependency on both {{B}} and {{A}}. The shuffle dependency IDs are > numbers in the {{DAGScheduler}}, but to make the example easier to > understand, let's call the shuffled data from {{A}} shuffle dependency ID > {{s_A}} and the shuffled data from {{B}} shuffle dependency ID {{s_B}}. > The {{getAncestorShuffleDependencies}} method in {{DAGScheduler}} > (incorrectly) does not check for duplicates when it's adding > ShuffleDependencies to the parents data structure, so for this DAG, when > {{getAncestorShuffleDependencies}} gets called on {{C}} (previous of the > final RDD), {{getAncestorShuffleDependencies}} will return {{s_A}}, {{s_A}}, > {{s_B}} ({{s_A}} gets added twice: once when the method "visit"s RDD {{C}}, > and once when the method "visit"s RDD {{B}}). This is problematic because > this line of code: > https://github.com/apache/spark/blob/8ef3399/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L289 > then generates a new shuffle stage for each dependency returned by > {{getAncestorShuffleDependencies}}, resulting in duplicate map stages that > compute the map output from RDD A. > As a result, {{DAGScheduler}} generates the following stages and their > parents for each shuffle: > | | stage | parents | > | s_A | ShuffleMapStage 2 | List() | > | s_B | ShuffleMapStage 1 | List(ShuffleMapStage 0) | > | s_C | ShuffleMapStage 3 | List(ShuffleMapStage 1, ShuffleMapStage 2) | > | \- | ResultStage 4 | List(ShuffleMapStage 3) | > The stage for {{s_A}} should be {{ShuffleMapStage 0}}, but the stage for > {{s_A}} is generated twice as {{ShuffleMapStage 2}} and {{ShuffleMapStage 0}} > is overwritten by {{ShuffleMapStage 2}}, and the stage {{ShuffleMap Stage1}} > keeps referring the _old_ stage {{ShuffleMapStage 0}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org