This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 3a35b0f5e [#2207] feat(dashboard): Add the write information of
appinfo in Shuflle Server heartbeat (#2208)
3a35b0f5e is described below
commit 3a35b0f5eedd811c66db64388c839bec691cab9a
Author: leewish <[email protected]>
AuthorDate: Mon Oct 21 19:35:27 2024 +0800
[#2207] feat(dashboard): Add the write information of appinfo in Shuflle
Server heartbeat (#2208)
### What changes were proposed in this pull request?
Add the write information of appinfo in Shuflle Server heartbeat
### Why are the changes needed?
Fix: #2207
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Locally
Co-authored-by: wenlongwlli <[email protected]>
---
.../coordinator/CoordinatorGrpcService.java | 3 ++-
.../org/apache/uniffle/coordinator/ServerNode.java | 13 +++++++++--
.../client/impl/grpc/CoordinatorGrpcClient.java | 7 ++++--
.../client/request/RssSendHeartBeatRequest.java | 11 ++++++++-
proto/src/main/proto/Rss.proto | 12 ++++++++++
.../apache/uniffle/server/RegisterHeartBeat.java | 11 ++++++---
.../org/apache/uniffle/server/ShuffleServer.java | 26 +++++++++++++++++++++-
.../org/apache/uniffle/server/ShuffleTaskInfo.java | 4 ++++
8 files changed, 77 insertions(+), 10 deletions(-)
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
index 33e08c474..e49430001 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
@@ -537,7 +537,8 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
request.getServerId().getJettyPort(),
request.getStartTimeMs(),
request.getVersion(),
- request.getGitCommitId());
+ request.getGitCommitId(),
+ request.getApplicationInfoList());
}
/**
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java
index ad992f0bc..356c4bfe9 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java
@@ -17,14 +17,18 @@
package org.apache.uniffle.coordinator;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.uniffle.common.ServerStatus;
import org.apache.uniffle.common.storage.StorageInfo;
+import org.apache.uniffle.proto.RssProtos;
import org.apache.uniffle.proto.RssProtos.ShuffleServerId;
public class ServerNode implements Comparable<ServerNode> {
@@ -46,6 +50,7 @@ public class ServerNode implements Comparable<ServerNode> {
private long startTime = -1;
private String version;
private String gitCommitId;
+ Map<String, RssProtos.ApplicationInfo> appIdToInfos;
public ServerNode(String id) {
this(id, "", 0, 0, 0, 0, 0, Sets.newHashSet(), ServerStatus.EXCLUDED);
@@ -181,7 +186,8 @@ public class ServerNode implements Comparable<ServerNode> {
jettyPort,
startTime,
"",
- "");
+ "",
+ Collections.EMPTY_LIST);
}
public ServerNode(
@@ -199,7 +205,8 @@ public class ServerNode implements Comparable<ServerNode> {
int jettyPort,
long startTime,
String version,
- String gitCommitId) {
+ String gitCommitId,
+ List<RssProtos.ApplicationInfo> appInfos) {
this.id = id;
this.ip = ip;
this.grpcPort = grpcPort;
@@ -221,6 +228,8 @@ public class ServerNode implements Comparable<ServerNode> {
this.startTime = startTime;
this.version = version;
this.gitCommitId = gitCommitId;
+ this.appIdToInfos = new ConcurrentHashMap<>();
+ appInfos.forEach(appInfo -> appIdToInfos.put(appInfo.getAppId(), appInfo));
}
public ShuffleServerId convertToGrpcProto() {
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
index 8583e952e..fbfe57824 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
@@ -127,7 +127,8 @@ public class CoordinatorGrpcClient extends GrpcClient
implements CoordinatorClie
Map<String, StorageInfo> storageInfo,
int nettyPort,
int jettyPort,
- long startTimeMs) {
+ long startTimeMs,
+ List<RssProtos.ApplicationInfo> appInfos) {
ShuffleServerId serverId =
ShuffleServerId.newBuilder()
.setId(id)
@@ -149,6 +150,7 @@ public class CoordinatorGrpcClient extends GrpcClient
implements CoordinatorClie
.setStartTimeMs(startTimeMs)
.setVersion(Constants.VERSION)
.setGitCommitId(Constants.REVISION_SHORT)
+ .addAllApplicationInfo(appInfos)
.build();
RssProtos.StatusCode status;
@@ -225,7 +227,8 @@ public class CoordinatorGrpcClient extends GrpcClient
implements CoordinatorClie
request.getStorageInfo(),
request.getNettyPort(),
request.getJettyPort(),
- request.getStartTimeMs());
+ request.getStartTimeMs(),
+ request.getAppInfos());
RssSendHeartBeatResponse response;
RssProtos.StatusCode statusCode = rpcResponse.getStatus();
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java
index a31164195..a4f23ba73 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java
@@ -17,11 +17,13 @@
package org.apache.uniffle.client.request;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.uniffle.common.ServerStatus;
import org.apache.uniffle.common.storage.StorageInfo;
+import org.apache.uniffle.proto.RssProtos;
public class RssSendHeartBeatRequest {
@@ -39,6 +41,7 @@ public class RssSendHeartBeatRequest {
private final int nettyPort;
private final int jettyPort;
private final long startTimeMs;
+ private final List<RssProtos.ApplicationInfo> appInfos;
public RssSendHeartBeatRequest(
String shuffleServerId,
@@ -54,7 +57,8 @@ public class RssSendHeartBeatRequest {
Map<String, StorageInfo> storageInfo,
int nettyPort,
int jettyPort,
- long startTimeMs) {
+ long startTimeMs,
+ List<RssProtos.ApplicationInfo> appInfos) {
this.shuffleServerId = shuffleServerId;
this.shuffleServerIp = shuffleServerIp;
this.shuffleServerPort = shuffleServerPort;
@@ -69,6 +73,7 @@ public class RssSendHeartBeatRequest {
this.nettyPort = nettyPort;
this.jettyPort = jettyPort;
this.startTimeMs = startTimeMs;
+ this.appInfos = appInfos;
}
public String getShuffleServerId() {
@@ -126,4 +131,8 @@ public class RssSendHeartBeatRequest {
public long getStartTimeMs() {
return startTimeMs;
}
+
+ public List<RssProtos.ApplicationInfo> getAppInfos() {
+ return appInfos;
+ }
}
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index 06d781e13..7e4b19696 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -273,6 +273,17 @@ enum ServerStatus {
// todo: more status, such as UPGRADING
}
+message ApplicationInfo {
+ string appId = 1;
+ int64 partitionNum = 2;
+ int64 memorySize = 3;
+ int64 localFileNum = 4;
+ int64 localTotalSize = 5;
+ int64 hadoopFileNum = 6;
+ int64 hadoopTotalSize = 7;
+ int64 totalSize = 8;
+}
+
message ShuffleServerHeartBeatRequest {
ShuffleServerId serverId = 1;
int64 usedMemory = 2;
@@ -286,6 +297,7 @@ message ShuffleServerHeartBeatRequest {
optional string version = 22;
optional string gitCommitId = 23;
optional int64 startTimeMs = 24;
+ repeated ApplicationInfo applicationInfo = 25;
}
message ShuffleServerHeartBeatResponse {
diff --git
a/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
b/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
index 4b2d4607a..5a44728b5 100644
--- a/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
+++ b/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
@@ -17,6 +17,7 @@
package org.apache.uniffle.server;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
@@ -33,6 +34,7 @@ import org.apache.uniffle.common.ServerStatus;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.storage.StorageInfo;
import org.apache.uniffle.common.util.ThreadUtils;
+import org.apache.uniffle.proto.RssProtos;
public class RegisterHeartBeat {
@@ -84,7 +86,8 @@ public class RegisterHeartBeat {
shuffleServer.getStorageManager().getStorageInfo(),
shuffleServer.getNettyPort(),
shuffleServer.getJettyPort(),
- shuffleServer.getStartTimeMs());
+ shuffleServer.getStartTimeMs(),
+ shuffleServer.getAppInfos());
} catch (Exception e) {
LOG.warn("Error happened when send heart beat to coordinator");
}
@@ -107,7 +110,8 @@ public class RegisterHeartBeat {
Map<String, StorageInfo> localStorageInfo,
int nettyPort,
int jettyPort,
- long startTimeMs) {
+ long startTimeMs,
+ List<RssProtos.ApplicationInfo> appInfos) {
// use `rss.server.heartbeat.interval` as the timeout option
RssSendHeartBeatRequest request =
new RssSendHeartBeatRequest(
@@ -124,7 +128,8 @@ public class RegisterHeartBeat {
localStorageInfo,
nettyPort,
jettyPort,
- startTimeMs);
+ startTimeMs,
+ appInfos);
if (coordinatorClient.sendHeartBeat(request).getStatusCode() ==
StatusCode.SUCCESS) {
return true;
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index 92fa6b36b..59f53c97a 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -17,8 +17,10 @@
package org.apache.uniffle.server;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@@ -57,6 +59,7 @@ import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.common.web.CoalescedCollectorRegistry;
import org.apache.uniffle.common.web.JettyServer;
+import org.apache.uniffle.proto.RssProtos;
import org.apache.uniffle.server.buffer.ShuffleBufferManager;
import org.apache.uniffle.server.buffer.ShuffleBufferType;
import org.apache.uniffle.server.merge.ShuffleMergeManager;
@@ -584,6 +587,26 @@ public class ShuffleServer {
return startTimeMs;
}
+ public List<RssProtos.ApplicationInfo> getAppInfos() {
+ List<RssProtos.ApplicationInfo> appInfos = new ArrayList<>();
+ Map<String, ShuffleTaskInfo> taskInfos =
getShuffleTaskManager().getShuffleTaskInfos();
+ taskInfos.forEach(
+ (appId, taskInfo) -> {
+ RssProtos.ApplicationInfo applicationInfo =
+ RssProtos.ApplicationInfo.newBuilder()
+ .setAppId(appId)
+ .setPartitionNum(taskInfo.getPartitionNum())
+ .setMemorySize(taskInfo.getInMemoryDataSize())
+ .setLocalTotalSize(taskInfo.getOnLocalFileDataSize())
+ .setHadoopTotalSize(taskInfo.getOnHadoopDataSize())
+ .setTotalSize(taskInfo.getTotalDataSize())
+ .build();
+
+ appInfos.add(applicationInfo);
+ });
+ return appInfos;
+ }
+
@VisibleForTesting
public void sendHeartbeat() {
ShuffleServer shuffleServer = this;
@@ -600,7 +623,8 @@ public class ShuffleServer {
shuffleServer.getStorageManager().getStorageInfo(),
shuffleServer.getNettyPort(),
shuffleServer.getJettyPort(),
- shuffleServer.getStartTimeMs());
+ shuffleServer.getStartTimeMs(),
+ shuffleServer.getAppInfos());
}
public ShuffleMergeManager getShuffleMergeManager() {
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
index d4e6eeb32..94987d661 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
@@ -283,6 +283,10 @@ public class ShuffleTaskInfo {
return shuffleDetailInfos.get(shuffleId);
}
+ public long getPartitionNum() {
+ return partitionDataSizes.values().stream().mapToLong(Map::size).sum();
+ }
+
@Override
public String toString() {
return "ShuffleTaskInfo{"