We don't know that the table is small unless you cache it. In Spark 1.5 you'll be able to give us a hint though ( https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L581 )
On Wed, Jul 1, 2015 at 8:30 AM, Srikanth <srikanth...@gmail.com> wrote: > Hello, > > > > I have a straight forward use case of joining a large table with a smaller > table. The small table is within the limit I set for > spark.sql.autoBroadcastJoinThreshold. > > I notice that ShuffledHashJoin is used to perform the join. > BroadcastHashJoin was used only when I pre-fetched and cached the small > table. > > I understand that for typical broadcast we would have to read and > collect() the small table in driver before broadcasting. > > Why not do this automatically for joins? That way stage1(read large table) > and stage2(read small table) can still be run in parallel. > > > > > > Sort [emailId#19 ASC,date#0 ASC], true > > Exchange (RangePartitioning 24) > > Project [emailId#19,ip#7,date#0,siteName#1,uri#3,status#12,csbytes#16L] > > Filter ((lowerTime#22 <= date#0) && (date#0 <= upperTime#23)) > > *ShuffledHashJoin* [ip#7], [ip#18], BuildRight > > Exchange (HashPartitioning 24) > > Project [ip#7,date#0,siteName#1,uri#3,status#12,csbytes#16L] > > PhysicalRDD > [date#0,siteName#1,method#2,uri#3,query#4,port#5,userName#6], > MapPartitionsRDD[6] at rddToDataFrameHolder at DataSourceReader.scala:25 > > Exchange (HashPartitioning 24) > > Project [emailId#19,scalaUDF(date#20) AS > upperTime#23,ip#18,scalaUDF(date#20) AS lowerTime#22] > > PhysicalRDD [ip#18,emailId#19,date#20], MapPartitionsRDD[12] at > rddToDataFrameHolder at DataSourceReader.scala:41 > > > Srikanth >