This is an automated email from the ASF dual-hosted git repository. rickyma pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push: new 57f0f8bbc [#1930] fix(server): Fix a double release issue about clearResource thread (#1931) 57f0f8bbc is described below commit 57f0f8bbcd530feb6ab1284c2eed0c08b9e2c9ae Author: maobaolong <baoloong...@tencent.com> AuthorDate: Fri Aug 23 01:47:09 2024 +0800 [#1930] fix(server): Fix a double release issue about clearResource thread (#1931) ### What changes were proposed in this pull request? Fix double release issue about clearResource thread ### Why are the changes needed? Fix: #1930 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? No. --- .../server/buffer/AbstractShuffleBuffer.java | 2 +- .../uniffle/server/buffer/ShuffleBuffer.java | 2 +- .../server/buffer/ShuffleBufferManager.java | 13 +++++++++++-- .../server/buffer/ShuffleBufferWithLinkedList.java | 22 ++++++++++++++++++++-- .../server/buffer/ShuffleBufferWithSkipList.java | 22 ++++++++++++++++++++-- 5 files changed, 53 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java b/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java index ddbeb21cf..3f4549c7b 100644 --- a/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java +++ b/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java @@ -37,7 +37,7 @@ import org.apache.uniffle.server.ShuffleDataFlushEvent; public abstract class AbstractShuffleBuffer implements ShuffleBuffer { - private static final Logger LOG = LoggerFactory.getLogger(AbstractShuffleBuffer.class); + protected static final Logger LOG = LoggerFactory.getLogger(AbstractShuffleBuffer.class); protected long size; diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java index f0d4dadb4..1b29e3b41 100644 --- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java +++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java @@ -57,7 +57,7 @@ public interface ShuffleBuffer { int getBlockCount(); - void release(); + long release(); void clearInFlushBuffer(long eventId); diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java index 1b432abee..8cade02ef 100644 --- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java +++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java @@ -722,9 +722,18 @@ public class ShuffleBufferManager { Collection<ShuffleBuffer> buffers = bufferRangeMap.asMapOfRanges().values(); if (buffers != null) { for (ShuffleBuffer buffer : buffers) { - buffer.release(); + // the actual released size by this thread + long releasedSize = buffer.release(); ShuffleServerMetrics.gaugeTotalPartitionNum.dec(); - releaseMemory(buffer.getSize(), false, false); + if (releasedSize != buffer.getSize()) { + LOG.warn( + "Release shuffle buffer size {} is not equal to buffer size {}, appId: {}, shuffleId: {}", + releasedSize, + buffer.getSize(), + appId, + shuffleId); + } + releaseMemory(releasedSize, false, false); } } if (shuffleIdToSizeMap != null) { diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java index cd9d7ab66..a9e8ddc1a 100644 --- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java +++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java @@ -110,8 +110,26 @@ public class ShuffleBufferWithLinkedList extends AbstractShuffleBuffer { } @Override - public void release() { - blocks.forEach(spb -> spb.getData().release()); + public long release() { + Throwable lastException = null; + int failedToReleaseSize = 0; + long releasedSize = 0; + for (ShufflePartitionedBlock spb : blocks) { + try { + spb.getData().release(); + releasedSize += spb.getSize(); + } catch (Throwable t) { + lastException = t; + failedToReleaseSize += spb.getSize(); + } + } + if (lastException != null) { + LOG.warn( + "Failed to release shuffle blocks with size {}. Maybe it has been released by others.", + failedToReleaseSize, + lastException); + } + return releasedSize; } @Override diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java index 2419a7dd4..960ab94f5 100644 --- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java +++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java @@ -111,8 +111,26 @@ public class ShuffleBufferWithSkipList extends AbstractShuffleBuffer { } @Override - public void release() { - blocksMap.values().forEach(spb -> spb.getData().release()); + public long release() { + Throwable lastException = null; + int failedToReleaseSize = 0; + long releasedSize = 0; + for (ShufflePartitionedBlock spb : blocksMap.values()) { + try { + spb.getData().release(); + releasedSize += spb.getSize(); + } catch (Throwable t) { + lastException = t; + failedToReleaseSize += spb.getSize(); + } + } + if (lastException != null) { + LOG.warn( + "Failed to release shuffle blocks with size (). Maybe it has been released by others.", + failedToReleaseSize, + lastException); + } + return releasedSize; } @Override