We don't know that the table is small unless you cache it.  In Spark 1.5
you'll be able to give us a hint though (
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L581
)

On Wed, Jul 1, 2015 at 8:30 AM, Srikanth <srikanth...@gmail.com> wrote:

> Hello,
>
>
>
> I have a straight forward use case of joining a large table with a smaller
> table. The small table is within the limit I set for
> spark.sql.autoBroadcastJoinThreshold.
>
> I notice that ShuffledHashJoin is used to perform the join.
> BroadcastHashJoin was used only when I pre-fetched and cached the small
> table.
>
> I understand that for typical broadcast we would have to read and
> collect() the small table in driver before broadcasting.
>
> Why not do this automatically for joins? That way stage1(read large table)
> and stage2(read small table) can still be run in parallel.
>
>
>
>
>
> Sort [emailId#19 ASC,date#0 ASC], true
>
>  Exchange (RangePartitioning 24)
>
>   Project [emailId#19,ip#7,date#0,siteName#1,uri#3,status#12,csbytes#16L]
>
>    Filter ((lowerTime#22 <= date#0) && (date#0 <= upperTime#23))
>
>     *ShuffledHashJoin* [ip#7], [ip#18], BuildRight
>
>      Exchange (HashPartitioning 24)
>
>       Project [ip#7,date#0,siteName#1,uri#3,status#12,csbytes#16L]
>
>        PhysicalRDD
> [date#0,siteName#1,method#2,uri#3,query#4,port#5,userName#6],
> MapPartitionsRDD[6] at rddToDataFrameHolder at DataSourceReader.scala:25
>
>      Exchange (HashPartitioning 24)
>
>       Project [emailId#19,scalaUDF(date#20) AS
> upperTime#23,ip#18,scalaUDF(date#20) AS lowerTime#22]
>
>        PhysicalRDD [ip#18,emailId#19,date#20], MapPartitionsRDD[12] at
> rddToDataFrameHolder at DataSourceReader.scala:41
>
>
> Srikanth
>

Reply via email to