This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch 2.0.2-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/2.0.2-prepare by this push:
new 85b3b2d [DS-7686][Server]fix restart server after kill force (#7689)
85b3b2d is described below
commit 85b3b2d29d374016dd9a6b2d49153c0371ab8c1f
Author: wind <[email protected]>
AuthorDate: Tue Dec 28 22:34:14 2021 +0800
[DS-7686][Server]fix restart server after kill force (#7689)
* [DS-7686][Server]fix restart server after kill force
* update registry logic
Co-authored-by: caishunfeng <[email protected]>
---
.../master/registry/MasterRegistryClient.java | 21 +++++++++------
.../worker/registry/WorkerRegistryClient.java | 30 +++++++++++++---------
2 files changed, 31 insertions(+), 20 deletions(-)
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index 425d6e9..2243e90 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -119,14 +119,6 @@ public class MasterRegistryClient {
registryClient.getLock(nodeLock);
// master registry
registry();
- String registryPath = getMasterPath();
-
registryClient.handleDeadServer(Collections.singleton(registryPath),
NodeType.MASTER, Constants.DELETE_OP);
-
- // init system node
-
- while (!registryClient.checkNodeExists(NetUtils.getHost(),
NodeType.MASTER)) {
- ThreadUtils.sleep(SLEEP_TIME_MILLIS);
- }
registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new
MasterRegistryDataListener());
} catch (Exception e) {
@@ -524,7 +516,20 @@ public class MasterRegistryClient {
Constants.MASTER_TYPE,
registryClient);
+ // remove before persist
+ registryClient.remove(localNodePath);
registryClient.persistEphemeral(localNodePath,
heartBeatTask.getHeartBeatInfo());
+
+ while (!registryClient.checkNodeExists(NetUtils.getHost(),
NodeType.MASTER)) {
+ ThreadUtils.sleep(SLEEP_TIME_MILLIS);
+ }
+
+ // sleep 1s, waiting master failover remove
+ ThreadUtils.sleep(SLEEP_TIME_MILLIS);
+
+ // delete dead server
+ registryClient.handleDeadServer(Collections.singleton(localNodePath),
NodeType.MASTER, Constants.DELETE_OP);
+
registryClient.addConnectionStateListener(this::handleConnectionState);
this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask,
masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
logger.info("master node : {} registry to ZK successfully with
heartBeatInterval : {}s", address, masterHeartbeatInterval);
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
index 74af482..7f72d1a 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
@@ -102,8 +102,21 @@ public class WorkerRegistryClient {
Set<String> workerZkPaths = getWorkerZkPaths();
int workerHeartbeatInterval =
workerConfig.getWorkerHeartbeatInterval();
+ HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
+ workerConfig.getWorkerMaxCpuloadAvg(),
+ workerConfig.getWorkerReservedMemory(),
+ workerConfig.getHostWeight(),
+ workerZkPaths,
+ Constants.WORKER_TYPE,
+ registryClient,
+ workerConfig.getWorkerExecThreads(),
+ workerManagerThread
+ );
+
for (String workerZKPath : workerZkPaths) {
- registryClient.persistEphemeral(workerZKPath, "");
+ // remove before persist
+ registryClient.remove(workerZKPath);
+ registryClient.persistEphemeral(workerZKPath,
heartBeatTask.getHeartBeatInfo());
logger.info("worker node : {} registry to ZK {} successfully",
address, workerZKPath);
}
@@ -111,21 +124,14 @@ public class WorkerRegistryClient {
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
}
+ // sleep 1s, waiting master failover remove
+ ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
+
+ // delete dead server
this.handleDeadServer(workerZkPaths, NodeType.WORKER,
Constants.DELETE_OP);
registryClient.addConnectionStateListener(this::handleConnectionState);
- HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
- workerConfig.getWorkerMaxCpuloadAvg(),
- workerConfig.getWorkerReservedMemory(),
- workerConfig.getHostWeight(),
- workerZkPaths,
- Constants.WORKER_TYPE,
- registryClient,
- workerConfig.getWorkerExecThreads(),
- workerManagerThread
- );
-
this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask,
workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS);
logger.info("worker node : {} heartbeat interval {} s", address,
workerHeartbeatInterval);
}