This is an automated email from the ASF dual-hosted git repository. zuston pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push: new 9a84af38c [#2105] fix(spark): Potential duplicate blockId that will make netty memory leak for ShuffleServer (#2106) 9a84af38c is described below commit 9a84af38cc4912fec75840bed2087d7fcfef3c0a Author: leewish <mr.liwenl...@outlook.com> AuthorDate: Fri Sep 13 14:25:03 2024 +0800 [#2105] fix(spark): Potential duplicate blockId that will make netty memory leak for ShuffleServer (#2106) ### What changes were proposed in this pull request? Fix duplicated blockId cause shuffle server memory leak in netty mode. ### Why are the changes needed? Fix: #2105 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Needn't Co-authored-by: wenlongwlli <wenlongw...@tencent.com> --- .../org/apache/spark/shuffle/writer/WriteBufferManager.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java index 08eec1c2e..4ee09a31c 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java +++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java @@ -26,6 +26,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; @@ -70,7 +71,7 @@ public class WriteBufferManager extends MemoryConsumer { /** An atomic counter used to keep track of the number of blocks */ private AtomicLong blockCounter = new AtomicLong(0); // it's part of blockId - private Map<Integer, Integer> partitionToSeqNo = Maps.newHashMap(); + private Map<Integer, AtomicInteger> partitionToSeqNo = Maps.newHashMap(); private long askExecutorMemory; private int shuffleId; private String taskId; @@ -413,10 +414,9 @@ public class WriteBufferManager extends MemoryConsumer { // it's run in single thread, and is not thread safe private int getNextSeqNo(int partitionId) { - partitionToSeqNo.putIfAbsent(partitionId, 0); - int seqNo = partitionToSeqNo.get(partitionId); - partitionToSeqNo.put(partitionId, seqNo + 1); - return seqNo; + return partitionToSeqNo + .computeIfAbsent(partitionId, k -> new AtomicInteger(0)) + .getAndIncrement(); } private void requestMemory(long requiredMem) {