This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch 2.0.2-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/2.0.2-prepare by this push:
new f8942bf [Improvement][MasterServer] event response handle parallel
(#7560)
f8942bf is described below
commit f8942bf7982c832bc13a0911877fa8198e716b55
Author: zwZjut <[email protected]>
AuthorDate: Fri Dec 24 10:39:52 2021 +0800
[Improvement][MasterServer] event response handle parallel (#7560)
* [Feature][dolphinscheduler-api] parse traceId in http header for Cross
system delivery to #7237 (#7238)
* to #7237
* rerun test
Co-authored-by: honghuo.zw <[email protected]>
* chery-pick 05aef27 and handle conflicts
* to #7065: fix ExecutorService and schedulerService (#7072)
Co-authored-by: honghuo.zw <[email protected]>
* [Feature][dolphinscheduler-api] access control of taskDefinition and
taskInstance in project to #7081 (#7082)
* to #7081
* fix #7081
* to #7081
Co-authored-by: honghuo.zw <[email protected]>
* chery-pick 8ebe060 and handle conflicts
* cherry-pick 1f18444 and handle conflicts
* fix #6807: dolphinscheduler.zookeeper.env_vars - >
dolphinscheduler.registry.env_vars (#6808)
Co-authored-by: honghuo.zw <[email protected]>
Co-authored-by: Kirs <[email protected]>
* add default constructor (#6780)
Co-authored-by: honghuo.zw <[email protected]>
* to #7108 (#7109)
* to #7450
* to #7450: fix parallel bug
* add index
* expose config to user
* fix bug
* fix bug
* add delay delete
* fix bug
* add License
* fix ut
* fix ut
* fix name
Co-authored-by: honghuo.zw <[email protected]>
Co-authored-by: Kirs <[email protected]>
---
.../conf/dolphinscheduler/master.properties.tpl | 4 +-
docker/kubernetes/dolphinscheduler/values.yaml | 1 +
.../main/resources/sql/dolphinscheduler_mysql.sql | 3 +-
.../resources/sql/dolphinscheduler_postgresql.sql | 2 +
.../2.0.2_schema/mysql/dolphinscheduler_ddl.sql | 24 ++-
.../postgresql/dolphinscheduler_ddl.sql | 3 +
.../server/master/config/MasterConfig.java | 13 +-
...Service.java => TaskResponsePersistThread.java} | 152 ++++++---------
.../processor/queue/TaskResponseService.java | 205 ++++++++++++---------
.../master/registry/MasterRegistryClient.java | 18 +-
.../master/runner/FailoverExecuteThread.java | 21 ++-
.../processor/queue/TaskResponseServiceTest.java | 7 +-
12 files changed, 252 insertions(+), 201 deletions(-)
diff --git a/docker/build/conf/dolphinscheduler/master.properties.tpl
b/docker/build/conf/dolphinscheduler/master.properties.tpl
index 98ca3dd..5d130fa 100644
--- a/docker/build/conf/dolphinscheduler/master.properties.tpl
+++ b/docker/build/conf/dolphinscheduler/master.properties.tpl
@@ -47,4 +47,6 @@ master.reserved.memory=${MASTER_RESERVED_MEMORY}
# master failover interval minutes
master.failover.interval=${MASTER_FAILOVER_INTERVAL}
# master kill yarn job when handle failover
-master.kill.yarn.job.when.handle.failover=${MASTER_KILL_YARN_JOB_WHEN_HANDLE_FAILOVER}
\ No newline at end of file
+master.kill.yarn.job.when.handle.failover=${MASTER_KILL_YARN_JOB_WHEN_HANDLE_FAILOVER}
+# master.persist.event.state.threads
+master.persist.event.state.threads=${MASTER_PERSIST_EVENT_STATE_THREADS}
\ No newline at end of file
diff --git a/docker/kubernetes/dolphinscheduler/values.yaml
b/docker/kubernetes/dolphinscheduler/values.yaml
index f0df9e7..52431f9 100644
--- a/docker/kubernetes/dolphinscheduler/values.yaml
+++ b/docker/kubernetes/dolphinscheduler/values.yaml
@@ -166,6 +166,7 @@ master:
ORG_QUARTZ_THREADPOOL_THREADCOUNT: "25"
ORG_QUARTZ_SCHEDULER_BATCHTRIGGERACQUISTITIONMAXCOUNT: "1"
SESSION_TIMEOUT_MS: 60000
+ MASTER_PERSIST_EVENT_STATE_THREADS: 10
## Periodic probe of container liveness. Container will be restarted if the
probe fails. Cannot be updated.
## More info:
https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle#container-probes
livenessProbe:
diff --git
a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
index 9e52d60..8e7401b 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
@@ -532,7 +532,8 @@ CREATE TABLE `t_ds_process_task_relation` (
`condition_params` text COMMENT 'condition params(json)',
`create_time` datetime NOT NULL COMMENT 'create time',
`update_time` datetime NOT NULL COMMENT 'update time',
- PRIMARY KEY (`id`)
+ PRIMARY KEY (`id`),
+ KEY `project_code_process_definition_code_index`
(`project_code`,`process_definition_code`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
-- ----------------------------
diff --git
a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
index 5c02006..7bb8a9b 100644
---
a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
+++
b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
@@ -440,6 +440,8 @@ CREATE TABLE t_ds_process_task_relation (
PRIMARY KEY (id)
) ;
+create index project_code_process_definition_code_index on
t_ds_process_task_relation (project_code,process_definition_code);
+
DROP TABLE IF EXISTS t_ds_process_task_relation_log;
CREATE TABLE t_ds_process_task_relation_log (
id int NOT NULL ,
diff --git
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/mysql/dolphinscheduler_ddl.sql
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/mysql/dolphinscheduler_ddl.sql
index 2525b3c..89d5c53 100644
---
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/mysql/dolphinscheduler_ddl.sql
+++
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/mysql/dolphinscheduler_ddl.sql
@@ -17,6 +17,7 @@
SET sql_mode=(SELECT REPLACE(@@sql_mode,'ONLY_FULL_GROUP_BY',''));
+
-- uc_dolphin_T_t_ds_process_instance_A_restart_time
drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_process_instance_A_restart_time;
delimiter d//
@@ -35,4 +36,25 @@ d//
delimiter ;
CALL uc_dolphin_T_t_ds_process_instance_A_restart_time();
-DROP PROCEDURE uc_dolphin_T_t_ds_process_instance_A_restart_time;
\ No newline at end of file
+DROP PROCEDURE uc_dolphin_T_t_ds_process_instance_A_restart_time;
+
+
+-- uc_dolphin_T_t_ds_process_task_relation_A_pc_pd_index
+drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_process_task_relation_A_pc_pd_index;
+delimiter d//
+CREATE PROCEDURE uc_dolphin_T_t_ds_process_task_relation_A_pc_pd_index()
+BEGIN
+ IF NOT EXISTS (SELECT 1 FROM information_schema.STATISTICS
+ WHERE TABLE_NAME='t_ds_process_task_relation'
+ AND TABLE_SCHEMA=(SELECT DATABASE())
+ AND INDEX_NAME ='project_code_process_definition_code_index')
+ THEN
+ALTER TABLE `t_ds_process_task_relation` ADD KEY
`project_code_process_definition_code_index`(`project_code`,`process_definition_code`)
USING BTREE;
+END IF;
+END;
+
+d//
+
+delimiter ;
+CALL uc_dolphin_T_t_ds_process_task_relation_A_pc_pd_index();
+DROP PROCEDURE uc_dolphin_T_t_ds_process_task_relation_A_pc_pd_index;
\ No newline at end of file
diff --git
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/postgresql/dolphinscheduler_ddl.sql
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/postgresql/dolphinscheduler_ddl.sql
index d26cf8e..75be01f 100644
---
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/postgresql/dolphinscheduler_ddl.sql
+++
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/postgresql/dolphinscheduler_ddl.sql
@@ -29,6 +29,9 @@ BEGIN
v_schema =current_schema();
EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_process_instance ADD
COLUMN IF NOT EXISTS "restart_time" timestamp DEFAULT NULL';
+
+EXECUTE 'CREATE INDEX IF NOT EXISTS project_code_process_definition_code_index
ON ' || quote_ident(v_schema) ||'.t_ds_process_task_relation USING
Btree("project_code","process_definition_code")';
+
return 'Success!';
exception when others then
---Raise EXCEPTION '(%)',SQLERRM;
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
index 13a68c4..b7e5642 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
@@ -66,9 +66,12 @@ public class MasterConfig {
@Value("${master.failover.interval:10}")
private int failoverInterval;
- @Value("${master.kill.yarn.job.when.handle.fail.over:true}")
+ @Value("${master.kill.yarn.job.when.handle.failover:true}")
private boolean masterKillYarnJobWhenHandleFailOver;
+ @Value("${master.persist.event.state.threads:10}")
+ private int masterPersistEventStateThreads;
+
public int getListenPort() {
return listenPort;
}
@@ -183,4 +186,12 @@ public class MasterConfig {
public void setMasterKillYarnJobWhenHandleFailOver(boolean
masterKillYarnJobWhenHandleFailOver) {
this.masterKillYarnJobWhenHandleFailOver =
masterKillYarnJobWhenHandleFailOver;
}
+
+ public int getMasterPersistEventStateThreads() {
+ return masterPersistEventStateThreads;
+ }
+
+ public void setMasterPersistEventStateThreads(int
masterPersistEventStateThreads) {
+ this.masterPersistEventStateThreads = masterPersistEventStateThreads;
+ }
}
\ No newline at end of file
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java
similarity index 69%
copy from
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
copy to
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java
index a320a70..621dd79 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java
@@ -21,123 +21,60 @@ import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
-import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.service.process.ProcessService;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
+import java.util.concurrent.ConcurrentLinkedQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
import io.netty.channel.Channel;
-/**
- * task manager
- */
-@Component
-public class TaskResponseService {
+public class TaskResponsePersistThread implements Runnable {
/**
- * logger
+ * logger of TaskResponsePersistThread
*/
- private final Logger logger =
LoggerFactory.getLogger(TaskResponseService.class);
+ private static final Logger logger =
LoggerFactory.getLogger(TaskResponsePersistThread.class);
- /**
- * attemptQueue
- */
- private final BlockingQueue<TaskResponseEvent> eventQueue = new
LinkedBlockingQueue<>();
+ private final ConcurrentLinkedQueue<TaskResponseEvent> events = new
ConcurrentLinkedQueue<>();
+
+ private final Integer processInstanceId;
/**
* process service
*/
- @Autowired
private ProcessService processService;
- /**
- * task response worker
- */
- private Thread taskResponseWorker;
-
private ConcurrentHashMap<Integer, WorkflowExecuteThread>
processInstanceMapper;
- public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread>
processInstanceMapper) {
- if (this.processInstanceMapper == null) {
- this.processInstanceMapper = processInstanceMapper;
- }
- }
-
- @PostConstruct
- public void start() {
- this.taskResponseWorker = new TaskResponseWorker();
- this.taskResponseWorker.setName("StateEventResponseWorker");
- this.taskResponseWorker.start();
+ public TaskResponsePersistThread(ProcessService processService,
+ ConcurrentHashMap<Integer,
WorkflowExecuteThread> processInstanceMapper,
+ Integer processInstanceId) {
+ this.processService = processService;
+ this.processInstanceMapper = processInstanceMapper;
+ this.processInstanceId = processInstanceId;
}
- @PreDestroy
- public void stop() {
- try {
- this.taskResponseWorker.interrupt();
- if (!eventQueue.isEmpty()) {
- List<TaskResponseEvent> remainEvents = new
ArrayList<>(eventQueue.size());
- eventQueue.drainTo(remainEvents);
- for (TaskResponseEvent event : remainEvents) {
- this.persist(event);
+ @Override
+ public void run() {
+ while (!this.events.isEmpty()) {
+ TaskResponseEvent event = this.events.peek();
+ try {
+ boolean result = persist(event);
+ if (!result) {
+ logger.error("persist meta error, task id:{}, instance
id:{}", event.getTaskInstanceId(), event.getProcessInstanceId());
}
+ } catch (Exception e) {
+ logger.error("persist error, task id:{}, instance id:{}",
event.getTaskInstanceId(), event.getProcessInstanceId(), e);
+ } finally {
+ this.events.remove(event);
}
- } catch (Exception e) {
- logger.error("stop error:", e);
- }
- }
-
- /**
- * put task to attemptQueue
- *
- * @param taskResponseEvent taskResponseEvent
- */
- public void addResponse(TaskResponseEvent taskResponseEvent) {
- try {
- eventQueue.put(taskResponseEvent);
- logger.debug("eventQueue size:{}", eventQueue.size());
- } catch (InterruptedException e) {
- logger.error("put task : {} error :{}", taskResponseEvent, e);
- Thread.currentThread().interrupt();
- }
- }
-
- /**
- * task worker thread
- */
- class TaskResponseWorker extends Thread {
-
- @Override
- public void run() {
-
- while (Stopper.isRunning()) {
- try {
- // if not task , blocking here
- TaskResponseEvent taskResponseEvent = eventQueue.take();
- persist(taskResponseEvent);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- break;
- } catch (Exception e) {
- logger.error("persist task error", e);
- }
- }
- logger.info("StateEventResponseWorker stopped");
}
}
@@ -146,17 +83,20 @@ public class TaskResponseService {
*
* @param taskResponseEvent taskResponseEvent
*/
- private void persist(TaskResponseEvent taskResponseEvent) {
+ private boolean persist(TaskResponseEvent taskResponseEvent) {
Event event = taskResponseEvent.getEvent();
Channel channel = taskResponseEvent.getChannel();
TaskInstance taskInstance =
processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());
+
+ boolean result = true;
+
switch (event) {
case ACK:
try {
if (taskInstance != null) {
ExecutionStatus status =
taskInstance.getState().typeIsFinished() ? taskInstance.getState() :
taskResponseEvent.getState();
- boolean result =
processService.changeTaskState(taskInstance, status,
+ processService.changeTaskState(taskInstance, status,
taskResponseEvent.getStartTime(),
taskResponseEvent.getWorkerAddress(),
taskResponseEvent.getExecutePath(),
@@ -170,6 +110,7 @@ public class TaskResponseService {
channel.writeAndFlush(taskAckCommand.convert2Command());
logger.debug("worker ack master success, taskInstance
id:{},taskInstance host:{}", taskInstance.getId(), taskInstance.getHost());
} catch (Exception e) {
+ result = false;
logger.error("worker ack master error", e);
DBTaskAckCommand taskAckCommand = new
DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(), taskInstance == null ? -1 :
taskInstance.getId());
channel.writeAndFlush(taskAckCommand.convert2Command());
@@ -177,7 +118,6 @@ public class TaskResponseService {
break;
case RESULT:
try {
- boolean result = true;
if (taskInstance != null) {
result = processService.changeTaskState(taskInstance,
taskResponseEvent.getState(),
taskResponseEvent.getEndTime(),
@@ -200,6 +140,7 @@ public class TaskResponseService {
logger.debug("worker response master success,
taskInstance id:{},taskInstance host:{}", taskInstance.getId(),
taskInstance.getHost());
}
} catch (Exception e) {
+ result = false;
logger.error("worker response master error", e);
DBTaskResponseCommand taskResponseCommand = new
DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(), -1);
channel.writeAndFlush(taskResponseCommand.convert2Command());
@@ -208,6 +149,7 @@ public class TaskResponseService {
default:
throw new IllegalArgumentException("invalid event type : " +
event);
}
+
WorkflowExecuteThread workflowExecuteThread =
this.processInstanceMapper.get(taskResponseEvent.getProcessInstanceId());
if (workflowExecuteThread != null) {
StateEvent stateEvent = new StateEvent();
@@ -217,9 +159,31 @@ public class TaskResponseService {
stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
workflowExecuteThread.addStateEvent(stateEvent);
}
+ return result;
+ }
+
+ public boolean addEvent(TaskResponseEvent event) {
+ if (event.getProcessInstanceId() != this.processInstanceId) {
+ logger.info("event would be abounded, task instance id:{}, process
instance id:{}, this.processInstanceId:{}",
+ event.getTaskInstanceId(), event.getProcessInstanceId(),
this.processInstanceId);
+ return false;
+ }
+ return this.events.add(event);
+ }
+
+ public int eventSize() {
+ return this.events.size();
+ }
+
+ public boolean isEmpty() {
+ return this.events.isEmpty();
+ }
+
+ public Integer getProcessInstanceId() {
+ return processInstanceId;
}
- public BlockingQueue<TaskResponseEvent> getEventQueue() {
- return eventQueue;
+ public String getKey() {
+ return String.valueOf(processInstanceId);
}
-}
+}
\ No newline at end of file
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
index a320a70..5ef2350 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
@@ -17,22 +17,18 @@
package org.apache.dolphinscheduler.server.master.processor.queue;
-import org.apache.dolphinscheduler.common.enums.Event;
-import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.common.enums.StateEvent;
-import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.Stopper;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand;
-import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.service.process.ProcessService;
-import java.util.ArrayList;
-import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@@ -42,7 +38,11 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import io.netty.channel.Channel;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
/**
* task manager
@@ -66,37 +66,56 @@ public class TaskResponseService {
@Autowired
private ProcessService processService;
+ @Autowired
+ private MasterConfig masterConfig;
+
/**
* task response worker
*/
private Thread taskResponseWorker;
- private ConcurrentHashMap<Integer, WorkflowExecuteThread>
processInstanceMapper;
+ /**
+ * event handler
+ */
+ private Thread taskResponseEventHandler;
+
+ private ConcurrentHashMap<Integer, WorkflowExecuteThread>
processInstanceMap;
+
+ private final ConcurrentHashMap<String, TaskResponsePersistThread>
taskResponseEventHandlerMap = new ConcurrentHashMap<>();
+
+ private ListeningExecutorService listeningExecutorService;
+
+ private ExecutorService eventExecService;
+
+ /**
+ * task response mapper
+ */
+ private final ConcurrentHashMap<Integer, TaskResponsePersistThread>
processTaskResponseMap = new ConcurrentHashMap<>();
- public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread>
processInstanceMapper) {
- if (this.processInstanceMapper == null) {
- this.processInstanceMapper = processInstanceMapper;
+ public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread>
processInstanceMap) {
+ if (this.processInstanceMap == null) {
+ this.processInstanceMap = processInstanceMap;
}
}
@PostConstruct
public void start() {
+ eventExecService =
ThreadUtils.newDaemonFixedThreadExecutor("PersistEventState",
masterConfig.getMasterPersistEventStateThreads());
+ this.listeningExecutorService =
MoreExecutors.listeningDecorator(eventExecService);
this.taskResponseWorker = new TaskResponseWorker();
- this.taskResponseWorker.setName("StateEventResponseWorker");
+ this.taskResponseWorker.setName("TaskResponseWorker");
this.taskResponseWorker.start();
+ this.taskResponseEventHandler = new TaskResponseEventHandler();
+ this.taskResponseEventHandler.setName("TaskResponseEventHandler");
+ this.taskResponseEventHandler.start();
}
@PreDestroy
public void stop() {
try {
this.taskResponseWorker.interrupt();
- if (!eventQueue.isEmpty()) {
- List<TaskResponseEvent> remainEvents = new
ArrayList<>(eventQueue.size());
- eventQueue.drainTo(remainEvents);
- for (TaskResponseEvent event : remainEvents) {
- this.persist(event);
- }
- }
+ this.taskResponseEventHandler.interrupt();
+ this.eventExecService.shutdown();
} catch (Exception e) {
logger.error("stop error:", e);
}
@@ -124,12 +143,26 @@ public class TaskResponseService {
@Override
public void run() {
-
while (Stopper.isRunning()) {
try {
// if not task , blocking here
TaskResponseEvent taskResponseEvent = eventQueue.take();
- persist(taskResponseEvent);
+ if
(processInstanceMap.containsKey(taskResponseEvent.getProcessInstanceId())
+ &&
!processTaskResponseMap.containsKey(taskResponseEvent.getProcessInstanceId())) {
+ TaskResponsePersistThread taskResponsePersistThread =
new TaskResponsePersistThread(
+ processService, processInstanceMap,
taskResponseEvent.getProcessInstanceId());
+
processTaskResponseMap.put(taskResponseEvent.getProcessInstanceId(),
taskResponsePersistThread);
+ }
+ TaskResponsePersistThread taskResponsePersistThread =
processTaskResponseMap.get(taskResponseEvent.getProcessInstanceId());
+ if (null != taskResponsePersistThread) {
+ if
(taskResponsePersistThread.addEvent(taskResponseEvent)) {
+ logger.debug("submit task response persist queue
success, task instance id:{},process instance id:{}, state:{} ",
+ taskResponseEvent.getTaskInstanceId(),
taskResponseEvent.getProcessInstanceId(), taskResponseEvent.getState());
+ } else {
+ logger.error("submit task response persist queue
error, task instance id:{},process instance id:{} ",
+ taskResponseEvent.getTaskInstanceId(),
taskResponseEvent.getProcessInstanceId());
+ }
+ }
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
@@ -142,84 +175,72 @@ public class TaskResponseService {
}
/**
- * persist taskResponseEvent
- *
- * @param taskResponseEvent taskResponseEvent
+ * event handler thread
*/
- private void persist(TaskResponseEvent taskResponseEvent) {
- Event event = taskResponseEvent.getEvent();
- Channel channel = taskResponseEvent.getChannel();
+ class TaskResponseEventHandler extends Thread {
- TaskInstance taskInstance =
processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());
- switch (event) {
- case ACK:
+ @Override
+ public void run() {
+ logger.info("event handler thread started");
+ while (Stopper.isRunning()) {
try {
- if (taskInstance != null) {
- ExecutionStatus status =
taskInstance.getState().typeIsFinished() ? taskInstance.getState() :
taskResponseEvent.getState();
- boolean result =
processService.changeTaskState(taskInstance, status,
- taskResponseEvent.getStartTime(),
- taskResponseEvent.getWorkerAddress(),
- taskResponseEvent.getExecutePath(),
- taskResponseEvent.getLogPath(),
- taskResponseEvent.getTaskInstanceId());
- logger.debug("changeTaskState in ACK , changed in
meta:{} ,task instance state:{}, task response event state:{}, taskInstance
id:{},taskInstance host:{}",
- result, taskInstance.getState(),
taskResponseEvent.getState(), taskInstance.getId(), taskInstance.getHost());
- }
- // if taskInstance is null (maybe deleted) . retry will be
meaningless . so ack success
- DBTaskAckCommand taskAckCommand = new
DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(),
taskResponseEvent.getTaskInstanceId());
- channel.writeAndFlush(taskAckCommand.convert2Command());
- logger.debug("worker ack master success, taskInstance
id:{},taskInstance host:{}", taskInstance.getId(), taskInstance.getHost());
+ eventHandler();
+
+ TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ break;
} catch (Exception e) {
- logger.error("worker ack master error", e);
- DBTaskAckCommand taskAckCommand = new
DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(), taskInstance == null ? -1 :
taskInstance.getId());
- channel.writeAndFlush(taskAckCommand.convert2Command());
+ logger.error("event handler thread error", e);
}
- break;
- case RESULT:
- try {
- boolean result = true;
- if (taskInstance != null) {
- result = processService.changeTaskState(taskInstance,
taskResponseEvent.getState(),
- taskResponseEvent.getEndTime(),
- taskResponseEvent.getProcessId(),
- taskResponseEvent.getAppIds(),
- taskResponseEvent.getTaskInstanceId(),
- taskResponseEvent.getVarPool()
- );
- logger.debug("changeTaskState in RESULT , changed in
meta:{} task instance state:{}, task response event state:{}, taskInstance
id:{},taskInstance host:{}",
- result, taskInstance.getState(),
taskResponseEvent.getState(), taskInstance.getId(), taskInstance.getHost());
- }
- if (!result) {
- DBTaskResponseCommand taskResponseCommand = new
DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(),
taskResponseEvent.getTaskInstanceId());
-
channel.writeAndFlush(taskResponseCommand.convert2Command());
- logger.debug("worker response master failure,
taskInstance id:{},taskInstance host:{}", taskInstance.getId(),
taskInstance.getHost());
- } else {
- // if taskInstance is null (maybe deleted) . retry
will be meaningless . so response success
- DBTaskResponseCommand taskResponseCommand = new
DBTaskResponseCommand(ExecutionStatus.SUCCESS.getCode(),
taskResponseEvent.getTaskInstanceId());
-
channel.writeAndFlush(taskResponseCommand.convert2Command());
- logger.debug("worker response master success,
taskInstance id:{},taskInstance host:{}", taskInstance.getId(),
taskInstance.getHost());
+ }
+ }
+
+ private void eventHandler() {
+
+ for (TaskResponsePersistThread taskResponsePersistThread:
processTaskResponseMap.values()) {
+
+ if
(taskResponseEventHandlerMap.containsKey(taskResponsePersistThread.getKey())) {
+ continue;
+ }
+ if (taskResponsePersistThread.eventSize() == 0) {
+ if
(!processInstanceMap.containsKey(taskResponsePersistThread.getProcessInstanceId()))
{
+
processTaskResponseMap.remove(taskResponsePersistThread.getProcessInstanceId());
+ logger.info("remove process instance: {}",
taskResponsePersistThread.getProcessInstanceId());
}
- } catch (Exception e) {
- logger.error("worker response master error", e);
- DBTaskResponseCommand taskResponseCommand = new
DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(), -1);
-
channel.writeAndFlush(taskResponseCommand.convert2Command());
+ continue;
}
- break;
- default:
- throw new IllegalArgumentException("invalid event type : " +
event);
- }
- WorkflowExecuteThread workflowExecuteThread =
this.processInstanceMapper.get(taskResponseEvent.getProcessInstanceId());
- if (workflowExecuteThread != null) {
- StateEvent stateEvent = new StateEvent();
-
stateEvent.setProcessInstanceId(taskResponseEvent.getProcessInstanceId());
-
stateEvent.setTaskInstanceId(taskResponseEvent.getTaskInstanceId());
- stateEvent.setExecutionStatus(taskResponseEvent.getState());
- stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
- workflowExecuteThread.addStateEvent(stateEvent);
+ logger.info("already exists handler process size:{}",
taskResponseEventHandlerMap.size());
+
taskResponseEventHandlerMap.put(taskResponsePersistThread.getKey(),
taskResponsePersistThread);
+
+ ListenableFuture future =
listeningExecutorService.submit(taskResponsePersistThread);
+ FutureCallback futureCallback = new FutureCallback() {
+ @Override
+ public void onSuccess(Object o) {
+ logger.info("persist events {} succeeded.",
taskResponsePersistThread.getProcessInstanceId());
+ if
(!processInstanceMap.containsKey(taskResponsePersistThread.getProcessInstanceId()))
{
+
processTaskResponseMap.remove(taskResponsePersistThread.getProcessInstanceId());
+ logger.info("remove process instance: {}",
taskResponsePersistThread.getProcessInstanceId());
+ }
+
taskResponseEventHandlerMap.remove(taskResponsePersistThread.getKey());
+ }
+
+ @Override
+ public void onFailure(Throwable throwable) {
+ logger.error("persist events failed: {}", throwable);
+ if
(!processInstanceMap.containsKey(taskResponsePersistThread.getProcessInstanceId()))
{
+
processTaskResponseMap.remove(taskResponsePersistThread.getProcessInstanceId());
+ logger.info("remove process instance: {}",
taskResponsePersistThread.getProcessInstanceId());
+ }
+
taskResponseEventHandlerMap.remove(taskResponsePersistThread.getKey());
+ }
+ };
+ Futures.addCallback(future, futureCallback,
listeningExecutorService);
+ }
}
}
public BlockingQueue<TaskResponseEvent> getEventQueue() {
return eventQueue;
}
-}
+}
\ No newline at end of file
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index aaf1d90..425d6e9 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -436,13 +436,6 @@ public class MasterRegistryClient {
continue;
}
- if (serverStartupTime != null && processInstance.getRestartTime()
!= null
- &&
processInstance.getRestartTime().after(serverStartupTime)) {
- continue;
- }
-
- logger.info("failover process instance id: {}",
processInstance.getId());
-
List<TaskInstance> validTaskInstanceList =
processService.findValidTaskListByProcessId(processInstance.getId());
for (TaskInstance taskInstance : validTaskInstanceList) {
if (Constants.NULL.equals(taskInstance.getHost())) {
@@ -457,6 +450,13 @@ public class MasterRegistryClient {
logger.info("failover task instance id: {}, process instance
id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
failoverTaskInstance(processInstance, taskInstance);
}
+
+ if (serverStartupTime != null && processInstance.getRestartTime()
!= null
+ &&
processInstance.getRestartTime().after(serverStartupTime)) {
+ continue;
+ }
+
+ logger.info("failover process instance id: {}",
processInstance.getId());
//updateProcessInstance host is null and insert into command
processService.processNeedFailoverProcessInstances(processInstance);
}
@@ -576,8 +576,8 @@ public class MasterRegistryClient {
/**
* get local address
*/
- private String getLocalAddress() {
+ public String getLocalAddress() {
return NetUtils.getAddr(masterConfig.getListenPort());
}
-}
+}
\ No newline at end of file
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
index 81d02a9..770062f 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
@@ -28,6 +28,7 @@ import
org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.commons.collections4.CollectionUtils;
+import java.util.Iterator;
import java.util.List;
import org.slf4j.Logger;
@@ -66,10 +67,12 @@ public class FailoverExecuteThread extends Thread {
while (Stopper.isRunning()) {
logger.info("failover execute started");
try {
- List<String> hosts =
processService.queryNeedFailoverProcessInstanceHost();
+ List<String> hosts = getNeedFailoverMasterServers();
if (CollectionUtils.isEmpty(hosts)) {
continue;
}
+ logger.info("need failover hosts:{}", hosts);
+
for (String host : hosts) {
String failoverPath =
masterRegistryClient.getFailoverLockPath(NodeType.MASTER, host);
try {
@@ -88,4 +91,20 @@ public class FailoverExecuteThread extends Thread {
}
}
}
+
+ private List<String> getNeedFailoverMasterServers() {
+ // failover myself && failover dead masters
+ List<String> hosts =
processService.queryNeedFailoverProcessInstanceHost();
+
+ Iterator<String> iterator = hosts.iterator();
+ while (iterator.hasNext()) {
+ String host = iterator.next();
+ if (registryClient.checkNodeExists(host, NodeType.MASTER)) {
+ if (!host.equals(masterRegistryClient.getLocalAddress())) {
+ iterator.remove();
+ }
+ }
+ }
+ return hosts;
+ }
}
\ No newline at end of file
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
index 878446c..d787d0c 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
@@ -19,6 +19,7 @@ package
org.apache.dolphinscheduler.server.master.processor.queue;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.Date;
@@ -34,12 +35,15 @@ import org.mockito.junit.MockitoJUnitRunner;
import io.netty.channel.Channel;
-@RunWith(MockitoJUnitRunner.class)
+@RunWith(MockitoJUnitRunner.Silent.class)
public class TaskResponseServiceTest {
@Mock(name = "processService")
private ProcessService processService;
+ @Mock
+ private MasterConfig masterConfig;
+
@InjectMocks
TaskResponseService taskRspService;
@@ -54,6 +58,7 @@ public class TaskResponseServiceTest {
@Before
public void before() {
+
Mockito.when(masterConfig.getMasterPersistEventStateThreads()).thenReturn(10);
taskRspService.start();
ackEvent = TaskResponseEvent.newAck(ExecutionStatus.RUNNING_EXECUTION,