This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 69923546a1 [DS-9263][Improvement][master]optimize failover (#9281)
69923546a1 is described below

commit 69923546a1586ceed97a2d013be6f5a8ee977529
Author: worry <[email protected]>
AuthorDate: Tue Apr 12 11:53:18 2022 +0800

    [DS-9263][Improvement][master]optimize failover (#9281)
    
    - add FailoverService.java
    - move failover method  from MasterRegistryClient to FailoverService
    - move failover code from FailoverExecuteThread to FailoverService
    
    This closes #9263
---
 .../master/registry/MasterRegistryClient.java      | 294 +----------
 .../master/runner/FailoverExecuteThread.java       |  54 +-
 .../FailoverService.java}                          | 561 +++++++--------------
 .../server/master/service/FailoverServiceTest.java | 157 ++++++
 4 files changed, 355 insertions(+), 711 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index e567b77464..c79f9b7071 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -38,6 +38,7 @@ import 
org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
 import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import 
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
+import org.apache.dolphinscheduler.server.master.service.FailoverService;
 import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
 import org.apache.dolphinscheduler.server.utils.ProcessUtils;
 import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -77,10 +78,10 @@ public class MasterRegistryClient {
     private static final Logger logger = 
LoggerFactory.getLogger(MasterRegistryClient.class);
 
     /**
-     * process service
+     * failover service
      */
     @Autowired
-    private ProcessService processService;
+    private FailoverService failoverService;
 
     @Autowired
     private RegistryClient registryClient;
@@ -96,16 +97,11 @@ public class MasterRegistryClient {
      */
     private ScheduledExecutorService heartBeatExecutor;
 
-    @Autowired
-    private WorkflowExecuteThreadPool workflowExecuteThreadPool;
-
     /**
      * master startup time, ms
      */
     private long startupTime;
 
-    private String localNodePath;
-
     public void init() {
         this.startupTime = System.currentTimeMillis();
         this.heartBeatExecutor = 
Executors.newSingleThreadScheduledExecutor(new 
NamedThreadFactory("HeartBeatExecutor"));
@@ -153,10 +149,7 @@ public class MasterRegistryClient {
             return;
         }
 
-        String failoverPath = getFailoverLockPath(nodeType, serverHost);
         try {
-            registryClient.getLock(failoverPath);
-
             if (!registryClient.exists(path)) {
                 logger.info("path: {} not exists", path);
                 // handle dead server
@@ -165,12 +158,10 @@ public class MasterRegistryClient {
 
             //failover server
             if (failover) {
-                failoverServerWhenDown(serverHost, nodeType);
+                failoverService.failoverServerWhenDown(serverHost, nodeType);
             }
         } catch (Exception e) {
             logger.error("{} server failover failed, host:{}", nodeType, 
serverHost, e);
-        } finally {
-            registryClient.releaseLock(failoverPath);
         }
     }
 
@@ -199,287 +190,19 @@ public class MasterRegistryClient {
             }
             //failover server
             if (failover) {
-                failoverServerWhenDown(serverHost, nodeType);
+                failoverService.failoverServerWhenDown(serverHost, nodeType);
             }
         } catch (Exception e) {
             logger.error("{} server failover failed", nodeType, e);
         }
     }
 
-    private boolean isNeedToHandleDeadServer(String host, NodeType nodeType, 
Duration sessionTimeout) {
-        long sessionTimeoutMillis = 
Math.max(Constants.REGISTRY_SESSION_TIMEOUT, sessionTimeout.toMillis());
-        List<Server> serverList = registryClient.getServerList(nodeType);
-        if (CollectionUtils.isEmpty(serverList)) {
-            return true;
-        }
-        Date startupTime = getServerStartupTime(serverList, host);
-        if (startupTime == null) {
-            return true;
-        }
-        if (System.currentTimeMillis() - startupTime.getTime() > 
sessionTimeoutMillis) {
-            return true;
-        }
-        return false;
-    }
-
-    /**
-     * failover server when server down
-     *
-     * @param serverHost server host
-     * @param nodeType zookeeper node type
-     */
-    private void failoverServerWhenDown(String serverHost, NodeType nodeType) {
-        switch (nodeType) {
-            case MASTER:
-                failoverMaster(serverHost);
-                break;
-            case WORKER:
-                failoverWorker(serverHost);
-                break;
-            default:
-                break;
-        }
-    }
-
-    /**
-     * get failover lock path
-     *
-     * @param nodeType zookeeper node type
-     * @return fail over lock path
-     */
-    public String getFailoverLockPath(NodeType nodeType, String host) {
-        switch (nodeType) {
-            case MASTER:
-                return 
Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS + "/" + host;
-            case WORKER:
-                return 
Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS + "/" + host;
-            default:
-                return "";
-        }
-    }
-
-    /**
-     * task needs failover if task start before worker starts
-     *
-     * @param workerServers worker servers
-     * @param taskInstance task instance
-     * @return true if task instance need fail over
-     */
-    private boolean checkTaskInstanceNeedFailover(List<Server> workerServers, 
TaskInstance taskInstance) {
-
-        boolean taskNeedFailover = true;
-
-        //now no host will execute this task instance,so no need to failover 
the task
-        if (taskInstance.getHost() == null) {
-            return false;
-        }
-
-        //if task start after worker starts, there is no need to failover the 
task.
-        if (checkTaskAfterWorkerStart(workerServers, taskInstance)) {
-            taskNeedFailover = false;
-        }
-
-        return taskNeedFailover;
-    }
-
-    /**
-     * check task start after the worker server starts.
-     *
-     * @param taskInstance task instance
-     * @return true if task instance start time after worker server start date
-     */
-    private boolean checkTaskAfterWorkerStart(List<Server> workerServers, 
TaskInstance taskInstance) {
-        if (StringUtils.isEmpty(taskInstance.getHost())) {
-            return false;
-        }
-        Date workerServerStartDate = getServerStartupTime(workerServers, 
taskInstance.getHost());
-        if (workerServerStartDate != null) {
-            if (taskInstance.getStartTime() == null) {
-                return 
taskInstance.getSubmitTime().after(workerServerStartDate);
-            } else {
-                return 
taskInstance.getStartTime().after(workerServerStartDate);
-            }
-        }
-        return false;
-    }
-
-    /**
-     * get server startup time
-     */
-    private Date getServerStartupTime(List<Server> servers, String host) {
-        if (CollectionUtils.isEmpty(servers)) {
-            return null;
-        }
-        Date serverStartupTime = null;
-        for (Server server : servers) {
-            if (host.equals(server.getHost() + Constants.COLON + 
server.getPort())) {
-                serverStartupTime = server.getCreateTime();
-                break;
-            }
-        }
-        return serverStartupTime;
-    }
-
-    /**
-     * get server startup time
-     */
-    private Date getServerStartupTime(NodeType nodeType, String host) {
-        if (StringUtils.isEmpty(host)) {
-            return null;
-        }
-        List<Server> servers = registryClient.getServerList(nodeType);
-        return getServerStartupTime(servers, host);
-    }
-
-    /**
-     * failover worker tasks
-     * <p>
-     * 1. kill yarn job if there are yarn jobs in tasks.
-     * 2. change task state from running to need failover.
-     * 3. failover all tasks when workerHost is null
-     *
-     * @param workerHost worker host
-     */
-    private void failoverWorker(String workerHost) {
-
-        if (StringUtils.isEmpty(workerHost)) {
-            return;
-        }
-
-        List<Server> workerServers = 
registryClient.getServerList(NodeType.WORKER);
-
-        long startTime = System.currentTimeMillis();
-        List<TaskInstance> needFailoverTaskInstanceList = 
processService.queryNeedFailoverTaskInstances(workerHost);
-        Map<Integer, ProcessInstance> processInstanceCacheMap = new 
HashMap<>();
-        logger.info("start worker[{}] failover, task list size:{}", 
workerHost, needFailoverTaskInstanceList.size());
-
-        for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
-            ProcessInstance processInstance = 
processInstanceCacheMap.get(taskInstance.getProcessInstanceId());
-            if (processInstance == null) {
-                processInstance = 
processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
-                if (processInstance == null) {
-                    logger.error("failover task instance error, 
processInstance {} of taskInstance {} is null",
-                            taskInstance.getProcessInstanceId(), 
taskInstance.getId());
-                    continue;
-                }
-                processInstanceCacheMap.put(processInstance.getId(), 
processInstance);
-            }
-
-            if (!checkTaskInstanceNeedFailover(workerServers, taskInstance)) {
-                continue;
-            }
-
-            // only failover the task owned myself if worker down.
-            if 
(!processInstance.getHost().equalsIgnoreCase(getLocalAddress())) {
-                continue;
-            }
-
-            logger.info("failover task instance id: {}, process instance id: 
{}", taskInstance.getId(), taskInstance.getProcessInstanceId());
-            failoverTaskInstance(processInstance, taskInstance);
-        }
-        logger.info("end worker[{}] failover, useTime:{}ms", workerHost, 
System.currentTimeMillis() - startTime);
-    }
-
-    /**
-     * failover master
-     * <p>
-     * failover process instance and associated task instance
-     *
-     * @param masterHost master host
-     */
-    public void failoverMaster(String masterHost) {
-
-        if (StringUtils.isEmpty(masterHost)) {
-            return;
-        }
-
-        Date serverStartupTime = getServerStartupTime(NodeType.MASTER, 
masterHost);
-        List<Server> workerServers = 
registryClient.getServerList(NodeType.WORKER);
-
-        long startTime = System.currentTimeMillis();
-        List<ProcessInstance> needFailoverProcessInstanceList = 
processService.queryNeedFailoverProcessInstances(masterHost);
-        logger.info("start master[{}] failover, process list size:{}", 
masterHost, needFailoverProcessInstanceList.size());
-
-        for (ProcessInstance processInstance : 
needFailoverProcessInstanceList) {
-            if (Constants.NULL.equals(processInstance.getHost())) {
-                continue;
-            }
-
-            List<TaskInstance> validTaskInstanceList = 
processService.findValidTaskListByProcessId(processInstance.getId());
-            for (TaskInstance taskInstance : validTaskInstanceList) {
-                if (Constants.NULL.equals(taskInstance.getHost())) {
-                    continue;
-                }
-                if (taskInstance.getState().typeIsFinished()) {
-                    continue;
-                }
-                if (!checkTaskInstanceNeedFailover(workerServers, 
taskInstance)) {
-                    continue;
-                }
-                logger.info("failover task instance id: {}, process instance 
id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
-                failoverTaskInstance(processInstance, taskInstance);
-            }
-
-            if (serverStartupTime != null && processInstance.getRestartTime() 
!= null
-                    && 
processInstance.getRestartTime().after(serverStartupTime)) {
-                continue;
-            }
-
-            logger.info("failover process instance id: {}", 
processInstance.getId());
-            //updateProcessInstance host is null and insert into command
-            
processService.processNeedFailoverProcessInstances(processInstance);
-        }
-
-        logger.info("master[{}] failover end, useTime:{}ms", masterHost, 
System.currentTimeMillis() - startTime);
-    }
-
-    /**
-     * failover task instance
-     * <p>
-     * 1. kill yarn job if there are yarn jobs in tasks.
-     * 2. change task state from running to need failover.
-     * 3. try to notify local master
-     */
-    private void failoverTaskInstance(ProcessInstance processInstance, 
TaskInstance taskInstance) {
-        if (taskInstance == null) {
-            logger.error("failover task instance error, taskInstance is null");
-            return;
-        }
-
-        if (processInstance == null) {
-            logger.error("failover task instance error, processInstance {} of 
taskInstance {} is null",
-                    taskInstance.getProcessInstanceId(), taskInstance.getId());
-            return;
-        }
-
-        taskInstance.setProcessInstance(processInstance);
-        TaskExecutionContext taskExecutionContext = 
TaskExecutionContextBuilder.get()
-                .buildTaskInstanceRelatedInfo(taskInstance)
-                .buildProcessInstanceRelatedInfo(processInstance)
-                .create();
-
-        if (masterConfig.isKillYarnJobWhenTaskFailover()) {
-            // only kill yarn job if exists , the local thread has exited
-            ProcessUtils.killYarnJob(taskExecutionContext);
-        }
-
-        taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
-        processService.saveTaskInstance(taskInstance);
-
-        StateEvent stateEvent = new StateEvent();
-        stateEvent.setTaskInstanceId(taskInstance.getId());
-        stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
-        stateEvent.setProcessInstanceId(processInstance.getId());
-        stateEvent.setExecutionStatus(taskInstance.getState());
-        workflowExecuteThreadPool.submitStateEvent(stateEvent);
-    }
-
     /**
      * registry
      */
     public void registry() {
         String address = NetUtils.getAddr(masterConfig.getListenPort());
-        localNodePath = getMasterPath();
+        String localNodePath = getMasterPath();
         int masterHeartbeatInterval = masterConfig.getHeartbeatInterval();
         HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
                 masterConfig.getMaxCpuLoadAvg(),
@@ -509,6 +232,7 @@ public class MasterRegistryClient {
     }
 
     public void handleConnectionState(ConnectionState state) {
+        String localNodePath = getMasterPath();
         switch (state) {
             case CONNECTED:
                 logger.debug("registry connection state is {}", state);
@@ -545,7 +269,7 @@ public class MasterRegistryClient {
     /**
      * get master path
      */
-    public String getMasterPath() {
+    private String getMasterPath() {
         String address = getLocalAddress();
         return REGISTRY_DOLPHINSCHEDULER_MASTERS + "/" + address;
     }
@@ -553,7 +277,7 @@ public class MasterRegistryClient {
     /**
      * get local address
      */
-    public String getLocalAddress() {
+    private String getLocalAddress() {
         return NetUtils.getAddr(masterConfig.getListenPort());
     }
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
index 5dbd6e90df..c717ab04a2 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
@@ -18,18 +18,10 @@
 package org.apache.dolphinscheduler.server.master.runner;
 
 import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.NodeType;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
-import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.registry.RegistryClient;
-
-import org.apache.commons.collections4.CollectionUtils;
-
-import java.util.Iterator;
-import java.util.List;
+import org.apache.dolphinscheduler.server.master.service.FailoverService;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,20 +33,14 @@ public class FailoverExecuteThread extends Thread {
 
     private static final Logger logger = 
LoggerFactory.getLogger(FailoverExecuteThread.class);
 
-    @Autowired
-    private MasterRegistryClient masterRegistryClient;
-
-    @Autowired
-    private RegistryClient registryClient;
-
     @Autowired
     private MasterConfig masterConfig;
 
     /**
-     * process service
+     * failover service
      */
     @Autowired
-    private ProcessService processService;
+    private FailoverService failoverService;
 
     @Override
     public synchronized void start() {
@@ -67,23 +53,7 @@ public class FailoverExecuteThread extends Thread {
         logger.info("failover execute thread started");
         while (Stopper.isRunning()) {
             try {
-                List<String> hosts = getNeedFailoverMasterServers();
-                if (CollectionUtils.isEmpty(hosts)) {
-                    continue;
-                }
-                logger.info("need failover hosts:{}", hosts);
-
-                for (String host : hosts) {
-                    String failoverPath = 
masterRegistryClient.getFailoverLockPath(NodeType.MASTER, host);
-                    try {
-                        registryClient.getLock(failoverPath);
-                        masterRegistryClient.failoverMaster(host);
-                    } catch (Exception e) {
-                        logger.error("{} server failover failed, host:{}", 
NodeType.MASTER, host, e);
-                    } finally {
-                        registryClient.releaseLock(failoverPath);
-                    }
-                }
+                failoverService.checkMasterFailover();
             } catch (Exception e) {
                 logger.error("failover execute error", e);
             } finally {
@@ -91,20 +61,4 @@ public class FailoverExecuteThread extends Thread {
             }
         }
     }
-
-    private List<String> getNeedFailoverMasterServers() {
-        // failover myself && failover dead masters
-        List<String> hosts = 
processService.queryNeedFailoverProcessInstanceHost();
-
-        Iterator<String> iterator = hosts.iterator();
-        while (iterator.hasNext()) {
-            String host = iterator.next();
-            if (registryClient.checkNodeExists(host, NodeType.MASTER)) {
-                if (!host.equals(masterRegistryClient.getLocalAddress())) {
-                    iterator.remove();
-                }
-            }
-        }
-        return hosts;
-    }
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
similarity index 50%
copy from 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
copy to 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
index e567b77464..4f25c78dad 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
@@ -15,30 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.master.registry;
-
-import static 
org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
-import static 
org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_NODE;
-import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
+package org.apache.dolphinscheduler.server.master.service;
 
 import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.IStoppable;
 import org.apache.dolphinscheduler.common.enums.NodeType;
 import org.apache.dolphinscheduler.common.enums.StateEvent;
 import org.apache.dolphinscheduler.common.enums.StateEventType;
 import org.apache.dolphinscheduler.common.model.Server;
-import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.registry.api.ConnectionState;
-import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
 import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import 
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
-import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
 import org.apache.dolphinscheduler.server.utils.ProcessUtils;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
@@ -46,192 +37,60 @@ import 
org.apache.dolphinscheduler.service.registry.RegistryClient;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang.StringUtils;
 
-import java.time.Duration;
-import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import com.google.common.collect.Sets;
-
 /**
- * zookeeper master client
- * <p>
- * single instance
+ * failover service
  */
 @Component
-public class MasterRegistryClient {
-
-    /**
-     * logger
-     */
-    private static final Logger logger = 
LoggerFactory.getLogger(MasterRegistryClient.class);
-
-    /**
-     * process service
-     */
-    @Autowired
-    private ProcessService processService;
-
-    @Autowired
-    private RegistryClient registryClient;
-
-    /**
-     * master config
-     */
-    @Autowired
-    private MasterConfig masterConfig;
-
-    /**
-     * heartbeat executor
-     */
-    private ScheduledExecutorService heartBeatExecutor;
-
-    @Autowired
-    private WorkflowExecuteThreadPool workflowExecuteThreadPool;
-
-    /**
-     * master startup time, ms
-     */
-    private long startupTime;
-
-    private String localNodePath;
-
-    public void init() {
-        this.startupTime = System.currentTimeMillis();
-        this.heartBeatExecutor = 
Executors.newSingleThreadScheduledExecutor(new 
NamedThreadFactory("HeartBeatExecutor"));
-    }
-
-    public void start() {
-        try {
-            // master registry
-            registry();
-
-            registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new 
MasterRegistryDataListener());
-        } catch (Exception e) {
-            logger.error("master start up exception", e);
-            throw new RuntimeException("master start up error", e);
-        }
-    }
-
-    public void setRegistryStoppable(IStoppable stoppable) {
-        registryClient.setStoppable(stoppable);
-    }
-
-    public void closeRegistry() {
-        // TODO unsubscribe MasterRegistryDataListener
-        deregister();
+public class FailoverService {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(FailoverService.class);
+    private final RegistryClient registryClient;
+    private final MasterConfig masterConfig;
+    private final ProcessService processService;
+    private final WorkflowExecuteThreadPool workflowExecuteThreadPool;
+
+    public FailoverService(RegistryClient registryClient, MasterConfig 
masterConfig, ProcessService processService,
+                           WorkflowExecuteThreadPool 
workflowExecuteThreadPool) {
+        this.registryClient = registryClient;
+        this.masterConfig = masterConfig;
+        this.processService = processService;
+        this.workflowExecuteThreadPool = workflowExecuteThreadPool;
     }
 
     /**
-     * remove master node path
-     *
-     * @param path node path
-     * @param nodeType node type
-     * @param failover is failover
+     * check master failover
      */
-    public void removeMasterNodePath(String path, NodeType nodeType, boolean 
failover) {
-        logger.info("{} node deleted : {}", nodeType, path);
-
-        if (StringUtils.isEmpty(path)) {
-            logger.error("server down error: empty path: {}, nodeType:{}", 
path, nodeType);
+    public void checkMasterFailover() {
+        List<String> hosts = getNeedFailoverMasterServers();
+        if (CollectionUtils.isEmpty(hosts)) {
             return;
         }
+        LOGGER.info("need failover hosts:{}", hosts);
 
-        String serverHost = registryClient.getHostByEventDataPath(path);
-        if (StringUtils.isEmpty(serverHost)) {
-            logger.error("server down error: unknown path: {}, nodeType:{}", 
path, nodeType);
-            return;
-        }
-
-        String failoverPath = getFailoverLockPath(nodeType, serverHost);
-        try {
-            registryClient.getLock(failoverPath);
-
-            if (!registryClient.exists(path)) {
-                logger.info("path: {} not exists", path);
-                // handle dead server
-                registryClient.handleDeadServer(Collections.singleton(path), 
nodeType, Constants.ADD_OP);
-            }
-
-            //failover server
-            if (failover) {
-                failoverServerWhenDown(serverHost, nodeType);
-            }
-        } catch (Exception e) {
-            logger.error("{} server failover failed, host:{}", nodeType, 
serverHost, e);
-        } finally {
-            registryClient.releaseLock(failoverPath);
-        }
-    }
-
-    /**
-     * remove worker node path
-     *
-     * @param path     node path
-     * @param nodeType node type
-     * @param failover is failover
-     */
-    public void removeWorkerNodePath(String path, NodeType nodeType, boolean 
failover) {
-        logger.info("{} node deleted : {}", nodeType, path);
-        try {
-            String serverHost = null;
-            if (!StringUtils.isEmpty(path)) {
-                serverHost = registryClient.getHostByEventDataPath(path);
-                if (StringUtils.isEmpty(serverHost)) {
-                    logger.error("server down error: unknown path: {}", path);
-                    return;
-                }
-                if (!registryClient.exists(path)) {
-                    logger.info("path: {} not exists", path);
-                    // handle dead server
-                    
registryClient.handleDeadServer(Collections.singleton(path), nodeType, 
Constants.ADD_OP);
-                }
-            }
-            //failover server
-            if (failover) {
-                failoverServerWhenDown(serverHost, nodeType);
-            }
-        } catch (Exception e) {
-            logger.error("{} server failover failed", nodeType, e);
+        for (String host : hosts) {
+            failoverMasterWithLock(host);
         }
     }
 
-    private boolean isNeedToHandleDeadServer(String host, NodeType nodeType, 
Duration sessionTimeout) {
-        long sessionTimeoutMillis = 
Math.max(Constants.REGISTRY_SESSION_TIMEOUT, sessionTimeout.toMillis());
-        List<Server> serverList = registryClient.getServerList(nodeType);
-        if (CollectionUtils.isEmpty(serverList)) {
-            return true;
-        }
-        Date startupTime = getServerStartupTime(serverList, host);
-        if (startupTime == null) {
-            return true;
-        }
-        if (System.currentTimeMillis() - startupTime.getTime() > 
sessionTimeoutMillis) {
-            return true;
-        }
-        return false;
-    }
-
     /**
      * failover server when server down
      *
      * @param serverHost server host
-     * @param nodeType zookeeper node type
+     * @param nodeType   node type
      */
-    private void failoverServerWhenDown(String serverHost, NodeType nodeType) {
+    public void failoverServerWhenDown(String serverHost, NodeType nodeType) {
         switch (nodeType) {
             case MASTER:
-                failoverMaster(serverHost);
+                failoverMasterWithLock(serverHost);
                 break;
             case WORKER:
                 failoverWorker(serverHost);
@@ -241,94 +100,57 @@ public class MasterRegistryClient {
         }
     }
 
-    /**
-     * get failover lock path
-     *
-     * @param nodeType zookeeper node type
-     * @return fail over lock path
-     */
-    public String getFailoverLockPath(NodeType nodeType, String host) {
-        switch (nodeType) {
-            case MASTER:
-                return 
Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS + "/" + host;
-            case WORKER:
-                return 
Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS + "/" + host;
-            default:
-                return "";
+    private void failoverMasterWithLock(String masterHost) {
+        String failoverPath = getFailoverLockPath(NodeType.MASTER, masterHost);
+        try {
+            registryClient.getLock(failoverPath);
+            this.failoverMaster(masterHost);
+        } catch (Exception e) {
+            LOGGER.error("{} server failover failed, host:{}", 
NodeType.MASTER, masterHost, e);
+        } finally {
+            registryClient.releaseLock(failoverPath);
         }
     }
 
     /**
-     * task needs failover if task start before worker starts
+     * failover master
+     * <p>
+     * failover process instance and associated task instance
      *
-     * @param workerServers worker servers
-     * @param taskInstance task instance
-     * @return true if task instance need fail over
+     * @param masterHost master host
      */
-    private boolean checkTaskInstanceNeedFailover(List<Server> workerServers, 
TaskInstance taskInstance) {
-
-        boolean taskNeedFailover = true;
-
-        //now no host will execute this task instance,so no need to failover 
the task
-        if (taskInstance.getHost() == null) {
-            return false;
-        }
-
-        //if task start after worker starts, there is no need to failover the 
task.
-        if (checkTaskAfterWorkerStart(workerServers, taskInstance)) {
-            taskNeedFailover = false;
+    private void failoverMaster(String masterHost) {
+        if (StringUtils.isEmpty(masterHost)) {
+            return;
         }
+        Date serverStartupTime = getServerStartupTime(NodeType.MASTER, 
masterHost);
+        long startTime = System.currentTimeMillis();
+        List<ProcessInstance> needFailoverProcessInstanceList = 
processService.queryNeedFailoverProcessInstances(masterHost);
+        LOGGER.info("start master[{}] failover, process list size:{}", 
masterHost, needFailoverProcessInstanceList.size());
+        List<Server> workerServers = 
registryClient.getServerList(NodeType.WORKER);
+        for (ProcessInstance processInstance : 
needFailoverProcessInstanceList) {
+            if (Constants.NULL.equals(processInstance.getHost())) {
+                continue;
+            }
 
-        return taskNeedFailover;
-    }
-
-    /**
-     * check task start after the worker server starts.
-     *
-     * @param taskInstance task instance
-     * @return true if task instance start time after worker server start date
-     */
-    private boolean checkTaskAfterWorkerStart(List<Server> workerServers, 
TaskInstance taskInstance) {
-        if (StringUtils.isEmpty(taskInstance.getHost())) {
-            return false;
-        }
-        Date workerServerStartDate = getServerStartupTime(workerServers, 
taskInstance.getHost());
-        if (workerServerStartDate != null) {
-            if (taskInstance.getStartTime() == null) {
-                return 
taskInstance.getSubmitTime().after(workerServerStartDate);
-            } else {
-                return 
taskInstance.getStartTime().after(workerServerStartDate);
+            List<TaskInstance> validTaskInstanceList = 
processService.findValidTaskListByProcessId(processInstance.getId());
+            for (TaskInstance taskInstance : validTaskInstanceList) {
+                LOGGER.info("failover task instance id: {}, process instance 
id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
+                failoverTaskInstance(processInstance, taskInstance, 
workerServers);
             }
-        }
-        return false;
-    }
 
-    /**
-     * get server startup time
-     */
-    private Date getServerStartupTime(List<Server> servers, String host) {
-        if (CollectionUtils.isEmpty(servers)) {
-            return null;
-        }
-        Date serverStartupTime = null;
-        for (Server server : servers) {
-            if (host.equals(server.getHost() + Constants.COLON + 
server.getPort())) {
-                serverStartupTime = server.getCreateTime();
-                break;
+            if (serverStartupTime != null && processInstance.getRestartTime() 
!= null
+                && processInstance.getRestartTime().after(serverStartupTime)) {
+                continue;
             }
-        }
-        return serverStartupTime;
-    }
 
-    /**
-     * get server startup time
-     */
-    private Date getServerStartupTime(NodeType nodeType, String host) {
-        if (StringUtils.isEmpty(host)) {
-            return null;
+            LOGGER.info("failover process instance id: {}", 
processInstance.getId());
+            //updateProcessInstance host is null and insert into command
+            processInstance.setHost(Constants.NULL);
+            
processService.processNeedFailoverProcessInstances(processInstance);
         }
-        List<Server> servers = registryClient.getServerList(nodeType);
-        return getServerStartupTime(servers, host);
+
+        LOGGER.info("master[{}] failover end, useTime:{}ms", masterHost, 
System.currentTimeMillis() - startTime);
     }
 
     /**
@@ -341,96 +163,36 @@ public class MasterRegistryClient {
      * @param workerHost worker host
      */
     private void failoverWorker(String workerHost) {
-
         if (StringUtils.isEmpty(workerHost)) {
             return;
         }
 
-        List<Server> workerServers = 
registryClient.getServerList(NodeType.WORKER);
-
         long startTime = System.currentTimeMillis();
         List<TaskInstance> needFailoverTaskInstanceList = 
processService.queryNeedFailoverTaskInstances(workerHost);
         Map<Integer, ProcessInstance> processInstanceCacheMap = new 
HashMap<>();
-        logger.info("start worker[{}] failover, task list size:{}", 
workerHost, needFailoverTaskInstanceList.size());
-
+        LOGGER.info("start worker[{}] failover, task list size:{}", 
workerHost, needFailoverTaskInstanceList.size());
+        List<Server> workerServers = 
registryClient.getServerList(NodeType.WORKER);
         for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
             ProcessInstance processInstance = 
processInstanceCacheMap.get(taskInstance.getProcessInstanceId());
             if (processInstance == null) {
                 processInstance = 
processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
                 if (processInstance == null) {
-                    logger.error("failover task instance error, 
processInstance {} of taskInstance {} is null",
-                            taskInstance.getProcessInstanceId(), 
taskInstance.getId());
+                    LOGGER.error("failover task instance error, 
processInstance {} of taskInstance {} is null",
+                        taskInstance.getProcessInstanceId(), 
taskInstance.getId());
                     continue;
                 }
                 processInstanceCacheMap.put(processInstance.getId(), 
processInstance);
             }
 
-            if (!checkTaskInstanceNeedFailover(workerServers, taskInstance)) {
-                continue;
-            }
-
             // only failover the task owned myself if worker down.
             if 
(!processInstance.getHost().equalsIgnoreCase(getLocalAddress())) {
                 continue;
             }
 
-            logger.info("failover task instance id: {}, process instance id: 
{}", taskInstance.getId(), taskInstance.getProcessInstanceId());
-            failoverTaskInstance(processInstance, taskInstance);
-        }
-        logger.info("end worker[{}] failover, useTime:{}ms", workerHost, 
System.currentTimeMillis() - startTime);
-    }
-
-    /**
-     * failover master
-     * <p>
-     * failover process instance and associated task instance
-     *
-     * @param masterHost master host
-     */
-    public void failoverMaster(String masterHost) {
-
-        if (StringUtils.isEmpty(masterHost)) {
-            return;
-        }
-
-        Date serverStartupTime = getServerStartupTime(NodeType.MASTER, 
masterHost);
-        List<Server> workerServers = 
registryClient.getServerList(NodeType.WORKER);
-
-        long startTime = System.currentTimeMillis();
-        List<ProcessInstance> needFailoverProcessInstanceList = 
processService.queryNeedFailoverProcessInstances(masterHost);
-        logger.info("start master[{}] failover, process list size:{}", 
masterHost, needFailoverProcessInstanceList.size());
-
-        for (ProcessInstance processInstance : 
needFailoverProcessInstanceList) {
-            if (Constants.NULL.equals(processInstance.getHost())) {
-                continue;
-            }
-
-            List<TaskInstance> validTaskInstanceList = 
processService.findValidTaskListByProcessId(processInstance.getId());
-            for (TaskInstance taskInstance : validTaskInstanceList) {
-                if (Constants.NULL.equals(taskInstance.getHost())) {
-                    continue;
-                }
-                if (taskInstance.getState().typeIsFinished()) {
-                    continue;
-                }
-                if (!checkTaskInstanceNeedFailover(workerServers, 
taskInstance)) {
-                    continue;
-                }
-                logger.info("failover task instance id: {}, process instance 
id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
-                failoverTaskInstance(processInstance, taskInstance);
-            }
-
-            if (serverStartupTime != null && processInstance.getRestartTime() 
!= null
-                    && 
processInstance.getRestartTime().after(serverStartupTime)) {
-                continue;
-            }
-
-            logger.info("failover process instance id: {}", 
processInstance.getId());
-            //updateProcessInstance host is null and insert into command
-            
processService.processNeedFailoverProcessInstances(processInstance);
+            LOGGER.info("failover task instance id: {}, process instance id: 
{}", taskInstance.getId(), taskInstance.getProcessInstanceId());
+            failoverTaskInstance(processInstance, taskInstance, workerServers);
         }
-
-        logger.info("master[{}] failover end, useTime:{}ms", masterHost, 
System.currentTimeMillis() - startTime);
+        LOGGER.info("end worker[{}] failover, useTime:{}ms", workerHost, 
System.currentTimeMillis() - startTime);
     }
 
     /**
@@ -440,23 +202,21 @@ public class MasterRegistryClient {
      * 2. change task state from running to need failover.
      * 3. try to notify local master
      */
-    private void failoverTaskInstance(ProcessInstance processInstance, 
TaskInstance taskInstance) {
-        if (taskInstance == null) {
-            logger.error("failover task instance error, taskInstance is null");
+    private void failoverTaskInstance(ProcessInstance processInstance, 
TaskInstance taskInstance, List<Server> workerServers) {
+        if (processInstance == null) {
+            LOGGER.error("failover task instance error, processInstance {} of 
taskInstance {} is null",
+                taskInstance.getProcessInstanceId(), taskInstance.getId());
             return;
         }
-
-        if (processInstance == null) {
-            logger.error("failover task instance error, processInstance {} of 
taskInstance {} is null",
-                    taskInstance.getProcessInstanceId(), taskInstance.getId());
+        if (!checkTaskInstanceNeedFailover(workerServers, taskInstance)) {
             return;
         }
 
         taskInstance.setProcessInstance(processInstance);
         TaskExecutionContext taskExecutionContext = 
TaskExecutionContextBuilder.get()
-                .buildTaskInstanceRelatedInfo(taskInstance)
-                .buildProcessInstanceRelatedInfo(processInstance)
-                .create();
+            .buildTaskInstanceRelatedInfo(taskInstance)
+            .buildProcessInstanceRelatedInfo(processInstance)
+            .create();
 
         if (masterConfig.isKillYarnJobWhenTaskFailover()) {
             // only kill yarn job if exists , the local thread has exited
@@ -475,85 +235,134 @@ public class MasterRegistryClient {
     }
 
     /**
-     * registry
+     * get need failover master servers
+     *
+     * @return need failover master servers
      */
-    public void registry() {
-        String address = NetUtils.getAddr(masterConfig.getListenPort());
-        localNodePath = getMasterPath();
-        int masterHeartbeatInterval = masterConfig.getHeartbeatInterval();
-        HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
-                masterConfig.getMaxCpuLoadAvg(),
-                masterConfig.getReservedMemory(),
-                Sets.newHashSet(getMasterPath()),
-                Constants.MASTER_TYPE,
-                registryClient);
-
-        // remove before persist
-        registryClient.remove(localNodePath);
-        registryClient.persistEphemeral(localNodePath, 
heartBeatTask.getHeartBeatInfo());
-
-        while (!registryClient.checkNodeExists(NetUtils.getHost(), 
NodeType.MASTER)) {
-            ThreadUtils.sleep(SLEEP_TIME_MILLIS);
+    private List<String> getNeedFailoverMasterServers() {
+        // failover myself && failover dead masters
+        List<String> hosts = 
processService.queryNeedFailoverProcessInstanceHost();
+
+        Iterator<String> iterator = hosts.iterator();
+        while (iterator.hasNext()) {
+            String host = iterator.next();
+            if (registryClient.checkNodeExists(host, NodeType.MASTER)) {
+                if (!host.equals(getLocalAddress())) {
+                    iterator.remove();
+                }
+            }
+        }
+        return hosts;
+    }
+
+    /**
+     * task needs failover if task start before worker starts
+     *
+     * @param workerServers worker servers
+     * @param taskInstance  task instance
+     * @return true if task instance need fail over
+     */
+    private boolean checkTaskInstanceNeedFailover(List<Server> workerServers, 
TaskInstance taskInstance) {
+
+        boolean taskNeedFailover = true;
+
+        if (taskInstance == null) {
+            LOGGER.error("failover task instance error, taskInstance is null");
+            return false;
+        }
+
+        if (Constants.NULL.equals(taskInstance.getHost())) {
+            return false;
         }
 
-        // sleep 1s, waiting master failover remove
-        ThreadUtils.sleep(SLEEP_TIME_MILLIS);
+        if (taskInstance.getState() != null && 
taskInstance.getState().typeIsFinished()) {
+            return false;
+        }
 
-        // delete dead server
-        registryClient.handleDeadServer(Collections.singleton(localNodePath), 
NodeType.MASTER, Constants.DELETE_OP);
 
-        registryClient.addConnectionStateListener(this::handleConnectionState);
-        this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 
masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
-        logger.info("master node : {} registry to ZK successfully with 
heartBeatInterval : {}s", address, masterHeartbeatInterval);
+        //now no host will execute this task instance,so no need to failover 
the task
+        if (taskInstance.getHost() == null) {
+            return false;
+        }
+
+        //if task start after worker starts, there is no need to failover the 
task.
+        if (checkTaskAfterWorkerStart(workerServers, taskInstance)) {
+            taskNeedFailover = false;
+        }
 
+        return taskNeedFailover;
     }
 
-    public void handleConnectionState(ConnectionState state) {
-        switch (state) {
-            case CONNECTED:
-                logger.debug("registry connection state is {}", state);
-                break;
-            case SUSPENDED:
-                logger.warn("registry connection state is {}, ready to retry 
connection", state);
-                break;
-            case RECONNECTED:
-                logger.debug("registry connection state is {}, clean the node 
info", state);
-                registryClient.persistEphemeral(localNodePath, "");
-                break;
-            case DISCONNECTED:
-                logger.warn("registry connection state is {}, ready to stop 
myself", state);
-                registryClient.getStoppable().stop("registry connection state 
is DISCONNECTED, stop myself");
-                break;
+    /**
+     * check task start after the worker server starts.
+     *
+     * @param taskInstance task instance
+     * @return true if task instance start time after worker server start date
+     */
+    private boolean checkTaskAfterWorkerStart(List<Server> workerServers, 
TaskInstance taskInstance) {
+        if (StringUtils.isEmpty(taskInstance.getHost())) {
+            return false;
+        }
+        Date workerServerStartDate = getServerStartupTime(workerServers, 
taskInstance.getHost());
+        if (workerServerStartDate != null) {
+            if (taskInstance.getStartTime() == null) {
+                return 
taskInstance.getSubmitTime().after(workerServerStartDate);
+            } else {
+                return 
taskInstance.getStartTime().after(workerServerStartDate);
+            }
+        }
+        return false;
+    }
+
+    /**
+     * get failover lock path
+     *
+     * @param nodeType zookeeper node type
+     * @return fail over lock path
+     */
+    private String getFailoverLockPath(NodeType nodeType, String host) {
+        switch (nodeType) {
+            case MASTER:
+                return 
Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS + "/" + host;
+            case WORKER:
+                return 
Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS + "/" + host;
             default:
+                return "";
         }
     }
 
-    public void deregister() {
-        try {
-            String address = getLocalAddress();
-            String localNodePath = getMasterPath();
-            registryClient.remove(localNodePath);
-            logger.info("master node : {} unRegistry to register center.", 
address);
-            heartBeatExecutor.shutdown();
-            logger.info("heartbeat executor shutdown");
-            registryClient.close();
-        } catch (Exception e) {
-            logger.error("remove registry path exception ", e);
+    /**
+     * get server startup time
+     */
+    private Date getServerStartupTime(NodeType nodeType, String host) {
+        if (StringUtils.isEmpty(host)) {
+            return null;
         }
+        List<Server> servers = registryClient.getServerList(nodeType);
+        return getServerStartupTime(servers, host);
     }
 
     /**
-     * get master path
+     * get server startup time
      */
-    public String getMasterPath() {
-        String address = getLocalAddress();
-        return REGISTRY_DOLPHINSCHEDULER_MASTERS + "/" + address;
+    private Date getServerStartupTime(List<Server> servers, String host) {
+        if (CollectionUtils.isEmpty(servers)) {
+            return null;
+        }
+        Date serverStartupTime = null;
+        for (Server server : servers) {
+            if (host.equals(server.getHost() + Constants.COLON + 
server.getPort())) {
+                serverStartupTime = server.getCreateTime();
+                break;
+            }
+        }
+        return serverStartupTime;
     }
 
     /**
      * get local address
      */
-    public String getLocalAddress() {
+    String getLocalAddress() {
         return NetUtils.getAddr(masterConfig.getListenPort());
     }
 
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
new file mode 100644
index 0000000000..17a3798090
--- /dev/null
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.service;
+
+import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.doNothing;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.NodeType;
+import org.apache.dolphinscheduler.common.enums.StateEvent;
+import org.apache.dolphinscheduler.common.model.Server;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import 
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.registry.RegistryClient;
+
+import java.util.Arrays;
+import java.util.Date;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.springframework.test.util.ReflectionTestUtils;
+
+import com.google.common.collect.Lists;
+
+/**
+ * MasterRegistryClientTest
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({RegistryClient.class})
+@PowerMockIgnore({"javax.management.*"})
+public class FailoverServiceTest {
+    @InjectMocks
+    private FailoverService failoverService;
+
+    @Mock
+    private MasterConfig masterConfig;
+
+    @Mock
+    private RegistryClient registryClient;
+
+    @Mock
+    private ProcessService processService;
+
+    @Mock
+    private WorkflowExecuteThreadPool workflowExecuteThreadPool;
+
+    private String testHost;
+    private ProcessInstance processInstance;
+    private TaskInstance taskInstance;
+
+    @Before
+    public void before() throws Exception {
+        given(masterConfig.getListenPort()).willReturn(8080);
+
+        testHost = failoverService.getLocalAddress();
+        String ip = testHost.split(":")[0];
+        int port = Integer.valueOf(testHost.split(":")[1]);
+        Assert.assertEquals(8080, port);
+
+        given(registryClient.getLock(Mockito.anyString())).willReturn(true);
+        
given(registryClient.releaseLock(Mockito.anyString())).willReturn(true);
+        
given(registryClient.getHostByEventDataPath(Mockito.anyString())).willReturn(testHost);
+        given(registryClient.getStoppable()).willReturn(cause -> {
+        });
+        given(registryClient.checkNodeExists(Mockito.anyString(), 
Mockito.any())).willReturn(true);
+        doNothing().when(registryClient).handleDeadServer(Mockito.anySet(), 
Mockito.any(NodeType.class), Mockito.anyString());
+
+        processInstance = new ProcessInstance();
+        processInstance.setId(1);
+        processInstance.setHost(testHost);
+        processInstance.setRestartTime(new Date());
+        processInstance.setHistoryCmd("xxx");
+        processInstance.setCommandType(CommandType.STOP);
+
+        taskInstance = new TaskInstance();
+        taskInstance.setId(1);
+        taskInstance.setStartTime(new Date());
+        taskInstance.setHost(testHost);
+
+        
given(processService.queryNeedFailoverTaskInstances(Mockito.anyString())).willReturn(Arrays.asList(taskInstance));
+        
given(processService.queryNeedFailoverProcessInstanceHost()).willReturn(Lists.newArrayList(testHost));
+        
given(processService.queryNeedFailoverProcessInstances(Mockito.anyString())).willReturn(Arrays.asList(processInstance));
+        
doNothing().when(processService).processNeedFailoverProcessInstances(Mockito.any(ProcessInstance.class));
+        
given(processService.findValidTaskListByProcessId(Mockito.anyInt())).willReturn(Lists.newArrayList(taskInstance));
+        
given(processService.findProcessInstanceDetailById(Mockito.anyInt())).willReturn(processInstance);
+
+        Thread.sleep(1000);
+        Server server = new Server();
+        server.setHost(ip);
+        server.setPort(port);
+        server.setCreateTime(new Date());
+        
given(registryClient.getServerList(NodeType.WORKER)).willReturn(Arrays.asList(server));
+        
given(registryClient.getServerList(NodeType.MASTER)).willReturn(Arrays.asList(server));
+        ReflectionTestUtils.setField(failoverService, "registryClient", 
registryClient);
+
+        
doNothing().when(workflowExecuteThreadPool).submitStateEvent(Mockito.any(StateEvent.class));
+    }
+
+    @Test
+    public void checkMasterFailoverTest() {
+        failoverService.checkMasterFailover();
+    }
+
+    @Test
+    public void failoverMasterTest() {
+        processInstance.setHost(Constants.NULL);
+        taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
+        failoverService.failoverServerWhenDown(testHost, NodeType.MASTER);
+        Assert.assertNotEquals(taskInstance.getState(), 
ExecutionStatus.NEED_FAULT_TOLERANCE);
+
+        processInstance.setHost(testHost);
+        taskInstance.setState(ExecutionStatus.SUCCESS);
+        failoverService.failoverServerWhenDown(testHost, NodeType.MASTER);
+        Assert.assertNotEquals(taskInstance.getState(), 
ExecutionStatus.NEED_FAULT_TOLERANCE);
+        Assert.assertEquals(Constants.NULL, processInstance.getHost());
+
+        processInstance.setHost(testHost);
+        taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
+        failoverService.failoverServerWhenDown(testHost, NodeType.MASTER);
+        Assert.assertEquals(taskInstance.getState(), 
ExecutionStatus.NEED_FAULT_TOLERANCE);
+        Assert.assertEquals(Constants.NULL, processInstance.getHost());
+    }
+
+    @Test
+    public void failoverWorkTest() {
+        failoverService.failoverServerWhenDown(testHost, NodeType.WORKER);
+        Assert.assertEquals(taskInstance.getState(), 
ExecutionStatus.NEED_FAULT_TOLERANCE);
+    }
+}

Reply via email to