[ 
https://issues.apache.org/jira/browse/SPARK-9563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-9563:
-----------------------------------

    Assignee:     (was: Apache Spark)

> Remove repartition operators when they are the child of Exchange and 
> shuffle=True
> ---------------------------------------------------------------------------------
>
>                 Key: SPARK-9563
>                 URL: https://issues.apache.org/jira/browse/SPARK-9563
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>            Reporter: Josh Rosen
>
> Consider the following query:
> {code}
> val df1 = sqlContext.createDataFrame(sc.parallelize(1 to 100, 100).map(x => 
> (x, x)))
> val df2 = sqlContext.createDataFrame(sc.parallelize(1 to 100, 100).map(x => 
> (x, x)))
> df1.repartition(1000).join(df2, "_1").explain(true)
> {code}
> Here's the plan for this query as of Spark 1.4.1:
> {code}
> == Parsed Logical Plan ==
> Project [_1#68991,_2#68992,_2#68994]
>  Join Inner, Some((_1#68991 = _1#68993))
>   Repartition 1000, true
>    LogicalRDD [_1#68991,_2#68992], MapPartitionsRDD[82530] at createDataFrame 
> at <console>:29
>   LogicalRDD [_1#68993,_2#68994], MapPartitionsRDD[82533] at createDataFrame 
> at <console>:30
> == Analyzed Logical Plan ==
> _1: int, _2: int, _2: int
> Project [_1#68991,_2#68992,_2#68994]
>  Join Inner, Some((_1#68991 = _1#68993))
>   Repartition 1000, true
>    LogicalRDD [_1#68991,_2#68992], MapPartitionsRDD[82530] at createDataFrame 
> at <console>:29
>   LogicalRDD [_1#68993,_2#68994], MapPartitionsRDD[82533] at createDataFrame 
> at <console>:30
> == Optimized Logical Plan ==
> Project [_1#68991,_2#68992,_2#68994]
>  Join Inner, Some((_1#68991 = _1#68993))
>   Repartition 1000, true
>    LogicalRDD [_1#68991,_2#68992], MapPartitionsRDD[82530] at createDataFrame 
> at <console>:29
>   LogicalRDD [_1#68993,_2#68994], MapPartitionsRDD[82533] at createDataFrame 
> at <console>:30
> == Physical Plan ==
> Project [_1#68991,_2#68992,_2#68994]
>  ShuffledHashJoin [_1#68991], [_1#68993], BuildRight
>   Exchange (HashPartitioning 200)
>    Repartition 1000, true
>     PhysicalRDD [_1#68991,_2#68992], MapPartitionsRDD[82530] at 
> createDataFrame at <console>:29
>   Exchange (HashPartitioning 200)
>    PhysicalRDD [_1#68993,_2#68994], MapPartitionsRDD[82533] at 
> createDataFrame at <console>:30
> {code}
> In this plan, we end up repartitioning {{df1}} to have 1000 partitions, which 
> involves a shuffle, only to turn around and shuffle again as part of the 
> exchange.
> To avoid this extra shuffle, I think that we should remove the Repartition 
> when the following condition holds:
> - Exchange's child is a repartition operator where shuffle=True.
> We should not perform this collapsing when shuffle=False, since there might 
> be a legitimate reason to coalesce before shuffling (reducing the number of 
> map outputs that need to be tracked, for instance).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to