This is an automated email from the ASF dual-hosted git repository.

rickyma pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f56b822b [#1917] fix(server,netty): Fix memory leak issues when 
handling SendShuffleDataRequest (#1918)
0f56b822b is described below

commit 0f56b822bc45370f23e191dd0d8f5a9b0606a9e4
Author: maobaolong <baoloong...@tencent.com>
AuthorDate: Thu Jul 18 02:43:52 2024 +0800

    [#1917] fix(server,netty): Fix memory leak issues when handling 
SendShuffleDataRequest (#1918)
    
    ### What changes were proposed in this pull request?
    
    Extract a method to release resources including Netty buffers and memory 
metrics when handling SendShuffleDataRequest.
    
    ### Why are the changes needed?
    
    Fix: #1917
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Tested on our test cluster.
---
 .../server/netty/ShuffleServerNettyHandler.java    | 85 +++++++++++++++++++---
 1 file changed, 74 insertions(+), 11 deletions(-)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
 
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
index d297603b9..27a3f1dc4 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
@@ -26,6 +26,7 @@ import com.google.common.collect.Lists;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
+import org.apache.commons.collections.MapUtils;
 import org.apache.commons.collections4.CollectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -105,6 +106,18 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
     long timestamp = req.getTimestamp();
     int stageAttemptNumber = req.getStageAttemptNumber();
     ShuffleTaskInfo taskInfo = 
shuffleServer.getShuffleTaskManager().getShuffleTaskInfo(appId);
+    ShuffleBufferManager shuffleBufferManager = 
shuffleServer.getShuffleBufferManager();
+    ShuffleTaskManager shuffleTaskManager = 
shuffleServer.getShuffleTaskManager();
+    // info is null, means pre-allocated buffer has been removed by 
preAllocatedBufferCheck thread,
+    // otherwise we need to release the required size.
+    PreAllocatedBufferInfo info =
+        shuffleTaskManager.getAndRemovePreAllocatedBuffer(requireBufferId);
+    int requireSize = info == null ? 0 : info.getRequireSize();
+    int requireBlocksSize =
+        requireSize - req.encodedLength() < 0 ? 0 : requireSize - 
req.encodedLength();
+
+    boolean isPreAllocated = info != null;
+
     if (taskInfo == null) {
       rpcResponse =
           new RpcResponse(
@@ -116,9 +129,20 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
               + appId
               + "], shuffleId["
               + shuffleId
+              + "], isPreAllocated["
+              + isPreAllocated
               + "]";
       LOG.error(errorMsg);
       ShuffleServerMetrics.counterAppNotFound.inc();
+      releaseNettyBufferAndMetrics(
+          req,
+          appId,
+          shuffleId,
+          requireBufferId,
+          requireBlocksSize,
+          shuffleBufferManager,
+          info,
+          isPreAllocated);
       client.getChannel().writeAndFlush(rpcResponse);
       return;
     }
@@ -129,6 +153,25 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
       String responseMessage = "A retry has occurred at the Stage, sending 
data is invalid.";
       rpcResponse =
           new RpcResponse(req.getRequestId(), StatusCode.STAGE_RETRY_IGNORE, 
responseMessage);
+      LOG.warn(
+          "Stage retry occurred, appId["
+              + appId
+              + "], shuffleId["
+              + shuffleId
+              + "], stageAttemptNumber["
+              + stageAttemptNumber
+              + "], latestStageAttemptNumber["
+              + latestStageAttemptNumber
+              + "]");
+      releaseNettyBufferAndMetrics(
+          req,
+          appId,
+          shuffleId,
+          requireBufferId,
+          requireBlocksSize,
+          shuffleBufferManager,
+          info,
+          isPreAllocated);
       client.getChannel().writeAndFlush(rpcResponse);
       return;
     }
@@ -147,17 +190,11 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
             .recordTransportTime(SendShuffleDataRequest.class.getName(), 
transportTime);
       }
     }
-    int requireSize = 
shuffleServer.getShuffleTaskManager().getRequireBufferSize(requireBufferId);
-    int requireBlocksSize =
-        requireSize - req.encodedLength() < 0 ? 0 : requireSize - 
req.encodedLength();
 
     StatusCode ret = StatusCode.SUCCESS;
     String responseMessage = "OK";
     if (req.getPartitionToBlocks().size() > 0) {
       ShuffleServerMetrics.counterTotalReceivedDataSize.inc(requireBlocksSize);
-      ShuffleTaskManager manager = shuffleServer.getShuffleTaskManager();
-      PreAllocatedBufferInfo info = 
manager.getAndRemovePreAllocatedBuffer(requireBufferId);
-      boolean isPreAllocated = info != null;
       if (!isPreAllocated) {
         req.getPartitionToBlocks().values().stream()
             .flatMap(Collection::stream)
@@ -180,7 +217,6 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
         return;
       }
       final long start = System.currentTimeMillis();
-      ShuffleBufferManager shuffleBufferManager = 
shuffleServer.getShuffleBufferManager();
       shuffleBufferManager.releaseMemory(req.encodedLength(), false, true);
       List<ShufflePartitionedData> shufflePartitionedData = 
toPartitionedData(req);
       long alreadyReleasedSize = 0;
@@ -198,7 +234,7 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
           if (hasFailureOccurred) {
             continue;
           }
-          ret = manager.cacheShuffleData(appId, shuffleId, isPreAllocated, 
spd);
+          ret = shuffleTaskManager.cacheShuffleData(appId, shuffleId, 
isPreAllocated, spd);
           if (ret != StatusCode.SUCCESS) {
             String errorMsg =
                 "Error happened when shuffleEngine.write for "
@@ -211,9 +247,9 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
           } else {
             long toReleasedSize = spd.getTotalBlockSize();
             // after each cacheShuffleData call, the `preAllocatedSize` is 
updated timely.
-            manager.releasePreAllocatedSize(toReleasedSize);
+            shuffleTaskManager.releasePreAllocatedSize(toReleasedSize);
             alreadyReleasedSize += toReleasedSize;
-            manager.updateCachedBlockIds(
+            shuffleTaskManager.updateCachedBlockIds(
                 appId, shuffleId, spd.getPartitionId(), spd.getBlockList());
           }
         } catch (Exception e) {
@@ -240,7 +276,7 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
       // removed, then after
       // cacheShuffleData finishes, the preAllocatedSize should be updated 
accordingly.
       if (requireBlocksSize > alreadyReleasedSize) {
-        manager.releasePreAllocatedSize(requireBlocksSize - 
alreadyReleasedSize);
+        shuffleTaskManager.releasePreAllocatedSize(requireBlocksSize - 
alreadyReleasedSize);
       }
       rpcResponse = new RpcResponse(req.getRequestId(), ret, responseMessage);
       long costTime = System.currentTimeMillis() - start;
@@ -269,6 +305,33 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
     client.getChannel().writeAndFlush(rpcResponse);
   }
 
+  private static void releaseNettyBufferAndMetrics(
+      SendShuffleDataRequest req,
+      String appId,
+      int shuffleId,
+      long requireBufferId,
+      long requireBlocksSize,
+      ShuffleBufferManager shuffleBufferManager,
+      PreAllocatedBufferInfo info,
+      boolean isPreAllocated) {
+    if (isPreAllocated) {
+      shuffleBufferManager.releaseMemory(info.getRequireSize(), false, true);
+    }
+    if (MapUtils.isNotEmpty(req.getPartitionToBlocks())) {
+      // release memory
+      req.getPartitionToBlocks().values().stream()
+          .flatMap(Collection::stream)
+          .forEach(block -> block.getData().release());
+      ShuffleServerMetrics.counterTotalReceivedDataSize.inc(requireBlocksSize);
+    } else {
+      LOG.error(
+          "Failed to handle send shuffle data request, no blocks found in this 
request. appId: {}, shuffleId: {}, requireBufferId: {}",
+          appId,
+          shuffleId,
+          requireBufferId);
+    }
+  }
+
   public void handleGetMemoryShuffleDataRequest(
       TransportClient client, GetMemoryShuffleDataRequest req) {
     String appId = req.getAppId();

Reply via email to