GitHub user yucai opened a pull request:

    https://github.com/apache/spark/pull/21564

    [SPARK-24556][SQL] ReusedExchange should rewrite output partitioning also 
when child's partitioning is RangePartitioning

    ## What changes were proposed in this pull request?
    
    Currently, ReusedExchange would rewrite output partitioning if child's 
partitioning is HashPartitioning, but it does not do the same when child's 
partitioning is RangePartitioning, sometimes, it could introduce extra shuffle, 
see:
    
    ```
    val df = Seq(1 -> "a", 3 -> "c", 2 -> "b").toDF("i", "j")
    val df1 = df.as("t1")
    val df2 = df.as("t2")
    val t = df1.orderBy("j").join(df2.orderBy("j"), $"t1.i" === $"t2.i", 
"right")
    t.cache.orderBy($"t2.j").explain
    ```
    Before:
    ```
    == Physical Plan ==
    *(1) Sort [j#14 ASC NULLS FIRST], true, 0
    +- Exchange rangepartitioning(j#14 ASC NULLS FIRST, 200)
       +- InMemoryTableScan [i#5, j#6, i#13, j#14]
             +- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder...
                   +- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, 
BuildLeft
                      :- BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, false] as...
                      :  +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0
                      :     +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 
200)
                      :        +- LocalTableScan [i#5, j#6]
                      +- *(2) Sort [j#14 ASC NULLS FIRST], true, 0
                         +- ReusedExchange [i#13, j#14], Exchange 
rangepartitioning(j#6 ASC NULLS FIRST, 200)
    ```
    Better plan should avoid ```Exchange rangepartitioning(j#14 ASC NULLS 
FIRST, 200)```, like:
    ```
    == Physical Plan ==
    *(1) Sort [j#14 ASC NULLS FIRST], true, 0
    +- InMemoryTableScan [i#5, j#6, i#13, j#14]
          +- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder...
                +- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft
                   :- BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
                   :  +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0
                   :     +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200)
                   :        +- LocalTableScan [i#5, j#6]
                   +- *(2) Sort [j#14 ASC NULLS FIRST], true, 0
                      +- ReusedExchange [i#13, j#14], Exchange 
rangepartitioning(j#6 ASC NULLS FIRST, 200)
    ```
    
    ## How was this patch tested?
    
    Add new tests.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/yucai/spark SPARK-24556

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/21564.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21564
    
----

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to