Good to know this will be in next release. Thanks. On Wed, Jul 1, 2015 at 3:13 PM, Michael Armbrust <mich...@databricks.com> wrote:
> We don't know that the table is small unless you cache it. In Spark 1.5 > you'll be able to give us a hint though ( > https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L581 > ) > > On Wed, Jul 1, 2015 at 8:30 AM, Srikanth <srikanth...@gmail.com> wrote: > >> Hello, >> >> >> >> I have a straight forward use case of joining a large table with a >> smaller table. The small table is within the limit I set for >> spark.sql.autoBroadcastJoinThreshold. >> >> I notice that ShuffledHashJoin is used to perform the join. >> BroadcastHashJoin was used only when I pre-fetched and cached the small >> table. >> >> I understand that for typical broadcast we would have to read and >> collect() the small table in driver before broadcasting. >> >> Why not do this automatically for joins? That way stage1(read large >> table) and stage2(read small table) can still be run in parallel. >> >> >> >> >> >> Sort [emailId#19 ASC,date#0 ASC], true >> >> Exchange (RangePartitioning 24) >> >> Project [emailId#19,ip#7,date#0,siteName#1,uri#3,status#12,csbytes#16L] >> >> Filter ((lowerTime#22 <= date#0) && (date#0 <= upperTime#23)) >> >> *ShuffledHashJoin* [ip#7], [ip#18], BuildRight >> >> Exchange (HashPartitioning 24) >> >> Project [ip#7,date#0,siteName#1,uri#3,status#12,csbytes#16L] >> >> PhysicalRDD >> [date#0,siteName#1,method#2,uri#3,query#4,port#5,userName#6], >> MapPartitionsRDD[6] at rddToDataFrameHolder at DataSourceReader.scala:25 >> >> Exchange (HashPartitioning 24) >> >> Project [emailId#19,scalaUDF(date#20) AS >> upperTime#23,ip#18,scalaUDF(date#20) AS lowerTime#22] >> >> PhysicalRDD [ip#18,emailId#19,date#20], MapPartitionsRDD[12] at >> rddToDataFrameHolder at DataSourceReader.scala:41 >> >> >> Srikanth >> > >