This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new a498b1137 [CELEBORN-1984] Merge ResourceRequest to
transportMessageProtobuf
a498b1137 is described below
commit a498b1137f37db308aac3b73e35ae6159ed63237
Author: zhaohehuhu <[email protected]>
AuthorDate: Fri Aug 1 23:28:32 2025 +0800
[CELEBORN-1984] Merge ResourceRequest to transportMessageProtobuf
### What changes were proposed in this pull request?
as title
### Why are the changes needed?
Merge Resource.proto into TransportMessages.proto as per the below design
https://cwiki.apache.org/confluence/display/CELEBORN/CIP-16+Merge+transport+proto+and+resource+proto+files
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes #3231 from zhaohehuhu/dev-0425.
Lead-authored-by: zhaohehuhu <[email protected]>
Co-authored-by: mingji <[email protected]>
Signed-off-by: mingji <[email protected]>
---
common/src/main/proto/TransportMessages.proto | 136 +++++++++++++++++++++
.../deploy/master/clustermeta/MetaUtil.java | 49 ++++++++
.../deploy/master/clustermeta/ha/HAHelper.java | 10 +-
.../deploy/master/clustermeta/ha/MetaHandler.java | 62 ++++++----
.../deploy/master/clustermeta/ha/StateMachine.java | 10 +-
master/src/main/proto/Resource.proto | 2 +-
.../clustermeta/ha/MasterStateMachineSuiteJ.java | 39 +++---
.../ha/RatisMasterStatusSystemSuiteJ.java | 80 ++++++++++++
8 files changed, 339 insertions(+), 49 deletions(-)
diff --git a/common/src/main/proto/TransportMessages.proto
b/common/src/main/proto/TransportMessages.proto
index 6051b5901..76ec5b457 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -208,6 +208,18 @@ message PbRegisterWorker {
string networkLocation = 11;
}
+message PbMetaRegisterWorkerRequest {
+ string host = 1;
+ int32 rpcPort = 2;
+ int32 pushPort = 3;
+ int32 fetchPort = 4;
+ int32 replicatePort = 5;
+ map<string, PbDiskInfo> disks = 6;
+ map<string, PbResourceConsumption> userResourceConsumption = 7;
+ int32 internalPort = 8;
+ string networkLocation = 9;
+}
+
message PbHeartbeatFromWorker {
string host = 1;
int32 rpcPort = 2;
@@ -223,6 +235,20 @@ message PbHeartbeatFromWorker {
PbWorkerStatus workerStatus = 12;
}
+message PbMetaWorkerHeartbeatRequest {
+ string host = 1;
+ int32 rpcPort = 2;
+ int32 pushPort = 3;
+ int32 fetchPort = 4;
+ int32 replicatePort = 5;
+ map<string, PbDiskInfo> disks = 6;
+ int64 time = 7;
+ map<string, PbResourceConsumption> userResourceConsumption = 8;
+ map<string, int64> estimatedAppDiskUsage = 9; // deprecated
+ bool highWorkload = 10;
+ PbWorkerStatus workerStatus = 11;
+}
+
message PbWorkerStatus {
enum State {
Normal = 0;
@@ -437,12 +463,20 @@ message PbUnregisterShuffle {
string requestId = 3;
}
+message PbMetaUnregisterShuffle {
+ string shuffleKey = 1;
+}
+
message PbBatchUnregisterShuffles {
string appId = 1;
string requestId = 2;
repeated int32 shuffleIds = 3;
}
+message PbMetaBatchUnregisterShuffles {
+ repeated string shuffleKeys = 1;
+}
+
message PbUnregisterShuffleResponse {
int32 status = 1;
}
@@ -461,6 +495,14 @@ message PbApplicationLostResponse {
int32 status = 1;
}
+message PbHeartbeatInfo {
+ string appId = 1;
+ int64 totalWritten = 2;
+ int64 fileCount = 3;
+ int64 shuffleCount = 4;
+ map<string, int64> shuffleFallbackCounts = 5;
+}
+
message PbHeartbeatFromApplication {
string appId = 1;
int64 totalWritten = 2;
@@ -472,6 +514,7 @@ message PbHeartbeatFromApplication {
map<string, int64> shuffleFallbackCounts = 8;
int64 applicationCount = 9;
map<string, int64> applicationFallbackCounts = 10;
+ PbHeartbeatInfo heartbeatInfo = 11;
}
message PbHeartbeatFromApplicationResponse {
@@ -939,3 +982,96 @@ message PbReadReducerPartitionEndResponse {
int32 status = 1;
string errorMsg = 2;
}
+
+enum PbMetaRequestType {
+ Unknown = 0;
+
+ UnRegisterShuffle = 12;
+ RequestSlots = 13;
+ //deprecated
+ ReleaseSlots = 14;
+ AppHeartbeat = 15;
+ AppLost = 16;
+ WorkerLost = 17;
+ WorkerHeartbeat = 18;
+ RegisterWorker = 19;
+ ReportWorkerUnavailable = 20;
+ UpdatePartitionSize = 21;
+ WorkerRemove = 22;
+ RemoveWorkersUnavailableInfo = 23;
+ WorkerExclude = 24;
+ WorkerEvent = 25;
+ ApplicationMeta = 26;
+ ReportWorkerDecommission = 27;
+ BatchUnRegisterShuffle = 28;
+ ReviseLostShuffles = 29;
+}
+
+message PbMetaRequest {
+ PbMetaRequestType metaRequestType = 1;
+ string requestId = 2;
+
+ PbMetaRequestSlotsRequest requestSlotsRequest = 10;
+ // deprecated
+ PbReleaseSlots releaseSlotsRequest = 11;
+ PbMetaUnregisterShuffle unregisterShuffleRequest = 12;
+ PbMetaAppHeartbeatRequest appHeartbeatRequest = 13;
+ PbApplicationLost appLostRequest = 14;
+ PbWorkerAddress workerLostRequest = 15;
+ PbMetaWorkerHeartbeatRequest workerHeartbeatRequest = 16;
+ PbMetaRegisterWorkerRequest registerWorkerRequest = 17;
+ PbReportWorkerUnavailable reportWorkerUnavailableRequest = 18;
+ PbWorkerAddress workerRemoveRequest = 19;
+ PbRemoveWorkersUnavailableInfo removeWorkersUnavailableInfoRequest = 20;
+ PbMetaWorkerExcludeRequest workerExcludeRequest = 21;
+ PbWorkerEventRequest workerEventRequest = 22;
+ PbApplicationMeta applicationMetaRequest = 23;
+ PbReportWorkerDecommission reportWorkerDecommissionRequest = 24;
+ PbMetaBatchUnregisterShuffles batchUnregisterShuffleRequest = 25;
+ PbReviseLostShuffles reviseLostShufflesRequest = 102;
+}
+
+message PbMetaRequestSlotsRequest {
+ string shuffleKey = 1;
+ string hostName = 2;
+ map<string, PbSlotInfo> workerAllocations = 3;
+}
+
+message PbMetaAppHeartbeatRequest {
+ string appId = 1;
+ int64 time = 2;
+ int64 totalWritten = 3;
+ int64 fileCount = 4;
+ int64 shuffleCount = 5;
+ map<string, int64> shuffleFallbackCounts = 6;
+ PbHeartbeatInfo heartbeatInfo = 7;
+ int64 applicationCount = 8;
+ map<string, int64> applicationFallbackCounts = 9;
+}
+
+message PbMetaWorkerExcludeRequest {
+ repeated PbWorkerAddress workersToAdd = 1;
+ repeated PbWorkerAddress workersToRemove = 2;
+}
+
+message PbWorkerAddress {
+ string host = 1;
+ int32 rpcPort = 2;
+ int32 pushPort = 3;
+ int32 fetchPort = 4;
+ int32 replicatePort = 5;
+ int32 internalPort = 6;
+}
+
+enum PbMetaRequestStatus {
+ UNKNOWN = 0;
+ OK = 1;
+ INTERNAL_ERROR= 2;
+}
+
+message PbMetaRequestResponse {
+ PbMetaRequestType metaRequestType = 1;
+ bool success = 2;
+ string message = 3;
+ PbMetaRequestStatus status = 4;
+}
\ No newline at end of file
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/MetaUtil.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/MetaUtil.java
index 1937dec25..fb26a4e6f 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/MetaUtil.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/MetaUtil.java
@@ -26,6 +26,10 @@ import org.apache.celeborn.common.identity.UserIdentifier$;
import org.apache.celeborn.common.meta.DiskInfo;
import org.apache.celeborn.common.meta.WorkerInfo;
import org.apache.celeborn.common.meta.WorkerStatus;
+import org.apache.celeborn.common.protocol.PbDiskInfo;
+import org.apache.celeborn.common.protocol.PbWorkerAddress;
+import org.apache.celeborn.common.protocol.PbWorkerInfo;
+import org.apache.celeborn.common.protocol.PbWorkerStatus;
import org.apache.celeborn.common.protocol.StorageInfo;
import org.apache.celeborn.common.quota.ResourceConsumption;
import org.apache.celeborn.common.util.CollectionUtils;
@@ -44,6 +48,26 @@ public class MetaUtil {
address.getInternalPort());
}
+ public static WorkerInfo addrToInfo(PbWorkerAddress address) {
+ return new WorkerInfo(
+ address.getHost(),
+ address.getRpcPort(),
+ address.getPushPort(),
+ address.getFetchPort(),
+ address.getReplicatePort(),
+ address.getInternalPort());
+ }
+
+ public static WorkerInfo pbInfoToInfo(PbWorkerInfo pbWorkerInfo) {
+ return new WorkerInfo(
+ pbWorkerInfo.getHost(),
+ pbWorkerInfo.getRpcPort(),
+ pbWorkerInfo.getPushPort(),
+ pbWorkerInfo.getFetchPort(),
+ pbWorkerInfo.getReplicatePort(),
+ pbWorkerInfo.getInternalPort());
+ }
+
public static ResourceProtos.WorkerAddress infoToAddr(WorkerInfo info) {
return ResourceProtos.WorkerAddress.newBuilder()
.setHost(info.host())
@@ -76,6 +100,26 @@ public class MetaUtil {
return map;
}
+ public static Map<String, DiskInfo> fromPbDiskInfoMap(Map<String,
PbDiskInfo> pbDiskInfoMap) {
+ Map<String, DiskInfo> map = new HashMap<>();
+
+ pbDiskInfoMap.forEach(
+ (k, v) -> {
+ DiskInfo diskInfo =
+ new DiskInfo(
+ v.getMountPoint(),
+ v.getUsableSpace(),
+ v.getAvgFlushTime(),
+ v.getAvgFetchTime(),
+ v.getUsedSlots(),
+ StorageInfo.typesMap.get(v.getStorageType()))
+ .setStatus(Utils.toDiskStatus(v.getStatus()))
+ .setTotalSpace(v.getTotalSpace());
+ map.put(k, diskInfo);
+ });
+ return map;
+ }
+
public static Map<String, ResourceProtos.DiskInfo> toPbDiskInfos(
Map<String, DiskInfo> diskInfos) {
Map<String, ResourceProtos.DiskInfo> map = new HashMap<>();
@@ -167,4 +211,9 @@ public class MetaUtil {
public static WorkerStatus fromPbWorkerStatus(ResourceProtos.WorkerStatus
workerStatus) {
return new WorkerStatus(workerStatus.getState().getNumber(),
workerStatus.getStateStartTime());
}
+
+ public static WorkerStatus fromPbWorkerStatus(PbWorkerStatus pbWorkerStatus)
{
+ return new WorkerStatus(
+ pbWorkerStatus.getState().getNumber(),
pbWorkerStatus.getStateStartTime());
+ }
}
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java
index 71bcaf064..71d6cd1e1 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java
@@ -28,6 +28,7 @@ import
org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.celeborn.common.client.MasterNotLeaderException;
import org.apache.celeborn.common.exception.CelebornIOException;
+import org.apache.celeborn.common.protocol.PbMetaRequest;
import org.apache.celeborn.common.rpc.RpcCallContext;
import
org.apache.celeborn.service.deploy.master.clustermeta.AbstractMetaManager;
import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos;
@@ -106,10 +107,15 @@ public class HAHelper {
return ByteString.copyFrom(requestBytes);
}
- public static ResourceProtos.ResourceRequest
convertByteStringToRequest(ByteString byteString)
+ public static ByteString convertRequestToByteString(PbMetaRequest request) {
+ byte[] requestBytes = request.toByteArray();
+ return ByteString.copyFrom(requestBytes);
+ }
+
+ public static PbMetaRequest convertByteStringToRequest(ByteString byteString)
throws InvalidProtocolBufferException {
byte[] bytes = byteString.toByteArray();
- return ResourceProtos.ResourceRequest.parseFrom(bytes);
+ return PbMetaRequest.parseFrom(bytes);
}
public static Message
convertResponseToMessage(ResourceProtos.ResourceResponse response) {
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
index abe21a4de..8cbf4de95 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
@@ -32,6 +32,7 @@ import org.apache.celeborn.common.meta.ApplicationMeta;
import org.apache.celeborn.common.meta.DiskInfo;
import org.apache.celeborn.common.meta.WorkerInfo;
import org.apache.celeborn.common.meta.WorkerStatus;
+import org.apache.celeborn.common.protocol.*;
import org.apache.celeborn.common.quota.ResourceConsumption;
import org.apache.celeborn.common.util.CollectionUtils;
import org.apache.celeborn.service.deploy.master.clustermeta.MetaUtil;
@@ -69,6 +70,14 @@ public class MetaHandler {
.setSuccess(true);
}
+ public static
org.apache.celeborn.common.protocol.PbMetaRequestResponse.Builder
+ getMasterMetaResponseBuilder(PbMetaRequest request) {
+ return
org.apache.celeborn.common.protocol.PbMetaRequestResponse.newBuilder()
+ .setMetaRequestType(request.getMetaRequestType())
+ .setStatus(PbMetaRequestStatus.OK)
+ .setSuccess(true);
+ }
+
public ResourceResponse handleReadRequest(ResourceProtos.ResourceRequest
request) {
ResourceProtos.Type cmdType = request.getCmdType();
ResourceResponse.Builder responseBuilder =
getMasterMetaResponseBuilder(request);
@@ -88,9 +97,11 @@ public class MetaHandler {
return responseBuilder.build();
}
- public ResourceResponse handleWriteRequest(ResourceProtos.ResourceRequest
request) {
- ResourceProtos.Type cmdType = request.getCmdType();
- ResourceResponse.Builder responseBuilder =
getMasterMetaResponseBuilder(request);
+ public org.apache.celeborn.common.protocol.PbMetaRequestResponse
handleWriteRequest(
+ PbMetaRequest request) {
+ PbMetaRequestType metaRequestType = request.getMetaRequestType();
+ org.apache.celeborn.common.protocol.PbMetaRequestResponse.Builder
responseBuilder =
+ getMasterMetaResponseBuilder(request);
try {
String shuffleKey;
String appId;
@@ -103,7 +114,7 @@ public class MetaHandler {
Map<UserIdentifier, ResourceConsumption> userResourceConsumption;
WorkerStatus workerStatus;
List<Integer> lostShuffles;
- switch (cmdType) {
+ switch (metaRequestType) {
case ReviseLostShuffles:
appId = request.getReviseLostShufflesRequest().getAppId();
lostShuffles =
request.getReviseLostShufflesRequest().getLostShufflesList();
@@ -142,11 +153,7 @@ public class MetaHandler {
long fileCount = request.getAppHeartbeatRequest().getFileCount();
long shuffleCount =
request.getAppHeartbeatRequest().getShuffleCount();
long applicationCount =
request.getAppHeartbeatRequest().getApplicationCount();
- LOG.debug(
- "Handle app heartbeat for {} with shuffle count {} and
application count {}",
- appId,
- shuffleCount,
- applicationCount);
+ LOG.debug("Handle app heartbeat for {} with shuffle count {}",
appId, shuffleCount);
Map<String, Long> shuffleFallbackCounts =
request.getAppHeartbeatRequest().getShuffleFallbackCountsMap();
if (CollectionUtils.isNotEmpty(shuffleFallbackCounts)) {
@@ -175,9 +182,9 @@ public class MetaHandler {
break;
case WorkerExclude:
- List<ResourceProtos.WorkerAddress> addAddresses =
+ List<PbWorkerAddress> addAddresses =
request.getWorkerExcludeRequest().getWorkersToAddList();
- List<ResourceProtos.WorkerAddress> removeAddresses =
+ List<PbWorkerAddress> removeAddresses =
request.getWorkerExcludeRequest().getWorkersToRemoveList();
List<WorkerInfo> workersToAdd =
addAddresses.stream().map(MetaUtil::addrToInfo).collect(Collectors.toList());
@@ -212,7 +219,8 @@ public class MetaHandler {
rpcPort = request.getWorkerHeartbeatRequest().getRpcPort();
pushPort = request.getWorkerHeartbeatRequest().getPushPort();
fetchPort = request.getWorkerHeartbeatRequest().getFetchPort();
- diskInfos =
MetaUtil.fromPbDiskInfos(request.getWorkerHeartbeatRequest().getDisksMap());
+ Map<String, PbDiskInfo> pbDiskInfoMap =
request.getWorkerHeartbeatRequest().getDisksMap();
+ diskInfos = MetaUtil.fromPbDiskInfoMap(pbDiskInfoMap);
replicatePort =
request.getWorkerHeartbeatRequest().getReplicatePort();
boolean highWorkload =
request.getWorkerHeartbeatRequest().getHighWorkload();
if (request.getWorkerHeartbeatRequest().hasWorkerStatus()) {
@@ -237,7 +245,7 @@ public class MetaHandler {
fetchPort,
replicatePort,
diskInfos,
- request.getWorkerHeartbeatRequest().getTime(),
+
request.getWorkerHeartbeatRequest().getWorkerStatus().getStateStartTime(),
workerStatus,
highWorkload);
break;
@@ -250,7 +258,8 @@ public class MetaHandler {
replicatePort =
request.getRegisterWorkerRequest().getReplicatePort();
String networkLocation =
request.getRegisterWorkerRequest().getNetworkLocation();
int internalPort =
request.getRegisterWorkerRequest().getInternalPort();
- diskInfos =
MetaUtil.fromPbDiskInfos(request.getRegisterWorkerRequest().getDisksMap());
+ Map<String, PbDiskInfo> pbDiskInfo =
request.getRegisterWorkerRequest().getDisksMap();
+ diskInfos = MetaUtil.fromPbDiskInfoMap(pbDiskInfo);
LOG.debug(
"Handle worker register for {} {} {} {} {} {} {}",
host,
@@ -272,10 +281,10 @@ public class MetaHandler {
break;
case ReportWorkerUnavailable:
- List<ResourceProtos.WorkerAddress> failedAddress =
+ List<PbWorkerInfo> failedAddress =
request.getReportWorkerUnavailableRequest().getUnavailableList();
List<WorkerInfo> failedWorkers =
-
failedAddress.stream().map(MetaUtil::addrToInfo).collect(Collectors.toList());
+
failedAddress.stream().map(MetaUtil::pbInfoToInfo).collect(Collectors.toList());
metaSystem.updateMetaByReportWorkerUnavailable(failedWorkers);
break;
@@ -284,18 +293,17 @@ public class MetaHandler {
break;
case RemoveWorkersUnavailableInfo:
- List<ResourceProtos.WorkerAddress> unavailableList =
-
request.getRemoveWorkersUnavailableInfoRequest().getUnavailableList();
+ List<PbWorkerInfo> unavailableList =
+
request.getRemoveWorkersUnavailableInfoRequest().getWorkerInfoList();
List<WorkerInfo> unavailableWorkers =
-
unavailableList.stream().map(MetaUtil::addrToInfo).collect(Collectors.toList());
+
unavailableList.stream().map(MetaUtil::pbInfoToInfo).collect(Collectors.toList());
metaSystem.removeWorkersUnavailableInfoMeta(unavailableWorkers);
break;
case WorkerEvent:
- List<ResourceProtos.WorkerAddress> workerAddresses =
- request.getWorkerEventRequest().getWorkerAddressList();
+ List<PbWorkerInfo> workerAddresses =
request.getWorkerEventRequest().getWorkersList();
List<WorkerInfo> workerInfoList =
-
workerAddresses.stream().map(MetaUtil::addrToInfo).collect(Collectors.toList());
+
workerAddresses.stream().map(MetaUtil::pbInfoToInfo).collect(Collectors.toList());
metaSystem.updateWorkerEventMeta(
request.getWorkerEventRequest().getWorkerEventType().getNumber(),
workerInfoList);
break;
@@ -307,21 +315,21 @@ public class MetaHandler {
break;
case ReportWorkerDecommission:
- List<ResourceProtos.WorkerAddress> decommissionList =
+ List<PbWorkerInfo> decommissionList =
request.getReportWorkerDecommissionRequest().getWorkersList();
List<WorkerInfo> decommissionWorkers =
-
decommissionList.stream().map(MetaUtil::addrToInfo).collect(Collectors.toList());
+
decommissionList.stream().map(MetaUtil::pbInfoToInfo).collect(Collectors.toList());
metaSystem.updateMetaByReportWorkerDecommission(decommissionWorkers);
break;
default:
throw new IOException("Can not parse this command!" + request);
}
- responseBuilder.setStatus(ResourceProtos.Status.OK);
+ responseBuilder.setStatus(PbMetaRequestStatus.OK);
} catch (IOException e) {
- LOG.warn("Handle meta write request {} failed!", cmdType, e);
+ LOG.warn("Handle meta write request {} failed!", metaRequestType, e);
responseBuilder.setSuccess(false);
- responseBuilder.setStatus(ResourceProtos.Status.INTERNAL_ERROR);
+ responseBuilder.setStatus(PbMetaRequestStatus.INTERNAL_ERROR);
if (e.getMessage() != null) {
responseBuilder.setMessage(e.getMessage());
}
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java
index 40d23c59f..65ffa1f4a 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java
@@ -54,6 +54,8 @@ import org.apache.ratis.util.MD5FileUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.celeborn.common.protocol.PbMetaRequest;
+import org.apache.celeborn.common.protocol.PbMetaRequestResponse;
import org.apache.celeborn.common.util.ThreadUtils;
import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos;
import
org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos.ResourceResponse;
@@ -174,7 +176,7 @@ public class StateMachine extends BaseStateMachine {
@Override
public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
try {
- ResourceProtos.ResourceRequest request =
+ PbMetaRequest request =
HAHelper.convertByteStringToRequest(trx.getStateMachineLogEntry().getLogData());
long trxLogIndex = trx.getLogEntry().getIndex();
// In the current approach we have one single global thread executor.
@@ -184,14 +186,14 @@ public class StateMachine extends BaseStateMachine {
// chance that Master replica can be out of sync.
// Ref: from Ozone project (OzoneManagerStateMachine)
CompletableFuture<Message> ratisFuture = new CompletableFuture<>();
- CompletableFuture<ResourceResponse> future =
+ CompletableFuture<PbMetaRequestResponse> future =
CompletableFuture.supplyAsync(() -> runCommand(request,
trxLogIndex), executorService);
future.thenApply(
response -> {
if (!response.getSuccess()) {
LOG.warn(
"Failed to apply log {} for this raft group {}!",
- request.getCmdType(),
+ request.getMetaRequestType(),
this.raftGroupId);
}
@@ -212,7 +214,7 @@ public class StateMachine extends BaseStateMachine {
* @return response from meta system
*/
@VisibleForTesting
- protected ResourceResponse runCommand(ResourceProtos.ResourceRequest
request, long trxLogIndex) {
+ protected PbMetaRequestResponse runCommand(PbMetaRequest request, long
trxLogIndex) {
try {
return metaHandler.handleWriteRequest(request);
} catch (Throwable e) {
diff --git a/master/src/main/proto/Resource.proto
b/master/src/main/proto/Resource.proto
index c8e9c2aee..e3c688aa1 100644
--- a/master/src/main/proto/Resource.proto
+++ b/master/src/main/proto/Resource.proto
@@ -88,7 +88,7 @@ message DiskInfo {
int64 usedSlots = 4;
int32 status = 5;
int64 avgFetchTime = 6;
- int32 storageType =7;
+ int32 storageType = 7;
int64 totalSpace = 8;
}
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
index 2b03afd56..46a9c68e6 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
@@ -29,6 +29,8 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.junit.Assert;
@@ -40,55 +42,62 @@ import org.apache.celeborn.common.client.MasterClient;
import org.apache.celeborn.common.identity.UserIdentifier;
import org.apache.celeborn.common.meta.DiskInfo;
import org.apache.celeborn.common.meta.WorkerInfo;
+import org.apache.celeborn.common.protocol.PbMetaRequest;
+import org.apache.celeborn.common.protocol.PbMetaRequestResponse;
+import org.apache.celeborn.common.protocol.PbMetaRequestSlotsRequest;
+import org.apache.celeborn.common.protocol.PbSlotInfo;
import org.apache.celeborn.common.quota.ResourceConsumption;
import org.apache.celeborn.common.rpc.RpcEnv;
import org.apache.celeborn.common.util.JavaUtils;
import org.apache.celeborn.common.util.Utils;
import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos;
-import
org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos.RequestSlotsRequest;
-import
org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos.ResourceRequest;
-import
org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos.ResourceResponse;
-import
org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos.Type;
public class MasterStateMachineSuiteJ extends RatisBaseSuiteJ {
private final AtomicLong callerId = new AtomicLong();
@Test
- public void testRunCommand() {
+ public void testRunCommandByTransportMessage() throws
InvalidProtocolBufferException {
StateMachine stateMachine = ratisServer.getMasterStateMachine();
Map<String, Integer> allocations = new HashMap<>();
allocations.put("disk1", 15);
allocations.put("disk2", 20);
- Map<String, ResourceProtos.SlotInfo> workerAllocations = new HashMap<>();
+ Map<String, PbSlotInfo> workerAllocations = new HashMap<>();
workerAllocations.put(
new WorkerInfo("host1", 1, 2, 3, 10).toUniqueId(),
- ResourceProtos.SlotInfo.newBuilder().putAllSlot(allocations).build());
+ PbSlotInfo.newBuilder().putAllSlot(allocations).build());
workerAllocations.put(
new WorkerInfo("host2", 2, 3, 4, 11).toUniqueId(),
- ResourceProtos.SlotInfo.newBuilder().putAllSlot(allocations).build());
+ PbSlotInfo.newBuilder().putAllSlot(allocations).build());
workerAllocations.put(
new WorkerInfo("host3", 3, 4, 5, 12).toUniqueId(),
- ResourceProtos.SlotInfo.newBuilder().putAllSlot(allocations).build());
+ PbSlotInfo.newBuilder().putAllSlot(allocations).build());
- RequestSlotsRequest requestSlots =
- RequestSlotsRequest.newBuilder()
+ PbMetaRequestSlotsRequest requestSlots =
+ PbMetaRequestSlotsRequest.newBuilder()
.setShuffleKey("appId-1-1")
.setHostName("hostname")
.putAllWorkerAllocations(workerAllocations)
.build();
- ResourceRequest request =
- ResourceRequest.newBuilder()
+ PbMetaRequest request =
+ PbMetaRequest.newBuilder()
.setRequestSlotsRequest(requestSlots)
- .setCmdType(Type.RequestSlots)
+
.setMetaRequestType(org.apache.celeborn.common.protocol.PbMetaRequestType.RequestSlots)
.setRequestId(UUID.randomUUID().toString())
.build();
- ResourceResponse response = stateMachine.runCommand(request, -1);
+ PbMetaRequestResponse response = stateMachine.runCommand(request, -1);
Assert.assertTrue(response.getSuccess());
+
+ ByteString byteString = request.toByteString();
+ ResourceProtos.ResourceRequest resourceRequest =
+ ResourceProtos.ResourceRequest.parseFrom(byteString);
+ Assert.assertEquals("appId-1-1",
resourceRequest.getRequestSlotsRequest().getShuffleKey());
+ Assert.assertEquals("hostname",
resourceRequest.getRequestSlotsRequest().getHostName());
+ Assert.assertEquals(3,
resourceRequest.getRequestSlotsRequest().getWorkerAllocations().size());
}
@Test
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index be644869f..7c66cccec 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -1497,6 +1497,86 @@ public class RatisMasterStatusSystemSuiteJ {
Assert.assertEquals(2, STATUSSYSTEM3.availableWorkers.size());
}
+ @Test
+ public void testHandleWorkerEventByTransportMessage() throws
InterruptedException {
+ AbstractMetaManager statusSystem = pickLeaderStatusSystem();
+ Assert.assertNotNull(statusSystem);
+
+ statusSystem.handleRegisterWorker(
+ HOSTNAME1,
+ RPCPORT1,
+ PUSHPORT1,
+ FETCHPORT1,
+ REPLICATEPORT1,
+ INTERNALPORT1,
+ NETWORK_LOCATION1,
+ disks1,
+ userResourceConsumption1,
+ getNewReqeustId());
+ statusSystem.handleRegisterWorker(
+ HOSTNAME2,
+ RPCPORT2,
+ PUSHPORT2,
+ FETCHPORT2,
+ REPLICATEPORT2,
+ INTERNALPORT2,
+ NETWORK_LOCATION2,
+ disks2,
+ userResourceConsumption2,
+ getNewReqeustId());
+ statusSystem.handleRegisterWorker(
+ HOSTNAME3,
+ RPCPORT3,
+ PUSHPORT3,
+ FETCHPORT3,
+ REPLICATEPORT3,
+ INTERNALPORT3,
+ NETWORK_LOCATION3,
+ disks3,
+ userResourceConsumption3,
+ getNewReqeustId());
+
+ WorkerInfo workerInfo1 =
+ WorkerInfo.fromUniqueId(
+ HOSTNAME1 + ":" + RPCPORT1 + ":" + PUSHPORT1 + ":" + FETCHPORT1 +
":" + REPLICATEPORT1);
+ WorkerInfo workerInfo2 =
+ WorkerInfo.fromUniqueId(
+ HOSTNAME2 + ":" + RPCPORT2 + ":" + PUSHPORT2 + ":" + FETCHPORT2 +
":" + REPLICATEPORT2);
+ statusSystem.handleWorkerEvent(
+ WorkerEventType.Decommission_VALUE,
+ Lists.newArrayList(workerInfo1, workerInfo2),
+ getNewReqeustId());
+
+ Thread.sleep(3000L);
+ Assert.assertEquals(2, STATUSSYSTEM1.workerEventInfos.size());
+ Assert.assertEquals(2, STATUSSYSTEM2.workerEventInfos.size());
+ Assert.assertEquals(2, STATUSSYSTEM3.workerEventInfos.size());
+
+ Assert.assertEquals(1, STATUSSYSTEM1.availableWorkers.size());
+ Assert.assertEquals(1, STATUSSYSTEM2.availableWorkers.size());
+ Assert.assertEquals(1, STATUSSYSTEM3.availableWorkers.size());
+
+ Assert.assertTrue(STATUSSYSTEM1.workerEventInfos.containsKey(workerInfo1));
+ Assert.assertTrue(STATUSSYSTEM1.workerEventInfos.containsKey(workerInfo2));
+
+ Assert.assertEquals(
+ WorkerEventType.Decommission,
+ STATUSSYSTEM1.workerEventInfos.get(workerInfo1).getEventType());
+
+ statusSystem.handleWorkerEvent(
+ WorkerEventType.None_VALUE, Lists.newArrayList(workerInfo1),
getNewReqeustId());
+ Thread.sleep(3000L);
+ Assert.assertEquals(1, STATUSSYSTEM1.workerEventInfos.size());
+ Assert.assertEquals(1, STATUSSYSTEM2.workerEventInfos.size());
+ Assert.assertEquals(1, STATUSSYSTEM3.workerEventInfos.size());
+ Assert.assertTrue(STATUSSYSTEM1.workerEventInfos.containsKey(workerInfo2));
+ Assert.assertEquals(
+ WorkerEventType.Decommission,
+ STATUSSYSTEM1.workerEventInfos.get(workerInfo2).getEventType());
+
+ Assert.assertEquals(2, STATUSSYSTEM1.availableWorkers.size());
+ }
+
@Test
public void testReviseShuffles() throws InterruptedException {
AbstractMetaManager statusSystem = pickLeaderStatusSystem();