[ https://issues.apache.org/jira/browse/SPARK-24556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Wenchen Fan resolved SPARK-24556. --------------------------------- Resolution: Fixed Fix Version/s: 2.4.0 Issue resolved by pull request 21564 [https://github.com/apache/spark/pull/21564] > ReusedExchange should rewrite output partitioning also when child's > partitioning is RangePartitioning > ----------------------------------------------------------------------------------------------------- > > Key: SPARK-24556 > URL: https://issues.apache.org/jira/browse/SPARK-24556 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.3.2 > Reporter: yucai > Assignee: yucai > Priority: Major > Fix For: 2.4.0 > > > Currently, ReusedExchange would rewrite output partitioning if child's > partitioning is HashPartitioning, but it does not do the same when child's > partitioning is RangePartitioning, sometimes, it could introduce extra > shuffle, see: > {code:java} > val df = Seq(1 -> "a", 3 -> "c", 2 -> "b").toDF("i", "j") > val df1 = df.as("t1") > val df2 = df.as("t2") > val t = df1.orderBy("j").join(df2.orderBy("j"), $"t1.i" === $"t2.i", "right") > t.cache.orderBy($"t2.j").explain > {code} > Before fix: > {code:sql} > == Physical Plan == > *(1) Sort [j#14 ASC NULLS FIRST], true, 0 > +- Exchange rangepartitioning(j#14 ASC NULLS FIRST, 200) > +- InMemoryTableScan [i#5, j#6, i#13, j#14] > +- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder... > +- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft > :- BroadcastExchange > HashedRelationBroadcastMode(List(cast(input[0, int, false] as... > : +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0 > : +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, > 200) > : +- LocalTableScan [i#5, j#6] > +- *(2) Sort [j#14 ASC NULLS FIRST], true, 0 > +- ReusedExchange [i#13, j#14], Exchange > rangepartitioning(j#6 ASC NULLS FIRST, 200) > {code} > Better plan should avoid "Exchange rangepartitioning(j#14 ASC NULLS FIRST, > 200)", like: > {code:sql} > == Physical Plan == > *(1) Sort [j#14 ASC NULLS FIRST], true, 0 > +- InMemoryTableScan [i#5, j#6, i#13, j#14] > +- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder... > +- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft > :- BroadcastExchange > HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) > : +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0 > : +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200) > : +- LocalTableScan [i#5, j#6] > +- *(2) Sort [j#14 ASC NULLS FIRST], true, 0 > +- ReusedExchange [i#13, j#14], Exchange > rangepartitioning(j#6 ASC NULLS FIRST, 200) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org