This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a84af38c [#2105] fix(spark): Potential duplicate blockId that will 
make netty memory leak for ShuffleServer (#2106)
9a84af38c is described below

commit 9a84af38cc4912fec75840bed2087d7fcfef3c0a
Author: leewish <mr.liwenl...@outlook.com>
AuthorDate: Fri Sep 13 14:25:03 2024 +0800

    [#2105] fix(spark): Potential duplicate blockId that will make netty memory 
leak for ShuffleServer (#2106)
    
    ### What changes were proposed in this pull request?
    
    Fix duplicated blockId cause shuffle server memory leak in netty mode.
    
    ### Why are the changes needed?
    
    Fix: #2105
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Needn't
    
    Co-authored-by: wenlongwlli <wenlongw...@tencent.com>
---
 .../org/apache/spark/shuffle/writer/WriteBufferManager.java    | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
index 08eec1c2e..4ee09a31c 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
@@ -26,6 +26,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 
@@ -70,7 +71,7 @@ public class WriteBufferManager extends MemoryConsumer {
   /** An atomic counter used to keep track of the number of blocks */
   private AtomicLong blockCounter = new AtomicLong(0);
   // it's part of blockId
-  private Map<Integer, Integer> partitionToSeqNo = Maps.newHashMap();
+  private Map<Integer, AtomicInteger> partitionToSeqNo = Maps.newHashMap();
   private long askExecutorMemory;
   private int shuffleId;
   private String taskId;
@@ -413,10 +414,9 @@ public class WriteBufferManager extends MemoryConsumer {
 
   // it's run in single thread, and is not thread safe
   private int getNextSeqNo(int partitionId) {
-    partitionToSeqNo.putIfAbsent(partitionId, 0);
-    int seqNo = partitionToSeqNo.get(partitionId);
-    partitionToSeqNo.put(partitionId, seqNo + 1);
-    return seqNo;
+    return partitionToSeqNo
+        .computeIfAbsent(partitionId, k -> new AtomicInteger(0))
+        .getAndIncrement();
   }
 
   private void requestMemory(long requiredMem) {

Reply via email to