Thanks Amit, I was referring to dynamic partition pruning ( https://issues.apache.org/jira/browse/SPARK-11150) & adaptive query execution (https://issues.apache.org/jira/browse/SPARK-31412) in Sparkk 3 - where it would figure out right partitions & pushes the filters to input before applying the join.
On Sat, Sep 19, 2020 at 1:31 AM Amit Joshi <mailtojoshia...@gmail.com> wrote: > Hi Rishi, > > May be you have aready done these steps. > Can you check the size of the dataframe you are trying to broadcast using > logInfo(SizeEstimator.estimate(df)) > and adjust the driver similarly. > > There is one more issue which I found was in spark 2. > Broadcast does not work in cache data. It is possible this may not be the > issue. You can check at your end the same problem. > > > https://github.com/apache/spark/blame/master/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala#L219 > > And can you pls tell what issue was solved in spark 3, which you are > referring. > > Regards > Amit > > > On Saturday, September 19, 2020, Rishi Shah <rishishah.s...@gmail.com> > wrote: > >> Thanks Amit. I have tried increasing driver memory , also tried >> increasing max result size returned to the driver. Nothing works, I believe >> spark is not able to determine the fact that the result to be broadcasted >> is small enough because input data is huge? When I tried this in 2 stages, >> write out the grouped data and use that to join using broadcast, spark has >> no issues broadcasting this. >> >> When I was checking Spark 3 documentation, it seems like this issue may >> have been addressed in Spark 3 but not in earlier version? >> >> On Thu, Sep 17, 2020 at 11:35 PM Amit Joshi <mailtojoshia...@gmail.com> >> wrote: >> >>> Hi, >>> >>> I think problem lies with driver memory. Broadcast in spark work by >>> collecting all the data to driver and then driver broadcasting to all the >>> executors. Different strategy could be employed for trasfer like bit >>> torrent though. >>> >>> Please try increasing the driver memory. See if it works. >>> >>> Regards, >>> Amit >>> >>> >>> On Thursday, September 17, 2020, Rishi Shah <rishishah.s...@gmail.com> >>> wrote: >>> >>>> Hello All, >>>> >>>> Hope this email finds you well. I have a dataframe of size 8TB (parquet >>>> snappy compressed), however I group it by a column and get a much smaller >>>> aggregated dataframe of size 700 rows (just two columns, key and count). >>>> When I use it like below to broadcast this aggregated result, it throws >>>> dataframe can not be broadcasted error. >>>> >>>> df_agg = df.groupBy('column1').count().cache() >>>> # df_agg.count() >>>> df_join = df.join(broadcast(df_agg), 'column1', 'left_outer') >>>> df_join.write.parquet('PATH') >>>> >>>> The same code works with input df size of 3TB without any >>>> modifications. >>>> >>>> Any suggestions? >>>> >>>> -- >>>> Regards, >>>> >>>> Rishi Shah >>>> >>> >> >> -- >> Regards, >> >> Rishi Shah >> > -- Regards, Rishi Shah