This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new f194cc57f [#2529] fix(spark3): Incorrect clientInfo without nettyPort
if netty is enabled (#2530)
f194cc57f is described below
commit f194cc57f386da3aaf9bb94e68070946bd189caf
Author: Junfan Zhang <[email protected]>
AuthorDate: Fri Jun 27 19:08:24 2025 +0800
[#2529] fix(spark3): Incorrect clientInfo without nettyPort if netty is
enabled (#2530)
### What changes were proposed in this pull request?
If the shuffle server enables Netty RPC, clients collect push cost metrics
along with client information. However, the collected shuffle server
information lacks the Netty port, which causes subsequent analysis to be
inaccurate or invalid.
### Why are the changes needed?
for #2529

### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Yes. Test in the internal cluster spark jobs and existing UTs
---
.../uniffle/client/factory/ShuffleServerClientFactory.java | 3 +--
.../uniffle/client/impl/grpc/ShuffleServerGrpcClient.java | 10 +++++++++-
2 files changed, 10 insertions(+), 3 deletions(-)
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleServerClientFactory.java
b/internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleServerClientFactory.java
index 7c419e25f..79d4ba67e 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleServerClientFactory.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleServerClientFactory.java
@@ -46,8 +46,7 @@ public class ShuffleServerClientFactory {
private ShuffleServerClient createShuffleServerClient(
String clientType, ShuffleServerInfo shuffleServerInfo, RssConf rssConf)
{
if (clientType.equalsIgnoreCase(ClientType.GRPC.name())) {
- return new ShuffleServerGrpcClient(
- rssConf, shuffleServerInfo.getHost(),
shuffleServerInfo.getGrpcPort());
+ return new ShuffleServerGrpcClient(rssConf, shuffleServerInfo);
} else if (clientType.equalsIgnoreCase(ClientType.GRPC_NETTY.name())) {
return new ShuffleServerGrpcNettyClient(
rssConf,
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index c7f8563de..1d9d7b0d0 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -152,6 +152,8 @@ public class ShuffleServerGrpcClient extends GrpcClient
implements ShuffleServer
StatusCode.INTERNAL_NOT_RETRY_ERROR,
StatusCode.EXCEED_HUGE_PARTITION_HARD_LIMIT);
+ private ShuffleServerInfo serverInfo;
+
@VisibleForTesting
public ShuffleServerGrpcClient(String host, int port) {
this(
@@ -166,6 +168,11 @@ public class ShuffleServerGrpcClient extends GrpcClient
implements ShuffleServer
-1);
}
+ public ShuffleServerGrpcClient(RssConf rssConf, ShuffleServerInfo
rssServerInfo) {
+ this(rssConf, rssServerInfo.getHost(), rssServerInfo.getGrpcPort());
+ this.serverInfo = rssServerInfo;
+ }
+
public ShuffleServerGrpcClient(RssConf rssConf, String host, int port) {
this(
host,
@@ -1271,7 +1278,8 @@ public class ShuffleServerGrpcClient extends GrpcClient
implements ShuffleServer
@Override
public ClientInfo getClientInfo() {
- return new ClientInfo(ClientType.GRPC, new ShuffleServerInfo(host, port));
+ return new ClientInfo(
+ ClientType.GRPC, serverInfo == null ? new ShuffleServerInfo(host,
port) : serverInfo);
}
protected void waitOrThrow(