This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch partition-split
in repository https://gitbox.apache.org/repos/asf/uniffle.git

commit c532ac1de04ba54fff3076f2a3685959af698d7f
Author: Junfan Zhang <[email protected]>
AuthorDate: Wed Mar 12 21:20:11 2025 +0800

    feat: Add support of partition split for grpc
---
 .../uniffle/shuffle/manager/RssShuffleManagerBase.java       |  1 +
 .../uniffle/client/impl/grpc/ShuffleServerGrpcClient.java    | 12 ++++++++----
 2 files changed, 9 insertions(+), 4 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index d869c64fe..43f9c1f00 100644
--- 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++ 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -271,6 +271,7 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
         throw new RssException(
             "The feature of task partition reassign is incompatible with 
multiple replicas mechanism.");
       }
+      LOG.info("Partition reassign is enabled.");
     }
     this.blockIdSelfManagedEnabled = 
rssConf.getBoolean(RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED);
     this.shuffleManagerRpcServiceEnabled =
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index cb477df34..7c8c88012 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -23,11 +23,13 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.UnsafeByteOperations;
 import io.grpc.StatusRuntimeException;
@@ -540,7 +542,7 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
 
     boolean isSuccessful = true;
     AtomicReference<StatusCode> failedStatusCode = new 
AtomicReference<>(StatusCode.INTERNAL_ERROR);
-
+    Set<Integer> needSplitPartitionIds = Sets.newHashSet();
     // prepare rpc request based on shuffleId -> partitionId -> blocks
     for (Map.Entry<Integer, Map<Integer, List<ShuffleBlockInfo>>> stb :
         shuffleIdToBlocks.entrySet()) {
@@ -583,7 +585,7 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
         RetryUtils.retryWithCondition(
             () -> {
               // TODO(baoloongmao): support partition split follow netty client
-              long requireId =
+              Pair<Long, List<Integer>> allocationResult =
                   requirePreAllocation(
                           appId,
                           shuffleId,
@@ -592,8 +594,9 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
                           allocateSize,
                           request.getRetryMax() / maxRetryAttempts,
                           request.getRetryIntervalMax(),
-                          failedStatusCode)
-                      .getLeft();
+                          failedStatusCode);
+              long requireId = allocationResult.getLeft();
+              needSplitPartitionIds.addAll(allocationResult.getRight());
               if (requireId == FAILED_REQUIRE_ID) {
                 throw new RssException(
                     String.format(
@@ -661,6 +664,7 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
     } else {
       response = new RssSendShuffleDataResponse(failedStatusCode.get());
     }
+    response.setNeedSplitPartitionIds(needSplitPartitionIds);
     return response;
   }
 

Reply via email to