This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 69923546a1 [DS-9263][Improvement][master]optimize failover (#9281)
69923546a1 is described below
commit 69923546a1586ceed97a2d013be6f5a8ee977529
Author: worry <[email protected]>
AuthorDate: Tue Apr 12 11:53:18 2022 +0800
[DS-9263][Improvement][master]optimize failover (#9281)
- add FailoverService.java
- move failover method from MasterRegistryClient to FailoverService
- move failover code from FailoverExecuteThread to FailoverService
This closes #9263
---
.../master/registry/MasterRegistryClient.java | 294 +----------
.../master/runner/FailoverExecuteThread.java | 54 +-
.../FailoverService.java} | 561 +++++++--------------
.../server/master/service/FailoverServiceTest.java | 157 ++++++
4 files changed, 355 insertions(+), 711 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index e567b77464..c79f9b7071 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -38,6 +38,7 @@ import
org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
+import org.apache.dolphinscheduler.server.master.service.FailoverService;
import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -77,10 +78,10 @@ public class MasterRegistryClient {
private static final Logger logger =
LoggerFactory.getLogger(MasterRegistryClient.class);
/**
- * process service
+ * failover service
*/
@Autowired
- private ProcessService processService;
+ private FailoverService failoverService;
@Autowired
private RegistryClient registryClient;
@@ -96,16 +97,11 @@ public class MasterRegistryClient {
*/
private ScheduledExecutorService heartBeatExecutor;
- @Autowired
- private WorkflowExecuteThreadPool workflowExecuteThreadPool;
-
/**
* master startup time, ms
*/
private long startupTime;
- private String localNodePath;
-
public void init() {
this.startupTime = System.currentTimeMillis();
this.heartBeatExecutor =
Executors.newSingleThreadScheduledExecutor(new
NamedThreadFactory("HeartBeatExecutor"));
@@ -153,10 +149,7 @@ public class MasterRegistryClient {
return;
}
- String failoverPath = getFailoverLockPath(nodeType, serverHost);
try {
- registryClient.getLock(failoverPath);
-
if (!registryClient.exists(path)) {
logger.info("path: {} not exists", path);
// handle dead server
@@ -165,12 +158,10 @@ public class MasterRegistryClient {
//failover server
if (failover) {
- failoverServerWhenDown(serverHost, nodeType);
+ failoverService.failoverServerWhenDown(serverHost, nodeType);
}
} catch (Exception e) {
logger.error("{} server failover failed, host:{}", nodeType,
serverHost, e);
- } finally {
- registryClient.releaseLock(failoverPath);
}
}
@@ -199,287 +190,19 @@ public class MasterRegistryClient {
}
//failover server
if (failover) {
- failoverServerWhenDown(serverHost, nodeType);
+ failoverService.failoverServerWhenDown(serverHost, nodeType);
}
} catch (Exception e) {
logger.error("{} server failover failed", nodeType, e);
}
}
- private boolean isNeedToHandleDeadServer(String host, NodeType nodeType,
Duration sessionTimeout) {
- long sessionTimeoutMillis =
Math.max(Constants.REGISTRY_SESSION_TIMEOUT, sessionTimeout.toMillis());
- List<Server> serverList = registryClient.getServerList(nodeType);
- if (CollectionUtils.isEmpty(serverList)) {
- return true;
- }
- Date startupTime = getServerStartupTime(serverList, host);
- if (startupTime == null) {
- return true;
- }
- if (System.currentTimeMillis() - startupTime.getTime() >
sessionTimeoutMillis) {
- return true;
- }
- return false;
- }
-
- /**
- * failover server when server down
- *
- * @param serverHost server host
- * @param nodeType zookeeper node type
- */
- private void failoverServerWhenDown(String serverHost, NodeType nodeType) {
- switch (nodeType) {
- case MASTER:
- failoverMaster(serverHost);
- break;
- case WORKER:
- failoverWorker(serverHost);
- break;
- default:
- break;
- }
- }
-
- /**
- * get failover lock path
- *
- * @param nodeType zookeeper node type
- * @return fail over lock path
- */
- public String getFailoverLockPath(NodeType nodeType, String host) {
- switch (nodeType) {
- case MASTER:
- return
Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS + "/" + host;
- case WORKER:
- return
Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS + "/" + host;
- default:
- return "";
- }
- }
-
- /**
- * task needs failover if task start before worker starts
- *
- * @param workerServers worker servers
- * @param taskInstance task instance
- * @return true if task instance need fail over
- */
- private boolean checkTaskInstanceNeedFailover(List<Server> workerServers,
TaskInstance taskInstance) {
-
- boolean taskNeedFailover = true;
-
- //now no host will execute this task instance,so no need to failover
the task
- if (taskInstance.getHost() == null) {
- return false;
- }
-
- //if task start after worker starts, there is no need to failover the
task.
- if (checkTaskAfterWorkerStart(workerServers, taskInstance)) {
- taskNeedFailover = false;
- }
-
- return taskNeedFailover;
- }
-
- /**
- * check task start after the worker server starts.
- *
- * @param taskInstance task instance
- * @return true if task instance start time after worker server start date
- */
- private boolean checkTaskAfterWorkerStart(List<Server> workerServers,
TaskInstance taskInstance) {
- if (StringUtils.isEmpty(taskInstance.getHost())) {
- return false;
- }
- Date workerServerStartDate = getServerStartupTime(workerServers,
taskInstance.getHost());
- if (workerServerStartDate != null) {
- if (taskInstance.getStartTime() == null) {
- return
taskInstance.getSubmitTime().after(workerServerStartDate);
- } else {
- return
taskInstance.getStartTime().after(workerServerStartDate);
- }
- }
- return false;
- }
-
- /**
- * get server startup time
- */
- private Date getServerStartupTime(List<Server> servers, String host) {
- if (CollectionUtils.isEmpty(servers)) {
- return null;
- }
- Date serverStartupTime = null;
- for (Server server : servers) {
- if (host.equals(server.getHost() + Constants.COLON +
server.getPort())) {
- serverStartupTime = server.getCreateTime();
- break;
- }
- }
- return serverStartupTime;
- }
-
- /**
- * get server startup time
- */
- private Date getServerStartupTime(NodeType nodeType, String host) {
- if (StringUtils.isEmpty(host)) {
- return null;
- }
- List<Server> servers = registryClient.getServerList(nodeType);
- return getServerStartupTime(servers, host);
- }
-
- /**
- * failover worker tasks
- * <p>
- * 1. kill yarn job if there are yarn jobs in tasks.
- * 2. change task state from running to need failover.
- * 3. failover all tasks when workerHost is null
- *
- * @param workerHost worker host
- */
- private void failoverWorker(String workerHost) {
-
- if (StringUtils.isEmpty(workerHost)) {
- return;
- }
-
- List<Server> workerServers =
registryClient.getServerList(NodeType.WORKER);
-
- long startTime = System.currentTimeMillis();
- List<TaskInstance> needFailoverTaskInstanceList =
processService.queryNeedFailoverTaskInstances(workerHost);
- Map<Integer, ProcessInstance> processInstanceCacheMap = new
HashMap<>();
- logger.info("start worker[{}] failover, task list size:{}",
workerHost, needFailoverTaskInstanceList.size());
-
- for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
- ProcessInstance processInstance =
processInstanceCacheMap.get(taskInstance.getProcessInstanceId());
- if (processInstance == null) {
- processInstance =
processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
- if (processInstance == null) {
- logger.error("failover task instance error,
processInstance {} of taskInstance {} is null",
- taskInstance.getProcessInstanceId(),
taskInstance.getId());
- continue;
- }
- processInstanceCacheMap.put(processInstance.getId(),
processInstance);
- }
-
- if (!checkTaskInstanceNeedFailover(workerServers, taskInstance)) {
- continue;
- }
-
- // only failover the task owned myself if worker down.
- if
(!processInstance.getHost().equalsIgnoreCase(getLocalAddress())) {
- continue;
- }
-
- logger.info("failover task instance id: {}, process instance id:
{}", taskInstance.getId(), taskInstance.getProcessInstanceId());
- failoverTaskInstance(processInstance, taskInstance);
- }
- logger.info("end worker[{}] failover, useTime:{}ms", workerHost,
System.currentTimeMillis() - startTime);
- }
-
- /**
- * failover master
- * <p>
- * failover process instance and associated task instance
- *
- * @param masterHost master host
- */
- public void failoverMaster(String masterHost) {
-
- if (StringUtils.isEmpty(masterHost)) {
- return;
- }
-
- Date serverStartupTime = getServerStartupTime(NodeType.MASTER,
masterHost);
- List<Server> workerServers =
registryClient.getServerList(NodeType.WORKER);
-
- long startTime = System.currentTimeMillis();
- List<ProcessInstance> needFailoverProcessInstanceList =
processService.queryNeedFailoverProcessInstances(masterHost);
- logger.info("start master[{}] failover, process list size:{}",
masterHost, needFailoverProcessInstanceList.size());
-
- for (ProcessInstance processInstance :
needFailoverProcessInstanceList) {
- if (Constants.NULL.equals(processInstance.getHost())) {
- continue;
- }
-
- List<TaskInstance> validTaskInstanceList =
processService.findValidTaskListByProcessId(processInstance.getId());
- for (TaskInstance taskInstance : validTaskInstanceList) {
- if (Constants.NULL.equals(taskInstance.getHost())) {
- continue;
- }
- if (taskInstance.getState().typeIsFinished()) {
- continue;
- }
- if (!checkTaskInstanceNeedFailover(workerServers,
taskInstance)) {
- continue;
- }
- logger.info("failover task instance id: {}, process instance
id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
- failoverTaskInstance(processInstance, taskInstance);
- }
-
- if (serverStartupTime != null && processInstance.getRestartTime()
!= null
- &&
processInstance.getRestartTime().after(serverStartupTime)) {
- continue;
- }
-
- logger.info("failover process instance id: {}",
processInstance.getId());
- //updateProcessInstance host is null and insert into command
-
processService.processNeedFailoverProcessInstances(processInstance);
- }
-
- logger.info("master[{}] failover end, useTime:{}ms", masterHost,
System.currentTimeMillis() - startTime);
- }
-
- /**
- * failover task instance
- * <p>
- * 1. kill yarn job if there are yarn jobs in tasks.
- * 2. change task state from running to need failover.
- * 3. try to notify local master
- */
- private void failoverTaskInstance(ProcessInstance processInstance,
TaskInstance taskInstance) {
- if (taskInstance == null) {
- logger.error("failover task instance error, taskInstance is null");
- return;
- }
-
- if (processInstance == null) {
- logger.error("failover task instance error, processInstance {} of
taskInstance {} is null",
- taskInstance.getProcessInstanceId(), taskInstance.getId());
- return;
- }
-
- taskInstance.setProcessInstance(processInstance);
- TaskExecutionContext taskExecutionContext =
TaskExecutionContextBuilder.get()
- .buildTaskInstanceRelatedInfo(taskInstance)
- .buildProcessInstanceRelatedInfo(processInstance)
- .create();
-
- if (masterConfig.isKillYarnJobWhenTaskFailover()) {
- // only kill yarn job if exists , the local thread has exited
- ProcessUtils.killYarnJob(taskExecutionContext);
- }
-
- taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
- processService.saveTaskInstance(taskInstance);
-
- StateEvent stateEvent = new StateEvent();
- stateEvent.setTaskInstanceId(taskInstance.getId());
- stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
- stateEvent.setProcessInstanceId(processInstance.getId());
- stateEvent.setExecutionStatus(taskInstance.getState());
- workflowExecuteThreadPool.submitStateEvent(stateEvent);
- }
-
/**
* registry
*/
public void registry() {
String address = NetUtils.getAddr(masterConfig.getListenPort());
- localNodePath = getMasterPath();
+ String localNodePath = getMasterPath();
int masterHeartbeatInterval = masterConfig.getHeartbeatInterval();
HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
masterConfig.getMaxCpuLoadAvg(),
@@ -509,6 +232,7 @@ public class MasterRegistryClient {
}
public void handleConnectionState(ConnectionState state) {
+ String localNodePath = getMasterPath();
switch (state) {
case CONNECTED:
logger.debug("registry connection state is {}", state);
@@ -545,7 +269,7 @@ public class MasterRegistryClient {
/**
* get master path
*/
- public String getMasterPath() {
+ private String getMasterPath() {
String address = getLocalAddress();
return REGISTRY_DOLPHINSCHEDULER_MASTERS + "/" + address;
}
@@ -553,7 +277,7 @@ public class MasterRegistryClient {
/**
* get local address
*/
- public String getLocalAddress() {
+ private String getLocalAddress() {
return NetUtils.getAddr(masterConfig.getListenPort());
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
index 5dbd6e90df..c717ab04a2 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
@@ -18,18 +18,10 @@
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
-import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.registry.RegistryClient;
-
-import org.apache.commons.collections4.CollectionUtils;
-
-import java.util.Iterator;
-import java.util.List;
+import org.apache.dolphinscheduler.server.master.service.FailoverService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,20 +33,14 @@ public class FailoverExecuteThread extends Thread {
private static final Logger logger =
LoggerFactory.getLogger(FailoverExecuteThread.class);
- @Autowired
- private MasterRegistryClient masterRegistryClient;
-
- @Autowired
- private RegistryClient registryClient;
-
@Autowired
private MasterConfig masterConfig;
/**
- * process service
+ * failover service
*/
@Autowired
- private ProcessService processService;
+ private FailoverService failoverService;
@Override
public synchronized void start() {
@@ -67,23 +53,7 @@ public class FailoverExecuteThread extends Thread {
logger.info("failover execute thread started");
while (Stopper.isRunning()) {
try {
- List<String> hosts = getNeedFailoverMasterServers();
- if (CollectionUtils.isEmpty(hosts)) {
- continue;
- }
- logger.info("need failover hosts:{}", hosts);
-
- for (String host : hosts) {
- String failoverPath =
masterRegistryClient.getFailoverLockPath(NodeType.MASTER, host);
- try {
- registryClient.getLock(failoverPath);
- masterRegistryClient.failoverMaster(host);
- } catch (Exception e) {
- logger.error("{} server failover failed, host:{}",
NodeType.MASTER, host, e);
- } finally {
- registryClient.releaseLock(failoverPath);
- }
- }
+ failoverService.checkMasterFailover();
} catch (Exception e) {
logger.error("failover execute error", e);
} finally {
@@ -91,20 +61,4 @@ public class FailoverExecuteThread extends Thread {
}
}
}
-
- private List<String> getNeedFailoverMasterServers() {
- // failover myself && failover dead masters
- List<String> hosts =
processService.queryNeedFailoverProcessInstanceHost();
-
- Iterator<String> iterator = hosts.iterator();
- while (iterator.hasNext()) {
- String host = iterator.next();
- if (registryClient.checkNodeExists(host, NodeType.MASTER)) {
- if (!host.equals(masterRegistryClient.getLocalAddress())) {
- iterator.remove();
- }
- }
- }
- return hosts;
- }
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
similarity index 50%
copy from
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
copy to
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
index e567b77464..4f25c78dad 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
@@ -15,30 +15,21 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master.registry;
-
-import static
org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
-import static
org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_NODE;
-import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
+package org.apache.dolphinscheduler.server.master.service;
import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.model.Server;
-import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.registry.api.ConnectionState;
-import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
-import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
@@ -46,192 +37,60 @@ import
org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang.StringUtils;
-import java.time.Duration;
-import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import com.google.common.collect.Sets;
-
/**
- * zookeeper master client
- * <p>
- * single instance
+ * failover service
*/
@Component
-public class MasterRegistryClient {
-
- /**
- * logger
- */
- private static final Logger logger =
LoggerFactory.getLogger(MasterRegistryClient.class);
-
- /**
- * process service
- */
- @Autowired
- private ProcessService processService;
-
- @Autowired
- private RegistryClient registryClient;
-
- /**
- * master config
- */
- @Autowired
- private MasterConfig masterConfig;
-
- /**
- * heartbeat executor
- */
- private ScheduledExecutorService heartBeatExecutor;
-
- @Autowired
- private WorkflowExecuteThreadPool workflowExecuteThreadPool;
-
- /**
- * master startup time, ms
- */
- private long startupTime;
-
- private String localNodePath;
-
- public void init() {
- this.startupTime = System.currentTimeMillis();
- this.heartBeatExecutor =
Executors.newSingleThreadScheduledExecutor(new
NamedThreadFactory("HeartBeatExecutor"));
- }
-
- public void start() {
- try {
- // master registry
- registry();
-
- registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new
MasterRegistryDataListener());
- } catch (Exception e) {
- logger.error("master start up exception", e);
- throw new RuntimeException("master start up error", e);
- }
- }
-
- public void setRegistryStoppable(IStoppable stoppable) {
- registryClient.setStoppable(stoppable);
- }
-
- public void closeRegistry() {
- // TODO unsubscribe MasterRegistryDataListener
- deregister();
+public class FailoverService {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(FailoverService.class);
+ private final RegistryClient registryClient;
+ private final MasterConfig masterConfig;
+ private final ProcessService processService;
+ private final WorkflowExecuteThreadPool workflowExecuteThreadPool;
+
+ public FailoverService(RegistryClient registryClient, MasterConfig
masterConfig, ProcessService processService,
+ WorkflowExecuteThreadPool
workflowExecuteThreadPool) {
+ this.registryClient = registryClient;
+ this.masterConfig = masterConfig;
+ this.processService = processService;
+ this.workflowExecuteThreadPool = workflowExecuteThreadPool;
}
/**
- * remove master node path
- *
- * @param path node path
- * @param nodeType node type
- * @param failover is failover
+ * check master failover
*/
- public void removeMasterNodePath(String path, NodeType nodeType, boolean
failover) {
- logger.info("{} node deleted : {}", nodeType, path);
-
- if (StringUtils.isEmpty(path)) {
- logger.error("server down error: empty path: {}, nodeType:{}",
path, nodeType);
+ public void checkMasterFailover() {
+ List<String> hosts = getNeedFailoverMasterServers();
+ if (CollectionUtils.isEmpty(hosts)) {
return;
}
+ LOGGER.info("need failover hosts:{}", hosts);
- String serverHost = registryClient.getHostByEventDataPath(path);
- if (StringUtils.isEmpty(serverHost)) {
- logger.error("server down error: unknown path: {}, nodeType:{}",
path, nodeType);
- return;
- }
-
- String failoverPath = getFailoverLockPath(nodeType, serverHost);
- try {
- registryClient.getLock(failoverPath);
-
- if (!registryClient.exists(path)) {
- logger.info("path: {} not exists", path);
- // handle dead server
- registryClient.handleDeadServer(Collections.singleton(path),
nodeType, Constants.ADD_OP);
- }
-
- //failover server
- if (failover) {
- failoverServerWhenDown(serverHost, nodeType);
- }
- } catch (Exception e) {
- logger.error("{} server failover failed, host:{}", nodeType,
serverHost, e);
- } finally {
- registryClient.releaseLock(failoverPath);
- }
- }
-
- /**
- * remove worker node path
- *
- * @param path node path
- * @param nodeType node type
- * @param failover is failover
- */
- public void removeWorkerNodePath(String path, NodeType nodeType, boolean
failover) {
- logger.info("{} node deleted : {}", nodeType, path);
- try {
- String serverHost = null;
- if (!StringUtils.isEmpty(path)) {
- serverHost = registryClient.getHostByEventDataPath(path);
- if (StringUtils.isEmpty(serverHost)) {
- logger.error("server down error: unknown path: {}", path);
- return;
- }
- if (!registryClient.exists(path)) {
- logger.info("path: {} not exists", path);
- // handle dead server
-
registryClient.handleDeadServer(Collections.singleton(path), nodeType,
Constants.ADD_OP);
- }
- }
- //failover server
- if (failover) {
- failoverServerWhenDown(serverHost, nodeType);
- }
- } catch (Exception e) {
- logger.error("{} server failover failed", nodeType, e);
+ for (String host : hosts) {
+ failoverMasterWithLock(host);
}
}
- private boolean isNeedToHandleDeadServer(String host, NodeType nodeType,
Duration sessionTimeout) {
- long sessionTimeoutMillis =
Math.max(Constants.REGISTRY_SESSION_TIMEOUT, sessionTimeout.toMillis());
- List<Server> serverList = registryClient.getServerList(nodeType);
- if (CollectionUtils.isEmpty(serverList)) {
- return true;
- }
- Date startupTime = getServerStartupTime(serverList, host);
- if (startupTime == null) {
- return true;
- }
- if (System.currentTimeMillis() - startupTime.getTime() >
sessionTimeoutMillis) {
- return true;
- }
- return false;
- }
-
/**
* failover server when server down
*
* @param serverHost server host
- * @param nodeType zookeeper node type
+ * @param nodeType node type
*/
- private void failoverServerWhenDown(String serverHost, NodeType nodeType) {
+ public void failoverServerWhenDown(String serverHost, NodeType nodeType) {
switch (nodeType) {
case MASTER:
- failoverMaster(serverHost);
+ failoverMasterWithLock(serverHost);
break;
case WORKER:
failoverWorker(serverHost);
@@ -241,94 +100,57 @@ public class MasterRegistryClient {
}
}
- /**
- * get failover lock path
- *
- * @param nodeType zookeeper node type
- * @return fail over lock path
- */
- public String getFailoverLockPath(NodeType nodeType, String host) {
- switch (nodeType) {
- case MASTER:
- return
Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS + "/" + host;
- case WORKER:
- return
Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS + "/" + host;
- default:
- return "";
+ private void failoverMasterWithLock(String masterHost) {
+ String failoverPath = getFailoverLockPath(NodeType.MASTER, masterHost);
+ try {
+ registryClient.getLock(failoverPath);
+ this.failoverMaster(masterHost);
+ } catch (Exception e) {
+ LOGGER.error("{} server failover failed, host:{}",
NodeType.MASTER, masterHost, e);
+ } finally {
+ registryClient.releaseLock(failoverPath);
}
}
/**
- * task needs failover if task start before worker starts
+ * failover master
+ * <p>
+ * failover process instance and associated task instance
*
- * @param workerServers worker servers
- * @param taskInstance task instance
- * @return true if task instance need fail over
+ * @param masterHost master host
*/
- private boolean checkTaskInstanceNeedFailover(List<Server> workerServers,
TaskInstance taskInstance) {
-
- boolean taskNeedFailover = true;
-
- //now no host will execute this task instance,so no need to failover
the task
- if (taskInstance.getHost() == null) {
- return false;
- }
-
- //if task start after worker starts, there is no need to failover the
task.
- if (checkTaskAfterWorkerStart(workerServers, taskInstance)) {
- taskNeedFailover = false;
+ private void failoverMaster(String masterHost) {
+ if (StringUtils.isEmpty(masterHost)) {
+ return;
}
+ Date serverStartupTime = getServerStartupTime(NodeType.MASTER,
masterHost);
+ long startTime = System.currentTimeMillis();
+ List<ProcessInstance> needFailoverProcessInstanceList =
processService.queryNeedFailoverProcessInstances(masterHost);
+ LOGGER.info("start master[{}] failover, process list size:{}",
masterHost, needFailoverProcessInstanceList.size());
+ List<Server> workerServers =
registryClient.getServerList(NodeType.WORKER);
+ for (ProcessInstance processInstance :
needFailoverProcessInstanceList) {
+ if (Constants.NULL.equals(processInstance.getHost())) {
+ continue;
+ }
- return taskNeedFailover;
- }
-
- /**
- * check task start after the worker server starts.
- *
- * @param taskInstance task instance
- * @return true if task instance start time after worker server start date
- */
- private boolean checkTaskAfterWorkerStart(List<Server> workerServers,
TaskInstance taskInstance) {
- if (StringUtils.isEmpty(taskInstance.getHost())) {
- return false;
- }
- Date workerServerStartDate = getServerStartupTime(workerServers,
taskInstance.getHost());
- if (workerServerStartDate != null) {
- if (taskInstance.getStartTime() == null) {
- return
taskInstance.getSubmitTime().after(workerServerStartDate);
- } else {
- return
taskInstance.getStartTime().after(workerServerStartDate);
+ List<TaskInstance> validTaskInstanceList =
processService.findValidTaskListByProcessId(processInstance.getId());
+ for (TaskInstance taskInstance : validTaskInstanceList) {
+ LOGGER.info("failover task instance id: {}, process instance
id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
+ failoverTaskInstance(processInstance, taskInstance,
workerServers);
}
- }
- return false;
- }
- /**
- * get server startup time
- */
- private Date getServerStartupTime(List<Server> servers, String host) {
- if (CollectionUtils.isEmpty(servers)) {
- return null;
- }
- Date serverStartupTime = null;
- for (Server server : servers) {
- if (host.equals(server.getHost() + Constants.COLON +
server.getPort())) {
- serverStartupTime = server.getCreateTime();
- break;
+ if (serverStartupTime != null && processInstance.getRestartTime()
!= null
+ && processInstance.getRestartTime().after(serverStartupTime)) {
+ continue;
}
- }
- return serverStartupTime;
- }
- /**
- * get server startup time
- */
- private Date getServerStartupTime(NodeType nodeType, String host) {
- if (StringUtils.isEmpty(host)) {
- return null;
+ LOGGER.info("failover process instance id: {}",
processInstance.getId());
+ //updateProcessInstance host is null and insert into command
+ processInstance.setHost(Constants.NULL);
+
processService.processNeedFailoverProcessInstances(processInstance);
}
- List<Server> servers = registryClient.getServerList(nodeType);
- return getServerStartupTime(servers, host);
+
+ LOGGER.info("master[{}] failover end, useTime:{}ms", masterHost,
System.currentTimeMillis() - startTime);
}
/**
@@ -341,96 +163,36 @@ public class MasterRegistryClient {
* @param workerHost worker host
*/
private void failoverWorker(String workerHost) {
-
if (StringUtils.isEmpty(workerHost)) {
return;
}
- List<Server> workerServers =
registryClient.getServerList(NodeType.WORKER);
-
long startTime = System.currentTimeMillis();
List<TaskInstance> needFailoverTaskInstanceList =
processService.queryNeedFailoverTaskInstances(workerHost);
Map<Integer, ProcessInstance> processInstanceCacheMap = new
HashMap<>();
- logger.info("start worker[{}] failover, task list size:{}",
workerHost, needFailoverTaskInstanceList.size());
-
+ LOGGER.info("start worker[{}] failover, task list size:{}",
workerHost, needFailoverTaskInstanceList.size());
+ List<Server> workerServers =
registryClient.getServerList(NodeType.WORKER);
for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
ProcessInstance processInstance =
processInstanceCacheMap.get(taskInstance.getProcessInstanceId());
if (processInstance == null) {
processInstance =
processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
if (processInstance == null) {
- logger.error("failover task instance error,
processInstance {} of taskInstance {} is null",
- taskInstance.getProcessInstanceId(),
taskInstance.getId());
+ LOGGER.error("failover task instance error,
processInstance {} of taskInstance {} is null",
+ taskInstance.getProcessInstanceId(),
taskInstance.getId());
continue;
}
processInstanceCacheMap.put(processInstance.getId(),
processInstance);
}
- if (!checkTaskInstanceNeedFailover(workerServers, taskInstance)) {
- continue;
- }
-
// only failover the task owned myself if worker down.
if
(!processInstance.getHost().equalsIgnoreCase(getLocalAddress())) {
continue;
}
- logger.info("failover task instance id: {}, process instance id:
{}", taskInstance.getId(), taskInstance.getProcessInstanceId());
- failoverTaskInstance(processInstance, taskInstance);
- }
- logger.info("end worker[{}] failover, useTime:{}ms", workerHost,
System.currentTimeMillis() - startTime);
- }
-
- /**
- * failover master
- * <p>
- * failover process instance and associated task instance
- *
- * @param masterHost master host
- */
- public void failoverMaster(String masterHost) {
-
- if (StringUtils.isEmpty(masterHost)) {
- return;
- }
-
- Date serverStartupTime = getServerStartupTime(NodeType.MASTER,
masterHost);
- List<Server> workerServers =
registryClient.getServerList(NodeType.WORKER);
-
- long startTime = System.currentTimeMillis();
- List<ProcessInstance> needFailoverProcessInstanceList =
processService.queryNeedFailoverProcessInstances(masterHost);
- logger.info("start master[{}] failover, process list size:{}",
masterHost, needFailoverProcessInstanceList.size());
-
- for (ProcessInstance processInstance :
needFailoverProcessInstanceList) {
- if (Constants.NULL.equals(processInstance.getHost())) {
- continue;
- }
-
- List<TaskInstance> validTaskInstanceList =
processService.findValidTaskListByProcessId(processInstance.getId());
- for (TaskInstance taskInstance : validTaskInstanceList) {
- if (Constants.NULL.equals(taskInstance.getHost())) {
- continue;
- }
- if (taskInstance.getState().typeIsFinished()) {
- continue;
- }
- if (!checkTaskInstanceNeedFailover(workerServers,
taskInstance)) {
- continue;
- }
- logger.info("failover task instance id: {}, process instance
id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
- failoverTaskInstance(processInstance, taskInstance);
- }
-
- if (serverStartupTime != null && processInstance.getRestartTime()
!= null
- &&
processInstance.getRestartTime().after(serverStartupTime)) {
- continue;
- }
-
- logger.info("failover process instance id: {}",
processInstance.getId());
- //updateProcessInstance host is null and insert into command
-
processService.processNeedFailoverProcessInstances(processInstance);
+ LOGGER.info("failover task instance id: {}, process instance id:
{}", taskInstance.getId(), taskInstance.getProcessInstanceId());
+ failoverTaskInstance(processInstance, taskInstance, workerServers);
}
-
- logger.info("master[{}] failover end, useTime:{}ms", masterHost,
System.currentTimeMillis() - startTime);
+ LOGGER.info("end worker[{}] failover, useTime:{}ms", workerHost,
System.currentTimeMillis() - startTime);
}
/**
@@ -440,23 +202,21 @@ public class MasterRegistryClient {
* 2. change task state from running to need failover.
* 3. try to notify local master
*/
- private void failoverTaskInstance(ProcessInstance processInstance,
TaskInstance taskInstance) {
- if (taskInstance == null) {
- logger.error("failover task instance error, taskInstance is null");
+ private void failoverTaskInstance(ProcessInstance processInstance,
TaskInstance taskInstance, List<Server> workerServers) {
+ if (processInstance == null) {
+ LOGGER.error("failover task instance error, processInstance {} of
taskInstance {} is null",
+ taskInstance.getProcessInstanceId(), taskInstance.getId());
return;
}
-
- if (processInstance == null) {
- logger.error("failover task instance error, processInstance {} of
taskInstance {} is null",
- taskInstance.getProcessInstanceId(), taskInstance.getId());
+ if (!checkTaskInstanceNeedFailover(workerServers, taskInstance)) {
return;
}
taskInstance.setProcessInstance(processInstance);
TaskExecutionContext taskExecutionContext =
TaskExecutionContextBuilder.get()
- .buildTaskInstanceRelatedInfo(taskInstance)
- .buildProcessInstanceRelatedInfo(processInstance)
- .create();
+ .buildTaskInstanceRelatedInfo(taskInstance)
+ .buildProcessInstanceRelatedInfo(processInstance)
+ .create();
if (masterConfig.isKillYarnJobWhenTaskFailover()) {
// only kill yarn job if exists , the local thread has exited
@@ -475,85 +235,134 @@ public class MasterRegistryClient {
}
/**
- * registry
+ * get need failover master servers
+ *
+ * @return need failover master servers
*/
- public void registry() {
- String address = NetUtils.getAddr(masterConfig.getListenPort());
- localNodePath = getMasterPath();
- int masterHeartbeatInterval = masterConfig.getHeartbeatInterval();
- HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
- masterConfig.getMaxCpuLoadAvg(),
- masterConfig.getReservedMemory(),
- Sets.newHashSet(getMasterPath()),
- Constants.MASTER_TYPE,
- registryClient);
-
- // remove before persist
- registryClient.remove(localNodePath);
- registryClient.persistEphemeral(localNodePath,
heartBeatTask.getHeartBeatInfo());
-
- while (!registryClient.checkNodeExists(NetUtils.getHost(),
NodeType.MASTER)) {
- ThreadUtils.sleep(SLEEP_TIME_MILLIS);
+ private List<String> getNeedFailoverMasterServers() {
+ // failover myself && failover dead masters
+ List<String> hosts =
processService.queryNeedFailoverProcessInstanceHost();
+
+ Iterator<String> iterator = hosts.iterator();
+ while (iterator.hasNext()) {
+ String host = iterator.next();
+ if (registryClient.checkNodeExists(host, NodeType.MASTER)) {
+ if (!host.equals(getLocalAddress())) {
+ iterator.remove();
+ }
+ }
+ }
+ return hosts;
+ }
+
+ /**
+ * task needs failover if task start before worker starts
+ *
+ * @param workerServers worker servers
+ * @param taskInstance task instance
+ * @return true if task instance need fail over
+ */
+ private boolean checkTaskInstanceNeedFailover(List<Server> workerServers,
TaskInstance taskInstance) {
+
+ boolean taskNeedFailover = true;
+
+ if (taskInstance == null) {
+ LOGGER.error("failover task instance error, taskInstance is null");
+ return false;
+ }
+
+ if (Constants.NULL.equals(taskInstance.getHost())) {
+ return false;
}
- // sleep 1s, waiting master failover remove
- ThreadUtils.sleep(SLEEP_TIME_MILLIS);
+ if (taskInstance.getState() != null &&
taskInstance.getState().typeIsFinished()) {
+ return false;
+ }
- // delete dead server
- registryClient.handleDeadServer(Collections.singleton(localNodePath),
NodeType.MASTER, Constants.DELETE_OP);
- registryClient.addConnectionStateListener(this::handleConnectionState);
- this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask,
masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
- logger.info("master node : {} registry to ZK successfully with
heartBeatInterval : {}s", address, masterHeartbeatInterval);
+ //now no host will execute this task instance,so no need to failover
the task
+ if (taskInstance.getHost() == null) {
+ return false;
+ }
+
+ //if task start after worker starts, there is no need to failover the
task.
+ if (checkTaskAfterWorkerStart(workerServers, taskInstance)) {
+ taskNeedFailover = false;
+ }
+ return taskNeedFailover;
}
- public void handleConnectionState(ConnectionState state) {
- switch (state) {
- case CONNECTED:
- logger.debug("registry connection state is {}", state);
- break;
- case SUSPENDED:
- logger.warn("registry connection state is {}, ready to retry
connection", state);
- break;
- case RECONNECTED:
- logger.debug("registry connection state is {}, clean the node
info", state);
- registryClient.persistEphemeral(localNodePath, "");
- break;
- case DISCONNECTED:
- logger.warn("registry connection state is {}, ready to stop
myself", state);
- registryClient.getStoppable().stop("registry connection state
is DISCONNECTED, stop myself");
- break;
+ /**
+ * check task start after the worker server starts.
+ *
+ * @param taskInstance task instance
+ * @return true if task instance start time after worker server start date
+ */
+ private boolean checkTaskAfterWorkerStart(List<Server> workerServers,
TaskInstance taskInstance) {
+ if (StringUtils.isEmpty(taskInstance.getHost())) {
+ return false;
+ }
+ Date workerServerStartDate = getServerStartupTime(workerServers,
taskInstance.getHost());
+ if (workerServerStartDate != null) {
+ if (taskInstance.getStartTime() == null) {
+ return
taskInstance.getSubmitTime().after(workerServerStartDate);
+ } else {
+ return
taskInstance.getStartTime().after(workerServerStartDate);
+ }
+ }
+ return false;
+ }
+
+ /**
+ * get failover lock path
+ *
+ * @param nodeType zookeeper node type
+ * @return fail over lock path
+ */
+ private String getFailoverLockPath(NodeType nodeType, String host) {
+ switch (nodeType) {
+ case MASTER:
+ return
Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS + "/" + host;
+ case WORKER:
+ return
Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS + "/" + host;
default:
+ return "";
}
}
- public void deregister() {
- try {
- String address = getLocalAddress();
- String localNodePath = getMasterPath();
- registryClient.remove(localNodePath);
- logger.info("master node : {} unRegistry to register center.",
address);
- heartBeatExecutor.shutdown();
- logger.info("heartbeat executor shutdown");
- registryClient.close();
- } catch (Exception e) {
- logger.error("remove registry path exception ", e);
+ /**
+ * get server startup time
+ */
+ private Date getServerStartupTime(NodeType nodeType, String host) {
+ if (StringUtils.isEmpty(host)) {
+ return null;
}
+ List<Server> servers = registryClient.getServerList(nodeType);
+ return getServerStartupTime(servers, host);
}
/**
- * get master path
+ * get server startup time
*/
- public String getMasterPath() {
- String address = getLocalAddress();
- return REGISTRY_DOLPHINSCHEDULER_MASTERS + "/" + address;
+ private Date getServerStartupTime(List<Server> servers, String host) {
+ if (CollectionUtils.isEmpty(servers)) {
+ return null;
+ }
+ Date serverStartupTime = null;
+ for (Server server : servers) {
+ if (host.equals(server.getHost() + Constants.COLON +
server.getPort())) {
+ serverStartupTime = server.getCreateTime();
+ break;
+ }
+ }
+ return serverStartupTime;
}
/**
* get local address
*/
- public String getLocalAddress() {
+ String getLocalAddress() {
return NetUtils.getAddr(masterConfig.getListenPort());
}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
new file mode 100644
index 0000000000..17a3798090
--- /dev/null
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.service;
+
+import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.doNothing;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.NodeType;
+import org.apache.dolphinscheduler.common.enums.StateEvent;
+import org.apache.dolphinscheduler.common.model.Server;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.registry.RegistryClient;
+
+import java.util.Arrays;
+import java.util.Date;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.springframework.test.util.ReflectionTestUtils;
+
+import com.google.common.collect.Lists;
+
+/**
+ * MasterRegistryClientTest
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({RegistryClient.class})
+@PowerMockIgnore({"javax.management.*"})
+public class FailoverServiceTest {
+ @InjectMocks
+ private FailoverService failoverService;
+
+ @Mock
+ private MasterConfig masterConfig;
+
+ @Mock
+ private RegistryClient registryClient;
+
+ @Mock
+ private ProcessService processService;
+
+ @Mock
+ private WorkflowExecuteThreadPool workflowExecuteThreadPool;
+
+ private String testHost;
+ private ProcessInstance processInstance;
+ private TaskInstance taskInstance;
+
+ @Before
+ public void before() throws Exception {
+ given(masterConfig.getListenPort()).willReturn(8080);
+
+ testHost = failoverService.getLocalAddress();
+ String ip = testHost.split(":")[0];
+ int port = Integer.valueOf(testHost.split(":")[1]);
+ Assert.assertEquals(8080, port);
+
+ given(registryClient.getLock(Mockito.anyString())).willReturn(true);
+
given(registryClient.releaseLock(Mockito.anyString())).willReturn(true);
+
given(registryClient.getHostByEventDataPath(Mockito.anyString())).willReturn(testHost);
+ given(registryClient.getStoppable()).willReturn(cause -> {
+ });
+ given(registryClient.checkNodeExists(Mockito.anyString(),
Mockito.any())).willReturn(true);
+ doNothing().when(registryClient).handleDeadServer(Mockito.anySet(),
Mockito.any(NodeType.class), Mockito.anyString());
+
+ processInstance = new ProcessInstance();
+ processInstance.setId(1);
+ processInstance.setHost(testHost);
+ processInstance.setRestartTime(new Date());
+ processInstance.setHistoryCmd("xxx");
+ processInstance.setCommandType(CommandType.STOP);
+
+ taskInstance = new TaskInstance();
+ taskInstance.setId(1);
+ taskInstance.setStartTime(new Date());
+ taskInstance.setHost(testHost);
+
+
given(processService.queryNeedFailoverTaskInstances(Mockito.anyString())).willReturn(Arrays.asList(taskInstance));
+
given(processService.queryNeedFailoverProcessInstanceHost()).willReturn(Lists.newArrayList(testHost));
+
given(processService.queryNeedFailoverProcessInstances(Mockito.anyString())).willReturn(Arrays.asList(processInstance));
+
doNothing().when(processService).processNeedFailoverProcessInstances(Mockito.any(ProcessInstance.class));
+
given(processService.findValidTaskListByProcessId(Mockito.anyInt())).willReturn(Lists.newArrayList(taskInstance));
+
given(processService.findProcessInstanceDetailById(Mockito.anyInt())).willReturn(processInstance);
+
+ Thread.sleep(1000);
+ Server server = new Server();
+ server.setHost(ip);
+ server.setPort(port);
+ server.setCreateTime(new Date());
+
given(registryClient.getServerList(NodeType.WORKER)).willReturn(Arrays.asList(server));
+
given(registryClient.getServerList(NodeType.MASTER)).willReturn(Arrays.asList(server));
+ ReflectionTestUtils.setField(failoverService, "registryClient",
registryClient);
+
+
doNothing().when(workflowExecuteThreadPool).submitStateEvent(Mockito.any(StateEvent.class));
+ }
+
+ @Test
+ public void checkMasterFailoverTest() {
+ failoverService.checkMasterFailover();
+ }
+
+ @Test
+ public void failoverMasterTest() {
+ processInstance.setHost(Constants.NULL);
+ taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
+ failoverService.failoverServerWhenDown(testHost, NodeType.MASTER);
+ Assert.assertNotEquals(taskInstance.getState(),
ExecutionStatus.NEED_FAULT_TOLERANCE);
+
+ processInstance.setHost(testHost);
+ taskInstance.setState(ExecutionStatus.SUCCESS);
+ failoverService.failoverServerWhenDown(testHost, NodeType.MASTER);
+ Assert.assertNotEquals(taskInstance.getState(),
ExecutionStatus.NEED_FAULT_TOLERANCE);
+ Assert.assertEquals(Constants.NULL, processInstance.getHost());
+
+ processInstance.setHost(testHost);
+ taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
+ failoverService.failoverServerWhenDown(testHost, NodeType.MASTER);
+ Assert.assertEquals(taskInstance.getState(),
ExecutionStatus.NEED_FAULT_TOLERANCE);
+ Assert.assertEquals(Constants.NULL, processInstance.getHost());
+ }
+
+ @Test
+ public void failoverWorkTest() {
+ failoverService.failoverServerWhenDown(testHost, NodeType.WORKER);
+ Assert.assertEquals(taskInstance.getState(),
ExecutionStatus.NEED_FAULT_TOLERANCE);
+ }
+}