Thanks Amit, I was referring to dynamic partition pruning (
https://issues.apache.org/jira/browse/SPARK-11150) & adaptive query
execution (https://issues.apache.org/jira/browse/SPARK-31412) in Sparkk 3 -
where it would figure out right partitions & pushes the filters to input
before applying the join.

On Sat, Sep 19, 2020 at 1:31 AM Amit Joshi <mailtojoshia...@gmail.com>
wrote:

> Hi Rishi,
>
> May be you have aready done these steps.
> Can you check the size of the dataframe you are trying to broadcast using
> logInfo(SizeEstimator.estimate(df))
> and adjust the driver similarly.
>
> There is one more issue which I found was in spark 2.
> Broadcast does not work in cache data. It is possible this may not be the
> issue. You can check at your end the same problem.
>
>
> https://github.com/apache/spark/blame/master/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala#L219
>
> And can you pls tell what issue was solved in spark 3, which you are
> referring.
>
> Regards
> Amit
>
>
> On Saturday, September 19, 2020, Rishi Shah <rishishah.s...@gmail.com>
> wrote:
>
>> Thanks Amit. I have tried increasing driver memory , also tried
>> increasing max result size returned to the driver. Nothing works, I believe
>> spark is not able to determine the fact that the result to be broadcasted
>> is small enough because input data is huge? When I tried this in 2 stages,
>> write out the grouped data and use that to join using broadcast, spark has
>> no issues broadcasting this.
>>
>> When I was checking Spark 3 documentation, it seems like this issue may
>> have been addressed in Spark 3 but not in earlier version?
>>
>> On Thu, Sep 17, 2020 at 11:35 PM Amit Joshi <mailtojoshia...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I think problem lies with driver memory. Broadcast in spark work by
>>> collecting all the data to driver and then driver broadcasting to all the
>>> executors. Different strategy could be employed for trasfer like bit
>>> torrent though.
>>>
>>> Please try increasing the driver memory. See if it works.
>>>
>>> Regards,
>>> Amit
>>>
>>>
>>> On Thursday, September 17, 2020, Rishi Shah <rishishah.s...@gmail.com>
>>> wrote:
>>>
>>>> Hello All,
>>>>
>>>> Hope this email finds you well. I have a dataframe of size 8TB (parquet
>>>> snappy compressed), however I group it by a column and get a much smaller
>>>> aggregated dataframe of size 700 rows (just two columns, key and count).
>>>> When I use it like below to broadcast this aggregated result, it throws
>>>> dataframe can not be broadcasted error.
>>>>
>>>> df_agg = df.groupBy('column1').count().cache()
>>>> # df_agg.count()
>>>> df_join = df.join(broadcast(df_agg), 'column1', 'left_outer')
>>>> df_join.write.parquet('PATH')
>>>>
>>>> The same code works with input df size of 3TB without any
>>>> modifications.
>>>>
>>>> Any suggestions?
>>>>
>>>> --
>>>> Regards,
>>>>
>>>> Rishi Shah
>>>>
>>>
>>
>> --
>> Regards,
>>
>> Rishi Shah
>>
>

-- 
Regards,

Rishi Shah

Reply via email to