GitHub user heary-cao opened a pull request: https://github.com/apache/spark/pull/21139
[SPARK-24066][SQL]Add a window exchange rule to eliminate redundant physical plan SortExec ## What changes were proposed in this pull request? Currently, when the order field of window function has a subset relationship, SparkSQL will randomly generate different physical plan. Similar like: ``` case class DistinctAgg(a: Int, b: Float, c: Double, d: Int, e: String) val df = spark.sparkContext.parallelize( DistinctAgg(8, 2, 3, 4, "a") :: DistinctAgg(9, 3, 4, 5, "b") :: DistinctAgg(3, 4, 5, 6, "c") :: DistinctAgg(3, 4, 5, 7, "c") :: DistinctAgg(3, 4, 5, 8, "c") :: DistinctAgg(3, 6, 6, 9, "d") :: DistinctAgg(30, 40, 50, 60, "e") :: DistinctAgg(41, 51, 61, 71, null) :: DistinctAgg(42, 52, 62, 72, null) :: DistinctAgg(43, 53, 63, 73, "k") ::Nil).toDF() df.createOrReplaceTempView("distinctAgg") select a, b, c, avg(b) over(partition by a order by b) as sumIb, sum(d) over(partition by a order by b, c) as sumId, d from distinctAgg ``` The physics plan will produce different results randomly. **One**: there is only one sort of physical plan ``` == Physical Plan == *(3) Project [a#181, b#182, c#183, sumId#210L, sumIb#209L, d#184] +- Window [sum(cast(b#182 as bigint)) windowspecdefinition(a#181, b#182 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS sumIb#209L], [a#181], [b#182 ASC NULLS FIRST] +- Window [sum(cast(d#184 as bigint)) windowspecdefinition(a#181, b#182 ASC NULLS FIRST, c#183 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS sumId#210L], [a#181], [b#182 ASC NULLS FIRST, c#183 ASC NULLS FIRST] +- *(2) Sort [a#181 ASC NULLS FIRST, b#182 ASC NULLS FIRST, c#183 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(a#181, 5) +- *(1) Project [a#181, b#182, c#183, d#184] +- *(1) SerializeFromObject [assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$DistinctAgg, true]).a AS a#181, assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$DistinctAgg, true]).b AS b#182, assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$DistinctAgg, true]).c AS c#183, assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$DistinctAgg, true]).d AS d#184, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$DistinctAgg, true]).e, true, false) AS e#185] +- Scan ExternalRDDScan[obj#180] ``` **Another one**: there is two sort of physical plans ``` == Physical Plan == *(4) Project [a#181, b#182, c#183, sumId#210L, sumIb#209L, d#184] +- Window [sum(cast(d#184 as bigint)) windowspecdefinition(a#181, b#182 ASC NULLS FIRST, c#183 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS sumId#210L], [a#181], [b#182 ASC NULLS FIRST, c#183 ASC NULLS FIRST] +- *(3) Sort [a#181 ASC NULLS FIRST, b#182 ASC NULLS FIRST, c#183 ASC NULLS FIRST], false, 0 +- Window [sum(cast(b#182 as bigint)) windowspecdefinition(a#181, b#182 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS sumIb#209L], [a#181], [b#182 ASC NULLS FIRST] +- *(2) Sort [a#181 ASC NULLS FIRST, b#182 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(a#181, 5) +- *(1) Project [a#181, b#182, c#183, d#184] +- *(1) SerializeFromObject [assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$DistinctAgg, true]).a AS a#181, assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$DistinctAgg, true]).b AS b#182, assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$DistinctAgg, true]).c AS c#183, assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$DistinctAgg, true]).d AS d#184, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$DistinctAgg, true]).e, true, false) AS e#185] +- Scan ExternalRDDScan[obj#180] ``` this PR add an exchange rule to ensure that no redundant physical plan SortExec is generated. ## How was this patch tested? add new unit tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/heary-cao/spark ExchangeWindow Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21139.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21139 ---- commit 4011b08aa7228195779a80eb66c4fe6dbff3352f Author: caoxuewen <cao.xuewen@...> Date: 2018-04-24T08:44:36Z Add a window exchange rule to eliminate redundant physical plan SortExec ---- --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org