This is an automated email from the ASF dual-hosted git repository.
rickyma pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new c3dd749f0 [#2060] fix(server): Fix memory leaks when reaching memory
limit (#2058)
c3dd749f0 is described below
commit c3dd749f0e188d7523a062aeb2d4a89a00542b4c
Author: leewish <[email protected]>
AuthorDate: Mon Oct 21 16:17:30 2024 +0800
[#2060] fix(server): Fix memory leaks when reaching memory limit (#2058)
### What changes were proposed in this pull request?
Fix shuffle server memory leaks when reaching memory limit.
### Why are the changes needed?
When enabling Netty, on the shuffle server side, Netty will allocate memory
for SEND_SHUFFLE_DATA_REQUEST request. However, when the memory limit is
reached, an `OutOfDirectMemoryError` will be thrown and the decoding process
for this message will fail. This will cause the bytebuf allocated successfully
in the previous batch in this message to not be released, resulting in memory
leaks.
Fix: https://github.com/apache/incubator-uniffle/issues/2060
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UTs
---
.../common/netty/protocol/SendShuffleDataRequest.java | 16 +++++++++++++++-
1 file changed, 15 insertions(+), 1 deletion(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/SendShuffleDataRequest.java
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/SendShuffleDataRequest.java
index 9fefb98f6..cde9a3ee2 100644
---
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/SendShuffleDataRequest.java
+++
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/SendShuffleDataRequest.java
@@ -103,7 +103,21 @@ public class SendShuffleDataRequest extends RequestMessage
{
int lengthOfShuffleBlocks = byteBuf.readInt();
List<ShuffleBlockInfo> shuffleBlockInfoList = Lists.newArrayList();
for (int j = 0; j < lengthOfShuffleBlocks; j++) {
- shuffleBlockInfoList.add(Decoders.decodeShuffleBlockInfo(byteBuf));
+ try {
+ shuffleBlockInfoList.add(Decoders.decodeShuffleBlockInfo(byteBuf));
+ } catch (Throwable t) {
+ // An OutOfDirectMemoryError will be thrown, when the direct memory
reaches the limit.
+ // OutOfDirectMemoryError will not cause the JVM to exit, but may
lead to direct memory
+ // leaks.
+ // Note: You can refer to docs/server_guide.md to set
MAX_DIRECT_MEMORY_SIZE to a
+ // reasonable value.
+ shuffleBlockInfoList.forEach(sbi -> sbi.getData().release());
+ partitionToBlocks.forEach(
+ (integer, shuffleBlockInfos) -> {
+ shuffleBlockInfos.forEach(sbi -> sbi.getData().release());
+ });
+ throw t;
+ }
}
partitionToBlocks.put(partitionId, shuffleBlockInfoList);
}