This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 2011c448b [CELEBORN-935][FOLLOWUP] Remove WorkerRemove in MetaHandler
2011c448b is described below
commit 2011c448b7a4c34a1c0b8ebd5d2ffbcb4a312929
Author: xxx <[email protected]>
AuthorDate: Thu Sep 11 17:14:52 2025 +0800
[CELEBORN-935][FOLLOWUP] Remove WorkerRemove in MetaHandler
### What changes were proposed in this pull request?
Remove `WorkerRemove` in `MetaHandler`.
### Why are the changes needed?
Remove `WorkerRemove` in 0.7.x version to guarantee upgrade compatibility.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes #3474 from xy2953396112/CELEBORN-935.
Authored-by: xxx <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
common/src/main/proto/TransportMessages.proto | 2 --
docs/migration.md | 4 ++++
.../deploy/master/clustermeta/AbstractMetaManager.java | 12 ------------
.../service/deploy/master/clustermeta/ha/MetaHandler.java | 11 -----------
master/src/main/proto/Resource.proto | 10 ----------
5 files changed, 4 insertions(+), 35 deletions(-)
diff --git a/common/src/main/proto/TransportMessages.proto
b/common/src/main/proto/TransportMessages.proto
index be8af738a..e47857af4 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -1006,7 +1006,6 @@ enum PbMetaRequestType {
RegisterWorker = 19;
ReportWorkerUnavailable = 20;
UpdatePartitionSize = 21;
- WorkerRemove = 22;
RemoveWorkersUnavailableInfo = 23;
WorkerExclude = 24;
WorkerEvent = 25;
@@ -1031,7 +1030,6 @@ message PbMetaRequest {
PbMetaWorkerHeartbeatRequest workerHeartbeatRequest = 16;
PbMetaRegisterWorkerRequest registerWorkerRequest = 17;
PbReportWorkerUnavailable reportWorkerUnavailableRequest = 18;
- PbWorkerAddress workerRemoveRequest = 19;
PbRemoveWorkersUnavailableInfo removeWorkersUnavailableInfoRequest = 20;
PbMetaWorkerExcludeRequest workerExcludeRequest = 21;
PbWorkerEventRequest workerEventRequest = 22;
diff --git a/docs/migration.md b/docs/migration.md
index 748fc884a..da696d62f 100644
--- a/docs/migration.md
+++ b/docs/migration.md
@@ -21,6 +21,10 @@ license: |
# Migration Guide
+# Upgrading from 0.6 to 0.7
+
+- Since 0.7.0, Celeborn removed `WorkerRemove`.
+
# Upgrading from 0.5 to 0.6
- Since 0.6.0, Celeborn deprecate
`celeborn.client.spark.fetch.throwsFetchFailure`. Please use
`celeborn.client.spark.stageRerun.enabled` instead.
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index ae639d2e5..fc6627fd2 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -238,18 +238,6 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
workerLostEvents.remove(worker);
}
- public void updateWorkerRemoveMeta(
- String host, int rpcPort, int pushPort, int fetchPort, int
replicatePort) {
- WorkerInfo worker = new WorkerInfo(host, rpcPort, pushPort, fetchPort,
replicatePort);
- // remove worker from workers
- synchronized (workersMap) {
- workersMap.remove(worker.toUniqueId());
- lostWorkers.put(worker, System.currentTimeMillis());
- availableWorkers.remove(worker);
- }
- excludedWorkers.remove(worker);
- }
-
public void removeWorkersUnavailableInfoMeta(List<WorkerInfo>
unavailableWorkers) {
synchronized (workersMap) {
for (WorkerInfo workerInfo : unavailableWorkers) {
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
index fa18ce925..312f67df7 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
@@ -215,17 +215,6 @@ public class MetaHandler {
metaSystem.updateWorkerLostMeta(host, rpcPort, pushPort, fetchPort,
replicatePort);
break;
- case WorkerRemove:
- // TODO: Remove `WorkerRemove` in 0.7.x version to guarantee upgrade
compatibility.
- host = request.getWorkerRemoveRequest().getHost();
- rpcPort = request.getWorkerRemoveRequest().getRpcPort();
- pushPort = request.getWorkerRemoveRequest().getPushPort();
- fetchPort = request.getWorkerRemoveRequest().getFetchPort();
- replicatePort = request.getWorkerRemoveRequest().getReplicatePort();
- LOG.debug("Handle worker remove for {} {}", host, pushPort);
- metaSystem.updateWorkerRemoveMeta(host, rpcPort, pushPort,
fetchPort, replicatePort);
- break;
-
case WorkerHeartbeat:
host = request.getWorkerHeartbeatRequest().getHost();
rpcPort = request.getWorkerHeartbeatRequest().getRpcPort();
diff --git a/master/src/main/proto/Resource.proto
b/master/src/main/proto/Resource.proto
index a10a368fa..04c64067d 100644
--- a/master/src/main/proto/Resource.proto
+++ b/master/src/main/proto/Resource.proto
@@ -35,7 +35,6 @@ enum Type {
RegisterWorker = 19;
ReportWorkerUnavailable = 20;
UpdatePartitionSize = 21;
- WorkerRemove = 22;
RemoveWorkersUnavailableInfo = 23;
WorkerExclude = 24;
WorkerEvent = 25;
@@ -73,7 +72,6 @@ message ResourceRequest {
optional WorkerHeartbeatRequest workerHeartbeatRequest = 16;
optional RegisterWorkerRequest registerWorkerRequest = 17;
optional ReportWorkerUnavailableRequest reportWorkerUnavailableRequest = 18;
- optional WorkerRemoveRequest workerRemoveRequest = 19;
optional RemoveWorkersUnavailableInfoRequest
removeWorkersUnavailableInfoRequest = 20;
optional WorkerExcludeRequest workerExcludeRequest = 21;
optional WorkerEventRequest workerEventRequest = 22;
@@ -156,14 +154,6 @@ message WorkerLostRequest {
int32 replicatePort = 5;
}
-message WorkerRemoveRequest {
- string host = 1;
- int32 rpcPort = 2;
- int32 pushPort = 3;
- int32 fetchPort = 4 ;
- int32 replicatePort = 5;
-}
-
message WorkerHeartbeatRequest {
string host = 1;
int32 rpcPort = 2;