This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 49979c658e [Fix-8828] [Master] Assign tasks to worker optimization 
(#9919)
49979c658e is described below

commit 49979c658e3b9b9468b1a8332ac6b81290b1fd3f
Author: JinYong Li <[email protected]>
AuthorDate: Tue May 31 11:49:54 2022 +0800

    [Fix-8828] [Master] Assign tasks to worker optimization (#9919)
    
    * fix 9584
    
    * master recall
    
    * fix ut
    
    * update logger
    
    * update delay queue
    
    * fix ut
    
    * remove sleep
    
    Co-authored-by: 进勇 <[email protected]>
    Co-authored-by: JinyLeeChina <[email protected]>
---
 .../dolphinscheduler/common/enums/Event.java       |  2 +-
 .../dolphinscheduler/common/enums/StateEvent.java  |  2 +-
 .../dolphinscheduler/common/utils/HeartBeat.java   | 15 ++--
 .../server/master/MasterServer.java                |  5 ++
 .../server/master/dispatch/ExecutorDispatcher.java | 25 +++---
 .../dispatch/executor/NettyExecutorManager.java    | 32 +++-----
 .../dispatch/host/LowerWeightHostManager.java      |  2 +-
 .../master/dispatch/host/assign/HostWeight.java    | 10 ++-
 .../host/assign/LowerWeightRoundRobin.java         | 22 +++++-
 .../master/processor/TaskRecallProcessor.java      | 62 +++++++++++++++
 .../server/master/processor/queue/TaskEvent.java   | 10 +++
 .../master/processor/queue/TaskExecuteThread.java  | 29 ++++++-
 .../master/runner/WorkflowExecuteThread.java       | 10 +++
 .../master/runner/task/BaseTaskProcessor.java      | 11 +++
 .../master/runner/task/BlockingTaskProcessor.java  |  5 ++
 .../master/runner/task/CommonTaskProcessor.java    | 11 ++-
 .../master/runner/task/ConditionTaskProcessor.java |  5 ++
 .../master/runner/task/DependentTaskProcessor.java |  5 ++
 .../master/runner/task/SubTaskProcessor.java       |  5 ++
 .../master/runner/task/SwitchTaskProcessor.java    |  5 ++
 .../server/master/runner/task/TaskAction.java      |  3 +-
 .../host/assign/LowerWeightRoundRobinTest.java     | 23 +++---
 .../remote/command/CommandType.java                | 10 +++
 .../remote/command/TaskRecallAckCommand.java       | 74 ++++++++++++++++++
 .../remote/command/TaskRecallCommand.java          | 90 ++++++++++++++++++++++
 .../server/worker/WorkerServer.java                |  6 +-
 .../server/worker/cache/ResponseCache.java         | 21 ++++-
 .../worker/processor/TaskCallbackService.java      | 18 +++++
 .../worker/processor/TaskExecuteProcessor.java     | 10 +--
 .../worker/processor/TaskRecallAckProcessor.java   | 58 ++++++++++++++
 .../worker/runner/RetryReportTaskStatusThread.java |  8 ++
 .../server/worker/runner/WorkerManagerThread.java  | 41 +++++++---
 32 files changed, 554 insertions(+), 81 deletions(-)

diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
index 78b036f037..26c3a3beab 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
@@ -22,5 +22,5 @@ public enum Event {
     DELAY,
     RUNNING,
     RESULT,
-    ;
+    WORKER_REJECT
 }
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java
index 54c7835f0b..405df09d3e 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java
@@ -97,7 +97,7 @@ public class StateEvent {
     public String toString() {
         return "State Event :"
                 + "key: " + key
-                + " type: " + type.toString()
+                + " type: " + type
                 + " executeStatus: " + executionStatus
                 + " task instance id: " + taskInstanceId
                 + " process instance id: " + processInstanceId
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java
index d28cd3db08..b4df08f4b7 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java
@@ -25,7 +25,6 @@ import org.slf4j.LoggerFactory;
 public class HeartBeat {
 
     private static final Logger logger = 
LoggerFactory.getLogger(HeartBeat.class);
-    public static final String COMMA = ",";
 
     private long startupTime;
     private long reportTime;
@@ -205,18 +204,18 @@ public class HeartBeat {
         this.updateServerState();
 
         StringBuilder builder = new StringBuilder(100);
-        builder.append(cpuUsage).append(COMMA);
-        builder.append(memoryUsage).append(COMMA);
-        builder.append(loadAverage).append(COMMA);
+        builder.append(cpuUsage).append(Constants.COMMA);
+        builder.append(memoryUsage).append(Constants.COMMA);
+        builder.append(loadAverage).append(Constants.COMMA);
         builder.append(availablePhysicalMemorySize).append(Constants.COMMA);
         builder.append(maxCpuloadAvg).append(Constants.COMMA);
         builder.append(reservedMemory).append(Constants.COMMA);
         builder.append(startupTime).append(Constants.COMMA);
         builder.append(reportTime).append(Constants.COMMA);
-        builder.append(serverStatus).append(COMMA);
-        builder.append(processId).append(COMMA);
-        builder.append(workerHostWeight).append(COMMA);
-        builder.append(workerExecThreadCount).append(COMMA);
+        builder.append(serverStatus).append(Constants.COMMA);
+        builder.append(processId).append(Constants.COMMA);
+        builder.append(workerHostWeight).append(Constants.COMMA);
+        builder.append(workerExecThreadCount).append(Constants.COMMA);
         builder.append(workerWaitingTaskCount);
 
         return builder.toString();
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 6ab0d4e51a..ad4d02e2e9 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -31,6 +31,7 @@ import 
org.apache.dolphinscheduler.server.master.processor.TaskEventProcessor;
 import 
org.apache.dolphinscheduler.server.master.processor.TaskExecuteResponseProcessor;
 import 
org.apache.dolphinscheduler.server.master.processor.TaskExecuteRunningProcessor;
 import 
org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
+import org.apache.dolphinscheduler.server.master.processor.TaskRecallProcessor;
 import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
 import org.apache.dolphinscheduler.server.master.runner.EventExecuteService;
 import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread;
@@ -96,6 +97,9 @@ public class MasterServer implements IStoppable {
     @Autowired
     private TaskKillResponseProcessor taskKillResponseProcessor;
 
+    @Autowired
+    private TaskRecallProcessor taskRecallProcessor;
+
     @Autowired
     private EventExecuteService eventExecuteService;
 
@@ -126,6 +130,7 @@ public class MasterServer implements IStoppable {
         
this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST,
 taskEventProcessor);
         
this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST,
 taskEventProcessor);
         this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, 
cacheProcessor);
+        this.nettyRemotingServer.registerProcessor(CommandType.TASK_RECALL, 
taskRecallProcessor);
 
         // logger server
         
this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, 
loggerRequestProcessor);
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
index 7c94144af8..2885c5a6be 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
@@ -17,6 +17,8 @@
 
 package org.apache.dolphinscheduler.server.master.dispatch;
 
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import 
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
 import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
@@ -29,6 +31,8 @@ import org.apache.commons.lang.StringUtils;
 
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.InitializingBean;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
@@ -39,6 +43,8 @@ import org.springframework.stereotype.Service;
 @Service
 public class ExecutorDispatcher implements InitializingBean {
 
+    private static final Logger logger = 
LoggerFactory.getLogger(ExecutorDispatcher.class);
+
     /**
      * netty executor manager
      */
@@ -71,30 +77,23 @@ public class ExecutorDispatcher implements InitializingBean 
{
      * @throws ExecuteException if error throws ExecuteException
      */
     public Boolean dispatch(final ExecutionContext context) throws 
ExecuteException {
-        /**
-         * get executor manager
-         */
+        // get executor manager
         ExecutorManager<Boolean> executorManager = 
this.executorManagers.get(context.getExecutorType());
         if (executorManager == null) {
             throw new ExecuteException("no ExecutorManager for type : " + 
context.getExecutorType());
         }
 
-        /**
-         * host select
-         */
-
+        // host select
         Host host = hostManager.select(context);
         if (StringUtils.isEmpty(host.getAddress())) {
-            throw new ExecuteException(String.format("fail to execute : %s due 
to no suitable worker, "
-                            + "current task needs worker group %s to execute",
-                    context.getCommand(),context.getWorkerGroup()));
+            logger.warn("fail to execute : {} due to no suitable worker, 
current task needs worker group {} to execute",
+                context.getCommand(), context.getWorkerGroup());
+            return false;
         }
         context.setHost(host);
         executorManager.beforeExecute(context);
         try {
-            /**
-             * task execute
-             */
+            // task execute
             return executorManager.execute(context);
         } finally {
             executorManager.afterExecute(context);
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
index 0ba24e287d..82f3416569 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.server.master.dispatch.executor;
 
+import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.remote.NettyRemotingClient;
 import org.apache.dolphinscheduler.remote.command.Command;
@@ -29,6 +30,7 @@ import 
org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteExce
 import 
org.apache.dolphinscheduler.server.master.processor.TaskExecuteResponseProcessor;
 import 
org.apache.dolphinscheduler.server.master.processor.TaskExecuteRunningProcessor;
 import 
org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
+import org.apache.dolphinscheduler.server.master.processor.TaskRecallProcessor;
 import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
 
 import org.apache.commons.collections.CollectionUtils;
@@ -68,6 +70,9 @@ public class NettyExecutorManager extends 
AbstractExecutorManager<Boolean> {
     @Autowired
     private TaskExecuteResponseProcessor taskExecuteResponseProcessor;
 
+    @Autowired
+    private TaskRecallProcessor taskRecallProcessor;
+
     /**
      * netty remote client
      */
@@ -86,6 +91,7 @@ public class NettyExecutorManager extends 
AbstractExecutorManager<Boolean> {
         
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, 
taskExecuteResponseProcessor);
         
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, 
taskExecuteRunningProcessor);
         
this.nettyRemotingClient.registerProcessor(CommandType.TASK_KILL_RESPONSE, 
taskKillResponseProcessor);
+        this.nettyRemotingClient.registerProcessor(CommandType.TASK_RECALL, 
taskRecallProcessor);
     }
 
     /**
@@ -97,25 +103,13 @@ public class NettyExecutorManager extends 
AbstractExecutorManager<Boolean> {
      */
     @Override
     public Boolean execute(ExecutionContext context) throws ExecuteException {
-
-        /**
-         *  all nodes
-         */
+        // all nodes
         Set<String> allNodes = getAllNodes(context);
-
-        /**
-         * fail nodes
-         */
+        // fail nodes
         Set<String> failNodeSet = new HashSet<>();
-
-        /**
-         *  build command accord executeContext
-         */
+        // build command accord executeContext
         Command command = context.getCommand();
-
-        /**
-         * execute task host
-         */
+        // execute task host
         Host host = context.getHost();
         boolean success = false;
         while (!success) {
@@ -158,9 +152,7 @@ public class NettyExecutorManager extends 
AbstractExecutorManager<Boolean> {
      * @throws ExecuteException if error throws ExecuteException
      */
     public void doExecute(final Host host, final Command command) throws 
ExecuteException {
-        /**
-         * retry count,default retry 3
-         */
+        // retry count,default retry 3
         int retryCount = 3;
         boolean success = false;
         do {
@@ -170,7 +162,7 @@ public class NettyExecutorManager extends 
AbstractExecutorManager<Boolean> {
             } catch (Exception ex) {
                 logger.error(String.format("send command : %s to %s error", 
command, host), ex);
                 retryCount--;
-                ThreadUtils.sleep(100);
+                ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
             }
         } while (retryCount >= 0 && !success);
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
index 646d770b01..2b8fe7b93a 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
@@ -178,7 +178,7 @@ public class LowerWeightHostManager extends 
CommonHostManager {
             return Optional.of(
                     new HostWeight(HostWorker.of(addr, 
heartBeat.getWorkerHostWeight(), workerGroup),
                             heartBeat.getCpuUsage(), 
heartBeat.getMemoryUsage(), heartBeat.getLoadAverage(),
-                            heartBeat.getStartupTime()));
+                            heartBeat.getWorkerWaitingTaskCount(), 
heartBeat.getStartupTime()));
         }
     }
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java
index 9d7855f054..a441582235 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java
@@ -37,10 +37,13 @@ public class HostWeight {
 
     private double currentWeight;
 
-    public HostWeight(HostWorker hostWorker, double cpu, double memory, double 
loadAverage, long startTime) {
+    private final int waitingTaskCount;
+
+    public HostWeight(HostWorker hostWorker, double cpu, double memory, double 
loadAverage, int waitingTaskCount, long startTime) {
         this.hostWorker = hostWorker;
         this.weight = calculateWeight(cpu, memory, loadAverage, startTime);
         this.currentWeight = this.weight;
+        this.waitingTaskCount = waitingTaskCount;
     }
 
     public double getWeight() {
@@ -63,12 +66,17 @@ public class HostWeight {
         return (Host)hostWorker;
     }
 
+    public int getWaitingTaskCount() {
+        return waitingTaskCount;
+    }
+
     @Override
     public String toString() {
         return "HostWeight{"
             + "hostWorker=" + hostWorker
             + ", weight=" + weight
             + ", currentWeight=" + currentWeight
+            + ", waitingTaskCount=" + waitingTaskCount
             + '}';
     }
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java
index ea55785182..f099d81473 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java
@@ -18,6 +18,11 @@
 package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
 
 import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Lists;
 
 /**
  * lower weight round robin
@@ -35,7 +40,8 @@ public class LowerWeightRoundRobin extends 
AbstractSelector<HostWeight> {
         double totalWeight = 0;
         double lowWeight = 0;
         HostWeight lowerNode = null;
-        for (HostWeight hostWeight : sources) {
+        List<HostWeight> weights = canAssignTaskHost(sources);
+        for (HostWeight hostWeight : weights) {
             totalWeight += hostWeight.getWeight();
             hostWeight.setCurrentWeight(hostWeight.getCurrentWeight() + 
hostWeight.getWeight());
             if (lowerNode == null || lowWeight > 
hostWeight.getCurrentWeight()) {
@@ -45,7 +51,21 @@ public class LowerWeightRoundRobin extends 
AbstractSelector<HostWeight> {
         }
         lowerNode.setCurrentWeight(lowerNode.getCurrentWeight() + totalWeight);
         return lowerNode;
+    }
 
+    private List<HostWeight> canAssignTaskHost(Collection<HostWeight> sources) 
{
+        List<HostWeight> zeroWaitingTask = sources.stream().filter(h -> 
h.getWaitingTaskCount() == 0).collect(Collectors.toList());
+        if (!zeroWaitingTask.isEmpty()) {
+            return zeroWaitingTask;
+        }
+        HostWeight hostWeight = 
sources.stream().min(Comparator.comparing(HostWeight::getWaitingTaskCount)).get();
+        List<HostWeight> waitingTask = Lists.newArrayList(hostWeight);
+        List<HostWeight> equalWaitingTask = sources.stream().filter(h -> 
!h.getHost().equals(hostWeight.getHost()) && h.getWaitingTaskCount() == 
hostWeight.getWaitingTaskCount())
+            .collect(Collectors.toList());
+        if (!equalWaitingTask.isEmpty()) {
+            waitingTask.addAll(equalWaitingTask);
+        }
+        return waitingTask;
     }
 }
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskRecallProcessor.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskRecallProcessor.java
new file mode 100644
index 0000000000..2d94d026fa
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskRecallProcessor.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.processor;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.TaskRecallCommand;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
+import 
org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import com.google.common.base.Preconditions;
+
+import io.netty.channel.Channel;
+
+/**
+ * task recall processor
+ */
+@Component
+public class TaskRecallProcessor implements NettyRequestProcessor {
+
+    private final Logger logger = 
LoggerFactory.getLogger(TaskRecallProcessor.class);
+
+    @Autowired
+    private TaskEventService taskEventService;
+
+    /**
+     * task ack process
+     *
+     * @param channel channel channel
+     * @param command command TaskExecuteAckCommand
+     */
+    @Override
+    public void process(Channel channel, Command command) {
+        Preconditions.checkArgument(CommandType.TASK_RECALL == 
command.getType(), String.format("invalid command type : %s", 
command.getType()));
+        TaskRecallCommand recallCommand = 
JSONUtils.parseObject(command.getBody(), TaskRecallCommand.class);
+        logger.info("taskRecallCommand : {}", recallCommand);
+        TaskEvent taskEvent = TaskEvent.newRecall(recallCommand, channel);
+        taskEventService.addEvent(taskEvent);
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
index 865eee53a5..8227793c9f 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
@@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.enums.Event;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
+import org.apache.dolphinscheduler.remote.command.TaskRecallCommand;
 import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
 
 import java.util.Date;
@@ -135,6 +136,15 @@ public class TaskEvent {
         return event;
     }
 
+    public static TaskEvent newRecall(TaskRecallCommand command, Channel 
channel) {
+        TaskEvent event = new TaskEvent();
+        event.setTaskInstanceId(command.getTaskInstanceId());
+        event.setProcessInstanceId(command.getProcessInstanceId());
+        event.setChannel(channel);
+        event.setEvent(Event.WORKER_REJECT);
+        return event;
+    }
+
     public String getVarPool() {
         return varPool;
     }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThread.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThread.java
index 47b190e246..b2778ae542 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThread.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThread.java
@@ -24,9 +24,12 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import 
org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand;
+import org.apache.dolphinscheduler.remote.command.TaskRecallAckCommand;
 import 
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
 import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
 import 
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
+import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor;
+import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
 import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
@@ -133,6 +136,9 @@ public class TaskExecuteThread {
             case RESULT:
                 handleResultEvent(taskEvent, taskInstance);
                 break;
+            case WORKER_REJECT:
+                handleWorkerRejectEvent(taskEvent.getChannel(), taskInstance, 
workflowExecuteThread);
+                break;
             default:
                 throw new IllegalArgumentException("invalid event type : " + 
event);
         }
@@ -185,7 +191,7 @@ public class TaskExecuteThread {
             TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new 
TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), 
taskEvent.getTaskInstanceId());
             
channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
         } catch (Exception e) {
-            logger.error("worker ack master error", e);
+            logger.error("handle worker ack master error", e);
             TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new 
TaskExecuteRunningAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
             
channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
         }
@@ -216,9 +222,28 @@ public class TaskExecuteThread {
             TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = new 
TaskExecuteResponseAckCommand(ExecutionStatus.SUCCESS.getCode(), 
taskEvent.getTaskInstanceId());
             
channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
         } catch (Exception e) {
-            logger.error("worker response master error", e);
+            logger.error("handle worker response master error", e);
             TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = new 
TaskExecuteResponseAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
             
channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
         }
     }
+
+    /**
+     * handle result event
+     */
+    private void handleWorkerRejectEvent(Channel channel, TaskInstance 
taskInstance, WorkflowExecuteThread executeThread) {
+        try {
+            if (executeThread != null) {
+                executeThread.resubmit(taskInstance.getTaskCode());
+            }
+            if (channel != null) {
+                TaskRecallAckCommand taskRecallAckCommand = new 
TaskRecallAckCommand(ExecutionStatus.SUCCESS.getCode(), taskInstance.getId());
+                channel.writeAndFlush(taskRecallAckCommand.convert2Command());
+            }
+        } catch (Exception e) {
+            logger.error("handle worker reject error", e);
+            TaskRecallAckCommand taskRecallAckCommand = new 
TaskRecallAckCommand(ExecutionStatus.FAILURE.getCode(), taskInstance.getId());
+            channel.writeAndFlush(taskRecallAckCommand.convert2Command());
+        }
+    }
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index a40b7e5b27..ed283b1a89 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.server.master.runner;
 
+import net.bytebuddy.implementation.bytecode.Throw;
 import static 
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
 import static 
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
 import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
@@ -1974,6 +1975,15 @@ public class WorkflowExecuteThread {
         }
     }
 
+    public void resubmit(long taskCode) throws Exception {
+        ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskCode);
+        if (taskProcessor != null) {
+            taskProcessor.action(TaskAction.RESUBMIT);
+            logger.debug("RESUBMIT: task code:{}", taskCode);
+        } else {
+            throw new Exception("resubmit error, taskProcessor is null, task 
code: " + taskCode);
+        }
+    }
     private void setGlobalParamIfCommanded(ProcessDefinition 
processDefinition, Map<String, String> cmdParam) {
         // get start params from command param
         Map<String, String> startParamMap = new HashMap<>();
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
index 2ca0d6cb19..329f72d1b1 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
@@ -159,6 +159,11 @@ public abstract class BaseTaskProcessor implements 
ITaskProcessor {
      */
     protected abstract boolean submitTask();
 
+    /*
+     * resubmit task
+     */
+    protected abstract boolean resubmitTask();
+
     /**
      * run task
      */
@@ -188,6 +193,8 @@ public abstract class BaseTaskProcessor implements 
ITaskProcessor {
                 return run();
             case DISPATCH:
                 return dispatch();
+            case RESUBMIT:
+                return resubmit();
             default:
                 logger.error("unknown task action: {}", taskAction);
         }
@@ -196,6 +203,10 @@ public abstract class BaseTaskProcessor implements 
ITaskProcessor {
         return false;
     }
 
+    protected boolean resubmit() {
+        return resubmitTask();
+    }
+
     protected boolean submit() {
         return submitTask();
     }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
index 1fa6b28625..be7dbe103f 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
@@ -126,6 +126,11 @@ public class BlockingTaskProcessor extends 
BaseTaskProcessor {
         return true;
     }
 
+    @Override
+    protected boolean resubmitTask() {
+        return true;
+    }
+
     @Override
     protected boolean dispatchTask() {
         return false;
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
index ffeb89a0d2..b1bf05dcd9 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
@@ -71,6 +71,15 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
         return true;
     }
 
+    @Override
+    protected boolean resubmitTask() {
+        if (this.taskInstance == null) {
+            return false;
+        }
+        setTaskExecutionLogger();
+        return dispatchTask();
+    }
+
     @Override
     public boolean runTask() {
         return true;
@@ -110,7 +119,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
                 logger.info("submit task, but the status of the task {} is 
already running or delayed.", taskInstance.getName());
                 return true;
             }
-            logger.info("task ready to submit: {}", taskInstance);
+            logger.debug("task ready to submit: {}", taskInstance.getName());
 
             TaskPriority taskPriority = new 
TaskPriority(processInstance.getProcessInstancePriority().getCode(),
                     processInstance.getId(), 
taskInstance.getProcessInstancePriority().getCode(),
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
index 4749e20f0f..9f441df52d 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
@@ -83,6 +83,11 @@ public class ConditionTaskProcessor extends 
BaseTaskProcessor {
         return true;
     }
 
+    @Override
+    protected boolean resubmitTask() {
+        return true;
+    }
+
     @Override
     protected boolean dispatchTask() {
         return true;
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
index d17f4b6419..c2da0b1b71 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
@@ -99,6 +99,11 @@ public class DependentTaskProcessor extends 
BaseTaskProcessor {
         return true;
     }
 
+    @Override
+    protected boolean resubmitTask() {
+        return true;
+    }
+
     @Override
     protected boolean dispatchTask() {
         return true;
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
index 747c3dd77a..da38a11a9e 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
@@ -92,6 +92,11 @@ public class SubTaskProcessor extends BaseTaskProcessor {
         return true;
     }
 
+    @Override
+    protected boolean resubmitTask() {
+        return true;
+    }
+
     @Override
     protected boolean dispatchTask() {
         return true;
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
index 444d41622c..047df02336 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
@@ -90,6 +90,11 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
         return true;
     }
 
+    @Override
+    protected boolean resubmitTask() {
+        return true;
+    }
+
     @Override
     protected boolean dispatchTask() {
         return true;
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java
index 9044945258..d292cb1d34 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java
@@ -26,5 +26,6 @@ public enum TaskAction {
     TIMEOUT,
     SUBMIT,
     RUN,
-    DISPATCH
+    DISPATCH,
+    RESUBMIT
 }
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java
index f822f04d97..fcb8fbc541 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java
@@ -28,33 +28,36 @@ public class LowerWeightRoundRobinTest {
     @Test
     public void testSelect() {
         Collection<HostWeight> sources = new ArrayList<>();
-        sources.add(new HostWeight(HostWorker.of("192.158.2.1:11", 100, 
"default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 8 * 1000));
-        sources.add(new HostWeight(HostWorker.of("192.158.2.2:22", 100, 
"default"), 0.06, 0.56, 3.24, System.currentTimeMillis() - 60 * 5 * 1000));
-        sources.add(new HostWeight(HostWorker.of("192.158.2.3:33", 100, 
"default"), 0.06, 0.80, 3.15, System.currentTimeMillis() - 60 * 2 * 1000));
+        sources.add(new HostWeight(HostWorker.of("192.158.2.1:11", 100, 
"default"), 0.06, 0.44, 3.84, 1, System.currentTimeMillis() - 60 * 8 * 1000));
+        sources.add(new HostWeight(HostWorker.of("192.158.2.2:22", 100, 
"default"), 0.06, 0.56, 3.24, 2, System.currentTimeMillis() - 60 * 5 * 1000));
+        sources.add(new HostWeight(HostWorker.of("192.158.2.3:33", 100, 
"default"), 0.06, 0.80, 3.15, 1, System.currentTimeMillis() - 60 * 2 * 1000));
 
         LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin();
         HostWeight result;
         result = roundRobin.select(sources);
         Assert.assertEquals("192.158.2.1", result.getHost().getIp());
         result = roundRobin.select(sources);
-        Assert.assertEquals("192.158.2.2", result.getHost().getIp());
+        Assert.assertEquals("192.158.2.1", result.getHost().getIp());
+        Assert.assertEquals("192.158.2.1", result.getHost().getIp());
         result = roundRobin.select(sources);
         Assert.assertEquals("192.158.2.1", result.getHost().getIp());
         result = roundRobin.select(sources);
-        Assert.assertEquals("192.158.2.2", result.getHost().getIp());
+        Assert.assertEquals("192.158.2.3", result.getHost().getIp());
+        Assert.assertEquals("192.158.2.3", result.getHost().getIp());
         result = roundRobin.select(sources);
         Assert.assertEquals("192.158.2.1", result.getHost().getIp());
         result = roundRobin.select(sources);
-        Assert.assertEquals("192.158.2.2", result.getHost().getIp());
+        Assert.assertEquals("192.158.2.1", result.getHost().getIp());
+        Assert.assertEquals("192.158.2.1", result.getHost().getIp());
     }
 
     @Test
     public void testWarmUpSelect() {
         Collection<HostWeight> sources = new ArrayList<>();
-        sources.add(new HostWeight(HostWorker.of("192.158.2.1:11", 100, 
"default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 8 * 1000));
-        sources.add(new HostWeight(HostWorker.of("192.158.2.2:22", 100, 
"default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 5 * 1000));
-        sources.add(new HostWeight(HostWorker.of("192.158.2.3:33", 100, 
"default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 3 * 1000));
-        sources.add(new HostWeight(HostWorker.of("192.158.2.4:33", 100, 
"default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 11 * 1000));
+        sources.add(new HostWeight(HostWorker.of("192.158.2.1:11", 100, 
"default"), 0.06, 0.44, 3.84, 0, System.currentTimeMillis() - 60 * 8 * 1000));
+        sources.add(new HostWeight(HostWorker.of("192.158.2.2:22", 100, 
"default"), 0.06, 0.44, 3.84, 0, System.currentTimeMillis() - 60 * 5 * 1000));
+        sources.add(new HostWeight(HostWorker.of("192.158.2.3:33", 100, 
"default"), 0.06, 0.44, 3.84, 0, System.currentTimeMillis() - 60 * 3 * 1000));
+        sources.add(new HostWeight(HostWorker.of("192.158.2.4:33", 100, 
"default"), 0.06, 0.44, 3.84, 0, System.currentTimeMillis() - 60 * 11 * 1000));
 
         LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin();
         HostWeight result;
diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
index 5718872bf5..e540efddfc 100644
--- 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
@@ -98,6 +98,16 @@ public enum CommandType {
      */
     TASK_KILL_RESPONSE,
 
+    /**
+     * task recall
+     */
+    TASK_RECALL,
+
+    /**
+     * task recall ack
+     */
+    TASK_RECALL_ACK,
+
     /**
      * HEART_BEAT
      */
diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallAckCommand.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallAckCommand.java
new file mode 100644
index 0000000000..2221a6c09d
--- /dev/null
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallAckCommand.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+
+import java.io.Serializable;
+
+/**
+ * task recall ack command
+ */
+public class TaskRecallAckCommand implements Serializable {
+
+    private int taskInstanceId;
+    private int status;
+
+    public TaskRecallAckCommand() {
+        super();
+    }
+
+    public TaskRecallAckCommand(int status, int taskInstanceId) {
+        this.status = status;
+        this.taskInstanceId = taskInstanceId;
+    }
+
+    public int getTaskInstanceId() {
+        return taskInstanceId;
+    }
+
+    public void setTaskInstanceId(int taskInstanceId) {
+        this.taskInstanceId = taskInstanceId;
+    }
+
+    public int getStatus() {
+        return status;
+    }
+
+    public void setStatus(int status) {
+        this.status = status;
+    }
+
+    /**
+     * package response command
+     *
+     * @return command
+     */
+    public Command convert2Command() {
+        Command command = new Command();
+        command.setType(CommandType.TASK_RECALL_ACK);
+        byte[] body = JSONUtils.toJsonByteArray(this);
+        command.setBody(body);
+        return command;
+    }
+
+    @Override
+    public String toString() {
+        return "TaskRecallAckCommand{" + "taskInstanceId=" + taskInstanceId + 
", status=" + status + '}';
+    }
+}
diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallCommand.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallCommand.java
new file mode 100644
index 0000000000..3d33d8c363
--- /dev/null
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallCommand.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command;
+
+import org.apache.dolphinscheduler.common.enums.Event;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+
+import java.io.Serializable;
+
+/**
+ * kill task recall command
+ */
+public class TaskRecallCommand implements Serializable {
+
+    /**
+     * taskInstanceId
+     */
+    private int taskInstanceId;
+
+    /**
+     * host
+     */
+    private String host;
+
+    /**
+     * process instance id
+     */
+    private int processInstanceId;
+
+    public int getTaskInstanceId() {
+        return taskInstanceId;
+    }
+
+    public void setTaskInstanceId(int taskInstanceId) {
+        this.taskInstanceId = taskInstanceId;
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public void setHost(String host) {
+        this.host = host;
+    }
+
+    public int getProcessInstanceId() {
+        return processInstanceId;
+    }
+
+    public void setProcessInstanceId(int processInstanceId) {
+        this.processInstanceId = processInstanceId;
+    }
+
+    /**
+     * package request command
+     *
+     * @return command
+     */
+    public Command convert2Command() {
+        Command command = new Command();
+        command.setType(CommandType.TASK_RECALL);
+        byte[] body = JSONUtils.toJsonByteArray(this);
+        command.setBody(body);
+        return command;
+    }
+
+    @Override
+    public String toString() {
+        return "TaskRecallCommand{"
+            + "taskInstanceId=" + taskInstanceId
+            + ", host='" + host + '\''
+            + ", processInstanceId=" + processInstanceId
+            + '}';
+    }
+}
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 8ac528dee4..009ab3996c 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -33,6 +33,7 @@ import 
org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
 import 
org.apache.dolphinscheduler.server.worker.processor.TaskExecuteResponseAckProcessor;
 import 
org.apache.dolphinscheduler.server.worker.processor.TaskExecuteRunningAckProcessor;
 import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
+import 
org.apache.dolphinscheduler.server.worker.processor.TaskRecallAckProcessor;
 import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
 import 
org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThread;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
@@ -110,6 +111,9 @@ public class WorkerServer implements IStoppable {
     @Autowired
     private TaskKillProcessor taskKillProcessor;
 
+    @Autowired
+    private TaskRecallAckProcessor taskRecallAckProcessor;
+
     @Autowired
     private TaskExecuteRunningAckProcessor taskExecuteRunningAckProcessor;
 
@@ -146,7 +150,7 @@ public class WorkerServer implements IStoppable {
         
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK,
 taskExecuteRunningAckProcessor);
         
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK,
 taskExecuteResponseAckProcessor);
         
this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST,
 hostUpdateProcessor);
-
+        
this.nettyRemotingServer.registerProcessor(CommandType.TASK_RECALL_ACK, 
taskRecallAckProcessor);
         // logger server
         
this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, 
loggerRequestProcessor);
         
this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, 
loggerRequestProcessor);
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java
index f28990b152..fb3c84da68 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java
@@ -37,8 +37,9 @@ public class ResponseCache {
         return instance;
     }
 
-    private Map<Integer, Command> runningCache = new ConcurrentHashMap<>();
-    private Map<Integer, Command> responseCache = new ConcurrentHashMap<>();
+    private final Map<Integer, Command> runningCache = new 
ConcurrentHashMap<>();
+    private final Map<Integer, Command> responseCache = new 
ConcurrentHashMap<>();
+    private final Map<Integer,Command> recallCache = new ConcurrentHashMap<>();
 
     /**
      * cache response
@@ -55,11 +56,27 @@ public class ResponseCache {
             case RESULT:
                 responseCache.put(taskInstanceId, command);
                 break;
+            case WORKER_REJECT:
+                recallCache.put(taskInstanceId, command);
+                break;
             default:
                 throw new IllegalArgumentException("invalid event type : " + 
event);
         }
     }
 
+    /**
+     * recall response cache
+     *
+     * @param taskInstanceId taskInstanceId
+     */
+    public void removeRecallCache(Integer taskInstanceId) {
+        recallCache.remove(taskInstanceId);
+    }
+
+    public Map<Integer, Command> getRecallCache() {
+        return recallCache;
+    }
+
     /**
      * remove running cache
      *
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
index 3641de8453..1c3d21501a 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
@@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
 import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
+import org.apache.dolphinscheduler.remote.command.TaskRecallCommand;
 import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
 import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
 import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
@@ -222,6 +223,14 @@ public class TaskCallbackService {
         return taskKillResponseCommand;
     }
 
+    private TaskRecallCommand buildRecallCommand(TaskExecutionContext 
taskExecutionContext) {
+        TaskRecallCommand taskRecallCommand = new TaskRecallCommand();
+        
taskRecallCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+        
taskRecallCommand.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
+        taskRecallCommand.setHost(taskExecutionContext.getHost());
+        return taskRecallCommand;
+    }
+
     /**
      * send task execute running command
      * todo unified callback command
@@ -257,4 +266,13 @@ public class TaskCallbackService {
         TaskKillResponseCommand taskKillResponseCommand = 
buildKillTaskResponseCommand(taskExecutionContext);
         send(taskExecutionContext.getTaskInstanceId(), 
taskKillResponseCommand.convert2Command());
     }
+
+    /**
+     * send task execute response command
+     */
+    public void sendRecallCommand(TaskExecutionContext taskExecutionContext) {
+        TaskRecallCommand taskRecallCommand = 
buildRecallCommand(taskExecutionContext);
+        ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), 
taskRecallCommand.convert2Command(), Event.WORKER_REJECT);
+        send(taskExecutionContext.getTaskInstanceId(), 
taskRecallCommand.convert2Command());
+    }
 }
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index cd3d940f00..c7df9e7876 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -159,8 +159,7 @@ public class TaskExecuteProcessor implements 
NettyRequestProcessor {
             }
         }
 
-        
taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
-                new NettyRemoteChannel(channel, command.getOpaque()));
+        
taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(), 
new NettyRemoteChannel(channel, command.getOpaque()));
 
         // delay task process
         long remainTime = 
DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), 
taskExecutionContext.getDelayTime() * 60L);
@@ -174,10 +173,9 @@ public class TaskExecuteProcessor implements 
NettyRequestProcessor {
         // submit task to manager
         boolean offer = workerManager.offer(new 
TaskExecuteThread(taskExecutionContext, taskCallbackService, 
alertClientService, taskPluginManager));
         if (!offer) {
-            logger.error("submit task to manager error, queue is full, queue 
size is {}, taskInstanceId: {}",
-                    workerManager.getDelayQueueSize(), 
taskExecutionContext.getTaskInstanceId());
-            
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
-            
taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
+            logger.warn("submit task to wait queue error, queue is full, queue 
size is {}, taskInstanceId: {}",
+                workerManager.getWaitSubmitQueueSize(), 
taskExecutionContext.getTaskInstanceId());
+            taskCallbackService.sendRecallCommand(taskExecutionContext);
         }
     }
 
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRecallAckProcessor.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRecallAckProcessor.java
new file mode 100644
index 0000000000..769024e0aa
--- /dev/null
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRecallAckProcessor.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.processor;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.TaskRecallAckCommand;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import com.google.common.base.Preconditions;
+
+import io.netty.channel.Channel;
+
+@Component
+public class TaskRecallAckProcessor implements NettyRequestProcessor {
+
+    private final Logger logger = 
LoggerFactory.getLogger(TaskRecallAckProcessor.class);
+
+    @Override
+    public void process(Channel channel, Command command) {
+        Preconditions.checkArgument(CommandType.TASK_RECALL_ACK == 
command.getType(),
+            String.format("invalid command type : %s", command.getType()));
+
+        TaskRecallAckCommand taskRecallAckCommand = 
JSONUtils.parseObject(command.getBody(), TaskRecallAckCommand.class);
+        if (taskRecallAckCommand == null) {
+            return;
+        }
+
+        if (taskRecallAckCommand.getStatus() ==  
ExecutionStatus.SUCCESS.getCode()) {
+            
ResponseCache.get().removeRecallCache(taskRecallAckCommand.getTaskInstanceId());
+            logger.debug("removeRecallCache: task instance id:{}", 
taskRecallAckCommand.getTaskInstanceId());
+            
TaskCallbackService.remove(taskRecallAckCommand.getTaskInstanceId());
+            logger.debug("remove REMOTE_CHANNELS, task instance id:{}", 
taskRecallAckCommand.getTaskInstanceId());
+        }
+    }
+}
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
index fc737ca1de..6e7d879ded 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
@@ -82,6 +82,14 @@ public class RetryReportTaskStatusThread implements Runnable 
{
                         taskCallbackService.send(taskInstanceId, 
responseCommand);
                     }
                 }
+                if (!instance.getRecallCache().isEmpty()) {
+                    Map<Integer, Command> recallCache = 
instance.getRecallCache();
+                    for (Map.Entry<Integer, Command> entry : 
recallCache.entrySet()) {
+                        Integer taskInstanceId = entry.getKey();
+                        Command responseCommand = entry.getValue();
+                        taskCallbackService.send(taskInstanceId, 
responseCommand);
+                    }
+                }
             } catch (Exception e) {
                 logger.warn("retry report task status error", e);
             }
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
index 60f752401d..d9531b58bf 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.server.worker.runner;
 
+import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.storage.StorageOperate;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
@@ -26,10 +27,9 @@ import 
org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
 
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.DelayQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,7 +47,7 @@ public class WorkerManagerThread implements Runnable {
     /**
      * task queue
      */
-    private final DelayQueue<TaskExecuteThread> workerExecuteQueue = new 
DelayQueue<>();
+    private final BlockingQueue<TaskExecuteThread> waitSubmitQueue;
 
     @Autowired(required = false)
     private StorageOperate storageOperate;
@@ -63,12 +63,16 @@ public class WorkerManagerThread implements Runnable {
     @Autowired
     private TaskCallbackService taskCallbackService;
 
+    private volatile int workerExecThreads;
+
     /**
      * running task
      */
     private final ConcurrentHashMap<Integer, TaskExecuteThread> 
taskExecuteThreadMap = new ConcurrentHashMap<>();
 
     public WorkerManagerThread(WorkerConfig workerConfig) {
+        workerExecThreads = workerConfig.getExecThreads();
+        this.waitSubmitQueue = new DelayQueue<>();
         workerExecService = new WorkerExecService(
             ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", 
workerConfig.getExecThreads()),
             taskExecuteThreadMap
@@ -80,12 +84,12 @@ public class WorkerManagerThread implements Runnable {
     }
 
     /**
-     * get delay queue size
+     * get wait submit queue size
      *
      * @return queue size
      */
-    public int getDelayQueueSize() {
-        return workerExecuteQueue.size();
+    public int getWaitSubmitQueueSize() {
+        return waitSubmitQueue.size();
     }
 
     /**
@@ -102,9 +106,9 @@ public class WorkerManagerThread implements Runnable {
      * then send Response to Master, update the execution status of task 
instance
      */
     public void killTaskBeforeExecuteByInstanceId(Integer taskInstanceId) {
-        workerExecuteQueue.stream()
+        waitSubmitQueue.stream()
                           .filter(taskExecuteThread -> 
taskExecuteThread.getTaskExecutionContext().getTaskInstanceId() == 
taskInstanceId)
-                          .forEach(workerExecuteQueue::remove);
+                          .forEach(waitSubmitQueue::remove);
         sendTaskKillResponse(taskInstanceId);
     }
 
@@ -127,7 +131,14 @@ public class WorkerManagerThread implements Runnable {
      * @return submit result
      */
     public boolean offer(TaskExecuteThread taskExecuteThread) {
-        return workerExecuteQueue.offer(taskExecuteThread);
+        if (waitSubmitQueue.size() > workerExecThreads) {
+            // if waitSubmitQueue is full, it will wait 1s, then try add
+            ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
+            if (waitSubmitQueue.size() > workerExecThreads) {
+                return false;
+            }
+        }
+        return waitSubmitQueue.offer(taskExecuteThread);
     }
 
     public void start() {
@@ -142,9 +153,15 @@ public class WorkerManagerThread implements Runnable {
         TaskExecuteThread taskExecuteThread;
         while (Stopper.isRunning()) {
             try {
-                taskExecuteThread = workerExecuteQueue.take();
-                taskExecuteThread.setStorageOperate(storageOperate);
-                workerExecService.submit(taskExecuteThread);
+                if (this.getThreadPoolQueueSize() <= workerExecThreads) {
+                    taskExecuteThread = waitSubmitQueue.take();
+                    taskExecuteThread.setStorageOperate(storageOperate);
+                    workerExecService.submit(taskExecuteThread);
+                } else {
+                    logger.info("Exec queue is full, waiting submit queue {}, 
waiting exec queue size {}",
+                        this.getWaitSubmitQueueSize(), 
this.getThreadPoolQueueSize());
+                    ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
+                }
             } catch (Exception e) {
                 logger.error("An unexpected interrupt is happened, "
                     + "the exception will be ignored and this thread will 
continue to run", e);

Reply via email to