This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 96e96f8e5 [#2626] feat(spark): Respect rss.client.rpc.maxAttempts in
ShuffleManagerClient (#2627)
96e96f8e5 is described below
commit 96e96f8e5c2ae2c38d73313beee6b4c9cb26cbcc
Author: Junfan Zhang <[email protected]>
AuthorDate: Thu Sep 25 10:30:13 2025 +0800
[#2626] feat(spark): Respect rss.client.rpc.maxAttempts in
ShuffleManagerClient (#2627)
### What changes were proposed in this pull request?
Respect `rss.client.rpc.maxAttempts` in ShuffleManagerClient
### Why are the changes needed?
#2626
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Neen't
Co-authored-by: Junfan Zhang <[email protected]>
---
.../apache/uniffle/shuffle/manager/RssShuffleManagerBase.java | 4 +++-
.../uniffle/client/factory/ShuffleManagerClientFactory.java | 9 +++++++--
2 files changed, 10 insertions(+), 3 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index 45615898e..ad1c4dd9a 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -957,11 +957,13 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
String driver = rssConf.getString("driver.host", "");
int port = rssConf.get(RssClientConf.SHUFFLE_MANAGER_GRPC_PORT);
long rpcTimeout = rssConf.getLong(RssClientConf.RPC_TIMEOUT_MS);
+ int maxAttempts = rssConf.getInteger(RssClientConf.RPC_MAX_ATTEMPTS);
this.managerClientSupplier =
ExpiringCloseableSupplier.of(
() ->
ShuffleManagerClientFactory.getInstance()
- .createShuffleManagerClient(ClientType.GRPC, driver,
port, rpcTimeout));
+ .createShuffleManagerClient(
+ ClientType.GRPC, driver, port, rpcTimeout,
maxAttempts));
}
return managerClientSupplier;
}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleManagerClientFactory.java
b/internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleManagerClientFactory.java
index 66b4a2a9e..68c736657 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleManagerClientFactory.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleManagerClientFactory.java
@@ -33,11 +33,16 @@ public class ShuffleManagerClientFactory {
private ShuffleManagerClientFactory() {}
public ShuffleManagerGrpcClient createShuffleManagerClient(
- ClientType clientType, String host, int port, long rpcTimeout) {
+ ClientType clientType, String host, int port, long rpcTimeout, int
maxAttempts) {
if (ClientType.GRPC.equals(clientType)) {
- return new ShuffleManagerGrpcClient(host, port, rpcTimeout);
+ return new ShuffleManagerGrpcClient(host, port, rpcTimeout, maxAttempts);
} else {
throw new UnsupportedOperationException("Unsupported client type " +
clientType);
}
}
+
+ public ShuffleManagerGrpcClient createShuffleManagerClient(
+ ClientType clientType, String host, int port, long rpcTimeout) {
+ return createShuffleManagerClient(clientType, host, port, rpcTimeout, 3);
+ }
}