GitHub user heary-cao opened a pull request:

    https://github.com/apache/spark/pull/22945

    [SPARK-24066][SQL]Add new optimization rule to eliminate unnecessary sort 
by exchanged adjacent Window expressions

    ## What changes were proposed in this pull request?
    
    Currently, when two adjacent window functions have the same partition and 
the same intersection of order, 
     There will be two sorted after shuffling, which is not necessary. This PR 
adds a new optimization rule to eliminate unnecessary sort by exchanged 
adjacent Window expressions.
    
    For example:
    
    'val df = Seq(("a", "p1", 10.0, 20.0, 30.0), ("a", "p2", 20.0, 10.0, 
40.0)).toDF("key", "value", "value1", "value2", "value3").select($"key", 
sum("value1").over(Window.partitionBy("key").orderBy("value")), 
max("value2").over(Window.partitionBy("key").orderBy("value", "value1")), 
avg("value3").over(Window.partitionBy("key").orderBy("value", "value1", 
"value2"))).queryExecution.executedPlan
    println(df)'
     
    Before  this PR:
    
    '*(5) Project key#16, sum(value1) OVER (PARTITION BY key ORDER BY value ASC 
NULLS FIRST unspecifiedframe$())#29, max(value2) OVER (PARTITION BY key ORDER 
BY value ASC NULLS FIRST, value1 ASC NULLS FIRST unspecifiedframe$())#30, 
avg(value3) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC 
NULLS FIRST, value2 ASC NULLS FIRST unspecifiedframe$())#31
     +- Window max(value2#19) windowspecdefinition(key#16, value#17 ASC NULLS 
FIRST, value1#18 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, 
unboundedpreceding$(), currentrow$())) AS max(value2) OVER (PARTITION BY key 
ORDER BY value ASC NULLS FIRST, value1 ASC NULLS FIRST unspecifiedframe$())#30, 
key#16, value#17 ASC NULLS FIRST, value1#18 ASC NULLS FIRST
        +- *(4) Project key#16, value1#18, value#17, value2#19, sum(value1) 
OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST unspecifiedframe$())#29, 
avg(value3) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC 
NULLS FIRST, value2 ASC NULLS FIRST unspecifiedframe$())#31
           +- Window avg(value3#20) windowspecdefinition(key#16, value#17 ASC 
NULLS FIRST, value1#18 ASC NULLS FIRST, value2#19 ASC NULLS FIRST, 
specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS 
avg(value3) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC 
NULLS FIRST, value2 ASC NULLS FIRST unspecifiedframe$())#31, key#16, value#17 
ASC NULLS FIRST, value1#18 ASC NULLS FIRST, value2#19 ASC NULLS FIRST
              +- *(3) Sort key#16 ASC NULLS FIRST, value#17 ASC NULLS FIRST, 
value1#18 ASC NULLS FIRST, value2#19 ASC NULLS FIRST, false, 0
                 +- Window sum(value1#18) windowspecdefinition(key#16, value#17 
ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), 
currentrow$())) AS sum(value1) OVER (PARTITION BY key ORDER BY value ASC NULLS 
FIRST unspecifiedframe$())#29, key#16, value#17 ASC NULLS FIRST
                    +- *(2) Sort key#16 ASC NULLS FIRST, value#17 ASC NULLS 
FIRST, false, 0
                       +- Exchange hashpartitioning(key#16, 5)
                          +- *(1) Project _1#5 AS key#16, _3#7 AS value1#18, 
_2#6 AS value#17, _4#8 AS value2#19, _5#9 AS value3#20
                             +- LocalTableScan _1#5, _2#6, _3#7, _4#8, _5#9'
    
    After  this PR:
    
    '*(5) Project key#16, sum(value1) OVER (PARTITION BY key ORDER BY value ASC 
NULLS FIRST unspecifiedframe$())#29, max(value2) OVER (PARTITION BY key ORDER 
BY value ASC NULLS FIRST, value1 ASC NULLS FIRST unspecifiedframe$())#30, 
avg(value3) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC 
NULLS FIRST, value2 ASC NULLS FIRST unspecifiedframe$())#31
     +- Window sum(value1#18) windowspecdefinition(key#16, value#17 ASC NULLS 
FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) 
AS sum(value1) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST 
unspecifiedframe$())#29, key#16, value#17 ASC NULLS FIRST
        +- *(4) Project key#16, value1#18, value#17, avg(value3) OVER 
(PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC NULLS FIRST, 
value2 ASC NULLS FIRST unspecifiedframe$())#31, max(value2) OVER (PARTITION BY 
key ORDER BY value ASC NULLS FIRST, value1 ASC NULLS FIRST 
unspecifiedframe$())#30
           +- Window max(value2#19) windowspecdefinition(key#16, value#17 ASC 
NULLS FIRST, value1#18 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, 
unboundedpreceding$(), currentrow$())) AS max(value2) OVER (PARTITION BY key 
ORDER BY value ASC NULLS FIRST, value1 ASC NULLS FIRST unspecifiedframe$())#30, 
key#16, value#17 ASC NULLS FIRST, value1#18 ASC NULLS FIRST
              +- *(3) Project key#16, value1#18, value#17, value2#19, 
avg(value3) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC 
NULLS FIRST, value2 ASC NULLS FIRST unspecifiedframe$())#31
                 +- Window avg(value3#20) windowspecdefinition(key#16, value#17 
ASC NULLS FIRST, value1#18 ASC NULLS FIRST, value2#19 ASC NULLS FIRST, 
specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS 
avg(value3) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC 
NULLS FIRST, value2 ASC NULLS FIRST unspecifiedframe$())#31, key#16, value#17 
ASC NULLS FIRST, value1#18 ASC NULLS FIRST, value2#19 ASC NULLS FIRST
                    +- *(2) Sort key#16 ASC NULLS FIRST, value#17 ASC NULLS 
FIRST, value1#18 ASC NULLS FIRST, value2#19 ASC NULLS FIRST, false, 0
                       +- Exchange hashpartitioning(key#16, 5)
                          +- *(1) Project _1#5 AS key#16, _3#7 AS value1#18, 
_2#6 AS value#17, _4#8 AS value2#19, _5#9 AS value3#20
                             +- LocalTableScan _1#5, _2#6, _3#7, _4#8, _5#9'
    
    ## How was this patch tested?
    
    add new unit tested


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/heary-cao/spark UnnecessarySort

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/22945.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #22945
    
----
commit d36dc0e698eddb67dd36d067e8a6e21dfbcf1f50
Author: caoxuewen <cao.xuewen@...>
Date:   2018-11-05T09:27:53Z

    Add new optimization rule to eliminate unnecessary sort by exchanged 
adjacent Window expressions

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to