This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 4def3cc7e [#2093][followup] feat: Add support of partition split for 
grpc (#2396)
4def3cc7e is described below

commit 4def3cc7e7f45ac8ee2bb37cb2fb29b7894f2c5e
Author: Junfan Zhang <[email protected]>
AuthorDate: Mon Mar 17 17:29:57 2025 +0800

    [#2093][followup] feat: Add support of partition split for grpc (#2396)
    
    ### What changes were proposed in this pull request?
    
    Add support of partition split for grpc
    
    ### Why are the changes needed?
    
    followup for #2093
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing unit tests
---
 .../shuffle/manager/RssShuffleManagerBase.java     |  1 +
 .../client/impl/grpc/ShuffleServerGrpcClient.java  | 26 +++++++++++++---------
 2 files changed, 16 insertions(+), 11 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index d869c64fe..43f9c1f00 100644
--- 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++ 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -271,6 +271,7 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
         throw new RssException(
             "The feature of task partition reassign is incompatible with 
multiple replicas mechanism.");
       }
+      LOG.info("Partition reassign is enabled.");
     }
     this.blockIdSelfManagedEnabled = 
rssConf.getBoolean(RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED);
     this.shuffleManagerRpcServiceEnabled =
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index cb477df34..82993890e 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -23,11 +23,13 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.UnsafeByteOperations;
 import io.grpc.StatusRuntimeException;
@@ -540,7 +542,7 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
 
     boolean isSuccessful = true;
     AtomicReference<StatusCode> failedStatusCode = new 
AtomicReference<>(StatusCode.INTERNAL_ERROR);
-
+    Set<Integer> needSplitPartitionIds = Sets.newHashSet();
     // prepare rpc request based on shuffleId -> partitionId -> blocks
     for (Map.Entry<Integer, Map<Integer, List<ShuffleBlockInfo>>> stb :
         shuffleIdToBlocks.entrySet()) {
@@ -583,17 +585,18 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
         RetryUtils.retryWithCondition(
             () -> {
               // TODO(baoloongmao): support partition split follow netty client
-              long requireId =
+              Pair<Long, List<Integer>> allocationResult =
                   requirePreAllocation(
-                          appId,
-                          shuffleId,
-                          partitionIds,
-                          partitionRequireSizes,
-                          allocateSize,
-                          request.getRetryMax() / maxRetryAttempts,
-                          request.getRetryIntervalMax(),
-                          failedStatusCode)
-                      .getLeft();
+                      appId,
+                      shuffleId,
+                      partitionIds,
+                      partitionRequireSizes,
+                      allocateSize,
+                      request.getRetryMax() / maxRetryAttempts,
+                      request.getRetryIntervalMax(),
+                      failedStatusCode);
+              long requireId = allocationResult.getLeft();
+              needSplitPartitionIds.addAll(allocationResult.getRight());
               if (requireId == FAILED_REQUIRE_ID) {
                 throw new RssException(
                     String.format(
@@ -661,6 +664,7 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
     } else {
       response = new RssSendShuffleDataResponse(failedStatusCode.get());
     }
+    response.setNeedSplitPartitionIds(needSplitPartitionIds);
     return response;
   }
 

Reply via email to