This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new a498b1137 [CELEBORN-1984] Merge ResourceRequest to 
transportMessageProtobuf
a498b1137 is described below

commit a498b1137f37db308aac3b73e35ae6159ed63237
Author: zhaohehuhu <[email protected]>
AuthorDate: Fri Aug 1 23:28:32 2025 +0800

    [CELEBORN-1984] Merge ResourceRequest to transportMessageProtobuf
    
    ### What changes were proposed in this pull request?
    as title
    
    ### Why are the changes needed?
    
    Merge Resource.proto into TransportMessages.proto as per the below design
    
    
https://cwiki.apache.org/confluence/display/CELEBORN/CIP-16+Merge+transport+proto+and+resource+proto+files
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    Closes #3231 from zhaohehuhu/dev-0425.
    
    Lead-authored-by: zhaohehuhu <[email protected]>
    Co-authored-by: mingji <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 common/src/main/proto/TransportMessages.proto      | 136 +++++++++++++++++++++
 .../deploy/master/clustermeta/MetaUtil.java        |  49 ++++++++
 .../deploy/master/clustermeta/ha/HAHelper.java     |  10 +-
 .../deploy/master/clustermeta/ha/MetaHandler.java  |  62 ++++++----
 .../deploy/master/clustermeta/ha/StateMachine.java |  10 +-
 master/src/main/proto/Resource.proto               |   2 +-
 .../clustermeta/ha/MasterStateMachineSuiteJ.java   |  39 +++---
 .../ha/RatisMasterStatusSystemSuiteJ.java          |  80 ++++++++++++
 8 files changed, 339 insertions(+), 49 deletions(-)

diff --git a/common/src/main/proto/TransportMessages.proto 
b/common/src/main/proto/TransportMessages.proto
index 6051b5901..76ec5b457 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -208,6 +208,18 @@ message PbRegisterWorker {
   string networkLocation = 11;
 }
 
