This is an automated email from the ASF dual-hosted git repository.

jinyleechina pushed a commit to branch 2.0.4-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/2.0.4-prepare by this push:
     new 6d0e367  cherry-pick [Bug-8110][WorkerServer] kill all running task 
before worker stop (#8508)
6d0e367 is described below

commit 6d0e367f69a8f4d0afd218daa192f13ae298f162
Author: caishunfeng <[email protected]>
AuthorDate: Wed Feb 23 19:17:02 2022 +0800

    cherry-pick [Bug-8110][WorkerServer] kill all running task before worker 
stop (#8508)
---
 .../server/worker/WorkerServer.java                | 30 +++++++++++++++++++++-
 .../spi/task/TaskExecutionContextCacheManager.java |  5 ++++
 2 files changed, 34 insertions(+), 1 deletion(-)

diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index f1fe3b1..1564501 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -39,6 +39,12 @@ import 
org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThr
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
 import org.apache.dolphinscheduler.service.alert.AlertClientService;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
+import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
+
+import org.apache.commons.collections4.CollectionUtils;
+
+import java.util.Collection;
 
 import javax.annotation.PostConstruct;
 
@@ -141,7 +147,7 @@ public class WorkerServer implements IStoppable {
         this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
         
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, 
new TaskExecuteProcessor(alertClientService, taskPluginManager));
         
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new 
TaskKillProcessor());
-        
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE_ACK,new
 TaskKillAckProcessor());
+        
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE_ACK, 
new TaskKillAckProcessor());
         this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, 
new DBTaskAckProcessor());
         
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, new 
DBTaskResponseProcessor());
         
this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST,
 new HostUpdateProcessor());
@@ -198,6 +204,11 @@ public class WorkerServer implements IStoppable {
             this.nettyRemotingServer.close();
             this.workerRegistryClient.unRegistry();
             this.alertClientService.close();
+
+            // kill running tasks
+            this.killAllRunningTasks();
+
+            // close the application context
             this.springApplicationContext.close();
             logger.info("springApplicationContext close");
             try {
@@ -217,4 +228,21 @@ public class WorkerServer implements IStoppable {
     public void stop(String cause) {
         close(cause);
     }
+
+    /**
+     * kill all tasks which are running
+     */
+    public void killAllRunningTasks() {
+        Collection<TaskRequest> taskRequests = 
TaskExecutionContextCacheManager.getAllTaskRequestList();
+        logger.info("ready to kill all cache job, job size:{}", 
taskRequests.size());
+
+        if (CollectionUtils.isEmpty(taskRequests)) {
+            return;
+        }
+
+        for (TaskRequest taskRequest : taskRequests) {
+            // kill task when it's not finished yet
+            
org.apache.dolphinscheduler.plugin.task.api.ProcessUtils.kill(taskRequest);
+        }
+    }
 }
diff --git 
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskExecutionContextCacheManager.java
 
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskExecutionContextCacheManager.java
index c4347d6..e2ab195 100644
--- 
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskExecutionContextCacheManager.java
+++ 
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskExecutionContextCacheManager.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.spi.task;
 
 import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
 
+import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -66,4 +67,8 @@ public class TaskExecutionContextCacheManager {
         taskRequestContextCache.computeIfPresent(request.getTaskInstanceId(), 
(k, v) -> request);
         return 
taskRequestContextCache.containsKey(request.getTaskInstanceId());
     }
+
+    public static Collection<TaskRequest> getAllTaskRequestList() {
+        return taskRequestContextCache.values();
+    }
 }

Reply via email to