This is an automated email from the ASF dual-hosted git repository. rickyma pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push: new 0f56b822b [#1917] fix(server,netty): Fix memory leak issues when handling SendShuffleDataRequest (#1918) 0f56b822b is described below commit 0f56b822bc45370f23e191dd0d8f5a9b0606a9e4 Author: maobaolong <baoloong...@tencent.com> AuthorDate: Thu Jul 18 02:43:52 2024 +0800 [#1917] fix(server,netty): Fix memory leak issues when handling SendShuffleDataRequest (#1918) ### What changes were proposed in this pull request? Extract a method to release resources including Netty buffers and memory metrics when handling SendShuffleDataRequest. ### Why are the changes needed? Fix: #1917 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Tested on our test cluster. --- .../server/netty/ShuffleServerNettyHandler.java | 85 +++++++++++++++++++--- 1 file changed, 74 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java index d297603b9..27a3f1dc4 100644 --- a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java +++ b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java @@ -26,6 +26,7 @@ import com.google.common.collect.Lists; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; +import org.apache.commons.collections.MapUtils; import org.apache.commons.collections4.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -105,6 +106,18 @@ public class ShuffleServerNettyHandler implements BaseMessageHandler { long timestamp = req.getTimestamp(); int stageAttemptNumber = req.getStageAttemptNumber(); ShuffleTaskInfo taskInfo = shuffleServer.getShuffleTaskManager().getShuffleTaskInfo(appId); + ShuffleBufferManager shuffleBufferManager = shuffleServer.getShuffleBufferManager(); + ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager(); + // info is null, means pre-allocated buffer has been removed by preAllocatedBufferCheck thread, + // otherwise we need to release the required size. + PreAllocatedBufferInfo info = + shuffleTaskManager.getAndRemovePreAllocatedBuffer(requireBufferId); + int requireSize = info == null ? 0 : info.getRequireSize(); + int requireBlocksSize = + requireSize - req.encodedLength() < 0 ? 0 : requireSize - req.encodedLength(); + + boolean isPreAllocated = info != null; + if (taskInfo == null) { rpcResponse = new RpcResponse( @@ -116,9 +129,20 @@ public class ShuffleServerNettyHandler implements BaseMessageHandler { + appId + "], shuffleId[" + shuffleId + + "], isPreAllocated[" + + isPreAllocated + "]"; LOG.error(errorMsg); ShuffleServerMetrics.counterAppNotFound.inc(); + releaseNettyBufferAndMetrics( + req, + appId, + shuffleId, + requireBufferId, + requireBlocksSize, + shuffleBufferManager, + info, + isPreAllocated); client.getChannel().writeAndFlush(rpcResponse); return; } @@ -129,6 +153,25 @@ public class ShuffleServerNettyHandler implements BaseMessageHandler { String responseMessage = "A retry has occurred at the Stage, sending data is invalid."; rpcResponse = new RpcResponse(req.getRequestId(), StatusCode.STAGE_RETRY_IGNORE, responseMessage); + LOG.warn( + "Stage retry occurred, appId[" + + appId + + "], shuffleId[" + + shuffleId + + "], stageAttemptNumber[" + + stageAttemptNumber + + "], latestStageAttemptNumber[" + + latestStageAttemptNumber + + "]"); + releaseNettyBufferAndMetrics( + req, + appId, + shuffleId, + requireBufferId, + requireBlocksSize, + shuffleBufferManager, + info, + isPreAllocated); client.getChannel().writeAndFlush(rpcResponse); return; } @@ -147,17 +190,11 @@ public class ShuffleServerNettyHandler implements BaseMessageHandler { .recordTransportTime(SendShuffleDataRequest.class.getName(), transportTime); } } - int requireSize = shuffleServer.getShuffleTaskManager().getRequireBufferSize(requireBufferId); - int requireBlocksSize = - requireSize - req.encodedLength() < 0 ? 0 : requireSize - req.encodedLength(); StatusCode ret = StatusCode.SUCCESS; String responseMessage = "OK"; if (req.getPartitionToBlocks().size() > 0) { ShuffleServerMetrics.counterTotalReceivedDataSize.inc(requireBlocksSize); - ShuffleTaskManager manager = shuffleServer.getShuffleTaskManager(); - PreAllocatedBufferInfo info = manager.getAndRemovePreAllocatedBuffer(requireBufferId); - boolean isPreAllocated = info != null; if (!isPreAllocated) { req.getPartitionToBlocks().values().stream() .flatMap(Collection::stream) @@ -180,7 +217,6 @@ public class ShuffleServerNettyHandler implements BaseMessageHandler { return; } final long start = System.currentTimeMillis(); - ShuffleBufferManager shuffleBufferManager = shuffleServer.getShuffleBufferManager(); shuffleBufferManager.releaseMemory(req.encodedLength(), false, true); List<ShufflePartitionedData> shufflePartitionedData = toPartitionedData(req); long alreadyReleasedSize = 0; @@ -198,7 +234,7 @@ public class ShuffleServerNettyHandler implements BaseMessageHandler { if (hasFailureOccurred) { continue; } - ret = manager.cacheShuffleData(appId, shuffleId, isPreAllocated, spd); + ret = shuffleTaskManager.cacheShuffleData(appId, shuffleId, isPreAllocated, spd); if (ret != StatusCode.SUCCESS) { String errorMsg = "Error happened when shuffleEngine.write for " @@ -211,9 +247,9 @@ public class ShuffleServerNettyHandler implements BaseMessageHandler { } else { long toReleasedSize = spd.getTotalBlockSize(); // after each cacheShuffleData call, the `preAllocatedSize` is updated timely. - manager.releasePreAllocatedSize(toReleasedSize); + shuffleTaskManager.releasePreAllocatedSize(toReleasedSize); alreadyReleasedSize += toReleasedSize; - manager.updateCachedBlockIds( + shuffleTaskManager.updateCachedBlockIds( appId, shuffleId, spd.getPartitionId(), spd.getBlockList()); } } catch (Exception e) { @@ -240,7 +276,7 @@ public class ShuffleServerNettyHandler implements BaseMessageHandler { // removed, then after // cacheShuffleData finishes, the preAllocatedSize should be updated accordingly. if (requireBlocksSize > alreadyReleasedSize) { - manager.releasePreAllocatedSize(requireBlocksSize - alreadyReleasedSize); + shuffleTaskManager.releasePreAllocatedSize(requireBlocksSize - alreadyReleasedSize); } rpcResponse = new RpcResponse(req.getRequestId(), ret, responseMessage); long costTime = System.currentTimeMillis() - start; @@ -269,6 +305,33 @@ public class ShuffleServerNettyHandler implements BaseMessageHandler { client.getChannel().writeAndFlush(rpcResponse); } + private static void releaseNettyBufferAndMetrics( + SendShuffleDataRequest req, + String appId, + int shuffleId, + long requireBufferId, + long requireBlocksSize, + ShuffleBufferManager shuffleBufferManager, + PreAllocatedBufferInfo info, + boolean isPreAllocated) { + if (isPreAllocated) { + shuffleBufferManager.releaseMemory(info.getRequireSize(), false, true); + } + if (MapUtils.isNotEmpty(req.getPartitionToBlocks())) { + // release memory + req.getPartitionToBlocks().values().stream() + .flatMap(Collection::stream) + .forEach(block -> block.getData().release()); + ShuffleServerMetrics.counterTotalReceivedDataSize.inc(requireBlocksSize); + } else { + LOG.error( + "Failed to handle send shuffle data request, no blocks found in this request. appId: {}, shuffleId: {}, requireBufferId: {}", + appId, + shuffleId, + requireBufferId); + } + } + public void handleGetMemoryShuffleDataRequest( TransportClient client, GetMemoryShuffleDataRequest req) { String appId = req.getAppId();