GitHub user viirya opened a pull request: https://github.com/apache/spark/pull/17175
[SPARK-19468][SQL][WIP] Rewrite physical Project operator's output partitioning and ordering to ensure no unnecessary shuffle/sort in Datasets ## What changes were proposed in this pull request? Before run the following example, please set `spark.sql.autoBroadcastJoinThreshold` to `-1` to force Spark SQL use `SortMergeJoin`. When we join two `Dataset`s as following, the already sorted and partitioned `Dataset`s are planned for another `Sort` and `ShuffleExchange`. val ds1 = Seq((0, 0), (1, 1)).toDS .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY) val ds2 = Seq((0, 0), (1, 1)).toDS .repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY) val joined1 = ds1.joinWith(ds2, ds1("_1") === ds2("_1")) joined1.explain == Physical Plan == *SortMergeJoin [_1#105._1], [_2#106._1], Inner :- *Sort [_1#105._1 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(_1#105._1, 4) : +- *Project [named_struct(_1, _1#83, _2, _2#84) AS _1#105] : +- InMemoryTableScan [_1#83, _2#84] : +- InMemoryRelation [_1#83, _2#84], true, 10000, StorageLevel(disk, 1 replicas) : +- *Sort [_1#83 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(_1#83, 4) : +- LocalTableScan [_1#83, _2#84] +- *Sort [_2#106._1 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(_2#106._1, 4) +- *Project [named_struct(_1, _1#100, _2, _2#101) AS _2#106] +- InMemoryTableScan [_1#100, _2#101] +- InMemoryRelation [_1#100, _2#101], true, 10000, StorageLevel(disk, 1 replicas) +- *Sort [_1#83 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(_1#83, 4) +- LocalTableScan [_1#83, _2#84] Two issues here. 1. `InMemoryTableScan` doesn't have correct `outputPartitioning` and `outputOrdering`. `InMemoryTableScan` will have different output than its relation's child plan. So it needs to rewrite the `outputPartitioning` and `outputOrdering` from its relation's child plan with correct output attributes. 2. If a `Project` operator projects some of its input to a complex type (e.g., `named_struct(_1, _1#83, _2, _2#84) AS _1#105` in above). Its parent operator will use this attribute (e.g., `_1#105._1` in above) of complex type as `requiredChildDistribution` and `requiredChildOrdering`. Because `Project` doesn't change the `outputPartitioning` and `outputOrdering` of its child plan, so you will always insert extra shuffle and sort on top on the `Project`. ## How was this patch tested? Jenkins tests. Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/viirya/spark-1 ensure-no-unnecessary-shuffle Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17175.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17175 ---- commit 6e4eba6c9c637f9af11dae17a4ee2f1b39ee00be Author: Liang-Chi Hsieh <vii...@gmail.com> Date: 2017-03-06T08:56:38Z Rewrite physical Project operator's output partitioning and ordering to ensure no unnecessary shuffle/sort in Datasets. ---- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org