This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a6d27699 [MINOR] feat(server,coordinator): Heartbeat server 
startTimeMs to coordinator (#1975)
7a6d27699 is described below

commit 7a6d2769980a29bc0309b6bdb52d167eacd71b90
Author: maobaolong <baoloong...@tencent.com>
AuthorDate: Tue Jul 30 10:14:15 2024 +0800

    [MINOR] feat(server,coordinator): Heartbeat server startTimeMs to 
coordinator (#1975)
    
    ### What changes were proposed in this pull request?
    
    Server send start time to coordinator through heartbeat.
    
    ### Why are the changes needed?
    
    Get each server start time to know well which server ever restarted.
    
    ### Does this PR introduce _any_ user-facing change?
    
    User can the the server start time from rest api.
    
    ### How was this patch tested?
    
    curl http://<COORDINATOR_HOST>: <COORDINATOR_JETTY_PORT>/api/server/nodes
---
 .../apache/uniffle/coordinator/CoordinatorGrpcService.java  |  3 ++-
 .../java/org/apache/uniffle/coordinator/ServerNode.java     | 13 +++++++++++--
 .../uniffle/client/impl/grpc/CoordinatorGrpcClient.java     |  7 +++++--
 .../uniffle/client/request/RssSendHeartBeatRequest.java     |  9 ++++++++-
 proto/src/main/proto/Rss.proto                              |  1 +
 .../java/org/apache/uniffle/server/RegisterHeartBeat.java   |  9 ++++++---
 .../main/java/org/apache/uniffle/server/ShuffleServer.java  | 10 +++++++++-
 7 files changed, 42 insertions(+), 10 deletions(-)

diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
index b3df63db5..2b6c1c428 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
@@ -434,6 +434,7 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
         serverStatus,
         StorageInfoUtils.fromProto(request.getStorageInfoMap()),
         request.getServerId().getNettyPort(),
-        request.getServerId().getJettyPort());
+        request.getServerId().getJettyPort(),
+        request.getStartTimeMs());
   }
 }
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java
index a9723e00b..8b90fdafe 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java
@@ -43,6 +43,7 @@ public class ServerNode implements Comparable<ServerNode> {
   private Map<String, StorageInfo> storageInfo;
   private int nettyPort = -1;
   private int jettyPort = -1;
+  private long startTimeMs = -1;
 
   public ServerNode(String id) {
     this(id, "", 0, 0, 0, 0, 0, Sets.newHashSet(), ServerStatus.EXCLUDED);
@@ -117,6 +118,7 @@ public class ServerNode implements Comparable<ServerNode> {
         status,
         storageInfoMap,
         -1,
+        -1,
         -1);
   }
 
@@ -144,7 +146,8 @@ public class ServerNode implements Comparable<ServerNode> {
         status,
         storageInfoMap,
         nettyPort,
-        -1);
+        -1,
+        -1L);
   }
 
   public ServerNode(
@@ -159,7 +162,8 @@ public class ServerNode implements Comparable<ServerNode> {
       ServerStatus status,
       Map<String, StorageInfo> storageInfoMap,
       int nettyPort,
-      int jettyPort) {
+      int jettyPort,
+      long startTimeMs) {
     this.id = id;
     this.ip = ip;
     this.grpcPort = grpcPort;
@@ -178,6 +182,7 @@ public class ServerNode implements Comparable<ServerNode> {
     if (jettyPort > 0) {
       this.jettyPort = jettyPort;
     }
+    this.startTimeMs = startTimeMs;
   }
 
   public ShuffleServerId convertToGrpcProto() {
@@ -317,4 +322,8 @@ public class ServerNode implements Comparable<ServerNode> {
   public int getJettyPort() {
     return jettyPort;
   }
+
+  public long getStartTimeMs() {
+    return startTimeMs;
+  }
 }
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
index 5900de6eb..3f7ff066c 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
@@ -125,7 +125,8 @@ public class CoordinatorGrpcClient extends GrpcClient 
implements CoordinatorClie
       ServerStatus serverStatus,
       Map<String, StorageInfo> storageInfo,
       int nettyPort,
-      int jettyPort) {
+      int jettyPort,
+      long startTimeMs) {
     ShuffleServerId serverId =
         ShuffleServerId.newBuilder()
             .setId(id)
@@ -144,6 +145,7 @@ public class CoordinatorGrpcClient extends GrpcClient 
implements CoordinatorClie
             .addAllTags(tags)
             .setStatusValue(serverStatus.ordinal())
             .putAllStorageInfo(StorageInfoUtils.toProto(storageInfo))
+            .setStartTimeMs(startTimeMs)
             .build();
 
     RssProtos.StatusCode status;
@@ -219,7 +221,8 @@ public class CoordinatorGrpcClient extends GrpcClient 
implements CoordinatorClie
             request.getServerStatus(),
             request.getStorageInfo(),
             request.getNettyPort(),
-            request.getJettyPort());
+            request.getJettyPort(),
+            request.getStartTimeMs());
 
     RssSendHeartBeatResponse response;
     RssProtos.StatusCode statusCode = rpcResponse.getStatus();
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java
index 34d29d750..a31164195 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java
@@ -38,6 +38,7 @@ public class RssSendHeartBeatRequest {
   private final Map<String, StorageInfo> storageInfo;
   private final int nettyPort;
   private final int jettyPort;
+  private final long startTimeMs;
 
   public RssSendHeartBeatRequest(
       String shuffleServerId,
@@ -52,7 +53,8 @@ public class RssSendHeartBeatRequest {
       ServerStatus serverStatus,
       Map<String, StorageInfo> storageInfo,
       int nettyPort,
-      int jettyPort) {
+      int jettyPort,
+      long startTimeMs) {
     this.shuffleServerId = shuffleServerId;
     this.shuffleServerIp = shuffleServerIp;
     this.shuffleServerPort = shuffleServerPort;
@@ -66,6 +68,7 @@ public class RssSendHeartBeatRequest {
     this.storageInfo = storageInfo;
     this.nettyPort = nettyPort;
     this.jettyPort = jettyPort;
+    this.startTimeMs = startTimeMs;
   }
 
   public String getShuffleServerId() {
@@ -119,4 +122,8 @@ public class RssSendHeartBeatRequest {
   public int getJettyPort() {
     return jettyPort;
   }
+
+  public long getStartTimeMs() {
+    return startTimeMs;
+  }
 }
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index 861d73a3f..eb7a82220 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -275,6 +275,7 @@ message ShuffleServerHeartBeatRequest {
   google.protobuf.BoolValue isHealthy = 7;
   optional ServerStatus status = 8;
   map<string, StorageInfo> storageInfo = 21; // mount point to storage info 
mapping.
+  optional int64 startTimeMs = 24;
 }
 
 message ShuffleServerHeartBeatResponse {
diff --git 
a/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java 
b/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
index a8c8b5d76..736b05581 100644
--- a/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
+++ b/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
@@ -86,7 +86,8 @@ public class RegisterHeartBeat {
                 shuffleServer.getServerStatus(),
                 shuffleServer.getStorageManager().getStorageInfo(),
                 shuffleServer.getNettyPort(),
-                shuffleServer.getJettyPort());
+                shuffleServer.getJettyPort(),
+                shuffleServer.getStartTimeMs());
           } catch (Exception e) {
             LOG.warn("Error happened when send heart beat to coordinator");
           }
@@ -108,7 +109,8 @@ public class RegisterHeartBeat {
       ServerStatus serverStatus,
       Map<String, StorageInfo> localStorageInfo,
       int nettyPort,
-      int jettyPort) {
+      int jettyPort,
+      long startTimeMs) {
     AtomicBoolean sendSuccessfully = new AtomicBoolean(false);
     // use `rss.server.heartbeat.interval` as the timeout option
     RssSendHeartBeatRequest request =
@@ -125,7 +127,8 @@ public class RegisterHeartBeat {
             serverStatus,
             localStorageInfo,
             nettyPort,
-            jettyPort);
+            jettyPort,
+            startTimeMs);
 
     ThreadUtils.executeTasks(
         heartBeatExecutorService,
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index 461fe2aab..88ddb04fe 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -104,8 +104,11 @@ public class ShuffleServer {
   private StreamServer streamServer;
   private JvmPauseMonitor jvmPauseMonitor;
 
+  private final long startTimeMs;
+
   public ShuffleServer(ShuffleServerConf shuffleServerConf) throws Exception {
     this.shuffleServerConf = shuffleServerConf;
+    this.startTimeMs = System.currentTimeMillis();
     try {
       initialization();
     } catch (Exception e) {
@@ -542,6 +545,10 @@ public class ShuffleServer {
     return StringUtils.join(tags, ",");
   }
 
+  public long getStartTimeMs() {
+    return startTimeMs;
+  }
+
   @VisibleForTesting
   public void sendHeartbeat() {
     ShuffleServer shuffleServer = this;
@@ -557,6 +564,7 @@ public class ShuffleServer {
         shuffleServer.getServerStatus(),
         shuffleServer.getStorageManager().getStorageInfo(),
         shuffleServer.getNettyPort(),
-        shuffleServer.getJettyPort());
+        shuffleServer.getJettyPort(),
+        shuffleServer.getStartTimeMs());
   }
 }

Reply via email to