Terry Kim created SPARK-31869:
---------------------------------

             Summary: BroadcastHashJoinExe's outputPartitioning can utilize the 
build side
                 Key: SPARK-31869
                 URL: https://issues.apache.org/jira/browse/SPARK-31869
             Project: Spark
          Issue Type: Improvement
          Components: SQL
    Affects Versions: 3.1.0
            Reporter: Terry Kim


Currently, the BroadcastHashJoinExec's outputPartitioning only uses the 
streamed side's outputPartitioning. Thus, if the join key is from the build 
side for the join where one side is BroadcastHashJoinExec:
{code:java}
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "500")
val df1 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i1", "j1")
val df2 = (0 until 20).map(i => (i % 7, i % 11)).toDF("i2", "j2")
val df3 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i3", "j3")
df1.write.format("parquet").bucketBy(8, "i1").saveAsTable("t1")
df3.write.format("parquet").bucketBy(8, "i3").saveAsTable("t3")
val t1 = spark.table("t1")
val t3 = spark.table("t3")
val join1 = t1.join(df2, t1("i1") === df2("i2"))
val join2 = join1.join(t3, join1("i2") === t3("i3"))
join2.explain
{code}
it produces Exchange hashpartitioning(i2#103, 200):
{code:java}
== Physical Plan ==
*(5) SortMergeJoin [i2#103], [i3#124], Inner
:- *(2) Sort [i2#103 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(i2#103, 200)
:     +- *(1) BroadcastHashJoin [i1#120], [i2#103], Inner, BuildRight
:        :- *(1) Project [i1#120, j1#121]
:        :  +- *(1) Filter isnotnull(i1#120)
:        :     +- *(1) FileScan parquet default.t1[i1#120,j1#121] Batched: 
true, Format: Parquet, Location: InMemoryFileIndex[], PartitionFilters: [], 
PushedFilters: [IsNotNull(i1)], ReadSchema: struct<i1:int,j1:int>, 
SelectedBucketsCount: 8 out of 8
:        +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, 
int, false] as bigint)))
:           +- LocalTableScan [i2#103, j2#104]
+- *(4) Sort [i3#124 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(i3#124, 200)
      +- *(3) Project [i3#124, j3#125]
         +- *(3) Filter isnotnull(i3#124)
            +- *(3) FileScan parquet default.t3[i3#124,j3#125] Batched: true, 
Format: Parquet, Location: InMemoryFileIndex[], PartitionFilters: [], 
PushedFilters: [IsNotNull(i3)], ReadSchema: struct<i3:int,j3:int>, 
SelectedBucketsCount: 8 out of 8
{code}
 But, since BroadcastHashJoinExec is only for equi-join, if the streamed side 
has HashPartitioning, BroadcastHashJoinExec can utilize the info to eliminate 
the exchange.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to