Shardul Mahadik created SPARK-44379: ---------------------------------------
Summary: Broadcast Joins taking up too much memory Key: SPARK-44379 URL: https://issues.apache.org/jira/browse/SPARK-44379 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.4.1 Reporter: Shardul Mahadik Context: After migrating to Spark 3 with AQE, we saw a significant increase in driver and executor memory usage in our jobs which contains star joins. By analyzing heapdump, we saw that majority of the memory was being taken up by {{UnsafeHashedRelation}} used for broadcast joins; in this case there were 92 broadcast joins in the query. !image-2023-07-11-10-41-02-251.png|width=851,height=70! This took up over 6GB of total memory, even though every table being broadcasted was around ~1MB and hence should only have been ~100MB total. I found that this is because {{BytesToBytesMap}} used within {{UnsafeHashedRelation}} allocates memory in ["pageSize" increments|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java#L117] which in our case was 64MB. Based on the [default page size calculation|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala#L251], this should be the case for any container with > 1 GB of memory (assuming executor.cores = 1) which is far too common. Thus in our case, most of the memory requested by {{BytesToBytesMap}} was un-utilized with just trailing 0s. !image-2023-07-11-10-52-59-553.png|width=389,height=101! I think this is a major inefficiency for broadcast joins (especially star joins). I think there are a few ways to tackle the problem. 1) Reduce {{spark.buffer.pageSize}} globally to a lower value. This does reduce the memory consumption of broadcast joins, but I am not sure what it implies for the rest of Spark machinery 2) Add a "finalize" operation to {{BytesToBytesMap}} which is called after all values are added to the map and allocates a new page only for the required bytes. 3) Enhance the serialization of {{BytesToBytesMap}} to record the number of keys and values, and use those during deserialization to only request the required memory. 4) Use a lower page size for certain {{BytesToBytesMap}}s based on the estimated data size of broadcast joins. I believe Option 3 would be simple enough to implement and I have a POC PR which I will post soon, but I am interested in knowing other people's thoughts here. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org