This is an automated email from the ASF dual-hosted git repository.

rickyma pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 77319985d [MINOR] fix(client/netty): ShuffleServerGrpcNettyClient 
missing to send shuffleId and partitionIds for requirePreAllocation request 
(#1913)
77319985d is described below

commit 77319985d8fe9f63fe230dbb440e99b82bc35b12
Author: maobaolong <baoloong...@tencent.com>
AuthorDate: Tue Jul 16 14:21:12 2024 +0800

    [MINOR] fix(client/netty): ShuffleServerGrpcNettyClient missing to send 
shuffleId and partitionIds for requirePreAllocation request (#1913)
    
    ### What changes were proposed in this pull request?
    
    Add partitionIds and shuffleId to `RequireBufferRequest`.
    
    ### Why are the changes needed?
    
    Without this changes, server cannot check limitHugePartition.
    
    ```java
      public long requireBuffer(
          String appId, int shuffleId, List<Integer> partitionIds, int 
requireSize) {
        ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.get(appId);
        if (null == shuffleTaskInfo) {
          LOG.error("No such app is registered. appId: {}, shuffleId: {}", 
appId, shuffleId);
          throw new NoRegisterException("No such app is registered. appId: " + 
appId);
        }
        for (int partitionId : partitionIds) {
          long partitionUsedDataSize = getPartitionDataSize(appId, shuffleId, 
partitionId);
          if (shuffleBufferManager.limitHugePartition(
              appId, shuffleId, partitionId, partitionUsedDataSize)) {
            String errorMessage =
                String.format(
                    "Huge partition is limited to writing. appId: %s, 
shuffleId: %s, partitionIds: %s, partitionUsedDataSize: %s",
                    appId, shuffleId, partitionIds, partitionUsedDataSize);
            LOG.error(errorMessage);
            throw new NoBufferForHugePartitionException(errorMessage);
          }
        }
        return requireBuffer(appId, requireSize);
      }
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Test on local, start a rss cluster with netty, specific a small huge 
partition size, you can see NoBufferForHugePartitionException
---
 .../uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java    | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
index a05d94b51..26e53851d 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
@@ -17,7 +17,7 @@
 
 package org.apache.uniffle.client.impl.grpc;
 
-import java.util.Collections;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
@@ -148,11 +148,13 @@ public class ShuffleServerGrpcNettyClient extends 
ShuffleServerGrpcClient {
       int shuffleId = stb.getKey();
       int size = 0;
       int blockNum = 0;
+      List<Integer> partitionIds = new ArrayList<>();
       for (Map.Entry<Integer, List<ShuffleBlockInfo>> ptb : 
stb.getValue().entrySet()) {
         for (ShuffleBlockInfo sbi : ptb.getValue()) {
           size += sbi.getSize();
           blockNum++;
         }
+        partitionIds.add(ptb.getKey());
       }
 
       SendShuffleDataRequest sendShuffleDataRequest =
@@ -173,8 +175,8 @@ public class ShuffleServerGrpcNettyClient extends 
ShuffleServerGrpcClient {
               long requireId =
                   requirePreAllocation(
                       request.getAppId(),
-                      0,
-                      Collections.emptyList(),
+                      shuffleId,
+                      partitionIds,
                       allocateSize,
                       request.getRetryMax(),
                       request.getRetryIntervalMax(),

Reply via email to