Cheng Su created SPARK-32383: -------------------------------- Summary: Preserve hash join (BHJ and SHJ) stream side ordering Key: SPARK-32383 URL: https://issues.apache.org/jira/browse/SPARK-32383 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.1.0 Reporter: Cheng Su
Currently `BroadcastHashJoinExec` and `ShuffledHashJoinExec` do not preserve children output ordering information (inherit from `SparkPlan.outputOrdering`, which is Nil). This can add unnecessary sort in complex queries involved multiple joins. Example: {code:java} withSQLConf( SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50") { val df1 = spark.range(100).select($"id".as("k1")) val df2 = spark.range(100).select($"id".as("k2")) val df3 = spark.range(3).select($"id".as("k3")) val df4 = spark.range(100).select($"id".as("k4")) val plan = df1.join(df2, $"k1" === $"k2") .join(df3, $"k1" === $"k3") .join(df4, $"k1" === $"k4") .queryExecution .executedPlan } {code} Current physical plan (extra sort on `k1` before top sort merge join): {code:java} *(9) SortMergeJoin [k1#220L], [k4#232L], Inner :- *(6) Sort [k1#220L ASC NULLS FIRST], false, 0 : +- *(6) BroadcastHashJoin [k1#220L], [k3#228L], Inner, BuildRight : :- *(6) SortMergeJoin [k1#220L], [k2#224L], Inner : : :- *(2) Sort [k1#220L ASC NULLS FIRST], false, 0 : : : +- Exchange hashpartitioning(k1#220L, 5), true, [id=#128] : : : +- *(1) Project [id#218L AS k1#220L] : : : +- *(1) Range (0, 100, step=1, splits=2) : : +- *(4) Sort [k2#224L ASC NULLS FIRST], false, 0 : : +- Exchange hashpartitioning(k2#224L, 5), true, [id=#134] : : +- *(3) Project [id#222L AS k2#224L] : : +- *(3) Range (0, 100, step=1, splits=2) : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])), [id=#141] : +- *(5) Project [id#226L AS k3#228L] : +- *(5) Range (0, 3, step=1, splits=2) +- *(8) Sort [k4#232L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(k4#232L, 5), true, [id=#148] +- *(7) Project [id#230L AS k4#232L] +- *(7) Range (0, 100, step=1, splits=2) {code} Ideal physical plan (no extra sort on `k1` before top sort merge join): {code:java} *(9) SortMergeJoin [k1#220L], [k4#232L], Inner :- *(6) BroadcastHashJoin [k1#220L], [k3#228L], Inner, BuildRight : :- *(6) SortMergeJoin [k1#220L], [k2#224L], Inner : : :- *(2) Sort [k1#220L ASC NULLS FIRST], false, 0 : : : +- Exchange hashpartitioning(k1#220L, 5), true, [id=#127] : : : +- *(1) Project [id#218L AS k1#220L] : : : +- *(1) Range (0, 100, step=1, splits=2) : : +- *(4) Sort [k2#224L ASC NULLS FIRST], false, 0 : : +- Exchange hashpartitioning(k2#224L, 5), true, [id=#133] : : +- *(3) Project [id#222L AS k2#224L] : : +- *(3) Range (0, 100, step=1, splits=2) : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])), [id=#140] : +- *(5) Project [id#226L AS k3#228L] : +- *(5) Range (0, 3, step=1, splits=2) +- *(8) Sort [k4#232L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(k4#232L, 5), true, [id=#146] +- *(7) Project [id#230L AS k4#232L] +- *(7) Range (0, 100, step=1, splits=2){code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org