[ https://issues.apache.org/jira/browse/SPARK-19468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16761262#comment-16761262 ]
Mitesh commented on SPARK-19468: -------------------------------- Also curious why in the fix for SPARK-19931, it was only fixed for HashPartitioning instead of any kind of partitioning that extends Expression. > Dataset slow because of unnecessary shuffles > -------------------------------------------- > > Key: SPARK-19468 > URL: https://issues.apache.org/jira/browse/SPARK-19468 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.1.0 > Reporter: koert kuipers > Priority: Major > > we noticed that some algos we ported from rdd to dataset are significantly > slower, and the main reason seems to be more shuffles that we successfully > avoid for rdds by careful partitioning. this seems to be dataset specific as > it works ok for dataframe. > see also here: > http://blog.hydronitrogen.com/2016/05/13/shuffle-free-joins-in-spark-sql/ > it kind of boils down to this... if i partition and sort dataframes that get > used for joins repeatedly i can avoid shuffles: > {noformat} > System.setProperty("spark.sql.autoBroadcastJoinThreshold", "-1") > val df1 = Seq((0, 0), (1, 1)).toDF("key", "value") > > .repartition(col("key")).sortWithinPartitions(col("key")).persist(StorageLevel.DISK_ONLY) > val df2 = Seq((0, 0), (1, 1)).toDF("key2", "value2") > > .repartition(col("key2")).sortWithinPartitions(col("key2")).persist(StorageLevel.DISK_ONLY) > val joined = df1.join(df2, col("key") === col("key2")) > joined.explain > == Physical Plan == > *SortMergeJoin [key#5], [key2#27], Inner > :- InMemoryTableScan [key#5, value#6] > : +- InMemoryRelation [key#5, value#6], true, 10000, StorageLevel(disk, 1 > replicas) > : +- *Sort [key#5 ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(key#5, 4) > : +- LocalTableScan [key#5, value#6] > +- InMemoryTableScan [key2#27, value2#28] > +- InMemoryRelation [key2#27, value2#28], true, 10000, > StorageLevel(disk, 1 replicas) > +- *Sort [key2#27 ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(key2#27, 4) > +- LocalTableScan [key2#27, value2#28] > {noformat} > notice how the persisted dataframes are not shuffled or sorted anymore before > being used in the join. however if i try to do the same with dataset i have > no luck: > {noformat} > val ds1 = Seq((0, 0), (1, 1)).toDS > > .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY) > val ds2 = Seq((0, 0), (1, 1)).toDS > > .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY) > val joined1 = ds1.joinWith(ds2, ds1("_1") === ds2("_1")) > joined1.explain > == Physical Plan == > *SortMergeJoin [_1#105._1], [_2#106._1], Inner > :- *Sort [_1#105._1 ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(_1#105._1, 4) > : +- *Project [named_struct(_1, _1#83, _2, _2#84) AS _1#105] > : +- InMemoryTableScan [_1#83, _2#84] > : +- InMemoryRelation [_1#83, _2#84], true, 10000, > StorageLevel(disk, 1 replicas) > : +- *Sort [_1#83 ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(_1#83, 4) > : +- LocalTableScan [_1#83, _2#84] > +- *Sort [_2#106._1 ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(_2#106._1, 4) > +- *Project [named_struct(_1, _1#100, _2, _2#101) AS _2#106] > +- InMemoryTableScan [_1#100, _2#101] > +- InMemoryRelation [_1#100, _2#101], true, 10000, > StorageLevel(disk, 1 replicas) > +- *Sort [_1#83 ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(_1#83, 4) > +- LocalTableScan [_1#83, _2#84] > {noformat} > notice how my persisted Datasets are shuffled and sorted again. part of the > issue seems to be in joinWith, which does some preprocessing that seems to > confuse the planner. if i change the joinWith to join (which returns a > dataframe) it looks a little better in that only one side gets shuffled > again, but still not optimal: > {noformat} > val ds1 = Seq((0, 0), (1, 1)).toDS > > .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY) > val ds2 = Seq((0, 0), (1, 1)).toDS > > .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY) > val joined1 = ds1.join(ds2, ds1("_1") === ds2("_1")) > joined1.explain > == Physical Plan == > *SortMergeJoin [_1#83], [_1#100], Inner > :- InMemoryTableScan [_1#83, _2#84] > : +- InMemoryRelation [_1#83, _2#84], true, 10000, StorageLevel(disk, 1 > replicas) > : +- *Sort [_1#83 ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(_1#83, 4) > : +- LocalTableScan [_1#83, _2#84] > +- *Sort [_1#100 ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(_1#100, 4) > +- InMemoryTableScan [_1#100, _2#101] > +- InMemoryRelation [_1#100, _2#101], true, 10000, > StorageLevel(disk, 1 replicas) > +- *Sort [_1#83 ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(_1#83, 4) > +- LocalTableScan [_1#83, _2#84] > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org