Hello,


I have a straight forward use case of joining a large table with a smaller
table. The small table is within the limit I set for
spark.sql.autoBroadcastJoinThreshold.

I notice that ShuffledHashJoin is used to perform the join.
BroadcastHashJoin was used only when I pre-fetched and cached the small
table.

I understand that for typical broadcast we would have to read and collect()
the small table in driver before broadcasting.

Why not do this automatically for joins? That way stage1(read large table)
and stage2(read small table) can still be run in parallel.





Sort [emailId#19 ASC,date#0 ASC], true

 Exchange (RangePartitioning 24)

  Project [emailId#19,ip#7,date#0,siteName#1,uri#3,status#12,csbytes#16L]

   Filter ((lowerTime#22 <= date#0) && (date#0 <= upperTime#23))

    *ShuffledHashJoin* [ip#7], [ip#18], BuildRight

     Exchange (HashPartitioning 24)

      Project [ip#7,date#0,siteName#1,uri#3,status#12,csbytes#16L]

       PhysicalRDD
[date#0,siteName#1,method#2,uri#3,query#4,port#5,userName#6],
MapPartitionsRDD[6] at rddToDataFrameHolder at DataSourceReader.scala:25

     Exchange (HashPartitioning 24)

      Project [emailId#19,scalaUDF(date#20) AS
upperTime#23,ip#18,scalaUDF(date#20) AS lowerTime#22]

       PhysicalRDD [ip#18,emailId#19,date#20], MapPartitionsRDD[12] at
rddToDataFrameHolder at DataSourceReader.scala:41


Srikanth

Reply via email to