Terry Kim created SPARK-31869: --------------------------------- Summary: BroadcastHashJoinExe's outputPartitioning can utilize the build side Key: SPARK-31869 URL: https://issues.apache.org/jira/browse/SPARK-31869 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.1.0 Reporter: Terry Kim
Currently, the BroadcastHashJoinExec's outputPartitioning only uses the streamed side's outputPartitioning. Thus, if the join key is from the build side for the join where one side is BroadcastHashJoinExec: {code:java} spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "500") val df1 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i1", "j1") val df2 = (0 until 20).map(i => (i % 7, i % 11)).toDF("i2", "j2") val df3 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i3", "j3") df1.write.format("parquet").bucketBy(8, "i1").saveAsTable("t1") df3.write.format("parquet").bucketBy(8, "i3").saveAsTable("t3") val t1 = spark.table("t1") val t3 = spark.table("t3") val join1 = t1.join(df2, t1("i1") === df2("i2")) val join2 = join1.join(t3, join1("i2") === t3("i3")) join2.explain {code} it produces Exchange hashpartitioning(i2#103, 200): {code:java} == Physical Plan == *(5) SortMergeJoin [i2#103], [i3#124], Inner :- *(2) Sort [i2#103 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(i2#103, 200) : +- *(1) BroadcastHashJoin [i1#120], [i2#103], Inner, BuildRight : :- *(1) Project [i1#120, j1#121] : : +- *(1) Filter isnotnull(i1#120) : : +- *(1) FileScan parquet default.t1[i1#120,j1#121] Batched: true, Format: Parquet, Location: InMemoryFileIndex[], PartitionFilters: [], PushedFilters: [IsNotNull(i1)], ReadSchema: struct<i1:int,j1:int>, SelectedBucketsCount: 8 out of 8 : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) : +- LocalTableScan [i2#103, j2#104] +- *(4) Sort [i3#124 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(i3#124, 200) +- *(3) Project [i3#124, j3#125] +- *(3) Filter isnotnull(i3#124) +- *(3) FileScan parquet default.t3[i3#124,j3#125] Batched: true, Format: Parquet, Location: InMemoryFileIndex[], PartitionFilters: [], PushedFilters: [IsNotNull(i3)], ReadSchema: struct<i3:int,j3:int>, SelectedBucketsCount: 8 out of 8 {code} But, since BroadcastHashJoinExec is only for equi-join, if the streamed side has HashPartitioning, BroadcastHashJoinExec can utilize the info to eliminate the exchange. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org