[ 
https://issues.apache.org/jira/browse/SPARK-24904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555652#comment-16555652
 ] 

Marco Gaido commented on SPARK-24904:
-------------------------------------

I see now what you mean, but yes, It think there is an assumption you are doing 
which is not always true, ie. "The output is (expected to be) very small 
compared to the big table". That is not true. If all the rows from the big 
table match the small one, this is not the case. We may trying to do something 
like what you mentioned in the optimizer if CBO is enabled and we have good 
enough statistics about the output size of the inner join, but i am not sure.

> Join with broadcasted dataframe causes shuffle of redundant data
> ----------------------------------------------------------------
>
>                 Key: SPARK-24904
>                 URL: https://issues.apache.org/jira/browse/SPARK-24904
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.1.2
>            Reporter: Shay Elbaz
>            Priority: Minor
>
> When joining a "large" dataframe with broadcasted small one, and join-type is 
> on the small DF side (see right-join below), the physical plan falls back to 
> sort merge join. But when the join is on the large DF side, the broadcast 
> does take place. Is there a good reason for this? In the below example it 
> sure doesn't make any sense to shuffle the entire large table:
>  
> {code:java}
> val small = spark.range(1, 10)
> val big = spark.range(1, 1 << 30)
>   .withColumnRenamed("id", "id2")
> big.join(broadcast(small), $"id" === $"id2", "right")
> .explain
> //OUTPUT:
> == Physical Plan == 
> SortMergeJoin [id2#16307L], [id#16310L], RightOuter 
> :- *Sort [id2#16307L ASC NULLS FIRST], false, 0
>  :  +- Exchange hashpartitioning(id2#16307L, 1000)
>  :     +- *Project [id#16304L AS id2#16307L]
>  :        +- *Range (1, 1073741824, step=1, splits=Some(600))
>  +- *Sort [id#16310L ASC NULLS FIRST], false, 0
>     +- Exchange hashpartitioning(id#16310L, 1000)
>        +- *Range (1, 10, step=1, splits=Some(600))
> {code}
> As a workaround, users need to perform inner instead of right join, and then 
> join the result back with the small DF to fill the missing rows.
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to