This is an automated email from the ASF dual-hosted git repository.

rickyma pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 57f0f8bbc [#1930] fix(server): Fix a double release issue about 
clearResource thread (#1931)
57f0f8bbc is described below

commit 57f0f8bbcd530feb6ab1284c2eed0c08b9e2c9ae
Author: maobaolong <baoloong...@tencent.com>
AuthorDate: Fri Aug 23 01:47:09 2024 +0800

    [#1930] fix(server): Fix a double release issue about clearResource thread 
(#1931)
    
    ### What changes were proposed in this pull request?
    
    Fix double release issue about clearResource thread
    
    ### Why are the changes needed?
    
    Fix: #1930
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    No.
---
 .../server/buffer/AbstractShuffleBuffer.java       |  2 +-
 .../uniffle/server/buffer/ShuffleBuffer.java       |  2 +-
 .../server/buffer/ShuffleBufferManager.java        | 13 +++++++++++--
 .../server/buffer/ShuffleBufferWithLinkedList.java | 22 ++++++++++++++++++++--
 .../server/buffer/ShuffleBufferWithSkipList.java   | 22 ++++++++++++++++++++--
 5 files changed, 53 insertions(+), 8 deletions(-)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
index ddbeb21cf..3f4549c7b 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
@@ -37,7 +37,7 @@ import org.apache.uniffle.server.ShuffleDataFlushEvent;
 
 public abstract class AbstractShuffleBuffer implements ShuffleBuffer {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(AbstractShuffleBuffer.class);
+  protected static final Logger LOG = 
LoggerFactory.getLogger(AbstractShuffleBuffer.class);
 
   protected long size;
 
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
index f0d4dadb4..1b29e3b41 100644
--- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
+++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
@@ -57,7 +57,7 @@ public interface ShuffleBuffer {
 
   int getBlockCount();
 
-  void release();
+  long release();
 
   void clearInFlushBuffer(long eventId);
 
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 1b432abee..8cade02ef 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -722,9 +722,18 @@ public class ShuffleBufferManager {
       Collection<ShuffleBuffer> buffers = 
bufferRangeMap.asMapOfRanges().values();
       if (buffers != null) {
         for (ShuffleBuffer buffer : buffers) {
-          buffer.release();
+          // the actual released size by this thread
+          long releasedSize = buffer.release();
           ShuffleServerMetrics.gaugeTotalPartitionNum.dec();
-          releaseMemory(buffer.getSize(), false, false);
+          if (releasedSize != buffer.getSize()) {
+            LOG.warn(
+                "Release shuffle buffer size {} is not equal to buffer size 
{}, appId: {}, shuffleId: {}",
+                releasedSize,
+                buffer.getSize(),
+                appId,
+                shuffleId);
+          }
+          releaseMemory(releasedSize, false, false);
         }
       }
       if (shuffleIdToSizeMap != null) {
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
index cd9d7ab66..a9e8ddc1a 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
@@ -110,8 +110,26 @@ public class ShuffleBufferWithLinkedList extends 
AbstractShuffleBuffer {
   }
 
   @Override
-  public void release() {
-    blocks.forEach(spb -> spb.getData().release());
+  public long release() {
+    Throwable lastException = null;
+    int failedToReleaseSize = 0;
+    long releasedSize = 0;
+    for (ShufflePartitionedBlock spb : blocks) {
+      try {
+        spb.getData().release();
+        releasedSize += spb.getSize();
+      } catch (Throwable t) {
+        lastException = t;
+        failedToReleaseSize += spb.getSize();
+      }
+    }
+    if (lastException != null) {
+      LOG.warn(
+          "Failed to release shuffle blocks with size {}. Maybe it has been 
released by others.",
+          failedToReleaseSize,
+          lastException);
+    }
+    return releasedSize;
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
index 2419a7dd4..960ab94f5 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
@@ -111,8 +111,26 @@ public class ShuffleBufferWithSkipList extends 
AbstractShuffleBuffer {
   }
 
   @Override
-  public void release() {
-    blocksMap.values().forEach(spb -> spb.getData().release());
+  public long release() {
+    Throwable lastException = null;
+    int failedToReleaseSize = 0;
+    long releasedSize = 0;
+    for (ShufflePartitionedBlock spb : blocksMap.values()) {
+      try {
+        spb.getData().release();
+        releasedSize += spb.getSize();
+      } catch (Throwable t) {
+        lastException = t;
+        failedToReleaseSize += spb.getSize();
+      }
+    }
+    if (lastException != null) {
+      LOG.warn(
+          "Failed to release shuffle blocks with size (). Maybe it has been 
released by others.",
+          failedToReleaseSize,
+          lastException);
+    }
+    return releasedSize;
   }
 
   @Override

Reply via email to