Kousuke Saruta created SPARK-32820: -------------------------------------- Summary: Remove redundant shuffle exchanges inserted by EnsureRequirements Key: SPARK-32820 URL: https://issues.apache.org/jira/browse/SPARK-32820 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.1.0 Reporter: Kousuke Saruta Assignee: Kousuke Saruta
Redundant repartition operations are removed by CollapseRepartition rule but EnsureRequirements can insert another HashPartitioning or RangePartitioning immediately after the repartition, leading adjacent ShuffleExchanges will be in the physical plan. {code:java} val ordered = spark.range(1, 100).repartitionByRange(10, $"id".desc).orderBy($"id") ordered.explain(true) ... == Physical Plan == *(2) Sort [id#0L ASC NULLS FIRST], true, 0 +- Exchange rangepartitioning(id#0L ASC NULLS FIRST, 200), true, [id=#15] +- Exchange rangepartitioning(id#0L DESC NULLS LAST, 10), false, [id=#14] +- *(1) Range (1, 100, step=1, splits=12){code} {code:java} spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 0) val left = Seq(1,2,3).toDF.repartition(10, $"value") val right = Seq(1,2,3).toDF val joined = left.join(right, left("value") + 1 === right("value") joined.explain(true) ... == Physical Plan == *(3) SortMergeJoin [(value#7 + 1)], [value#12], Inner :- *(1) Sort [(value#7 + 1) ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning((value#7 + 1), 200), true, [id=#67] : +- Exchange hashpartitioning(value#7, 10), false, [id=#63] : +- LocalTableScan [value#7] +- *(2) Sort [value#12 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(value#12, 200), true, [id=#68] +- LocalTableScan [value#12]{code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org