GitHub user heary-cao opened a pull request:

    https://github.com/apache/spark/pull/21139

    [SPARK-24066][SQL]Add a window exchange rule to eliminate redundant 
physical plan SortExec

    ## What changes were proposed in this pull request?
    Currently, when the order field of window function has a subset 
relationship, SparkSQL will randomly generate different physical plan.
    Similar like:
    
    ```
    case class DistinctAgg(a: Int, b: Float, c: Double, d: Int, e: String)
    val df = spark.sparkContext.parallelize(
          DistinctAgg(8, 2, 3, 4, "a") ::
          DistinctAgg(9, 3, 4, 5, "b") ::
          DistinctAgg(3, 4, 5, 6, "c") ::
          DistinctAgg(3, 4, 5, 7, "c") ::
          DistinctAgg(3, 4, 5, 8, "c") ::
          DistinctAgg(3, 6, 6, 9, "d") ::
          DistinctAgg(30, 40, 50, 60, "e") ::
          DistinctAgg(41, 51, 61, 71, null) ::
          DistinctAgg(42, 52, 62, 72, null) ::
          DistinctAgg(43, 53, 63, 73, "k") ::Nil).toDF()
    df.createOrReplaceTempView("distinctAgg")
    
    select a, b, c, 
    avg(b) over(partition by a order by b) as sumIb, 
    sum(d) over(partition by a order by b, c) as sumId, d 
    from distinctAgg    
    
    ```
    The physics plan will produce different results randomly.           
    **One**: there is only one sort of physical plan     
    ```
    == Physical Plan ==
    *(3) Project [a#181, b#182, c#183, sumId#210L, sumIb#209L, d#184]
    +- Window [sum(cast(b#182 as bigint)) windowspecdefinition(a#181, b#182 ASC 
NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), 
currentrow$())) AS sumIb#209L], [a#181], [b#182 ASC NULLS FIRST]
       +- Window [sum(cast(d#184 as bigint)) windowspecdefinition(a#181, b#182 
ASC NULLS FIRST, c#183 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, 
unboundedpreceding$(), currentrow$())) AS sumId#210L], [a#181], [b#182 ASC 
NULLS FIRST, c#183 ASC NULLS FIRST]
          +- *(2) Sort [a#181 ASC NULLS FIRST, b#182 ASC NULLS FIRST, c#183 ASC 
NULLS FIRST], false, 0
             +- Exchange hashpartitioning(a#181, 5)
                +- *(1) Project [a#181, b#182, c#183, d#184]
                   +- *(1) SerializeFromObject [assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$DistinctAgg, true]).a AS a#181, 
assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$DistinctAgg, 
true]).b AS b#182, assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$DistinctAgg, true]).c AS c#183, 
assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$DistinctAgg, 
true]).d AS d#184, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, 
StringType, fromString, assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$DistinctAgg, true]).e, true, false) AS 
e#185]
                      +- Scan ExternalRDDScan[obj#180]
    
    ```
    **Another one**: there is two sort of physical plans
    ```
    == Physical Plan ==
    *(4) Project [a#181, b#182, c#183, sumId#210L, sumIb#209L, d#184]
    +- Window [sum(cast(d#184 as bigint)) windowspecdefinition(a#181, b#182 ASC 
NULLS FIRST, c#183 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, 
unboundedpreceding$(), currentrow$())) AS sumId#210L], [a#181], [b#182 ASC 
NULLS FIRST, c#183 ASC NULLS FIRST]
       +- *(3) Sort [a#181 ASC NULLS FIRST, b#182 ASC NULLS FIRST, c#183 ASC 
NULLS FIRST], false, 0
          +- Window [sum(cast(b#182 as bigint)) windowspecdefinition(a#181, 
b#182 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), 
currentrow$())) AS sumIb#209L], [a#181], [b#182 ASC NULLS FIRST]
             +- *(2) Sort [a#181 ASC NULLS FIRST, b#182 ASC NULLS FIRST], 
false, 0
                +- Exchange hashpartitioning(a#181, 5)
                   +- *(1) Project [a#181, b#182, c#183, d#184]
                      +- *(1) SerializeFromObject [assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$DistinctAgg, true]).a AS a#181, 
assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$DistinctAgg, 
true]).b AS b#182, assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$DistinctAgg, true]).c AS c#183, 
assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$DistinctAgg, 
true]).d AS d#184, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, 
StringType, fromString, assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$DistinctAgg, true]).e, true, false) AS 
e#185]
                         +- Scan ExternalRDDScan[obj#180]
    
    ```
    this PR add an exchange rule to ensure that no redundant physical plan 
SortExec is generated.
    
    ## How was this patch tested?
    
    add new unit tests.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/heary-cao/spark ExchangeWindow

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/21139.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21139
    
----
commit 4011b08aa7228195779a80eb66c4fe6dbff3352f
Author: caoxuewen <cao.xuewen@...>
Date:   2018-04-24T08:44:36Z

    Add a window exchange rule to eliminate redundant physical plan SortExec

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to