This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 49979c658e [Fix-8828] [Master] Assign tasks to worker optimization
(#9919)
49979c658e is described below
commit 49979c658e3b9b9468b1a8332ac6b81290b1fd3f
Author: JinYong Li <[email protected]>
AuthorDate: Tue May 31 11:49:54 2022 +0800
[Fix-8828] [Master] Assign tasks to worker optimization (#9919)
* fix 9584
* master recall
* fix ut
* update logger
* update delay queue
* fix ut
* remove sleep
Co-authored-by: 进勇 <[email protected]>
Co-authored-by: JinyLeeChina <[email protected]>
---
.../dolphinscheduler/common/enums/Event.java | 2 +-
.../dolphinscheduler/common/enums/StateEvent.java | 2 +-
.../dolphinscheduler/common/utils/HeartBeat.java | 15 ++--
.../server/master/MasterServer.java | 5 ++
.../server/master/dispatch/ExecutorDispatcher.java | 25 +++---
.../dispatch/executor/NettyExecutorManager.java | 32 +++-----
.../dispatch/host/LowerWeightHostManager.java | 2 +-
.../master/dispatch/host/assign/HostWeight.java | 10 ++-
.../host/assign/LowerWeightRoundRobin.java | 22 +++++-
.../master/processor/TaskRecallProcessor.java | 62 +++++++++++++++
.../server/master/processor/queue/TaskEvent.java | 10 +++
.../master/processor/queue/TaskExecuteThread.java | 29 ++++++-
.../master/runner/WorkflowExecuteThread.java | 10 +++
.../master/runner/task/BaseTaskProcessor.java | 11 +++
.../master/runner/task/BlockingTaskProcessor.java | 5 ++
.../master/runner/task/CommonTaskProcessor.java | 11 ++-
.../master/runner/task/ConditionTaskProcessor.java | 5 ++
.../master/runner/task/DependentTaskProcessor.java | 5 ++
.../master/runner/task/SubTaskProcessor.java | 5 ++
.../master/runner/task/SwitchTaskProcessor.java | 5 ++
.../server/master/runner/task/TaskAction.java | 3 +-
.../host/assign/LowerWeightRoundRobinTest.java | 23 +++---
.../remote/command/CommandType.java | 10 +++
.../remote/command/TaskRecallAckCommand.java | 74 ++++++++++++++++++
.../remote/command/TaskRecallCommand.java | 90 ++++++++++++++++++++++
.../server/worker/WorkerServer.java | 6 +-
.../server/worker/cache/ResponseCache.java | 21 ++++-
.../worker/processor/TaskCallbackService.java | 18 +++++
.../worker/processor/TaskExecuteProcessor.java | 10 +--
.../worker/processor/TaskRecallAckProcessor.java | 58 ++++++++++++++
.../worker/runner/RetryReportTaskStatusThread.java | 8 ++
.../server/worker/runner/WorkerManagerThread.java | 41 +++++++---
32 files changed, 554 insertions(+), 81 deletions(-)
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
index 78b036f037..26c3a3beab 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
@@ -22,5 +22,5 @@ public enum Event {
DELAY,
RUNNING,
RESULT,
- ;
+ WORKER_REJECT
}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java
index 54c7835f0b..405df09d3e 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java
@@ -97,7 +97,7 @@ public class StateEvent {
public String toString() {
return "State Event :"
+ "key: " + key
- + " type: " + type.toString()
+ + " type: " + type
+ " executeStatus: " + executionStatus
+ " task instance id: " + taskInstanceId
+ " process instance id: " + processInstanceId
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java
index d28cd3db08..b4df08f4b7 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java
@@ -25,7 +25,6 @@ import org.slf4j.LoggerFactory;
public class HeartBeat {
private static final Logger logger =
LoggerFactory.getLogger(HeartBeat.class);
- public static final String COMMA = ",";
private long startupTime;
private long reportTime;
@@ -205,18 +204,18 @@ public class HeartBeat {
this.updateServerState();
StringBuilder builder = new StringBuilder(100);
- builder.append(cpuUsage).append(COMMA);
- builder.append(memoryUsage).append(COMMA);
- builder.append(loadAverage).append(COMMA);
+ builder.append(cpuUsage).append(Constants.COMMA);
+ builder.append(memoryUsage).append(Constants.COMMA);
+ builder.append(loadAverage).append(Constants.COMMA);
builder.append(availablePhysicalMemorySize).append(Constants.COMMA);
builder.append(maxCpuloadAvg).append(Constants.COMMA);
builder.append(reservedMemory).append(Constants.COMMA);
builder.append(startupTime).append(Constants.COMMA);
builder.append(reportTime).append(Constants.COMMA);
- builder.append(serverStatus).append(COMMA);
- builder.append(processId).append(COMMA);
- builder.append(workerHostWeight).append(COMMA);
- builder.append(workerExecThreadCount).append(COMMA);
+ builder.append(serverStatus).append(Constants.COMMA);
+ builder.append(processId).append(Constants.COMMA);
+ builder.append(workerHostWeight).append(Constants.COMMA);
+ builder.append(workerExecThreadCount).append(Constants.COMMA);
builder.append(workerWaitingTaskCount);
return builder.toString();
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 6ab0d4e51a..ad4d02e2e9 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -31,6 +31,7 @@ import
org.apache.dolphinscheduler.server.master.processor.TaskEventProcessor;
import
org.apache.dolphinscheduler.server.master.processor.TaskExecuteResponseProcessor;
import
org.apache.dolphinscheduler.server.master.processor.TaskExecuteRunningProcessor;
import
org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
+import org.apache.dolphinscheduler.server.master.processor.TaskRecallProcessor;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
import org.apache.dolphinscheduler.server.master.runner.EventExecuteService;
import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread;
@@ -96,6 +97,9 @@ public class MasterServer implements IStoppable {
@Autowired
private TaskKillResponseProcessor taskKillResponseProcessor;
+ @Autowired
+ private TaskRecallProcessor taskRecallProcessor;
+
@Autowired
private EventExecuteService eventExecuteService;
@@ -126,6 +130,7 @@ public class MasterServer implements IStoppable {
this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST,
taskEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST,
taskEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE,
cacheProcessor);
+ this.nettyRemotingServer.registerProcessor(CommandType.TASK_RECALL,
taskRecallProcessor);
// logger server
this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST,
loggerRequestProcessor);
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
index 7c94144af8..2885c5a6be 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
@@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.server.master.dispatch;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.remote.utils.Host;
import
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
@@ -29,6 +31,8 @@ import org.apache.commons.lang.StringUtils;
import java.util.concurrent.ConcurrentHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -39,6 +43,8 @@ import org.springframework.stereotype.Service;
@Service
public class ExecutorDispatcher implements InitializingBean {
+ private static final Logger logger =
LoggerFactory.getLogger(ExecutorDispatcher.class);
+
/**
* netty executor manager
*/
@@ -71,30 +77,23 @@ public class ExecutorDispatcher implements InitializingBean
{
* @throws ExecuteException if error throws ExecuteException
*/
public Boolean dispatch(final ExecutionContext context) throws
ExecuteException {
- /**
- * get executor manager
- */
+ // get executor manager
ExecutorManager<Boolean> executorManager =
this.executorManagers.get(context.getExecutorType());
if (executorManager == null) {
throw new ExecuteException("no ExecutorManager for type : " +
context.getExecutorType());
}
- /**
- * host select
- */
-
+ // host select
Host host = hostManager.select(context);
if (StringUtils.isEmpty(host.getAddress())) {
- throw new ExecuteException(String.format("fail to execute : %s due
to no suitable worker, "
- + "current task needs worker group %s to execute",
- context.getCommand(),context.getWorkerGroup()));
+ logger.warn("fail to execute : {} due to no suitable worker,
current task needs worker group {} to execute",
+ context.getCommand(), context.getWorkerGroup());
+ return false;
}
context.setHost(host);
executorManager.beforeExecute(context);
try {
- /**
- * task execute
- */
+ // task execute
return executorManager.execute(context);
} finally {
executorManager.afterExecute(context);
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
index 0ba24e287d..82f3416569 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.dispatch.executor;
+import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
@@ -29,6 +30,7 @@ import
org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteExce
import
org.apache.dolphinscheduler.server.master.processor.TaskExecuteResponseProcessor;
import
org.apache.dolphinscheduler.server.master.processor.TaskExecuteRunningProcessor;
import
org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
+import org.apache.dolphinscheduler.server.master.processor.TaskRecallProcessor;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.apache.commons.collections.CollectionUtils;
@@ -68,6 +70,9 @@ public class NettyExecutorManager extends
AbstractExecutorManager<Boolean> {
@Autowired
private TaskExecuteResponseProcessor taskExecuteResponseProcessor;
+ @Autowired
+ private TaskRecallProcessor taskRecallProcessor;
+
/**
* netty remote client
*/
@@ -86,6 +91,7 @@ public class NettyExecutorManager extends
AbstractExecutorManager<Boolean> {
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE,
taskExecuteResponseProcessor);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING,
taskExecuteRunningProcessor);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_KILL_RESPONSE,
taskKillResponseProcessor);
+ this.nettyRemotingClient.registerProcessor(CommandType.TASK_RECALL,
taskRecallProcessor);
}
/**
@@ -97,25 +103,13 @@ public class NettyExecutorManager extends
AbstractExecutorManager<Boolean> {
*/
@Override
public Boolean execute(ExecutionContext context) throws ExecuteException {
-
- /**
- * all nodes
- */
+ // all nodes
Set<String> allNodes = getAllNodes(context);
-
- /**
- * fail nodes
- */
+ // fail nodes
Set<String> failNodeSet = new HashSet<>();
-
- /**
- * build command accord executeContext
- */
+ // build command accord executeContext
Command command = context.getCommand();
-
- /**
- * execute task host
- */
+ // execute task host
Host host = context.getHost();
boolean success = false;
while (!success) {
@@ -158,9 +152,7 @@ public class NettyExecutorManager extends
AbstractExecutorManager<Boolean> {
* @throws ExecuteException if error throws ExecuteException
*/
public void doExecute(final Host host, final Command command) throws
ExecuteException {
- /**
- * retry count,default retry 3
- */
+ // retry count,default retry 3
int retryCount = 3;
boolean success = false;
do {
@@ -170,7 +162,7 @@ public class NettyExecutorManager extends
AbstractExecutorManager<Boolean> {
} catch (Exception ex) {
logger.error(String.format("send command : %s to %s error",
command, host), ex);
retryCount--;
- ThreadUtils.sleep(100);
+ ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
}
} while (retryCount >= 0 && !success);
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
index 646d770b01..2b8fe7b93a 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
@@ -178,7 +178,7 @@ public class LowerWeightHostManager extends
CommonHostManager {
return Optional.of(
new HostWeight(HostWorker.of(addr,
heartBeat.getWorkerHostWeight(), workerGroup),
heartBeat.getCpuUsage(),
heartBeat.getMemoryUsage(), heartBeat.getLoadAverage(),
- heartBeat.getStartupTime()));
+ heartBeat.getWorkerWaitingTaskCount(),
heartBeat.getStartupTime()));
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java
index 9d7855f054..a441582235 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java
@@ -37,10 +37,13 @@ public class HostWeight {
private double currentWeight;
- public HostWeight(HostWorker hostWorker, double cpu, double memory, double
loadAverage, long startTime) {
+ private final int waitingTaskCount;
+
+ public HostWeight(HostWorker hostWorker, double cpu, double memory, double
loadAverage, int waitingTaskCount, long startTime) {
this.hostWorker = hostWorker;
this.weight = calculateWeight(cpu, memory, loadAverage, startTime);
this.currentWeight = this.weight;
+ this.waitingTaskCount = waitingTaskCount;
}
public double getWeight() {
@@ -63,12 +66,17 @@ public class HostWeight {
return (Host)hostWorker;
}
+ public int getWaitingTaskCount() {
+ return waitingTaskCount;
+ }
+
@Override
public String toString() {
return "HostWeight{"
+ "hostWorker=" + hostWorker
+ ", weight=" + weight
+ ", currentWeight=" + currentWeight
+ + ", waitingTaskCount=" + waitingTaskCount
+ '}';
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java
index ea55785182..f099d81473 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java
@@ -18,6 +18,11 @@
package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Lists;
/**
* lower weight round robin
@@ -35,7 +40,8 @@ public class LowerWeightRoundRobin extends
AbstractSelector<HostWeight> {
double totalWeight = 0;
double lowWeight = 0;
HostWeight lowerNode = null;
- for (HostWeight hostWeight : sources) {
+ List<HostWeight> weights = canAssignTaskHost(sources);
+ for (HostWeight hostWeight : weights) {
totalWeight += hostWeight.getWeight();
hostWeight.setCurrentWeight(hostWeight.getCurrentWeight() +
hostWeight.getWeight());
if (lowerNode == null || lowWeight >
hostWeight.getCurrentWeight()) {
@@ -45,7 +51,21 @@ public class LowerWeightRoundRobin extends
AbstractSelector<HostWeight> {
}
lowerNode.setCurrentWeight(lowerNode.getCurrentWeight() + totalWeight);
return lowerNode;
+ }
+ private List<HostWeight> canAssignTaskHost(Collection<HostWeight> sources)
{
+ List<HostWeight> zeroWaitingTask = sources.stream().filter(h ->
h.getWaitingTaskCount() == 0).collect(Collectors.toList());
+ if (!zeroWaitingTask.isEmpty()) {
+ return zeroWaitingTask;
+ }
+ HostWeight hostWeight =
sources.stream().min(Comparator.comparing(HostWeight::getWaitingTaskCount)).get();
+ List<HostWeight> waitingTask = Lists.newArrayList(hostWeight);
+ List<HostWeight> equalWaitingTask = sources.stream().filter(h ->
!h.getHost().equals(hostWeight.getHost()) && h.getWaitingTaskCount() ==
hostWeight.getWaitingTaskCount())
+ .collect(Collectors.toList());
+ if (!equalWaitingTask.isEmpty()) {
+ waitingTask.addAll(equalWaitingTask);
+ }
+ return waitingTask;
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskRecallProcessor.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskRecallProcessor.java
new file mode 100644
index 0000000000..2d94d026fa
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskRecallProcessor.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.processor;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.TaskRecallCommand;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
+import
org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import com.google.common.base.Preconditions;
+
+import io.netty.channel.Channel;
+
+/**
+ * task recall processor
+ */
+@Component
+public class TaskRecallProcessor implements NettyRequestProcessor {
+
+ private final Logger logger =
LoggerFactory.getLogger(TaskRecallProcessor.class);
+
+ @Autowired
+ private TaskEventService taskEventService;
+
+ /**
+ * task ack process
+ *
+ * @param channel channel channel
+ * @param command command TaskExecuteAckCommand
+ */
+ @Override
+ public void process(Channel channel, Command command) {
+ Preconditions.checkArgument(CommandType.TASK_RECALL ==
command.getType(), String.format("invalid command type : %s",
command.getType()));
+ TaskRecallCommand recallCommand =
JSONUtils.parseObject(command.getBody(), TaskRecallCommand.class);
+ logger.info("taskRecallCommand : {}", recallCommand);
+ TaskEvent taskEvent = TaskEvent.newRecall(recallCommand, channel);
+ taskEventService.addEvent(taskEvent);
+ }
+}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
index 865eee53a5..8227793c9f 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
@@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
+import org.apache.dolphinscheduler.remote.command.TaskRecallCommand;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import java.util.Date;
@@ -135,6 +136,15 @@ public class TaskEvent {
return event;
}
+ public static TaskEvent newRecall(TaskRecallCommand command, Channel
channel) {
+ TaskEvent event = new TaskEvent();
+ event.setTaskInstanceId(command.getTaskInstanceId());
+ event.setProcessInstanceId(command.getProcessInstanceId());
+ event.setChannel(channel);
+ event.setEvent(Event.WORKER_REJECT);
+ return event;
+ }
+
public String getVarPool() {
return varPool;
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThread.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThread.java
index 47b190e246..b2778ae542 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThread.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThread.java
@@ -24,9 +24,12 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import
org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand;
+import org.apache.dolphinscheduler.remote.command.TaskRecallAckCommand;
import
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
+import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor;
+import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator;
import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -133,6 +136,9 @@ public class TaskExecuteThread {
case RESULT:
handleResultEvent(taskEvent, taskInstance);
break;
+ case WORKER_REJECT:
+ handleWorkerRejectEvent(taskEvent.getChannel(), taskInstance,
workflowExecuteThread);
+ break;
default:
throw new IllegalArgumentException("invalid event type : " +
event);
}
@@ -185,7 +191,7 @@ public class TaskExecuteThread {
TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new
TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(),
taskEvent.getTaskInstanceId());
channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
} catch (Exception e) {
- logger.error("worker ack master error", e);
+ logger.error("handle worker ack master error", e);
TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new
TaskExecuteRunningAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
}
@@ -216,9 +222,28 @@ public class TaskExecuteThread {
TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = new
TaskExecuteResponseAckCommand(ExecutionStatus.SUCCESS.getCode(),
taskEvent.getTaskInstanceId());
channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
} catch (Exception e) {
- logger.error("worker response master error", e);
+ logger.error("handle worker response master error", e);
TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = new
TaskExecuteResponseAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
}
}
+
+ /**
+ * handle result event
+ */
+ private void handleWorkerRejectEvent(Channel channel, TaskInstance
taskInstance, WorkflowExecuteThread executeThread) {
+ try {
+ if (executeThread != null) {
+ executeThread.resubmit(taskInstance.getTaskCode());
+ }
+ if (channel != null) {
+ TaskRecallAckCommand taskRecallAckCommand = new
TaskRecallAckCommand(ExecutionStatus.SUCCESS.getCode(), taskInstance.getId());
+ channel.writeAndFlush(taskRecallAckCommand.convert2Command());
+ }
+ } catch (Exception e) {
+ logger.error("handle worker reject error", e);
+ TaskRecallAckCommand taskRecallAckCommand = new
TaskRecallAckCommand(ExecutionStatus.FAILURE.getCode(), taskInstance.getId());
+ channel.writeAndFlush(taskRecallAckCommand.convert2Command());
+ }
+ }
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index a40b7e5b27..ed283b1a89 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.runner;
+import net.bytebuddy.implementation.bytecode.Throw;
import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
@@ -1974,6 +1975,15 @@ public class WorkflowExecuteThread {
}
}
+ public void resubmit(long taskCode) throws Exception {
+ ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskCode);
+ if (taskProcessor != null) {
+ taskProcessor.action(TaskAction.RESUBMIT);
+ logger.debug("RESUBMIT: task code:{}", taskCode);
+ } else {
+ throw new Exception("resubmit error, taskProcessor is null, task
code: " + taskCode);
+ }
+ }
private void setGlobalParamIfCommanded(ProcessDefinition
processDefinition, Map<String, String> cmdParam) {
// get start params from command param
Map<String, String> startParamMap = new HashMap<>();
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
index 2ca0d6cb19..329f72d1b1 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
@@ -159,6 +159,11 @@ public abstract class BaseTaskProcessor implements
ITaskProcessor {
*/
protected abstract boolean submitTask();
+ /*
+ * resubmit task
+ */
+ protected abstract boolean resubmitTask();
+
/**
* run task
*/
@@ -188,6 +193,8 @@ public abstract class BaseTaskProcessor implements
ITaskProcessor {
return run();
case DISPATCH:
return dispatch();
+ case RESUBMIT:
+ return resubmit();
default:
logger.error("unknown task action: {}", taskAction);
}
@@ -196,6 +203,10 @@ public abstract class BaseTaskProcessor implements
ITaskProcessor {
return false;
}
+ protected boolean resubmit() {
+ return resubmitTask();
+ }
+
protected boolean submit() {
return submitTask();
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
index 1fa6b28625..be7dbe103f 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
@@ -126,6 +126,11 @@ public class BlockingTaskProcessor extends
BaseTaskProcessor {
return true;
}
+ @Override
+ protected boolean resubmitTask() {
+ return true;
+ }
+
@Override
protected boolean dispatchTask() {
return false;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
index ffeb89a0d2..b1bf05dcd9 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
@@ -71,6 +71,15 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
return true;
}
+ @Override
+ protected boolean resubmitTask() {
+ if (this.taskInstance == null) {
+ return false;
+ }
+ setTaskExecutionLogger();
+ return dispatchTask();
+ }
+
@Override
public boolean runTask() {
return true;
@@ -110,7 +119,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
logger.info("submit task, but the status of the task {} is
already running or delayed.", taskInstance.getName());
return true;
}
- logger.info("task ready to submit: {}", taskInstance);
+ logger.debug("task ready to submit: {}", taskInstance.getName());
TaskPriority taskPriority = new
TaskPriority(processInstance.getProcessInstancePriority().getCode(),
processInstance.getId(),
taskInstance.getProcessInstancePriority().getCode(),
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
index 4749e20f0f..9f441df52d 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
@@ -83,6 +83,11 @@ public class ConditionTaskProcessor extends
BaseTaskProcessor {
return true;
}
+ @Override
+ protected boolean resubmitTask() {
+ return true;
+ }
+
@Override
protected boolean dispatchTask() {
return true;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
index d17f4b6419..c2da0b1b71 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
@@ -99,6 +99,11 @@ public class DependentTaskProcessor extends
BaseTaskProcessor {
return true;
}
+ @Override
+ protected boolean resubmitTask() {
+ return true;
+ }
+
@Override
protected boolean dispatchTask() {
return true;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
index 747c3dd77a..da38a11a9e 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
@@ -92,6 +92,11 @@ public class SubTaskProcessor extends BaseTaskProcessor {
return true;
}
+ @Override
+ protected boolean resubmitTask() {
+ return true;
+ }
+
@Override
protected boolean dispatchTask() {
return true;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
index 444d41622c..047df02336 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
@@ -90,6 +90,11 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
return true;
}
+ @Override
+ protected boolean resubmitTask() {
+ return true;
+ }
+
@Override
protected boolean dispatchTask() {
return true;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java
index 9044945258..d292cb1d34 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java
@@ -26,5 +26,6 @@ public enum TaskAction {
TIMEOUT,
SUBMIT,
RUN,
- DISPATCH
+ DISPATCH,
+ RESUBMIT
}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java
index f822f04d97..fcb8fbc541 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java
@@ -28,33 +28,36 @@ public class LowerWeightRoundRobinTest {
@Test
public void testSelect() {
Collection<HostWeight> sources = new ArrayList<>();
- sources.add(new HostWeight(HostWorker.of("192.158.2.1:11", 100,
"default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 8 * 1000));
- sources.add(new HostWeight(HostWorker.of("192.158.2.2:22", 100,
"default"), 0.06, 0.56, 3.24, System.currentTimeMillis() - 60 * 5 * 1000));
- sources.add(new HostWeight(HostWorker.of("192.158.2.3:33", 100,
"default"), 0.06, 0.80, 3.15, System.currentTimeMillis() - 60 * 2 * 1000));
+ sources.add(new HostWeight(HostWorker.of("192.158.2.1:11", 100,
"default"), 0.06, 0.44, 3.84, 1, System.currentTimeMillis() - 60 * 8 * 1000));
+ sources.add(new HostWeight(HostWorker.of("192.158.2.2:22", 100,
"default"), 0.06, 0.56, 3.24, 2, System.currentTimeMillis() - 60 * 5 * 1000));
+ sources.add(new HostWeight(HostWorker.of("192.158.2.3:33", 100,
"default"), 0.06, 0.80, 3.15, 1, System.currentTimeMillis() - 60 * 2 * 1000));
LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin();
HostWeight result;
result = roundRobin.select(sources);
Assert.assertEquals("192.158.2.1", result.getHost().getIp());
result = roundRobin.select(sources);
- Assert.assertEquals("192.158.2.2", result.getHost().getIp());
+ Assert.assertEquals("192.158.2.1", result.getHost().getIp());
+ Assert.assertEquals("192.158.2.1", result.getHost().getIp());
result = roundRobin.select(sources);
Assert.assertEquals("192.158.2.1", result.getHost().getIp());
result = roundRobin.select(sources);
- Assert.assertEquals("192.158.2.2", result.getHost().getIp());
+ Assert.assertEquals("192.158.2.3", result.getHost().getIp());
+ Assert.assertEquals("192.158.2.3", result.getHost().getIp());
result = roundRobin.select(sources);
Assert.assertEquals("192.158.2.1", result.getHost().getIp());
result = roundRobin.select(sources);
- Assert.assertEquals("192.158.2.2", result.getHost().getIp());
+ Assert.assertEquals("192.158.2.1", result.getHost().getIp());
+ Assert.assertEquals("192.158.2.1", result.getHost().getIp());
}
@Test
public void testWarmUpSelect() {
Collection<HostWeight> sources = new ArrayList<>();
- sources.add(new HostWeight(HostWorker.of("192.158.2.1:11", 100,
"default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 8 * 1000));
- sources.add(new HostWeight(HostWorker.of("192.158.2.2:22", 100,
"default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 5 * 1000));
- sources.add(new HostWeight(HostWorker.of("192.158.2.3:33", 100,
"default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 3 * 1000));
- sources.add(new HostWeight(HostWorker.of("192.158.2.4:33", 100,
"default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 11 * 1000));
+ sources.add(new HostWeight(HostWorker.of("192.158.2.1:11", 100,
"default"), 0.06, 0.44, 3.84, 0, System.currentTimeMillis() - 60 * 8 * 1000));
+ sources.add(new HostWeight(HostWorker.of("192.158.2.2:22", 100,
"default"), 0.06, 0.44, 3.84, 0, System.currentTimeMillis() - 60 * 5 * 1000));
+ sources.add(new HostWeight(HostWorker.of("192.158.2.3:33", 100,
"default"), 0.06, 0.44, 3.84, 0, System.currentTimeMillis() - 60 * 3 * 1000));
+ sources.add(new HostWeight(HostWorker.of("192.158.2.4:33", 100,
"default"), 0.06, 0.44, 3.84, 0, System.currentTimeMillis() - 60 * 11 * 1000));
LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin();
HostWeight result;
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
index 5718872bf5..e540efddfc 100644
---
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
@@ -98,6 +98,16 @@ public enum CommandType {
*/
TASK_KILL_RESPONSE,
+ /**
+ * task recall
+ */
+ TASK_RECALL,
+
+ /**
+ * task recall ack
+ */
+ TASK_RECALL_ACK,
+
/**
* HEART_BEAT
*/
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallAckCommand.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallAckCommand.java
new file mode 100644
index 0000000000..2221a6c09d
--- /dev/null
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallAckCommand.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+
+import java.io.Serializable;
+
+/**
+ * task recall ack command
+ */
+public class TaskRecallAckCommand implements Serializable {
+
+ private int taskInstanceId;
+ private int status;
+
+ public TaskRecallAckCommand() {
+ super();
+ }
+
+ public TaskRecallAckCommand(int status, int taskInstanceId) {
+ this.status = status;
+ this.taskInstanceId = taskInstanceId;
+ }
+
+ public int getTaskInstanceId() {
+ return taskInstanceId;
+ }
+
+ public void setTaskInstanceId(int taskInstanceId) {
+ this.taskInstanceId = taskInstanceId;
+ }
+
+ public int getStatus() {
+ return status;
+ }
+
+ public void setStatus(int status) {
+ this.status = status;
+ }
+
+ /**
+ * package response command
+ *
+ * @return command
+ */
+ public Command convert2Command() {
+ Command command = new Command();
+ command.setType(CommandType.TASK_RECALL_ACK);
+ byte[] body = JSONUtils.toJsonByteArray(this);
+ command.setBody(body);
+ return command;
+ }
+
+ @Override
+ public String toString() {
+ return "TaskRecallAckCommand{" + "taskInstanceId=" + taskInstanceId +
", status=" + status + '}';
+ }
+}
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallCommand.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallCommand.java
new file mode 100644
index 0000000000..3d33d8c363
--- /dev/null
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallCommand.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command;
+
+import org.apache.dolphinscheduler.common.enums.Event;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+
+import java.io.Serializable;
+
+/**
+ * kill task recall command
+ */
+public class TaskRecallCommand implements Serializable {
+
+ /**
+ * taskInstanceId
+ */
+ private int taskInstanceId;
+
+ /**
+ * host
+ */
+ private String host;
+
+ /**
+ * process instance id
+ */
+ private int processInstanceId;
+
+ public int getTaskInstanceId() {
+ return taskInstanceId;
+ }
+
+ public void setTaskInstanceId(int taskInstanceId) {
+ this.taskInstanceId = taskInstanceId;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public int getProcessInstanceId() {
+ return processInstanceId;
+ }
+
+ public void setProcessInstanceId(int processInstanceId) {
+ this.processInstanceId = processInstanceId;
+ }
+
+ /**
+ * package request command
+ *
+ * @return command
+ */
+ public Command convert2Command() {
+ Command command = new Command();
+ command.setType(CommandType.TASK_RECALL);
+ byte[] body = JSONUtils.toJsonByteArray(this);
+ command.setBody(body);
+ return command;
+ }
+
+ @Override
+ public String toString() {
+ return "TaskRecallCommand{"
+ + "taskInstanceId=" + taskInstanceId
+ + ", host='" + host + '\''
+ + ", processInstanceId=" + processInstanceId
+ + '}';
+ }
+}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 8ac528dee4..009ab3996c 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -33,6 +33,7 @@ import
org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
import
org.apache.dolphinscheduler.server.worker.processor.TaskExecuteResponseAckProcessor;
import
org.apache.dolphinscheduler.server.worker.processor.TaskExecuteRunningAckProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
+import
org.apache.dolphinscheduler.server.worker.processor.TaskRecallAckProcessor;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import
org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
@@ -110,6 +111,9 @@ public class WorkerServer implements IStoppable {
@Autowired
private TaskKillProcessor taskKillProcessor;
+ @Autowired
+ private TaskRecallAckProcessor taskRecallAckProcessor;
+
@Autowired
private TaskExecuteRunningAckProcessor taskExecuteRunningAckProcessor;
@@ -146,7 +150,7 @@ public class WorkerServer implements IStoppable {
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK,
taskExecuteRunningAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK,
taskExecuteResponseAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST,
hostUpdateProcessor);
-
+
this.nettyRemotingServer.registerProcessor(CommandType.TASK_RECALL_ACK,
taskRecallAckProcessor);
// logger server
this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST,
loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST,
loggerRequestProcessor);
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java
index f28990b152..fb3c84da68 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java
@@ -37,8 +37,9 @@ public class ResponseCache {
return instance;
}
- private Map<Integer, Command> runningCache = new ConcurrentHashMap<>();
- private Map<Integer, Command> responseCache = new ConcurrentHashMap<>();
+ private final Map<Integer, Command> runningCache = new
ConcurrentHashMap<>();
+ private final Map<Integer, Command> responseCache = new
ConcurrentHashMap<>();
+ private final Map<Integer,Command> recallCache = new ConcurrentHashMap<>();
/**
* cache response
@@ -55,11 +56,27 @@ public class ResponseCache {
case RESULT:
responseCache.put(taskInstanceId, command);
break;
+ case WORKER_REJECT:
+ recallCache.put(taskInstanceId, command);
+ break;
default:
throw new IllegalArgumentException("invalid event type : " +
event);
}
}
+ /**
+ * recall response cache
+ *
+ * @param taskInstanceId taskInstanceId
+ */
+ public void removeRecallCache(Integer taskInstanceId) {
+ recallCache.remove(taskInstanceId);
+ }
+
+ public Map<Integer, Command> getRecallCache() {
+ return recallCache;
+ }
+
/**
* remove running cache
*
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
index 3641de8453..1c3d21501a 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
@@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
+import org.apache.dolphinscheduler.remote.command.TaskRecallCommand;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
@@ -222,6 +223,14 @@ public class TaskCallbackService {
return taskKillResponseCommand;
}
+ private TaskRecallCommand buildRecallCommand(TaskExecutionContext
taskExecutionContext) {
+ TaskRecallCommand taskRecallCommand = new TaskRecallCommand();
+
taskRecallCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+
taskRecallCommand.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
+ taskRecallCommand.setHost(taskExecutionContext.getHost());
+ return taskRecallCommand;
+ }
+
/**
* send task execute running command
* todo unified callback command
@@ -257,4 +266,13 @@ public class TaskCallbackService {
TaskKillResponseCommand taskKillResponseCommand =
buildKillTaskResponseCommand(taskExecutionContext);
send(taskExecutionContext.getTaskInstanceId(),
taskKillResponseCommand.convert2Command());
}
+
+ /**
+ * send task execute response command
+ */
+ public void sendRecallCommand(TaskExecutionContext taskExecutionContext) {
+ TaskRecallCommand taskRecallCommand =
buildRecallCommand(taskExecutionContext);
+ ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(),
taskRecallCommand.convert2Command(), Event.WORKER_REJECT);
+ send(taskExecutionContext.getTaskInstanceId(),
taskRecallCommand.convert2Command());
+ }
}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index cd3d940f00..c7df9e7876 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -159,8 +159,7 @@ public class TaskExecuteProcessor implements
NettyRequestProcessor {
}
}
-
taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
- new NettyRemoteChannel(channel, command.getOpaque()));
+
taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
new NettyRemoteChannel(channel, command.getOpaque()));
// delay task process
long remainTime =
DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(),
taskExecutionContext.getDelayTime() * 60L);
@@ -174,10 +173,9 @@ public class TaskExecuteProcessor implements
NettyRequestProcessor {
// submit task to manager
boolean offer = workerManager.offer(new
TaskExecuteThread(taskExecutionContext, taskCallbackService,
alertClientService, taskPluginManager));
if (!offer) {
- logger.error("submit task to manager error, queue is full, queue
size is {}, taskInstanceId: {}",
- workerManager.getDelayQueueSize(),
taskExecutionContext.getTaskInstanceId());
-
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
-
taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
+ logger.warn("submit task to wait queue error, queue is full, queue
size is {}, taskInstanceId: {}",
+ workerManager.getWaitSubmitQueueSize(),
taskExecutionContext.getTaskInstanceId());
+ taskCallbackService.sendRecallCommand(taskExecutionContext);
}
}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRecallAckProcessor.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRecallAckProcessor.java
new file mode 100644
index 0000000000..769024e0aa
--- /dev/null
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRecallAckProcessor.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.processor;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.TaskRecallAckCommand;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import com.google.common.base.Preconditions;
+
+import io.netty.channel.Channel;
+
+@Component
+public class TaskRecallAckProcessor implements NettyRequestProcessor {
+
+ private final Logger logger =
LoggerFactory.getLogger(TaskRecallAckProcessor.class);
+
+ @Override
+ public void process(Channel channel, Command command) {
+ Preconditions.checkArgument(CommandType.TASK_RECALL_ACK ==
command.getType(),
+ String.format("invalid command type : %s", command.getType()));
+
+ TaskRecallAckCommand taskRecallAckCommand =
JSONUtils.parseObject(command.getBody(), TaskRecallAckCommand.class);
+ if (taskRecallAckCommand == null) {
+ return;
+ }
+
+ if (taskRecallAckCommand.getStatus() ==
ExecutionStatus.SUCCESS.getCode()) {
+
ResponseCache.get().removeRecallCache(taskRecallAckCommand.getTaskInstanceId());
+ logger.debug("removeRecallCache: task instance id:{}",
taskRecallAckCommand.getTaskInstanceId());
+
TaskCallbackService.remove(taskRecallAckCommand.getTaskInstanceId());
+ logger.debug("remove REMOTE_CHANNELS, task instance id:{}",
taskRecallAckCommand.getTaskInstanceId());
+ }
+ }
+}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
index fc737ca1de..6e7d879ded 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
@@ -82,6 +82,14 @@ public class RetryReportTaskStatusThread implements Runnable
{
taskCallbackService.send(taskInstanceId,
responseCommand);
}
}
+ if (!instance.getRecallCache().isEmpty()) {
+ Map<Integer, Command> recallCache =
instance.getRecallCache();
+ for (Map.Entry<Integer, Command> entry :
recallCache.entrySet()) {
+ Integer taskInstanceId = entry.getKey();
+ Command responseCommand = entry.getValue();
+ taskCallbackService.send(taskInstanceId,
responseCommand);
+ }
+ }
} catch (Exception e) {
logger.warn("retry report task status error", e);
}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
index 60f752401d..d9531b58bf 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.worker.runner;
+import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
@@ -26,10 +27,9 @@ import
org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,7 +47,7 @@ public class WorkerManagerThread implements Runnable {
/**
* task queue
*/
- private final DelayQueue<TaskExecuteThread> workerExecuteQueue = new
DelayQueue<>();
+ private final BlockingQueue<TaskExecuteThread> waitSubmitQueue;
@Autowired(required = false)
private StorageOperate storageOperate;
@@ -63,12 +63,16 @@ public class WorkerManagerThread implements Runnable {
@Autowired
private TaskCallbackService taskCallbackService;
+ private volatile int workerExecThreads;
+
/**
* running task
*/
private final ConcurrentHashMap<Integer, TaskExecuteThread>
taskExecuteThreadMap = new ConcurrentHashMap<>();
public WorkerManagerThread(WorkerConfig workerConfig) {
+ workerExecThreads = workerConfig.getExecThreads();
+ this.waitSubmitQueue = new DelayQueue<>();
workerExecService = new WorkerExecService(
ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread",
workerConfig.getExecThreads()),
taskExecuteThreadMap
@@ -80,12 +84,12 @@ public class WorkerManagerThread implements Runnable {
}
/**
- * get delay queue size
+ * get wait submit queue size
*
* @return queue size
*/
- public int getDelayQueueSize() {
- return workerExecuteQueue.size();
+ public int getWaitSubmitQueueSize() {
+ return waitSubmitQueue.size();
}
/**
@@ -102,9 +106,9 @@ public class WorkerManagerThread implements Runnable {
* then send Response to Master, update the execution status of task
instance
*/
public void killTaskBeforeExecuteByInstanceId(Integer taskInstanceId) {
- workerExecuteQueue.stream()
+ waitSubmitQueue.stream()
.filter(taskExecuteThread ->
taskExecuteThread.getTaskExecutionContext().getTaskInstanceId() ==
taskInstanceId)
- .forEach(workerExecuteQueue::remove);
+ .forEach(waitSubmitQueue::remove);
sendTaskKillResponse(taskInstanceId);
}
@@ -127,7 +131,14 @@ public class WorkerManagerThread implements Runnable {
* @return submit result
*/
public boolean offer(TaskExecuteThread taskExecuteThread) {
- return workerExecuteQueue.offer(taskExecuteThread);
+ if (waitSubmitQueue.size() > workerExecThreads) {
+ // if waitSubmitQueue is full, it will wait 1s, then try add
+ ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
+ if (waitSubmitQueue.size() > workerExecThreads) {
+ return false;
+ }
+ }
+ return waitSubmitQueue.offer(taskExecuteThread);
}
public void start() {
@@ -142,9 +153,15 @@ public class WorkerManagerThread implements Runnable {
TaskExecuteThread taskExecuteThread;
while (Stopper.isRunning()) {
try {
- taskExecuteThread = workerExecuteQueue.take();
- taskExecuteThread.setStorageOperate(storageOperate);
- workerExecService.submit(taskExecuteThread);
+ if (this.getThreadPoolQueueSize() <= workerExecThreads) {
+ taskExecuteThread = waitSubmitQueue.take();
+ taskExecuteThread.setStorageOperate(storageOperate);
+ workerExecService.submit(taskExecuteThread);
+ } else {
+ logger.info("Exec queue is full, waiting submit queue {},
waiting exec queue size {}",
+ this.getWaitSubmitQueueSize(),
this.getThreadPoolQueueSize());
+ ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
+ }
} catch (Exception e) {
logger.error("An unexpected interrupt is happened, "
+ "the exception will be ignored and this thread will
continue to run", e);