[ 
https://issues.apache.org/jira/browse/SPARK-19468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16761262#comment-16761262
 ] 

Mitesh commented on SPARK-19468:
--------------------------------

Also curious why in the fix for SPARK-19931, it was only fixed for 
HashPartitioning instead of any kind of partitioning that extends Expression.

> Dataset slow because of unnecessary shuffles
> --------------------------------------------
>
>                 Key: SPARK-19468
>                 URL: https://issues.apache.org/jira/browse/SPARK-19468
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.1.0
>            Reporter: koert kuipers
>            Priority: Major
>
> we noticed that some algos we ported from rdd to dataset are significantly 
> slower, and the main reason seems to be more shuffles that we successfully 
> avoid for rdds by careful partitioning. this seems to be dataset specific as 
> it works ok for dataframe.
> see also here:
> http://blog.hydronitrogen.com/2016/05/13/shuffle-free-joins-in-spark-sql/
> it kind of boils down to this... if i partition and sort dataframes that get 
> used for joins repeatedly i can avoid shuffles:
> {noformat}
> System.setProperty("spark.sql.autoBroadcastJoinThreshold", "-1")
> val df1 = Seq((0, 0), (1, 1)).toDF("key", "value")
>   
> .repartition(col("key")).sortWithinPartitions(col("key")).persist(StorageLevel.DISK_ONLY)
> val df2 = Seq((0, 0), (1, 1)).toDF("key2", "value2")
>   
> .repartition(col("key2")).sortWithinPartitions(col("key2")).persist(StorageLevel.DISK_ONLY)
> val joined = df1.join(df2, col("key") === col("key2"))
> joined.explain
> == Physical Plan ==
> *SortMergeJoin [key#5], [key2#27], Inner
> :- InMemoryTableScan [key#5, value#6]
> :     +- InMemoryRelation [key#5, value#6], true, 10000, StorageLevel(disk, 1 
> replicas)
> :           +- *Sort [key#5 ASC NULLS FIRST], false, 0
> :              +- Exchange hashpartitioning(key#5, 4)
> :                 +- LocalTableScan [key#5, value#6]
> +- InMemoryTableScan [key2#27, value2#28]
>       +- InMemoryRelation [key2#27, value2#28], true, 10000, 
> StorageLevel(disk, 1 replicas)
>             +- *Sort [key2#27 ASC NULLS FIRST], false, 0
>                +- Exchange hashpartitioning(key2#27, 4)
>                   +- LocalTableScan [key2#27, value2#28]
> {noformat}
> notice how the persisted dataframes are not shuffled or sorted anymore before 
> being used in the join. however if i try to do the same with dataset i have 
> no luck:
> {noformat}
> val ds1 = Seq((0, 0), (1, 1)).toDS
>   
> .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY)
> val ds2 = Seq((0, 0), (1, 1)).toDS
>   
> .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY)
> val joined1 = ds1.joinWith(ds2, ds1("_1") === ds2("_1"))
> joined1.explain
> == Physical Plan ==
> *SortMergeJoin [_1#105._1], [_2#106._1], Inner
> :- *Sort [_1#105._1 ASC NULLS FIRST], false, 0
> :  +- Exchange hashpartitioning(_1#105._1, 4)
> :     +- *Project [named_struct(_1, _1#83, _2, _2#84) AS _1#105]
> :        +- InMemoryTableScan [_1#83, _2#84]
> :              +- InMemoryRelation [_1#83, _2#84], true, 10000, 
> StorageLevel(disk, 1 replicas)
> :                    +- *Sort [_1#83 ASC NULLS FIRST], false, 0
> :                       +- Exchange hashpartitioning(_1#83, 4)
> :                          +- LocalTableScan [_1#83, _2#84]
> +- *Sort [_2#106._1 ASC NULLS FIRST], false, 0
>    +- Exchange hashpartitioning(_2#106._1, 4)
>       +- *Project [named_struct(_1, _1#100, _2, _2#101) AS _2#106]
>          +- InMemoryTableScan [_1#100, _2#101]
>                +- InMemoryRelation [_1#100, _2#101], true, 10000, 
> StorageLevel(disk, 1 replicas)
>                      +- *Sort [_1#83 ASC NULLS FIRST], false, 0
>                         +- Exchange hashpartitioning(_1#83, 4)
>                            +- LocalTableScan [_1#83, _2#84]
> {noformat}
> notice how my persisted Datasets are shuffled and sorted again. part of the 
> issue seems to be in joinWith, which does some preprocessing that seems to 
> confuse the planner. if i change the joinWith to join (which returns a 
> dataframe) it looks a little better in that only one side gets shuffled 
> again, but still not optimal:
> {noformat}
> val ds1 = Seq((0, 0), (1, 1)).toDS
>   
> .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY)
> val ds2 = Seq((0, 0), (1, 1)).toDS
>   
> .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY)
> val joined1 = ds1.join(ds2, ds1("_1") === ds2("_1"))
> joined1.explain
> == Physical Plan ==
> *SortMergeJoin [_1#83], [_1#100], Inner
> :- InMemoryTableScan [_1#83, _2#84]
> :     +- InMemoryRelation [_1#83, _2#84], true, 10000, StorageLevel(disk, 1 
> replicas)
> :           +- *Sort [_1#83 ASC NULLS FIRST], false, 0
> :              +- Exchange hashpartitioning(_1#83, 4)
> :                 +- LocalTableScan [_1#83, _2#84]
> +- *Sort [_1#100 ASC NULLS FIRST], false, 0
>    +- Exchange hashpartitioning(_1#100, 4)
>       +- InMemoryTableScan [_1#100, _2#101]
>             +- InMemoryRelation [_1#100, _2#101], true, 10000, 
> StorageLevel(disk, 1 replicas)
>                   +- *Sort [_1#83 ASC NULLS FIRST], false, 0
>                      +- Exchange hashpartitioning(_1#83, 4)
>                         +- LocalTableScan [_1#83, _2#84]
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to