This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a35b0f5e [#2207] feat(dashboard): Add the write information of 
appinfo in Shuflle Server heartbeat (#2208)
3a35b0f5e is described below

commit 3a35b0f5eedd811c66db64388c839bec691cab9a
Author: leewish <[email protected]>
AuthorDate: Mon Oct 21 19:35:27 2024 +0800

    [#2207] feat(dashboard): Add the write information of appinfo in Shuflle 
Server heartbeat (#2208)
    
    ### What changes were proposed in this pull request?
    
    Add the write information of appinfo in Shuflle Server heartbeat
    
    ### Why are the changes needed?
    
    Fix: #2207
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Locally
    Co-authored-by: wenlongwlli <[email protected]>
---
 .../coordinator/CoordinatorGrpcService.java        |  3 ++-
 .../org/apache/uniffle/coordinator/ServerNode.java | 13 +++++++++--
 .../client/impl/grpc/CoordinatorGrpcClient.java    |  7 ++++--
 .../client/request/RssSendHeartBeatRequest.java    | 11 ++++++++-
 proto/src/main/proto/Rss.proto                     | 12 ++++++++++
 .../apache/uniffle/server/RegisterHeartBeat.java   | 11 ++++++---
 .../org/apache/uniffle/server/ShuffleServer.java   | 26 +++++++++++++++++++++-
 .../org/apache/uniffle/server/ShuffleTaskInfo.java |  4 ++++
 8 files changed, 77 insertions(+), 10 deletions(-)

diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
index 33e08c474..e49430001 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
@@ -537,7 +537,8 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
         request.getServerId().getJettyPort(),
         request.getStartTimeMs(),
         request.getVersion(),
-        request.getGitCommitId());
+        request.getGitCommitId(),
+        request.getApplicationInfoList());
   }
 
   /**
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java
index ad992f0bc..356c4bfe9 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java
@@ -17,14 +17,18 @@
 
 package org.apache.uniffle.coordinator;
 
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 import org.apache.uniffle.common.ServerStatus;
 import org.apache.uniffle.common.storage.StorageInfo;
+import org.apache.uniffle.proto.RssProtos;
 import org.apache.uniffle.proto.RssProtos.ShuffleServerId;
 
 public class ServerNode implements Comparable<ServerNode> {
@@ -46,6 +50,7 @@ public class ServerNode implements Comparable<ServerNode> {
   private long startTime = -1;
   private String version;
   private String gitCommitId;
+  Map<String, RssProtos.ApplicationInfo> appIdToInfos;
 
   public ServerNode(String id) {
     this(id, "", 0, 0, 0, 0, 0, Sets.newHashSet(), ServerStatus.EXCLUDED);
@@ -181,7 +186,8 @@ public class ServerNode implements Comparable<ServerNode> {
         jettyPort,
         startTime,
         "",
-        "");
+        "",
+        Collections.EMPTY_LIST);
   }
 
   public ServerNode(
@@ -199,7 +205,8 @@ public class ServerNode implements Comparable<ServerNode> {
       int jettyPort,
       long startTime,
       String version,
-      String gitCommitId) {
+      String gitCommitId,
+      List<RssProtos.ApplicationInfo> appInfos) {
     this.id = id;
     this.ip = ip;
     this.grpcPort = grpcPort;
@@ -221,6 +228,8 @@ public class ServerNode implements Comparable<ServerNode> {
     this.startTime = startTime;
     this.version = version;
     this.gitCommitId = gitCommitId;
+    this.appIdToInfos = new ConcurrentHashMap<>();
+    appInfos.forEach(appInfo -> appIdToInfos.put(appInfo.getAppId(), appInfo));
   }
 
   public ShuffleServerId convertToGrpcProto() {
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
index 8583e952e..fbfe57824 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
@@ -127,7 +127,8 @@ public class CoordinatorGrpcClient extends GrpcClient 
implements CoordinatorClie
       Map<String, StorageInfo> storageInfo,
       int nettyPort,
       int jettyPort,
-      long startTimeMs) {
+      long startTimeMs,
+      List<RssProtos.ApplicationInfo> appInfos) {
     ShuffleServerId serverId =
         ShuffleServerId.newBuilder()
             .setId(id)
@@ -149,6 +150,7 @@ public class CoordinatorGrpcClient extends GrpcClient 
implements CoordinatorClie
             .setStartTimeMs(startTimeMs)
             .setVersion(Constants.VERSION)
             .setGitCommitId(Constants.REVISION_SHORT)
+            .addAllApplicationInfo(appInfos)
             .build();
 
     RssProtos.StatusCode status;
@@ -225,7 +227,8 @@ public class CoordinatorGrpcClient extends GrpcClient 
implements CoordinatorClie
             request.getStorageInfo(),
             request.getNettyPort(),
             request.getJettyPort(),
-            request.getStartTimeMs());
+            request.getStartTimeMs(),
+            request.getAppInfos());
 
     RssSendHeartBeatResponse response;
     RssProtos.StatusCode statusCode = rpcResponse.getStatus();
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java
index a31164195..a4f23ba73 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java
@@ -17,11 +17,13 @@
 
 package org.apache.uniffle.client.request;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.uniffle.common.ServerStatus;
 import org.apache.uniffle.common.storage.StorageInfo;
+import org.apache.uniffle.proto.RssProtos;
 
 public class RssSendHeartBeatRequest {
 
@@ -39,6 +41,7 @@ public class RssSendHeartBeatRequest {
   private final int nettyPort;
   private final int jettyPort;
   private final long startTimeMs;
+  private final List<RssProtos.ApplicationInfo> appInfos;
 
   public RssSendHeartBeatRequest(
       String shuffleServerId,
@@ -54,7 +57,8 @@ public class RssSendHeartBeatRequest {
       Map<String, StorageInfo> storageInfo,
       int nettyPort,
       int jettyPort,
-      long startTimeMs) {
+      long startTimeMs,
+      List<RssProtos.ApplicationInfo> appInfos) {
     this.shuffleServerId = shuffleServerId;
     this.shuffleServerIp = shuffleServerIp;
     this.shuffleServerPort = shuffleServerPort;
@@ -69,6 +73,7 @@ public class RssSendHeartBeatRequest {
     this.nettyPort = nettyPort;
     this.jettyPort = jettyPort;
     this.startTimeMs = startTimeMs;
+    this.appInfos = appInfos;
   }
 
   public String getShuffleServerId() {
@@ -126,4 +131,8 @@ public class RssSendHeartBeatRequest {
   public long getStartTimeMs() {
     return startTimeMs;
   }
+
+  public List<RssProtos.ApplicationInfo> getAppInfos() {
+    return appInfos;
+  }
 }
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index 06d781e13..7e4b19696 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -273,6 +273,17 @@ enum ServerStatus {
   // todo: more status, such as UPGRADING
 }
 
+message ApplicationInfo {
+  string appId = 1;
+  int64 partitionNum = 2;
+  int64 memorySize = 3;
+  int64 localFileNum = 4;
+  int64 localTotalSize = 5;
+  int64 hadoopFileNum = 6;
+  int64 hadoopTotalSize = 7;
+  int64 totalSize = 8;
+}
+
 message ShuffleServerHeartBeatRequest {
   ShuffleServerId serverId = 1;
   int64 usedMemory = 2;
@@ -286,6 +297,7 @@ message ShuffleServerHeartBeatRequest {
   optional string version = 22;
   optional string gitCommitId = 23;
   optional int64 startTimeMs = 24;
+  repeated ApplicationInfo applicationInfo = 25;
 }
 
 message ShuffleServerHeartBeatResponse {
diff --git 
a/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java 
b/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
index 4b2d4607a..5a44728b5 100644
--- a/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
+++ b/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
@@ -17,6 +17,7 @@
 
 package org.apache.uniffle.server;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
@@ -33,6 +34,7 @@ import org.apache.uniffle.common.ServerStatus;
 import org.apache.uniffle.common.rpc.StatusCode;
 import org.apache.uniffle.common.storage.StorageInfo;
 import org.apache.uniffle.common.util.ThreadUtils;
+import org.apache.uniffle.proto.RssProtos;
 
 public class RegisterHeartBeat {
 
@@ -84,7 +86,8 @@ public class RegisterHeartBeat {
                 shuffleServer.getStorageManager().getStorageInfo(),
                 shuffleServer.getNettyPort(),
                 shuffleServer.getJettyPort(),
-                shuffleServer.getStartTimeMs());
+                shuffleServer.getStartTimeMs(),
+                shuffleServer.getAppInfos());
           } catch (Exception e) {
             LOG.warn("Error happened when send heart beat to coordinator");
           }
@@ -107,7 +110,8 @@ public class RegisterHeartBeat {
       Map<String, StorageInfo> localStorageInfo,
       int nettyPort,
       int jettyPort,
-      long startTimeMs) {
+      long startTimeMs,
+      List<RssProtos.ApplicationInfo> appInfos) {
     // use `rss.server.heartbeat.interval` as the timeout option
     RssSendHeartBeatRequest request =
         new RssSendHeartBeatRequest(
@@ -124,7 +128,8 @@ public class RegisterHeartBeat {
             localStorageInfo,
             nettyPort,
             jettyPort,
-            startTimeMs);
+            startTimeMs,
+            appInfos);
 
     if (coordinatorClient.sendHeartBeat(request).getStatusCode() == 
StatusCode.SUCCESS) {
       return true;
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index 92fa6b36b..59f53c97a 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -17,8 +17,10 @@
 
 package org.apache.uniffle.server;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -57,6 +59,7 @@ import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.common.util.ThreadUtils;
 import org.apache.uniffle.common.web.CoalescedCollectorRegistry;
 import org.apache.uniffle.common.web.JettyServer;
+import org.apache.uniffle.proto.RssProtos;
 import org.apache.uniffle.server.buffer.ShuffleBufferManager;
 import org.apache.uniffle.server.buffer.ShuffleBufferType;
 import org.apache.uniffle.server.merge.ShuffleMergeManager;
@@ -584,6 +587,26 @@ public class ShuffleServer {
     return startTimeMs;
   }
 
+  public List<RssProtos.ApplicationInfo> getAppInfos() {
+    List<RssProtos.ApplicationInfo> appInfos = new ArrayList<>();
+    Map<String, ShuffleTaskInfo> taskInfos = 
getShuffleTaskManager().getShuffleTaskInfos();
+    taskInfos.forEach(
+        (appId, taskInfo) -> {
+          RssProtos.ApplicationInfo applicationInfo =
+              RssProtos.ApplicationInfo.newBuilder()
+                  .setAppId(appId)
+                  .setPartitionNum(taskInfo.getPartitionNum())
+                  .setMemorySize(taskInfo.getInMemoryDataSize())
+                  .setLocalTotalSize(taskInfo.getOnLocalFileDataSize())
+                  .setHadoopTotalSize(taskInfo.getOnHadoopDataSize())
+                  .setTotalSize(taskInfo.getTotalDataSize())
+                  .build();
+
+          appInfos.add(applicationInfo);
+        });
+    return appInfos;
+  }
+
   @VisibleForTesting
   public void sendHeartbeat() {
     ShuffleServer shuffleServer = this;
@@ -600,7 +623,8 @@ public class ShuffleServer {
         shuffleServer.getStorageManager().getStorageInfo(),
         shuffleServer.getNettyPort(),
         shuffleServer.getJettyPort(),
-        shuffleServer.getStartTimeMs());
+        shuffleServer.getStartTimeMs(),
+        shuffleServer.getAppInfos());
   }
 
   public ShuffleMergeManager getShuffleMergeManager() {
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
index d4e6eeb32..94987d661 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
@@ -283,6 +283,10 @@ public class ShuffleTaskInfo {
     return shuffleDetailInfos.get(shuffleId);
   }
 
+  public long getPartitionNum() {
+    return partitionDataSizes.values().stream().mapToLong(Map::size).sum();
+  }
+
   @Override
   public String toString() {
     return "ShuffleTaskInfo{"

Reply via email to