[ https://issues.apache.org/jira/browse/SPARK-24904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555578#comment-16555578 ]
Shay Elbaz commented on SPARK-24904: ------------------------------------ [~mgaido] Technically you *can* that, you just need an additional shuffle (after the map side join) to fill in the missing rows as you mentioned. And since the current implementation already shuffles, I don't see how it makes sense to involve the entire big table in the shuffle. Instead, Spark could do the following: # Broadcast the small table. # Just link inner join, load the big table and hash-join. The output is (expected to be) very small compared to the big table. # Keep the small table broadcasted, and shuffle the results from last stage (say, sort-merge). # Now on each task, fill in missing rows from the broadcasted table. This is trivial if using sort-merge and the broadcasted table is just another partition to merge. As I mentioned in the description, this is can be achieved by the user using 2 joins, but shouldn't Spark offer this by default? Needless to say how sub optimal the current implementation is compared to the above plan. Am I missing something? > Join with broadcasted dataframe causes shuffle of redundant data > ---------------------------------------------------------------- > > Key: SPARK-24904 > URL: https://issues.apache.org/jira/browse/SPARK-24904 > Project: Spark > Issue Type: Question > Components: SQL > Affects Versions: 2.1.2 > Reporter: Shay Elbaz > Priority: Minor > > When joining a "large" dataframe with broadcasted small one, and join-type is > on the small DF side (see right-join below), the physical plan does not > include broadcasting the small table. But when the join is on the large DF > side, the broadcast does take place. Is there a good reason for this? In the > below example it sure doesn't make any sense to shuffle the entire large > table: > > {code:java} > val small = spark.range(1, 10) > val big = spark.range(1, 1 << 30) > .withColumnRenamed("id", "id2") > big.join(broadcast(small), $"id" === $"id2", "right") > .explain > //OUTPUT: > == Physical Plan == > SortMergeJoin [id2#16307L], [id#16310L], RightOuter > :- *Sort [id2#16307L ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(id2#16307L, 1000) > : +- *Project [id#16304L AS id2#16307L] > : +- *Range (1, 1073741824, step=1, splits=Some(600)) > +- *Sort [id#16310L ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(id#16310L, 1000) > +- *Range (1, 10, step=1, splits=Some(600)) > {code} > As a workaround, users need to perform inner instead of right join, and then > join the result back with the small DF to fill the missing rows. > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org