yucai created SPARK-24556: ----------------------------- Summary: ReusedExchange should rewrite output partitioning also when child's partitioning is RangePartitioning Key: SPARK-24556 URL: https://issues.apache.org/jira/browse/SPARK-24556 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.3.2 Reporter: yucai
Currently, ReusedExchange would rewrite output partitioning if child's partitioning is HashPartitioning, but it does not do the same when child's partitioning is RangePartitioning, sometimes, it could introduce extra shuffle, see: {code:scala} val df = Seq(1 -> "a", 3 -> "c", 2 -> "b").toDF("i", "j") val df1 = df.as("t1") val df2 = df.as("t2") val t = df1.orderBy("j").join(df2.orderBy("j"), $"t1.i" === $"t2.i", "right") t.cache.orderBy($"t2.j").explain {code} Before fix: {code:sql} == Physical Plan == *(1) Sort [j#14 ASC NULLS FIRST], true, 0 +- Exchange rangepartitioning(j#14 ASC NULLS FIRST, 200) +- InMemoryTableScan [i#5, j#6, i#13, j#14] +- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) : +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0 : +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200) : +- LocalTableScan [i#5, j#6] +- *(2) Sort [j#14 ASC NULLS FIRST], true, 0 +- ReusedExchange [i#13, j#14], Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200) ,None) +- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) : +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0 : +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200) : +- LocalTableScan [i#5, j#6] +- *(2) Sort [j#14 ASC NULLS FIRST], true, 0 +- ReusedExchange [i#13, j#14], Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200) {code} Better plan should avoid "Exchange rangepartitioning(j#14 ASC NULLS FIRST, 200)", like: {code:sql} == Physical Plan == *(1) Sort [j#14 ASC NULLS FIRST], true, 0 +- InMemoryTableScan [i#5, j#6, i#13, j#14] +- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) : +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0 : +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200) : +- LocalTableScan [i#5, j#6] +- *(2) Sort [j#14 ASC NULLS FIRST], true, 0 +- ReusedExchange [i#13, j#14], Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200) ,None) +- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) : +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0 : +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200) : +- LocalTableScan [i#5, j#6] +- *(2) Sort [j#14 ASC NULLS FIRST], true, 0 +- ReusedExchange [i#13, j#14], Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org