This is an automated email from the ASF dual-hosted git repository. roryqi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push: new 7a6d27699 [MINOR] feat(server,coordinator): Heartbeat server startTimeMs to coordinator (#1975) 7a6d27699 is described below commit 7a6d2769980a29bc0309b6bdb52d167eacd71b90 Author: maobaolong <baoloong...@tencent.com> AuthorDate: Tue Jul 30 10:14:15 2024 +0800 [MINOR] feat(server,coordinator): Heartbeat server startTimeMs to coordinator (#1975) ### What changes were proposed in this pull request? Server send start time to coordinator through heartbeat. ### Why are the changes needed? Get each server start time to know well which server ever restarted. ### Does this PR introduce _any_ user-facing change? User can the the server start time from rest api. ### How was this patch tested? curl http://<COORDINATOR_HOST>: <COORDINATOR_JETTY_PORT>/api/server/nodes --- .../apache/uniffle/coordinator/CoordinatorGrpcService.java | 3 ++- .../java/org/apache/uniffle/coordinator/ServerNode.java | 13 +++++++++++-- .../uniffle/client/impl/grpc/CoordinatorGrpcClient.java | 7 +++++-- .../uniffle/client/request/RssSendHeartBeatRequest.java | 9 ++++++++- proto/src/main/proto/Rss.proto | 1 + .../java/org/apache/uniffle/server/RegisterHeartBeat.java | 9 ++++++--- .../main/java/org/apache/uniffle/server/ShuffleServer.java | 10 +++++++++- 7 files changed, 42 insertions(+), 10 deletions(-) diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java index b3df63db5..2b6c1c428 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java @@ -434,6 +434,7 @@ public class CoordinatorGrpcService extends CoordinatorServerGrpc.CoordinatorSer serverStatus, StorageInfoUtils.fromProto(request.getStorageInfoMap()), request.getServerId().getNettyPort(), - request.getServerId().getJettyPort()); + request.getServerId().getJettyPort(), + request.getStartTimeMs()); } } diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java index a9723e00b..8b90fdafe 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java @@ -43,6 +43,7 @@ public class ServerNode implements Comparable<ServerNode> { private Map<String, StorageInfo> storageInfo; private int nettyPort = -1; private int jettyPort = -1; + private long startTimeMs = -1; public ServerNode(String id) { this(id, "", 0, 0, 0, 0, 0, Sets.newHashSet(), ServerStatus.EXCLUDED); @@ -117,6 +118,7 @@ public class ServerNode implements Comparable<ServerNode> { status, storageInfoMap, -1, + -1, -1); } @@ -144,7 +146,8 @@ public class ServerNode implements Comparable<ServerNode> { status, storageInfoMap, nettyPort, - -1); + -1, + -1L); } public ServerNode( @@ -159,7 +162,8 @@ public class ServerNode implements Comparable<ServerNode> { ServerStatus status, Map<String, StorageInfo> storageInfoMap, int nettyPort, - int jettyPort) { + int jettyPort, + long startTimeMs) { this.id = id; this.ip = ip; this.grpcPort = grpcPort; @@ -178,6 +182,7 @@ public class ServerNode implements Comparable<ServerNode> { if (jettyPort > 0) { this.jettyPort = jettyPort; } + this.startTimeMs = startTimeMs; } public ShuffleServerId convertToGrpcProto() { @@ -317,4 +322,8 @@ public class ServerNode implements Comparable<ServerNode> { public int getJettyPort() { return jettyPort; } + + public long getStartTimeMs() { + return startTimeMs; + } } diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java index 5900de6eb..3f7ff066c 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java @@ -125,7 +125,8 @@ public class CoordinatorGrpcClient extends GrpcClient implements CoordinatorClie ServerStatus serverStatus, Map<String, StorageInfo> storageInfo, int nettyPort, - int jettyPort) { + int jettyPort, + long startTimeMs) { ShuffleServerId serverId = ShuffleServerId.newBuilder() .setId(id) @@ -144,6 +145,7 @@ public class CoordinatorGrpcClient extends GrpcClient implements CoordinatorClie .addAllTags(tags) .setStatusValue(serverStatus.ordinal()) .putAllStorageInfo(StorageInfoUtils.toProto(storageInfo)) + .setStartTimeMs(startTimeMs) .build(); RssProtos.StatusCode status; @@ -219,7 +221,8 @@ public class CoordinatorGrpcClient extends GrpcClient implements CoordinatorClie request.getServerStatus(), request.getStorageInfo(), request.getNettyPort(), - request.getJettyPort()); + request.getJettyPort(), + request.getStartTimeMs()); RssSendHeartBeatResponse response; RssProtos.StatusCode statusCode = rpcResponse.getStatus(); diff --git a/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java b/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java index 34d29d750..a31164195 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java @@ -38,6 +38,7 @@ public class RssSendHeartBeatRequest { private final Map<String, StorageInfo> storageInfo; private final int nettyPort; private final int jettyPort; + private final long startTimeMs; public RssSendHeartBeatRequest( String shuffleServerId, @@ -52,7 +53,8 @@ public class RssSendHeartBeatRequest { ServerStatus serverStatus, Map<String, StorageInfo> storageInfo, int nettyPort, - int jettyPort) { + int jettyPort, + long startTimeMs) { this.shuffleServerId = shuffleServerId; this.shuffleServerIp = shuffleServerIp; this.shuffleServerPort = shuffleServerPort; @@ -66,6 +68,7 @@ public class RssSendHeartBeatRequest { this.storageInfo = storageInfo; this.nettyPort = nettyPort; this.jettyPort = jettyPort; + this.startTimeMs = startTimeMs; } public String getShuffleServerId() { @@ -119,4 +122,8 @@ public class RssSendHeartBeatRequest { public int getJettyPort() { return jettyPort; } + + public long getStartTimeMs() { + return startTimeMs; + } } diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto index 861d73a3f..eb7a82220 100644 --- a/proto/src/main/proto/Rss.proto +++ b/proto/src/main/proto/Rss.proto @@ -275,6 +275,7 @@ message ShuffleServerHeartBeatRequest { google.protobuf.BoolValue isHealthy = 7; optional ServerStatus status = 8; map<string, StorageInfo> storageInfo = 21; // mount point to storage info mapping. + optional int64 startTimeMs = 24; } message ShuffleServerHeartBeatResponse { diff --git a/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java b/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java index a8c8b5d76..736b05581 100644 --- a/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java +++ b/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java @@ -86,7 +86,8 @@ public class RegisterHeartBeat { shuffleServer.getServerStatus(), shuffleServer.getStorageManager().getStorageInfo(), shuffleServer.getNettyPort(), - shuffleServer.getJettyPort()); + shuffleServer.getJettyPort(), + shuffleServer.getStartTimeMs()); } catch (Exception e) { LOG.warn("Error happened when send heart beat to coordinator"); } @@ -108,7 +109,8 @@ public class RegisterHeartBeat { ServerStatus serverStatus, Map<String, StorageInfo> localStorageInfo, int nettyPort, - int jettyPort) { + int jettyPort, + long startTimeMs) { AtomicBoolean sendSuccessfully = new AtomicBoolean(false); // use `rss.server.heartbeat.interval` as the timeout option RssSendHeartBeatRequest request = @@ -125,7 +127,8 @@ public class RegisterHeartBeat { serverStatus, localStorageInfo, nettyPort, - jettyPort); + jettyPort, + startTimeMs); ThreadUtils.executeTasks( heartBeatExecutorService, diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java index 461fe2aab..88ddb04fe 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java @@ -104,8 +104,11 @@ public class ShuffleServer { private StreamServer streamServer; private JvmPauseMonitor jvmPauseMonitor; + private final long startTimeMs; + public ShuffleServer(ShuffleServerConf shuffleServerConf) throws Exception { this.shuffleServerConf = shuffleServerConf; + this.startTimeMs = System.currentTimeMillis(); try { initialization(); } catch (Exception e) { @@ -542,6 +545,10 @@ public class ShuffleServer { return StringUtils.join(tags, ","); } + public long getStartTimeMs() { + return startTimeMs; + } + @VisibleForTesting public void sendHeartbeat() { ShuffleServer shuffleServer = this; @@ -557,6 +564,7 @@ public class ShuffleServer { shuffleServer.getServerStatus(), shuffleServer.getStorageManager().getStorageInfo(), shuffleServer.getNettyPort(), - shuffleServer.getJettyPort()); + shuffleServer.getJettyPort(), + shuffleServer.getStartTimeMs()); } }