This is an automated email from the ASF dual-hosted git repository. zuston pushed a commit to branch partition-split in repository https://gitbox.apache.org/repos/asf/uniffle.git
commit c532ac1de04ba54fff3076f2a3685959af698d7f Author: Junfan Zhang <[email protected]> AuthorDate: Wed Mar 12 21:20:11 2025 +0800 feat: Add support of partition split for grpc --- .../uniffle/shuffle/manager/RssShuffleManagerBase.java | 1 + .../uniffle/client/impl/grpc/ShuffleServerGrpcClient.java | 12 ++++++++---- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java index d869c64fe..43f9c1f00 100644 --- a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java +++ b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java @@ -271,6 +271,7 @@ public abstract class RssShuffleManagerBase implements RssShuffleManagerInterfac throw new RssException( "The feature of task partition reassign is incompatible with multiple replicas mechanism."); } + LOG.info("Partition reassign is enabled."); } this.blockIdSelfManagedEnabled = rssConf.getBoolean(RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED); this.shuffleManagerRpcServiceEnabled = diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java index cb477df34..7c8c88012 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java @@ -23,11 +23,13 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import com.google.protobuf.ByteString; import com.google.protobuf.UnsafeByteOperations; import io.grpc.StatusRuntimeException; @@ -540,7 +542,7 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer boolean isSuccessful = true; AtomicReference<StatusCode> failedStatusCode = new AtomicReference<>(StatusCode.INTERNAL_ERROR); - + Set<Integer> needSplitPartitionIds = Sets.newHashSet(); // prepare rpc request based on shuffleId -> partitionId -> blocks for (Map.Entry<Integer, Map<Integer, List<ShuffleBlockInfo>>> stb : shuffleIdToBlocks.entrySet()) { @@ -583,7 +585,7 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer RetryUtils.retryWithCondition( () -> { // TODO(baoloongmao): support partition split follow netty client - long requireId = + Pair<Long, List<Integer>> allocationResult = requirePreAllocation( appId, shuffleId, @@ -592,8 +594,9 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer allocateSize, request.getRetryMax() / maxRetryAttempts, request.getRetryIntervalMax(), - failedStatusCode) - .getLeft(); + failedStatusCode); + long requireId = allocationResult.getLeft(); + needSplitPartitionIds.addAll(allocationResult.getRight()); if (requireId == FAILED_REQUIRE_ID) { throw new RssException( String.format( @@ -661,6 +664,7 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer } else { response = new RssSendShuffleDataResponse(failedStatusCode.get()); } + response.setNeedSplitPartitionIds(needSplitPartitionIds); return response; }
