[ 
https://issues.apache.org/jira/browse/SPARK-13902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Takuya Ueshin updated SPARK-13902:
----------------------------------
    Description: 
{{DAGScheduler}} sometimes generate incorrect stage graph.

Suppose you have the following DAG (please see this in monospaced font):

{noformat}
[A] <--(s_A)-- [B] <--(s_B)-- [C] <--(s_C)-- [D]
            \                /
              <-------------
{noformat}

Note: [] means an RDD, () means a shuffle dependency.

Here, RDD {{B}} has a shuffle dependency on RDD {{A}}, and RDD {{C}} has 
shuffle dependency on both {{B}} and {{A}}. The shuffle dependency IDs are 
numbers in the {{DAGScheduler}}, but to make the example easier to understand, 
let's call the shuffled data from {{A}} shuffle dependency ID {{s_A}} and the 
shuffled data from {{B}} shuffle dependency ID {{s_B}}.
The {{getAncestorShuffleDependencies}} method in {{DAGScheduler}} (incorrectly) 
does not check for duplicates when it's adding ShuffleDependencies to the 
parents data structure, so for this DAG, when 
{{getAncestorShuffleDependencies}} gets called on {{C}} (previous of the final 
RDD), {{getAncestorShuffleDependencies}} will return {{s_A}}, {{s_A}}, {{s_B}} 
({{s_A}} gets added twice: once when the method "visit"s RDD {{C}}, and once 
when the method "visit"s RDD {{B}}). This is problematic because this line of 
code: 
https://github.com/apache/spark/blob/8ef3399/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L289
 then generates a new shuffle stage for each dependency returned by 
{{getAncestorShuffleDependencies}}, resulting in duplicate map stages that 
compute the map output from RDD A.

As a result, {{DAGScheduler}} generates the following stages and their parents 
for each shuffle:

|  | stage | parents |
| s_A | ShuffleMapStage 2 | List() |
| s_B | ShuffleMapStage 1 | List(ShuffleMapStage 0) |
| s_C | ShuffleMapStage 3 | List(ShuffleMapStage 1, ShuffleMapStage 2) |
| \- | ResultStage 4 | List(ShuffleMapStage 3) |

The stage for {{s_A}} should be {{ShuffleMapStage 0}}, but the stage for 
{{s_A}} is generated twice as {{ShuffleMapStage 2}} and {{ShuffleMapStage 0}} 
is overwritten by {{ShuffleMapStage 2}}, and the stage {{ShuffleMap Stage1}} 
keeps referring the _old_ stage {{ShuffleMapStage 0}}.


  was:
{{DAGScheduler}} sometimes generate incorrect stage graph.
Some stages are generated for the same shuffleId twice or more and they are 
referenced by the child stages because the building order of the graph is not 
correct.

Here, we submit an RDD\[F\] having a linage of RDDs as follows (please see this 
in {{monospaced}} font):

{noformat}
                              <--------------------
                            /                       \
[A] <--(1)-- [B] <--(2)-- [C] <--(3)-- [D] <--(4)-- [E] <--(5)-- [F]
               \                       /
                 <--------------------
{noformat}

Note: \[\] means an RDD, () means a shuffle dependency.

{{DAGScheduler}} generates the following stages and their parents for each 
shuffle:

|  | stage | parents |
| (1) | ShuffleMapStage 2 | List() |
| (2) | ShuffleMapStage 1 | List(ShuffleMapStage 0) |
| (3) | ShuffleMapStage 3 | List(ShuffleMapStage 1) |
| (4) | ShuffleMapStage 4 | List(ShuffleMapStage 2, ShuffleMapStage 3) |
| (5) | ShuffleMapStage 5 | List(ShuffleMapStage 1, ShuffleMapStage 4) |
| \- | ResultStage 6 | List(ShuffleMapStage 5) |

The stage for shuffle id {{0}} should be {{ShuffleMapStage 0}}, but the stage 
for shuffle id {{0}} is generated twice as {{ShuffleMapStage 2}} and 
{{ShuffleMapStage 0}} is overwritten by {{ShuffleMapStage 2}}, and the stage 
{{ShuffleMap Stage1}} keeps referring the _old_ stage {{ShuffleMapStage 0}}.


        Summary: Make DAGScheduler not to create duplicate stage.  (was: Make 
DAGScheduler.getAncestorShuffleDependencies() return in topological order to 
ensure building ancestor stages first.)

> Make DAGScheduler not to create duplicate stage.
> ------------------------------------------------
>
>                 Key: SPARK-13902
>                 URL: https://issues.apache.org/jira/browse/SPARK-13902
>             Project: Spark
>          Issue Type: Bug
>          Components: Scheduler
>            Reporter: Takuya Ueshin
>
> {{DAGScheduler}} sometimes generate incorrect stage graph.
> Suppose you have the following DAG (please see this in monospaced font):
> {noformat}
> [A] <--(s_A)-- [B] <--(s_B)-- [C] <--(s_C)-- [D]
>             \                /
>               <-------------
> {noformat}
> Note: [] means an RDD, () means a shuffle dependency.
> Here, RDD {{B}} has a shuffle dependency on RDD {{A}}, and RDD {{C}} has 
> shuffle dependency on both {{B}} and {{A}}. The shuffle dependency IDs are 
> numbers in the {{DAGScheduler}}, but to make the example easier to 
> understand, let's call the shuffled data from {{A}} shuffle dependency ID 
> {{s_A}} and the shuffled data from {{B}} shuffle dependency ID {{s_B}}.
> The {{getAncestorShuffleDependencies}} method in {{DAGScheduler}} 
> (incorrectly) does not check for duplicates when it's adding 
> ShuffleDependencies to the parents data structure, so for this DAG, when 
> {{getAncestorShuffleDependencies}} gets called on {{C}} (previous of the 
> final RDD), {{getAncestorShuffleDependencies}} will return {{s_A}}, {{s_A}}, 
> {{s_B}} ({{s_A}} gets added twice: once when the method "visit"s RDD {{C}}, 
> and once when the method "visit"s RDD {{B}}). This is problematic because 
> this line of code: 
> https://github.com/apache/spark/blob/8ef3399/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L289
>  then generates a new shuffle stage for each dependency returned by 
> {{getAncestorShuffleDependencies}}, resulting in duplicate map stages that 
> compute the map output from RDD A.
> As a result, {{DAGScheduler}} generates the following stages and their 
> parents for each shuffle:
> |  | stage | parents |
> | s_A | ShuffleMapStage 2 | List() |
> | s_B | ShuffleMapStage 1 | List(ShuffleMapStage 0) |
> | s_C | ShuffleMapStage 3 | List(ShuffleMapStage 1, ShuffleMapStage 2) |
> | \- | ResultStage 4 | List(ShuffleMapStage 3) |
> The stage for {{s_A}} should be {{ShuffleMapStage 0}}, but the stage for 
> {{s_A}} is generated twice as {{ShuffleMapStage 2}} and {{ShuffleMapStage 0}} 
> is overwritten by {{ShuffleMapStage 2}}, and the stage {{ShuffleMap Stage1}} 
> keeps referring the _old_ stage {{ShuffleMapStage 0}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to