This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 96e96f8e5 [#2626] feat(spark): Respect rss.client.rpc.maxAttempts in 
ShuffleManagerClient (#2627)
96e96f8e5 is described below

commit 96e96f8e5c2ae2c38d73313beee6b4c9cb26cbcc
Author: Junfan Zhang <[email protected]>
AuthorDate: Thu Sep 25 10:30:13 2025 +0800

    [#2626] feat(spark): Respect rss.client.rpc.maxAttempts in 
ShuffleManagerClient (#2627)
    
    ### What changes were proposed in this pull request?
    
    Respect `rss.client.rpc.maxAttempts` in ShuffleManagerClient
    
    ### Why are the changes needed?
    
    #2626
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Neen't
    
    Co-authored-by: Junfan Zhang <[email protected]>
---
 .../apache/uniffle/shuffle/manager/RssShuffleManagerBase.java    | 4 +++-
 .../uniffle/client/factory/ShuffleManagerClientFactory.java      | 9 +++++++--
 2 files changed, 10 insertions(+), 3 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index 45615898e..ad1c4dd9a 100644
--- 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++ 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -957,11 +957,13 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
       String driver = rssConf.getString("driver.host", "");
       int port = rssConf.get(RssClientConf.SHUFFLE_MANAGER_GRPC_PORT);
       long rpcTimeout = rssConf.getLong(RssClientConf.RPC_TIMEOUT_MS);
+      int maxAttempts = rssConf.getInteger(RssClientConf.RPC_MAX_ATTEMPTS);
       this.managerClientSupplier =
           ExpiringCloseableSupplier.of(
               () ->
                   ShuffleManagerClientFactory.getInstance()
-                      .createShuffleManagerClient(ClientType.GRPC, driver, 
port, rpcTimeout));
+                      .createShuffleManagerClient(
+                          ClientType.GRPC, driver, port, rpcTimeout, 
maxAttempts));
     }
     return managerClientSupplier;
   }
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleManagerClientFactory.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleManagerClientFactory.java
index 66b4a2a9e..68c736657 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleManagerClientFactory.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleManagerClientFactory.java
@@ -33,11 +33,16 @@ public class ShuffleManagerClientFactory {
   private ShuffleManagerClientFactory() {}
 
   public ShuffleManagerGrpcClient createShuffleManagerClient(
-      ClientType clientType, String host, int port, long rpcTimeout) {
+      ClientType clientType, String host, int port, long rpcTimeout, int 
maxAttempts) {
     if (ClientType.GRPC.equals(clientType)) {
-      return new ShuffleManagerGrpcClient(host, port, rpcTimeout);
+      return new ShuffleManagerGrpcClient(host, port, rpcTimeout, maxAttempts);
     } else {
       throw new UnsupportedOperationException("Unsupported client type " + 
clientType);
     }
   }
+
+  public ShuffleManagerGrpcClient createShuffleManagerClient(
+      ClientType clientType, String host, int port, long rpcTimeout) {
+    return createShuffleManagerClient(clientType, host, port, rpcTimeout, 3);
+  }
 }

Reply via email to