This is an automated email from the ASF dual-hosted git repository.
jinyleechina pushed a commit to branch 2.0.4-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/2.0.4-prepare by this push:
new 6d0e367 cherry-pick [Bug-8110][WorkerServer] kill all running task
before worker stop (#8508)
6d0e367 is described below
commit 6d0e367f69a8f4d0afd218daa192f13ae298f162
Author: caishunfeng <[email protected]>
AuthorDate: Wed Feb 23 19:17:02 2022 +0800
cherry-pick [Bug-8110][WorkerServer] kill all running task before worker
stop (#8508)
---
.../server/worker/WorkerServer.java | 30 +++++++++++++++++++++-
.../spi/task/TaskExecutionContextCacheManager.java | 5 ++++
2 files changed, 34 insertions(+), 1 deletion(-)
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index f1fe3b1..1564501 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -39,6 +39,12 @@ import
org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThr
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
+import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
+
+import org.apache.commons.collections4.CollectionUtils;
+
+import java.util.Collection;
import javax.annotation.PostConstruct;
@@ -141,7 +147,7 @@ public class WorkerServer implements IStoppable {
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST,
new TaskExecuteProcessor(alertClientService, taskPluginManager));
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new
TaskKillProcessor());
-
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE_ACK,new
TaskKillAckProcessor());
+
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE_ACK,
new TaskKillAckProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK,
new DBTaskAckProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, new
DBTaskResponseProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST,
new HostUpdateProcessor());
@@ -198,6 +204,11 @@ public class WorkerServer implements IStoppable {
this.nettyRemotingServer.close();
this.workerRegistryClient.unRegistry();
this.alertClientService.close();
+
+ // kill running tasks
+ this.killAllRunningTasks();
+
+ // close the application context
this.springApplicationContext.close();
logger.info("springApplicationContext close");
try {
@@ -217,4 +228,21 @@ public class WorkerServer implements IStoppable {
public void stop(String cause) {
close(cause);
}
+
+ /**
+ * kill all tasks which are running
+ */
+ public void killAllRunningTasks() {
+ Collection<TaskRequest> taskRequests =
TaskExecutionContextCacheManager.getAllTaskRequestList();
+ logger.info("ready to kill all cache job, job size:{}",
taskRequests.size());
+
+ if (CollectionUtils.isEmpty(taskRequests)) {
+ return;
+ }
+
+ for (TaskRequest taskRequest : taskRequests) {
+ // kill task when it's not finished yet
+
org.apache.dolphinscheduler.plugin.task.api.ProcessUtils.kill(taskRequest);
+ }
+ }
}
diff --git
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskExecutionContextCacheManager.java
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskExecutionContextCacheManager.java
index c4347d6..e2ab195 100644
---
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskExecutionContextCacheManager.java
+++
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskExecutionContextCacheManager.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.spi.task;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
+import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -66,4 +67,8 @@ public class TaskExecutionContextCacheManager {
taskRequestContextCache.computeIfPresent(request.getTaskInstanceId(),
(k, v) -> request);
return
taskRequestContextCache.containsKey(request.getTaskInstanceId());
}
+
+ public static Collection<TaskRequest> getAllTaskRequestList() {
+ return taskRequestContextCache.values();
+ }
}