+message PbMetaRegisterWorkerRequest {
+  string host = 1;
+  int32 rpcPort = 2;
+  int32 pushPort = 3;
+  int32 fetchPort = 4;
+  int32 replicatePort = 5;
+  map<string, PbDiskInfo> disks = 6;
+  map<string, PbResourceConsumption> userResourceConsumption = 7;
+  int32 internalPort = 8;
+  string networkLocation = 9;
+}
+
 message PbHeartbeatFromWorker {
   string host = 1;
   int32 rpcPort = 2;
@@ -223,6 +235,20 @@ message PbHeartbeatFromWorker {
   PbWorkerStatus workerStatus = 12;
 }
 
+message PbMetaWorkerHeartbeatRequest {
+  string host = 1;
+  int32 rpcPort = 2;
+  int32 pushPort = 3;
+  int32 fetchPort = 4;
+  int32 replicatePort = 5;
+  map<string, PbDiskInfo> disks = 6;
+  int64 time = 7;
+  map<string, PbResourceConsumption> userResourceConsumption = 8;
+  map<string, int64> estimatedAppDiskUsage = 9; // deprecated
+  bool highWorkload = 10;
+  PbWorkerStatus workerStatus = 11;
+}
+
 message PbWorkerStatus {
   enum State {
     Normal = 0;
@@ -437,12 +463,20 @@ message PbUnregisterShuffle {
   string requestId = 3;
 }
 
+message PbMetaUnregisterShuffle {
+  string shuffleKey = 1;
+}
+
 message PbBatchUnregisterShuffles {
   string appId = 1;
   string requestId = 2;
   repeated int32 shuffleIds = 3;
 }
 
+message PbMetaBatchUnregisterShuffles {
+  repeated string shuffleKeys = 1;
+}
+
 message PbUnregisterShuffleResponse {
   int32 status = 1;
 }
@@ -461,6 +495,14 @@ message PbApplicationLostResponse {
   int32 status = 1;
 }
 
+message PbHeartbeatInfo {
+  string appId = 1;
+  int64 totalWritten = 2;
+  int64 fileCount = 3;
+  int64 shuffleCount = 4;
+  map<string, int64> shuffleFallbackCounts = 5;
+}
+
 message PbHeartbeatFromApplication {
   string appId = 1;
   int64 totalWritten = 2;
@@ -472,6 +514,7 @@ message PbHeartbeatFromApplication {
   map<string, int64> shuffleFallbackCounts = 8;
   int64 applicationCount = 9;
   map<string, int64> applicationFallbackCounts = 10;
+  PbHeartbeatInfo heartbeatInfo = 11;
 }
 
 message PbHeartbeatFromApplicationResponse {
@@ -939,3 +982,96 @@ message PbReadReducerPartitionEndResponse {
   int32 status = 1;
   string errorMsg = 2;
 }
+
+enum PbMetaRequestType {
+  Unknown = 0;
+
+  UnRegisterShuffle = 12;
+  RequestSlots = 13;
+  //deprecated
+  ReleaseSlots = 14;
+  AppHeartbeat = 15;
+  AppLost = 16;
+  WorkerLost = 17;
+  WorkerHeartbeat = 18;
+  RegisterWorker = 19;
+  ReportWorkerUnavailable = 20;
+  UpdatePartitionSize = 21;
+  WorkerRemove = 22;
+  RemoveWorkersUnavailableInfo = 23;
+  WorkerExclude = 24;
+  WorkerEvent = 25;
+  ApplicationMeta = 26;
+  ReportWorkerDecommission = 27;
+  BatchUnRegisterShuffle = 28;
+  ReviseLostShuffles = 29;
+}
+
+message PbMetaRequest {
+  PbMetaRequestType metaRequestType = 1;
+  string requestId = 2;
+
+  PbMetaRequestSlotsRequest requestSlotsRequest = 10;
+  // deprecated
+  PbReleaseSlots releaseSlotsRequest = 11;
+  PbMetaUnregisterShuffle unregisterShuffleRequest = 12;
+  PbMetaAppHeartbeatRequest appHeartbeatRequest = 13;
+  PbApplicationLost appLostRequest = 14;
+  PbWorkerAddress workerLostRequest = 15;
+  PbMetaWorkerHeartbeatRequest workerHeartbeatRequest = 16;
+  PbMetaRegisterWorkerRequest registerWorkerRequest = 17;
+  PbReportWorkerUnavailable reportWorkerUnavailableRequest = 18;
+  PbWorkerAddress workerRemoveRequest = 19;
+  PbRemoveWorkersUnavailableInfo removeWorkersUnavailableInfoRequest = 20;
+  PbMetaWorkerExcludeRequest workerExcludeRequest = 21;
+  PbWorkerEventRequest workerEventRequest = 22;
+  PbApplicationMeta applicationMetaRequest = 23;
+  PbReportWorkerDecommission reportWorkerDecommissionRequest = 24;
+  PbMetaBatchUnregisterShuffles batchUnregisterShuffleRequest = 25;
+  PbReviseLostShuffles reviseLostShufflesRequest = 102;
+}
+
+message PbMetaRequestSlotsRequest {
+  string shuffleKey = 1;
+  string hostName = 2;
+  map<string, PbSlotInfo> workerAllocations = 3;
+}
+
+message PbMetaAppHeartbeatRequest {
+  string appId = 1;
+  int64 time = 2;
+  int64 totalWritten = 3;
+  int64 fileCount = 4;
+  int64 shuffleCount = 5;
+  map<string, int64> shuffleFallbackCounts = 6;
+  PbHeartbeatInfo heartbeatInfo = 7;
+  int64 applicationCount = 8;
+  map<string, int64> applicationFallbackCounts = 9;
+}
+
+message PbMetaWorkerExcludeRequest {
+  repeated PbWorkerAddress workersToAdd = 1;
+  repeated PbWorkerAddress workersToRemove = 2;
+}
+
+message PbWorkerAddress {
+  string host = 1;
+  int32 rpcPort = 2;
+  int32 pushPort = 3;
+  int32 fetchPort = 4;
+  int32 replicatePort = 5;
+  int32 internalPort = 6;
+}
+
+enum PbMetaRequestStatus {
+  UNKNOWN = 0;
+  OK = 1;
+  INTERNAL_ERROR= 2;
+}
+
+message PbMetaRequestResponse {
+  PbMetaRequestType metaRequestType = 1;
+  bool success = 2;
+  string message = 3;
+  PbMetaRequestStatus status = 4;
+}
\ No newline at end of file
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/MetaUtil.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/MetaUtil.java
index 1937dec25..fb26a4e6f 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/MetaUtil.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/MetaUtil.java
@@ -26,6 +26,10 @@ import org.apache.celeborn.common.identity.UserIdentifier$;
 import org.apache.celeborn.common.meta.DiskInfo;
 import org.apache.celeborn.common.meta.WorkerInfo;
 import org.apache.celeborn.common.meta.WorkerStatus;
+import org.apache.celeborn.common.protocol.PbDiskInfo;
+import org.apache.celeborn.common.protocol.PbWorkerAddress;
+import org.apache.celeborn.common.protocol.PbWorkerInfo;
+import org.apache.celeborn.common.protocol.PbWorkerStatus;
 import org.apache.celeborn.common.protocol.StorageInfo;
 import org.apache.celeborn.common.quota.ResourceConsumption;
 import org.apache.celeborn.common.util.CollectionUtils;
@@ -44,6 +48,26 @@ public class MetaUtil {
         address.getInternalPort());
   }
 
+  public static WorkerInfo addrToInfo(PbWorkerAddress address) {
+    return new WorkerInfo(
+        address.getHost(),
+        address.getRpcPort(),
+        address.getPushPort(),
+        address.getFetchPort(),
+        address.getReplicatePort(),
+        address.getInternalPort());
+  }
+
+  public static WorkerInfo pbInfoToInfo(PbWorkerInfo pbWorkerInfo) {
+    return new WorkerInfo(
+        pbWorkerInfo.getHost(),
+        pbWorkerInfo.getRpcPort(),
+        pbWorkerInfo.getPushPort(),
+        pbWorkerInfo.getFetchPort(),
+        pbWorkerInfo.getReplicatePort(),
+        pbWorkerInfo.getInternalPort());
+  }
+
   public static ResourceProtos.WorkerAddress infoToAddr(WorkerInfo info) {
     return ResourceProtos.WorkerAddress.newBuilder()
         .setHost(info.host())
@@ -76,6 +100,26 @@ public class MetaUtil {
     return map;
   }
 
+  public static Map<String, DiskInfo> fromPbDiskInfoMap(Map<String, 
PbDiskInfo> pbDiskInfoMap) {
+    Map<String, DiskInfo> map = new HashMap<>();
+
+    pbDiskInfoMap.forEach(
+        (k, v) -> {
+          DiskInfo diskInfo =
+              new DiskInfo(
+                      v.getMountPoint(),
+                      v.getUsableSpace(),
+                      v.getAvgFlushTime(),
+                      v.getAvgFetchTime(),
+                      v.getUsedSlots(),
+                      StorageInfo.typesMap.get(v.getStorageType()))
+                  .setStatus(Utils.toDiskStatus(v.getStatus()))
+                  .setTotalSpace(v.getTotalSpace());
+          map.put(k, diskInfo);
+        });
+    return map;
+  }
+
   public static Map<String, ResourceProtos.DiskInfo> toPbDiskInfos(
       Map<String, DiskInfo> diskInfos) {
     Map<String, ResourceProtos.DiskInfo> map = new HashMap<>();
@@ -167,4 +211,9 @@ public class MetaUtil {
   public static WorkerStatus fromPbWorkerStatus(ResourceProtos.WorkerStatus 
workerStatus) {
     return new WorkerStatus(workerStatus.getState().getNumber(), 
workerStatus.getStateStartTime());
   }
+
+  public static WorkerStatus fromPbWorkerStatus(PbWorkerStatus pbWorkerStatus) 
{
+    return new WorkerStatus(
+        pbWorkerStatus.getState().getNumber(), 
pbWorkerStatus.getStateStartTime());
+  }
 }
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java
index 71bcaf064..71d6cd1e1 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java
@@ -28,6 +28,7 @@ import 
org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 
 import org.apache.celeborn.common.client.MasterNotLeaderException;
 import org.apache.celeborn.common.exception.CelebornIOException;
+import org.apache.celeborn.common.protocol.PbMetaRequest;
 import org.apache.celeborn.common.rpc.RpcCallContext;
 import 
org.apache.celeborn.service.deploy.master.clustermeta.AbstractMetaManager;
 import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos;
@@ -106,10 +107,15 @@ public class HAHelper {
     return ByteString.copyFrom(requestBytes);
   }
 
-  public static ResourceProtos.ResourceRequest 
convertByteStringToRequest(ByteString byteString)
+  public static ByteString convertRequestToByteString(PbMetaRequest request) {
+    byte[] requestBytes = request.toByteArray();
+    return ByteString.copyFrom(requestBytes);
+  }
+
+  public static PbMetaRequest convertByteStringToRequest(ByteString byteString)
       throws InvalidProtocolBufferException {
     byte[] bytes = byteString.toByteArray();
-    return ResourceProtos.ResourceRequest.parseFrom(bytes);
+    return PbMetaRequest.parseFrom(bytes);
   }
 
   public static Message 
convertResponseToMessage(ResourceProtos.ResourceResponse response) {
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
index abe21a4de..8cbf4de95 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
@@ -32,6 +32,7 @@ import org.apache.celeborn.common.meta.ApplicationMeta;
 import org.apache.celeborn.common.meta.DiskInfo;
 import org.apache.celeborn.common.meta.WorkerInfo;
 import org.apache.celeborn.common.meta.WorkerStatus;
+import org.apache.celeborn.common.protocol.*;
 import org.apache.celeborn.common.quota.ResourceConsumption;
 import org.apache.celeborn.common.util.CollectionUtils;
 import org.apache.celeborn.service.deploy.master.clustermeta.MetaUtil;
@@ -69,6 +70,14 @@ public class MetaHandler {
         .setSuccess(true);
   }
 
+  public static 
org.apache.celeborn.common.protocol.PbMetaRequestResponse.Builder
+      getMasterMetaResponseBuilder(PbMetaRequest request) {
+    return 
org.apache.celeborn.common.protocol.PbMetaRequestResponse.newBuilder()
+        .setMetaRequestType(request.getMetaRequestType())
+        .setStatus(PbMetaRequestStatus.OK)
+        .setSuccess(true);
+  }
+
   public ResourceResponse handleReadRequest(ResourceProtos.ResourceRequest 
request) {
     ResourceProtos.Type cmdType = request.getCmdType();
     ResourceResponse.Builder responseBuilder = 
getMasterMetaResponseBuilder(request);
@@ -88,9 +97,11 @@ public class MetaHandler {
     return responseBuilder.build();
   }
 
-  public ResourceResponse handleWriteRequest(ResourceProtos.ResourceRequest 
request) {
-    ResourceProtos.Type cmdType = request.getCmdType();
-    ResourceResponse.Builder responseBuilder = 
getMasterMetaResponseBuilder(request);
+  public org.apache.celeborn.common.protocol.PbMetaRequestResponse 
handleWriteRequest(
+      PbMetaRequest request) {
+    PbMetaRequestType metaRequestType = request.getMetaRequestType();
+    org.apache.celeborn.common.protocol.PbMetaRequestResponse.Builder 
responseBuilder =
+        getMasterMetaResponseBuilder(request);
     try {
       String shuffleKey;
       String appId;
@@ -103,7 +114,7 @@ public class MetaHandler {
       Map<UserIdentifier, ResourceConsumption> userResourceConsumption;
       WorkerStatus workerStatus;
       List<Integer> lostShuffles;
-      switch (cmdType) {
+      switch (metaRequestType) {
         case ReviseLostShuffles:
           appId = request.getReviseLostShufflesRequest().getAppId();
           lostShuffles = 
request.getReviseLostShufflesRequest().getLostShufflesList();
@@ -142,11 +153,7 @@ public class MetaHandler {
           long fileCount = request.getAppHeartbeatRequest().getFileCount();
           long shuffleCount = 
request.getAppHeartbeatRequest().getShuffleCount();
           long applicationCount = 
request.getAppHeartbeatRequest().getApplicationCount();
-          LOG.debug(
-              "Handle app heartbeat for {} with shuffle count {} and 
application count {}",
-              appId,
-              shuffleCount,
-              applicationCount);
+          LOG.debug("Handle app heartbeat for {} with shuffle count {}", 
appId, shuffleCount);
           Map<String, Long> shuffleFallbackCounts =
               request.getAppHeartbeatRequest().getShuffleFallbackCountsMap();
           if (CollectionUtils.isNotEmpty(shuffleFallbackCounts)) {
@@ -175,9 +182,9 @@ public class MetaHandler {
           break;
 
         case WorkerExclude:
-          List<ResourceProtos.WorkerAddress> addAddresses =
+          List<PbWorkerAddress> addAddresses =
               request.getWorkerExcludeRequest().getWorkersToAddList();
-          List<ResourceProtos.WorkerAddress> removeAddresses =
+          List<PbWorkerAddress> removeAddresses =
               request.getWorkerExcludeRequest().getWorkersToRemoveList();
           List<WorkerInfo> workersToAdd =
               
addAddresses.stream().map(MetaUtil::addrToInfo).collect(Collectors.toList());
@@ -212,7 +219,8 @@ public class MetaHandler {
           rpcPort = request.getWorkerHeartbeatRequest().getRpcPort();
           pushPort = request.getWorkerHeartbeatRequest().getPushPort();
           fetchPort = request.getWorkerHeartbeatRequest().getFetchPort();
-          diskInfos = 
MetaUtil.fromPbDiskInfos(request.getWorkerHeartbeatRequest().getDisksMap());
+          Map<String, PbDiskInfo> pbDiskInfoMap = 
request.getWorkerHeartbeatRequest().getDisksMap();
+          diskInfos = MetaUtil.fromPbDiskInfoMap(pbDiskInfoMap);
           replicatePort = 
request.getWorkerHeartbeatRequest().getReplicatePort();
           boolean highWorkload = 
request.getWorkerHeartbeatRequest().getHighWorkload();
           if (request.getWorkerHeartbeatRequest().hasWorkerStatus()) {
@@ -237,7 +245,7 @@ public class MetaHandler {
               fetchPort,
               replicatePort,
               diskInfos,
-              request.getWorkerHeartbeatRequest().getTime(),
+              
request.getWorkerHeartbeatRequest().getWorkerStatus().getStateStartTime(),
               workerStatus,
               highWorkload);
           break;
@@ -250,7 +258,8 @@ public class MetaHandler {
           replicatePort = 
request.getRegisterWorkerRequest().getReplicatePort();
           String networkLocation = 
request.getRegisterWorkerRequest().getNetworkLocation();
           int internalPort = 
request.getRegisterWorkerRequest().getInternalPort();
-          diskInfos = 
MetaUtil.fromPbDiskInfos(request.getRegisterWorkerRequest().getDisksMap());
+          Map<String, PbDiskInfo> pbDiskInfo = 
request.getRegisterWorkerRequest().getDisksMap();
+          diskInfos = MetaUtil.fromPbDiskInfoMap(pbDiskInfo);
           LOG.debug(
               "Handle worker register for {} {} {} {} {} {} {}",
               host,
@@ -272,10 +281,10 @@ public class MetaHandler {
           break;
 
         case ReportWorkerUnavailable:
-          List<ResourceProtos.WorkerAddress> failedAddress =
+          List<PbWorkerInfo> failedAddress =
               request.getReportWorkerUnavailableRequest().getUnavailableList();
           List<WorkerInfo> failedWorkers =
-              
failedAddress.stream().map(MetaUtil::addrToInfo).collect(Collectors.toList());
+              
failedAddress.stream().map(MetaUtil::pbInfoToInfo).collect(Collectors.toList());
           metaSystem.updateMetaByReportWorkerUnavailable(failedWorkers);
           break;
 
@@ -284,18 +293,17 @@ public class MetaHandler {
           break;
 
         case RemoveWorkersUnavailableInfo:
-          List<ResourceProtos.WorkerAddress> unavailableList =
-              
request.getRemoveWorkersUnavailableInfoRequest().getUnavailableList();
+          List<PbWorkerInfo> unavailableList =
+              
request.getRemoveWorkersUnavailableInfoRequest().getWorkerInfoList();
           List<WorkerInfo> unavailableWorkers =
-              
unavailableList.stream().map(MetaUtil::addrToInfo).collect(Collectors.toList());
+              
unavailableList.stream().map(MetaUtil::pbInfoToInfo).collect(Collectors.toList());
           metaSystem.removeWorkersUnavailableInfoMeta(unavailableWorkers);
           break;
 
         case WorkerEvent:
-          List<ResourceProtos.WorkerAddress> workerAddresses =
-              request.getWorkerEventRequest().getWorkerAddressList();
+          List<PbWorkerInfo> workerAddresses = 
request.getWorkerEventRequest().getWorkersList();
           List<WorkerInfo> workerInfoList =
-              
workerAddresses.stream().map(MetaUtil::addrToInfo).collect(Collectors.toList());
+              
workerAddresses.stream().map(MetaUtil::pbInfoToInfo).collect(Collectors.toList());
           metaSystem.updateWorkerEventMeta(
               
request.getWorkerEventRequest().getWorkerEventType().getNumber(), 
workerInfoList);
           break;
@@ -307,21 +315,21 @@ public class MetaHandler {
           break;
 
         case ReportWorkerDecommission:
-          List<ResourceProtos.WorkerAddress> decommissionList =
+          List<PbWorkerInfo> decommissionList =
               request.getReportWorkerDecommissionRequest().getWorkersList();
           List<WorkerInfo> decommissionWorkers =
-              
decommissionList.stream().map(MetaUtil::addrToInfo).collect(Collectors.toList());
+              
decommissionList.stream().map(MetaUtil::pbInfoToInfo).collect(Collectors.toList());
           metaSystem.updateMetaByReportWorkerDecommission(decommissionWorkers);
           break;
 
         default:
           throw new IOException("Can not parse this command!" + request);
       }
-      responseBuilder.setStatus(ResourceProtos.Status.OK);
+      responseBuilder.setStatus(PbMetaRequestStatus.OK);
     } catch (IOException e) {
-      LOG.warn("Handle meta write request {} failed!", cmdType, e);
+      LOG.warn("Handle meta write request {} failed!", metaRequestType, e);
       responseBuilder.setSuccess(false);
-      responseBuilder.setStatus(ResourceProtos.Status.INTERNAL_ERROR);
+      responseBuilder.setStatus(PbMetaRequestStatus.INTERNAL_ERROR);
       if (e.getMessage() != null) {
         responseBuilder.setMessage(e.getMessage());
       }
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java
index 40d23c59f..65ffa1f4a 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java
@@ -54,6 +54,8 @@ import org.apache.ratis.util.MD5FileUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.celeborn.common.protocol.PbMetaRequest;
+import org.apache.celeborn.common.protocol.PbMetaRequestResponse;
 import org.apache.celeborn.common.util.ThreadUtils;
 import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos;
 import 
org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos.ResourceResponse;
@@ -174,7 +176,7 @@ public class StateMachine extends BaseStateMachine {
   @Override
   public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
     try {
-      ResourceProtos.ResourceRequest request =
+      PbMetaRequest request =
           
HAHelper.convertByteStringToRequest(trx.getStateMachineLogEntry().getLogData());
       long trxLogIndex = trx.getLogEntry().getIndex();
       // In the current approach we have one single global thread executor.
@@ -184,14 +186,14 @@ public class StateMachine extends BaseStateMachine {
       // chance that Master replica can be out of sync.
       // Ref: from Ozone project (OzoneManagerStateMachine)
       CompletableFuture<Message> ratisFuture = new CompletableFuture<>();
-      CompletableFuture<ResourceResponse> future =
+      CompletableFuture<PbMetaRequestResponse> future =
           CompletableFuture.supplyAsync(() -> runCommand(request, 
trxLogIndex), executorService);
       future.thenApply(
           response -> {
             if (!response.getSuccess()) {
               LOG.warn(
                   "Failed to apply log {} for this raft group {}!",
-                  request.getCmdType(),
+                  request.getMetaRequestType(),
                   this.raftGroupId);
             }
 
@@ -212,7 +214,7 @@ public class StateMachine extends BaseStateMachine {
    * @return response from meta system
    */
   @VisibleForTesting
-  protected ResourceResponse runCommand(ResourceProtos.ResourceRequest 
request, long trxLogIndex) {
+  protected PbMetaRequestResponse runCommand(PbMetaRequest request, long 
trxLogIndex) {
     try {
       return metaHandler.handleWriteRequest(request);
     } catch (Throwable e) {
diff --git a/master/src/main/proto/Resource.proto 
b/master/src/main/proto/Resource.proto
index c8e9c2aee..e3c688aa1 100644
--- a/master/src/main/proto/Resource.proto
+++ b/master/src/main/proto/Resource.proto
@@ -88,7 +88,7 @@ message DiskInfo {
   int64 usedSlots = 4;
   int32 status = 5;
   int64 avgFetchTime = 6;
-  int32 storageType =7;
+  int32 storageType = 7;
   int64 totalSpace = 8;
 }
 
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
index 2b03afd56..46a9c68e6 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
@@ -29,6 +29,8 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.statemachine.SnapshotInfo;
 import org.junit.Assert;
@@ -40,55 +42,62 @@ import org.apache.celeborn.common.client.MasterClient;
 import org.apache.celeborn.common.identity.UserIdentifier;
 import org.apache.celeborn.common.meta.DiskInfo;
 import org.apache.celeborn.common.meta.WorkerInfo;
+import org.apache.celeborn.common.protocol.PbMetaRequest;
+import org.apache.celeborn.common.protocol.PbMetaRequestResponse;
+import org.apache.celeborn.common.protocol.PbMetaRequestSlotsRequest;
+import org.apache.celeborn.common.protocol.PbSlotInfo;
 import org.apache.celeborn.common.quota.ResourceConsumption;
 import org.apache.celeborn.common.rpc.RpcEnv;
 import org.apache.celeborn.common.util.JavaUtils;
 import org.apache.celeborn.common.util.Utils;
 import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos;
-import 
org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos.RequestSlotsRequest;
-import 
org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos.ResourceRequest;
-import 
org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos.ResourceResponse;
-import 
org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos.Type;
 
 public class MasterStateMachineSuiteJ extends RatisBaseSuiteJ {
 
   private final AtomicLong callerId = new AtomicLong();
 
   @Test
-  public void testRunCommand() {
+  public void testRunCommandByTransportMessage() throws 
InvalidProtocolBufferException {
     StateMachine stateMachine = ratisServer.getMasterStateMachine();
 
     Map<String, Integer> allocations = new HashMap<>();
     allocations.put("disk1", 15);
     allocations.put("disk2", 20);
 
-    Map<String, ResourceProtos.SlotInfo> workerAllocations = new HashMap<>();
+    Map<String, PbSlotInfo> workerAllocations = new HashMap<>();
     workerAllocations.put(
         new WorkerInfo("host1", 1, 2, 3, 10).toUniqueId(),
-        ResourceProtos.SlotInfo.newBuilder().putAllSlot(allocations).build());
+        PbSlotInfo.newBuilder().putAllSlot(allocations).build());
     workerAllocations.put(
         new WorkerInfo("host2", 2, 3, 4, 11).toUniqueId(),
-        ResourceProtos.SlotInfo.newBuilder().putAllSlot(allocations).build());
+        PbSlotInfo.newBuilder().putAllSlot(allocations).build());
     workerAllocations.put(
         new WorkerInfo("host3", 3, 4, 5, 12).toUniqueId(),
-        ResourceProtos.SlotInfo.newBuilder().putAllSlot(allocations).build());
+        PbSlotInfo.newBuilder().putAllSlot(allocations).build());
 
-    RequestSlotsRequest requestSlots =
-        RequestSlotsRequest.newBuilder()
+    PbMetaRequestSlotsRequest requestSlots =
+        PbMetaRequestSlotsRequest.newBuilder()
             .setShuffleKey("appId-1-1")
             .setHostName("hostname")
             .putAllWorkerAllocations(workerAllocations)
             .build();
 
-    ResourceRequest request =
-        ResourceRequest.newBuilder()
+    PbMetaRequest request =
+        PbMetaRequest.newBuilder()
             .setRequestSlotsRequest(requestSlots)
-            .setCmdType(Type.RequestSlots)
+            
.setMetaRequestType(org.apache.celeborn.common.protocol.PbMetaRequestType.RequestSlots)
             .setRequestId(UUID.randomUUID().toString())
             .build();
 
-    ResourceResponse response = stateMachine.runCommand(request, -1);
+    PbMetaRequestResponse response = stateMachine.runCommand(request, -1);
     Assert.assertTrue(response.getSuccess());
+
+    ByteString byteString = request.toByteString();
+    ResourceProtos.ResourceRequest resourceRequest =
+        ResourceProtos.ResourceRequest.parseFrom(byteString);
+    Assert.assertEquals("appId-1-1", 
resourceRequest.getRequestSlotsRequest().getShuffleKey());
+    Assert.assertEquals("hostname", 
resourceRequest.getRequestSlotsRequest().getHostName());
+    Assert.assertEquals(3, 
resourceRequest.getRequestSlotsRequest().getWorkerAllocations().size());
   }
 
   @Test
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index be644869f..7c66cccec 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -1497,6 +1497,86 @@ public class RatisMasterStatusSystemSuiteJ {
     Assert.assertEquals(2, STATUSSYSTEM3.availableWorkers.size());
   }
 
+  @Test
+  public void testHandleWorkerEventByTransportMessage() throws 
InterruptedException {
+    AbstractMetaManager statusSystem = pickLeaderStatusSystem();
+    Assert.assertNotNull(statusSystem);
+
+    statusSystem.handleRegisterWorker(
+        HOSTNAME1,
+        RPCPORT1,
+        PUSHPORT1,
+        FETCHPORT1,
+        REPLICATEPORT1,
+        INTERNALPORT1,
+        NETWORK_LOCATION1,
+        disks1,
+        userResourceConsumption1,
+        getNewReqeustId());
+    statusSystem.handleRegisterWorker(
+        HOSTNAME2,
+        RPCPORT2,
+        PUSHPORT2,
+        FETCHPORT2,
+        REPLICATEPORT2,
+        INTERNALPORT2,
+        NETWORK_LOCATION2,
+        disks2,
+        userResourceConsumption2,
+        getNewReqeustId());
+    statusSystem.handleRegisterWorker(
+        HOSTNAME3,
+        RPCPORT3,
+        PUSHPORT3,
+        FETCHPORT3,
+        REPLICATEPORT3,
+        INTERNALPORT3,
+        NETWORK_LOCATION3,
+        disks3,
+        userResourceConsumption3,
+        getNewReqeustId());
+
+    WorkerInfo workerInfo1 =
+        WorkerInfo.fromUniqueId(
+            HOSTNAME1 + ":" + RPCPORT1 + ":" + PUSHPORT1 + ":" + FETCHPORT1 + 
":" + REPLICATEPORT1);
+    WorkerInfo workerInfo2 =
+        WorkerInfo.fromUniqueId(
+            HOSTNAME2 + ":" + RPCPORT2 + ":" + PUSHPORT2 + ":" + FETCHPORT2 + 
":" + REPLICATEPORT2);
+    statusSystem.handleWorkerEvent(
+        WorkerEventType.Decommission_VALUE,
+        Lists.newArrayList(workerInfo1, workerInfo2),
+        getNewReqeustId());
+
+    Thread.sleep(3000L);
+    Assert.assertEquals(2, STATUSSYSTEM1.workerEventInfos.size());
+    Assert.assertEquals(2, STATUSSYSTEM2.workerEventInfos.size());
+    Assert.assertEquals(2, STATUSSYSTEM3.workerEventInfos.size());
+
+    Assert.assertEquals(1, STATUSSYSTEM1.availableWorkers.size());
+    Assert.assertEquals(1, STATUSSYSTEM2.availableWorkers.size());
+    Assert.assertEquals(1, STATUSSYSTEM3.availableWorkers.size());
+
+    Assert.assertTrue(STATUSSYSTEM1.workerEventInfos.containsKey(workerInfo1));
+    Assert.assertTrue(STATUSSYSTEM1.workerEventInfos.containsKey(workerInfo2));
+
+    Assert.assertEquals(
+        WorkerEventType.Decommission,
+        STATUSSYSTEM1.workerEventInfos.get(workerInfo1).getEventType());
+
+    statusSystem.handleWorkerEvent(
+        WorkerEventType.None_VALUE, Lists.newArrayList(workerInfo1), 
getNewReqeustId());
+    Thread.sleep(3000L);
+    Assert.assertEquals(1, STATUSSYSTEM1.workerEventInfos.size());
+    Assert.assertEquals(1, STATUSSYSTEM2.workerEventInfos.size());
+    Assert.assertEquals(1, STATUSSYSTEM3.workerEventInfos.size());
+    Assert.assertTrue(STATUSSYSTEM1.workerEventInfos.containsKey(workerInfo2));
+    Assert.assertEquals(
+        WorkerEventType.Decommission,
+        STATUSSYSTEM1.workerEventInfos.get(workerInfo2).getEventType());
+
+    Assert.assertEquals(2, STATUSSYSTEM1.availableWorkers.size());
+  }
+
   @Test
   public void testReviseShuffles() throws InterruptedException {
     AbstractMetaManager statusSystem = pickLeaderStatusSystem();

Reply via email to