[ https://issues.apache.org/jira/browse/SPARK-44379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Shardul Mahadik updated SPARK-44379: ------------------------------------ Description: Context: After migrating to Spark 3 with AQE, we saw a significant increase in driver and executor memory usage in our jobs which contains star joins. By analyzing heapdump, we saw that majority of the memory was being taken up by {{UnsafeHashedRelation}} used for broadcast joins; in this case there were 92 broadcast joins in the query. !screenshot-1.png|width=851,height=70! This took up over 6GB of total memory, even though every table being broadcasted was around ~1MB and hence should only have been ~100MB total. I found that this is because {{BytesToBytesMap}} used within {{UnsafeHashedRelation}} allocates memory in ["pageSize" increments|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java#L117] which in our case was 64MB. Based on the [default page size calculation|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala#L251], this should be the case for any container with > 1 GB of memory (assuming executor.cores = 1) which is far too common. Thus in our case, most of the memory requested by {{BytesToBytesMap}} was un-utilized with just trailing 0s. !screenshot-2.png|width=389,height=101! I think this is a major inefficiency for broadcast joins (especially star joins). I think there are a few ways to tackle the problem. 1) Reduce {{spark.buffer.pageSize}} globally to a lower value. This does reduce the memory consumption of broadcast joins, but I am not sure what it implies for the rest of Spark machinery 2) Add a "finalize" operation to {{BytesToBytesMap}} which is called after all values are added to the map and allocates a new page only for the required bytes. 3) Enhance the serialization of {{BytesToBytesMap}} to record the number of keys and values, and use those during deserialization to only request the required memory. 4) Use a lower page size for certain {{BytesToBytesMap}}s based on the estimated data size of broadcast joins. I believe Option 3 would be simple enough to implement and I have a POC PR which I will post soon, but I am interested in knowing other people's thoughts here. was: Context: After migrating to Spark 3 with AQE, we saw a significant increase in driver and executor memory usage in our jobs which contains star joins. By analyzing heapdump, we saw that majority of the memory was being taken up by {{UnsafeHashedRelation}} used for broadcast joins; in this case there were 92 broadcast joins in the query. !image-2023-07-11-10-41-02-251.png|width=851,height=70! This took up over 6GB of total memory, even though every table being broadcasted was around ~1MB and hence should only have been ~100MB total. I found that this is because {{BytesToBytesMap}} used within {{UnsafeHashedRelation}} allocates memory in ["pageSize" increments|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java#L117] which in our case was 64MB. Based on the [default page size calculation|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala#L251], this should be the case for any container with > 1 GB of memory (assuming executor.cores = 1) which is far too common. Thus in our case, most of the memory requested by {{BytesToBytesMap}} was un-utilized with just trailing 0s. !image-2023-07-11-10-52-59-553.png|width=389,height=101! I think this is a major inefficiency for broadcast joins (especially star joins). I think there are a few ways to tackle the problem. 1) Reduce {{spark.buffer.pageSize}} globally to a lower value. This does reduce the memory consumption of broadcast joins, but I am not sure what it implies for the rest of Spark machinery 2) Add a "finalize" operation to {{BytesToBytesMap}} which is called after all values are added to the map and allocates a new page only for the required bytes. 3) Enhance the serialization of {{BytesToBytesMap}} to record the number of keys and values, and use those during deserialization to only request the required memory. 4) Use a lower page size for certain {{BytesToBytesMap}}s based on the estimated data size of broadcast joins. I believe Option 3 would be simple enough to implement and I have a POC PR which I will post soon, but I am interested in knowing other people's thoughts here. > Broadcast Joins taking up too much memory > ----------------------------------------- > > Key: SPARK-44379 > URL: https://issues.apache.org/jira/browse/SPARK-44379 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 3.4.1 > Reporter: Shardul Mahadik > Priority: Major > Attachments: screenshot-1.png, screenshot-2.png > > > Context: After migrating to Spark 3 with AQE, we saw a significant increase > in driver and executor memory usage in our jobs which contains star joins. By > analyzing heapdump, we saw that majority of the memory was being taken up by > {{UnsafeHashedRelation}} used for broadcast joins; in this case there were 92 > broadcast joins in the query. > !screenshot-1.png|width=851,height=70! > This took up over 6GB of total memory, even though every table being > broadcasted was around ~1MB and hence should only have been ~100MB total. I > found that this is because {{BytesToBytesMap}} used within > {{UnsafeHashedRelation}} allocates memory in ["pageSize" > increments|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java#L117] > which in our case was 64MB. Based on the [default page size > calculation|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala#L251], > this should be the case for any container with > 1 GB of memory (assuming > executor.cores = 1) which is far too common. Thus in our case, most of the > memory requested by {{BytesToBytesMap}} was un-utilized with just trailing 0s. > !screenshot-2.png|width=389,height=101! > I think this is a major inefficiency for broadcast joins (especially star > joins). I think there are a few ways to tackle the problem. > 1) Reduce {{spark.buffer.pageSize}} globally to a lower value. This does > reduce the memory consumption of broadcast joins, but I am not sure what it > implies for the rest of Spark machinery > 2) Add a "finalize" operation to {{BytesToBytesMap}} which is called after > all values are added to the map and allocates a new page only for the > required bytes. > 3) Enhance the serialization of {{BytesToBytesMap}} to record the number of > keys and values, and use those during deserialization to only request the > required memory. > 4) Use a lower page size for certain {{BytesToBytesMap}}s based on the > estimated data size of broadcast joins. > I believe Option 3 would be simple enough to implement and I have a POC PR > which I will post soon, but I am interested in knowing other people's > thoughts here. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org