caishunfeng commented on code in PR #9919:
URL: https://github.com/apache/dolphinscheduler/pull/9919#discussion_r867329374
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java:
##########
@@ -71,30 +77,24 @@ public ExecutorDispatcher() {
* @throws ExecuteException if error throws ExecuteException
*/
public Boolean dispatch(final ExecutionContext context) throws
ExecuteException {
- /**
- * get executor manager
- */
+ // get executor manager
ExecutorManager<Boolean> executorManager =
this.executorManagers.get(context.getExecutorType());
if (executorManager == null) {
throw new ExecuteException("no ExecutorManager for type : " +
context.getExecutorType());
}
- /**
- * host select
- */
-
+ // host select
Host host = hostManager.select(context);
if (StringUtils.isEmpty(host.getAddress())) {
- throw new ExecuteException(String.format("fail to execute : %s due
to no suitable worker, "
- + "current task needs worker group %s to execute",
- context.getCommand(),context.getWorkerGroup()));
+ ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
Review Comment:
Why to sleep here? Dispatcher is a single thread, we should avoid to sleep
here.
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java:
##########
@@ -45,7 +51,21 @@ public HostWeight doSelect(Collection<HostWeight> sources) {
}
lowerNode.setCurrentWeight(lowerNode.getCurrentWeight() + totalWeight);
return lowerNode;
+ }
+ private List<HostWeight> canAssignTaskHost(Collection<HostWeight> sources)
{
+ List<HostWeight> zeroWaitingTask = sources.stream().filter(h ->
h.getWaitingTaskCount() == 0).collect(Collectors.toList());
+ if (!zeroWaitingTask.isEmpty()) {
+ return zeroWaitingTask;
+ }
+ HostWeight hostWeight =
sources.stream().min(Comparator.comparing(HostWeight::getWaitingTaskCount)).get();
+ List<HostWeight> waitingTask = Lists.newArrayList(hostWeight);
+ List<HostWeight> equalWaitingTask = sources.stream().filter(h ->
!h.getHost().equals(hostWeight.getHost()) && h.getWaitingTaskCount() ==
hostWeight.getWaitingTaskCount())
Review Comment:
Why not just return the hostWeight list which order by waitingTaskCount asc?
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThread.java:
##########
@@ -221,4 +227,23 @@ private void handleResultEvent(TaskEvent taskEvent,
TaskInstance taskInstance) {
channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
}
}
+
+ /**
+ * handle result event
+ */
+ private void handleWorkerRejectEvent(Channel channel, TaskInstance
taskInstance, WorkflowExecuteThread executeThread) {
+ try {
+ if (executeThread != null) {
+ executeThread.resubmit(taskInstance.getTaskCode());
+ }
+ if (channel != null) {
+ TaskRecallAckCommand taskRecallAckCommand = new
TaskRecallAckCommand(ExecutionStatus.SUCCESS.getCode(), taskInstance.getId());
+ channel.writeAndFlush(taskRecallAckCommand.convert2Command());
+ }
+ } catch (Exception e) {
+ logger.error("worker reject error", e);
Review Comment:
```suggestion
logger.error("handle worker reject event error", e);
```
##########
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java:
##########
@@ -47,7 +51,7 @@ public class WorkerManagerThread implements Runnable {
/**
* task queue
*/
- private final DelayQueue<TaskExecuteThread> workerExecuteQueue = new
DelayQueue<>();
Review Comment:
The DelayQueue will take effect of the delay strategy, so it's useful and
don't remove it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]