Good to know this will be in next release. Thanks.

On Wed, Jul 1, 2015 at 3:13 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> We don't know that the table is small unless you cache it.  In Spark 1.5
> you'll be able to give us a hint though (
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L581
> )
>
> On Wed, Jul 1, 2015 at 8:30 AM, Srikanth <srikanth...@gmail.com> wrote:
>
>> Hello,
>>
>>
>>
>> I have a straight forward use case of joining a large table with a
>> smaller table. The small table is within the limit I set for
>> spark.sql.autoBroadcastJoinThreshold.
>>
>> I notice that ShuffledHashJoin is used to perform the join.
>> BroadcastHashJoin was used only when I pre-fetched and cached the small
>> table.
>>
>> I understand that for typical broadcast we would have to read and
>> collect() the small table in driver before broadcasting.
>>
>> Why not do this automatically for joins? That way stage1(read large
>> table) and stage2(read small table) can still be run in parallel.
>>
>>
>>
>>
>>
>> Sort [emailId#19 ASC,date#0 ASC], true
>>
>>  Exchange (RangePartitioning 24)
>>
>>   Project [emailId#19,ip#7,date#0,siteName#1,uri#3,status#12,csbytes#16L]
>>
>>    Filter ((lowerTime#22 <= date#0) && (date#0 <= upperTime#23))
>>
>>     *ShuffledHashJoin* [ip#7], [ip#18], BuildRight
>>
>>      Exchange (HashPartitioning 24)
>>
>>       Project [ip#7,date#0,siteName#1,uri#3,status#12,csbytes#16L]
>>
>>        PhysicalRDD
>> [date#0,siteName#1,method#2,uri#3,query#4,port#5,userName#6],
>> MapPartitionsRDD[6] at rddToDataFrameHolder at DataSourceReader.scala:25
>>
>>      Exchange (HashPartitioning 24)
>>
>>       Project [emailId#19,scalaUDF(date#20) AS
>> upperTime#23,ip#18,scalaUDF(date#20) AS lowerTime#22]
>>
>>        PhysicalRDD [ip#18,emailId#19,date#20], MapPartitionsRDD[12] at
>> rddToDataFrameHolder at DataSourceReader.scala:41
>>
>>
>> Srikanth
>>
>
>

Reply via email to