[ 
https://issues.apache.org/jira/browse/SPARK-44379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shardul Mahadik updated SPARK-44379:
------------------------------------
    Description: 
Context: After migrating to Spark 3 with AQE, we saw a significant increase in 
driver and executor memory usage in our jobs which contains star joins. By 
analyzing heapdump, we saw that majority of the memory was being taken up by 
{{UnsafeHashedRelation}} used for broadcast joins; in this case there were 92 
broadcast joins in the query.

!screenshot-1.png|width=851,height=70!

This took up over 6GB of total memory, even though every table being 
broadcasted was around ~1MB and hence should only have been ~100MB total. I 
found that this is because {{BytesToBytesMap}} used within 
{{UnsafeHashedRelation}} allocates memory in ["pageSize" 
increments|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java#L117]
 which in our case was 64MB. Based on the [default page size 
calculation|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala#L251],
 this should be the case for any container with > 1 GB of memory (assuming 
executor.cores = 1) which is far too common. Thus in our case, most of the 
memory requested by {{BytesToBytesMap}} was un-utilized with just trailing 0s.

!screenshot-2.png|width=389,height=101!

I think this is a major inefficiency for broadcast joins (especially star 
joins). I think there are a few ways to tackle the problem.
1) Reduce {{spark.buffer.pageSize}} globally to a lower value. This does reduce 
the memory consumption of broadcast joins, but I am not sure what it implies 
for the rest of Spark machinery
2) Add a "finalize" operation to {{BytesToBytesMap}} which is called after all 
values are added to the map and allocates a new page only for the required 
bytes. 
3) Enhance the serialization of {{BytesToBytesMap}} to record the number of 
keys and values, and use those during deserialization to only request the 
required memory.
4) Use a lower page size for certain {{BytesToBytesMap}}s based on the 
estimated data size of broadcast joins.

I believe Option 3 would be simple enough to implement and I have a POC PR 
which I will post soon, but I am interested in knowing other people's thoughts 
here. 

  was:
Context: After migrating to Spark 3 with AQE, we saw a significant increase in 
driver and executor memory usage in our jobs which contains star joins. By 
analyzing heapdump, we saw that majority of the memory was being taken up by 
{{UnsafeHashedRelation}} used for broadcast joins; in this case there were 92 
broadcast joins in the query.

!image-2023-07-11-10-41-02-251.png|width=851,height=70!

This took up over 6GB of total memory, even though every table being 
broadcasted was around ~1MB and hence should only have been ~100MB total. I 
found that this is because {{BytesToBytesMap}} used within 
{{UnsafeHashedRelation}} allocates memory in ["pageSize" 
increments|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java#L117]
 which in our case was 64MB. Based on the [default page size 
calculation|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala#L251],
 this should be the case for any container with > 1 GB of memory (assuming 
executor.cores = 1) which is far too common. Thus in our case, most of the 
memory requested by {{BytesToBytesMap}} was un-utilized with just trailing 0s.

!image-2023-07-11-10-52-59-553.png|width=389,height=101!

I think this is a major inefficiency for broadcast joins (especially star 
joins). I think there are a few ways to tackle the problem.
1) Reduce {{spark.buffer.pageSize}} globally to a lower value. This does reduce 
the memory consumption of broadcast joins, but I am not sure what it implies 
for the rest of Spark machinery
2) Add a "finalize" operation to {{BytesToBytesMap}} which is called after all 
values are added to the map and allocates a new page only for the required 
bytes. 
3) Enhance the serialization of {{BytesToBytesMap}} to record the number of 
keys and values, and use those during deserialization to only request the 
required memory.
4) Use a lower page size for certain {{BytesToBytesMap}}s based on the 
estimated data size of broadcast joins.

I believe Option 3 would be simple enough to implement and I have a POC PR 
which I will post soon, but I am interested in knowing other people's thoughts 
here. 


> Broadcast Joins taking up too much memory
> -----------------------------------------
>
>                 Key: SPARK-44379
>                 URL: https://issues.apache.org/jira/browse/SPARK-44379
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.4.1
>            Reporter: Shardul Mahadik
>            Priority: Major
>         Attachments: screenshot-1.png, screenshot-2.png
>
>
> Context: After migrating to Spark 3 with AQE, we saw a significant increase 
> in driver and executor memory usage in our jobs which contains star joins. By 
> analyzing heapdump, we saw that majority of the memory was being taken up by 
> {{UnsafeHashedRelation}} used for broadcast joins; in this case there were 92 
> broadcast joins in the query.
> !screenshot-1.png|width=851,height=70!
> This took up over 6GB of total memory, even though every table being 
> broadcasted was around ~1MB and hence should only have been ~100MB total. I 
> found that this is because {{BytesToBytesMap}} used within 
> {{UnsafeHashedRelation}} allocates memory in ["pageSize" 
> increments|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java#L117]
>  which in our case was 64MB. Based on the [default page size 
> calculation|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala#L251],
>  this should be the case for any container with > 1 GB of memory (assuming 
> executor.cores = 1) which is far too common. Thus in our case, most of the 
> memory requested by {{BytesToBytesMap}} was un-utilized with just trailing 0s.
> !screenshot-2.png|width=389,height=101!
> I think this is a major inefficiency for broadcast joins (especially star 
> joins). I think there are a few ways to tackle the problem.
> 1) Reduce {{spark.buffer.pageSize}} globally to a lower value. This does 
> reduce the memory consumption of broadcast joins, but I am not sure what it 
> implies for the rest of Spark machinery
> 2) Add a "finalize" operation to {{BytesToBytesMap}} which is called after 
> all values are added to the map and allocates a new page only for the 
> required bytes. 
> 3) Enhance the serialization of {{BytesToBytesMap}} to record the number of 
> keys and values, and use those during deserialization to only request the 
> required memory.
> 4) Use a lower page size for certain {{BytesToBytesMap}}s based on the 
> estimated data size of broadcast joins.
> I believe Option 3 would be simple enough to implement and I have a POC PR 
> which I will post soon, but I am interested in knowing other people's 
> thoughts here. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to