yucai created SPARK-24556:
-----------------------------

             Summary: ReusedExchange should rewrite output partitioning also 
when child's partitioning is RangePartitioning
                 Key: SPARK-24556
                 URL: https://issues.apache.org/jira/browse/SPARK-24556
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 2.3.2
            Reporter: yucai


Currently, ReusedExchange would rewrite output partitioning if child's 
partitioning is HashPartitioning, but it does not do the same when child's 
partitioning is RangePartitioning, sometimes, it could introduce extra shuffle, 
see:

{code:scala}
val df = Seq(1 -> "a", 3 -> "c", 2 -> "b").toDF("i", "j")
val df1 = df.as("t1")
val df2 = df.as("t2")
val t = df1.orderBy("j").join(df2.orderBy("j"), $"t1.i" === $"t2.i", "right")
t.cache.orderBy($"t2.j").explain
{code}

Before fix:
{code:sql}
== Physical Plan ==
*(1) Sort [j#14 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(j#14 ASC NULLS FIRST, 200)
   +- InMemoryTableScan [i#5, j#6, i#13, j#14]
         +- InMemoryRelation [i#5, j#6, i#13, j#14], 
CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 
replicas),*(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft
:- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] 
as bigint)))
:  +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0
:     +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200)
:        +- LocalTableScan [i#5, j#6]
+- *(2) Sort [j#14 ASC NULLS FIRST], true, 0
   +- ReusedExchange [i#13, j#14], Exchange rangepartitioning(j#6 ASC NULLS 
FIRST, 200)
,None)
               +- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft
                  :- BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
                  :  +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0
                  :     +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200)
                  :        +- LocalTableScan [i#5, j#6]
                  +- *(2) Sort [j#14 ASC NULLS FIRST], true, 0
                     +- ReusedExchange [i#13, j#14], Exchange 
rangepartitioning(j#6 ASC NULLS FIRST, 200)
{code}

Better plan should avoid "Exchange rangepartitioning(j#14 ASC NULLS FIRST, 
200)", like:
{code:sql}
== Physical Plan ==
*(1) Sort [j#14 ASC NULLS FIRST], true, 0
+- InMemoryTableScan [i#5, j#6, i#13, j#14]
      +- InMemoryRelation [i#5, j#6, i#13, j#14], 
CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 
replicas),*(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft
:- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] 
as bigint)))
:  +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0
:     +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200)
:        +- LocalTableScan [i#5, j#6]
+- *(2) Sort [j#14 ASC NULLS FIRST], true, 0
   +- ReusedExchange [i#13, j#14], Exchange rangepartitioning(j#6 ASC NULLS 
FIRST, 200)
,None)
            +- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft
               :- BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
               :  +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0
               :     +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200)
               :        +- LocalTableScan [i#5, j#6]
               +- *(2) Sort [j#14 ASC NULLS FIRST], true, 0
                  +- ReusedExchange [i#13, j#14], Exchange 
rangepartitioning(j#6 ASC NULLS FIRST, 200)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to