This is an automated email from the ASF dual-hosted git repository. rickyma pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push: new 77319985d [MINOR] fix(client/netty): ShuffleServerGrpcNettyClient missing to send shuffleId and partitionIds for requirePreAllocation request (#1913) 77319985d is described below commit 77319985d8fe9f63fe230dbb440e99b82bc35b12 Author: maobaolong <baoloong...@tencent.com> AuthorDate: Tue Jul 16 14:21:12 2024 +0800 [MINOR] fix(client/netty): ShuffleServerGrpcNettyClient missing to send shuffleId and partitionIds for requirePreAllocation request (#1913) ### What changes were proposed in this pull request? Add partitionIds and shuffleId to `RequireBufferRequest`. ### Why are the changes needed? Without this changes, server cannot check limitHugePartition. ```java public long requireBuffer( String appId, int shuffleId, List<Integer> partitionIds, int requireSize) { ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.get(appId); if (null == shuffleTaskInfo) { LOG.error("No such app is registered. appId: {}, shuffleId: {}", appId, shuffleId); throw new NoRegisterException("No such app is registered. appId: " + appId); } for (int partitionId : partitionIds) { long partitionUsedDataSize = getPartitionDataSize(appId, shuffleId, partitionId); if (shuffleBufferManager.limitHugePartition( appId, shuffleId, partitionId, partitionUsedDataSize)) { String errorMessage = String.format( "Huge partition is limited to writing. appId: %s, shuffleId: %s, partitionIds: %s, partitionUsedDataSize: %s", appId, shuffleId, partitionIds, partitionUsedDataSize); LOG.error(errorMessage); throw new NoBufferForHugePartitionException(errorMessage); } } return requireBuffer(appId, requireSize); } ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Test on local, start a rss cluster with netty, specific a small huge partition size, you can see NoBufferForHugePartitionException --- .../uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java index a05d94b51..26e53851d 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java @@ -17,7 +17,7 @@ package org.apache.uniffle.client.impl.grpc; -import java.util.Collections; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; @@ -148,11 +148,13 @@ public class ShuffleServerGrpcNettyClient extends ShuffleServerGrpcClient { int shuffleId = stb.getKey(); int size = 0; int blockNum = 0; + List<Integer> partitionIds = new ArrayList<>(); for (Map.Entry<Integer, List<ShuffleBlockInfo>> ptb : stb.getValue().entrySet()) { for (ShuffleBlockInfo sbi : ptb.getValue()) { size += sbi.getSize(); blockNum++; } + partitionIds.add(ptb.getKey()); } SendShuffleDataRequest sendShuffleDataRequest = @@ -173,8 +175,8 @@ public class ShuffleServerGrpcNettyClient extends ShuffleServerGrpcClient { long requireId = requirePreAllocation( request.getAppId(), - 0, - Collections.emptyList(), + shuffleId, + partitionIds, allocateSize, request.getRetryMax(), request.getRetryIntervalMax(),