when I try to do a Broadcast Hash Join on a bigger table (6Mil rows) I get an incorrect result of 0 rows.
val rightDF = spark.read.format("parquet").load("table-a") val leftDF = spark.read.format("parquet").load("table-b") //needed to activate dynamic pruning subquery .where('part_ts === 20210304000L) // leftDF has 7 Mil rows ~ 120 MB val join = broadcast(leftDF).join(rightDF, $"match_part_id" === $"part_id" && $"match_id" === $"id" ) join.count res1: Long = 0 I think it's connected with Dynamic Partition Pruning of the rightDF, which is happening according to the plan: PartitionFilters: [isnotnull(part_id#477L), dynamicpruningexpression(part_id#477L IN dynamicpruning#534)] ===== Subqueries ===== Subquery:1 Hosting operator id = 6 Hosting Expression = part_id#477L IN dynamicpruning#534 ReusedExchange (11) (11) ReusedExchange [Reuses operator id: 5] Output [4]: [match_part_id#487L, match_id#488L, UK#489, part_ts#490L] *Removing the broadcast hint OR shrinking the broadcasted table corrects the result*: val rightDF = spark.read.format("parquet").load("table-a") val leftDF = spark.read.format("parquet").load("table-b") //needed to activate dynamic pruning subquery .where('part_ts === 20210304000L) // shrinks the broadcasted table to 18K rows .where('match_id === 33358792) // leftDF has 18K rows val join = broadcast(leftDF).join(rightDF, $"match_part_id" === $"part_id" && $"match_id" === $"id" ) join.count res2: Long = 379701 I would expect the broadcast to fail, but would never expect to get incorrect results without an exception. What do you think ? BR, Tomas