[ https://issues.apache.org/jira/browse/SPARK-31869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-31869: ------------------------------------ Assignee: Apache Spark > BroadcastHashJoinExe's outputPartitioning can utilize the build side > -------------------------------------------------------------------- > > Key: SPARK-31869 > URL: https://issues.apache.org/jira/browse/SPARK-31869 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 3.1.0 > Reporter: Terry Kim > Assignee: Apache Spark > Priority: Minor > > Currently, the BroadcastHashJoinExec's outputPartitioning only uses the > streamed side's outputPartitioning. Thus, if the join key is from the build > side for the join where one side is BroadcastHashJoinExec: > {code:java} > spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "500") > val t1 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i1", "j1") > val t2 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i2", "j2") > val t3 = (0 until 20).map(i => (i % 7, i % 11)).toDF("i3", "j3") > val t4 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i4", "j4") > // join1 is a sort merge join. > val join1 = t1.join(t2, t1("i1") === t2("i2")) > // join2 is a broadcast join where t3 is broadcasted. > val join2 = join1.join(t3, join1("i1") === t3("i3")) > // Join on the column from the broadcasted side (i3). > val join3 = join2.join(t4, join2("i3") === t4("i4")) > join3.explain > {code} > it produces Exchange hashpartitioning(i2#103, 200): > {code:java} > == Physical Plan == > *(6) SortMergeJoin [i3#29], [i4#40], Inner > :- *(4) Sort [i3#29 ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(i3#29, 200), true, [id=#55] > : +- *(3) BroadcastHashJoin [i1#7], [i3#29], Inner, BuildRight > : :- *(3) SortMergeJoin [i1#7], [i2#18], Inner > : : :- *(1) Sort [i1#7 ASC NULLS FIRST], false, 0 > : : : +- Exchange hashpartitioning(i1#7, 200), true, [id=#28] > : : : +- LocalTableScan [i1#7, j1#8] > : : +- *(2) Sort [i2#18 ASC NULLS FIRST], false, 0 > : : +- Exchange hashpartitioning(i2#18, 200), true, [id=#29] > : : +- LocalTableScan [i2#18, j2#19] > : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, > int, false] as bigint))), [id=#34] > : +- LocalTableScan [i3#29, j3#30] > +- *(5) Sort [i4#40 ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(i4#40, 200), true, [id=#39] > +- LocalTableScan [i4#40, j4#41] > {code} > But, since BroadcastHashJoinExec is only for equi-join, if the streamed side > has HashPartitioning, BroadcastHashJoinExec can utilize the info to eliminate > the exchange. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org