This is an automated email from the ASF dual-hosted git repository.

rickyma pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new c3dd749f0 [#2060] fix(server): Fix memory leaks when reaching memory 
limit (#2058)
c3dd749f0 is described below

commit c3dd749f0e188d7523a062aeb2d4a89a00542b4c
Author: leewish <[email protected]>
AuthorDate: Mon Oct 21 16:17:30 2024 +0800

    [#2060] fix(server): Fix memory leaks when reaching memory limit (#2058)
    
    ### What changes were proposed in this pull request?
    
    Fix shuffle server memory leaks when reaching memory limit.
    
    ### Why are the changes needed?
    When enabling Netty, on the shuffle server side, Netty will allocate memory 
for SEND_SHUFFLE_DATA_REQUEST request. However, when the memory limit is 
reached, an `OutOfDirectMemoryError` will be thrown and the decoding process 
for this message will fail. This will cause the bytebuf allocated successfully 
in the previous batch in this message to not be released, resulting in memory 
leaks.
    
    Fix: https://github.com/apache/incubator-uniffle/issues/2060
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    Existing UTs
---
 .../common/netty/protocol/SendShuffleDataRequest.java    | 16 +++++++++++++++-
 1 file changed, 15 insertions(+), 1 deletion(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/SendShuffleDataRequest.java
 
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/SendShuffleDataRequest.java
index 9fefb98f6..cde9a3ee2 100644
--- 
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/SendShuffleDataRequest.java
+++ 
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/SendShuffleDataRequest.java
@@ -103,7 +103,21 @@ public class SendShuffleDataRequest extends RequestMessage 
{
       int lengthOfShuffleBlocks = byteBuf.readInt();
       List<ShuffleBlockInfo> shuffleBlockInfoList = Lists.newArrayList();
       for (int j = 0; j < lengthOfShuffleBlocks; j++) {
-        shuffleBlockInfoList.add(Decoders.decodeShuffleBlockInfo(byteBuf));
+        try {
+          shuffleBlockInfoList.add(Decoders.decodeShuffleBlockInfo(byteBuf));
+        } catch (Throwable t) {
+          // An OutOfDirectMemoryError will be thrown, when the direct memory 
reaches the limit.
+          // OutOfDirectMemoryError will not cause the JVM to exit, but may 
lead to direct memory
+          // leaks.
+          // Note: You can refer to docs/server_guide.md to set 
MAX_DIRECT_MEMORY_SIZE to a
+          // reasonable value.
+          shuffleBlockInfoList.forEach(sbi -> sbi.getData().release());
+          partitionToBlocks.forEach(
+              (integer, shuffleBlockInfos) -> {
+                shuffleBlockInfos.forEach(sbi -> sbi.getData().release());
+              });
+          throw t;
+        }
       }
       partitionToBlocks.put(partitionId, shuffleBlockInfoList);
     }

Reply via email to