caishunfeng commented on code in PR #9919:
URL: https://github.com/apache/dolphinscheduler/pull/9919#discussion_r867329374


##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java:
##########
@@ -71,30 +77,24 @@ public ExecutorDispatcher() {
      * @throws ExecuteException if error throws ExecuteException
      */
     public Boolean dispatch(final ExecutionContext context) throws 
ExecuteException {
-        /**
-         * get executor manager
-         */
+        // get executor manager
         ExecutorManager<Boolean> executorManager = 
this.executorManagers.get(context.getExecutorType());
         if (executorManager == null) {
             throw new ExecuteException("no ExecutorManager for type : " + 
context.getExecutorType());
         }
 
-        /**
-         * host select
-         */
-
+        // host select
         Host host = hostManager.select(context);
         if (StringUtils.isEmpty(host.getAddress())) {
-            throw new ExecuteException(String.format("fail to execute : %s due 
to no suitable worker, "
-                            + "current task needs worker group %s to execute",
-                    context.getCommand(),context.getWorkerGroup()));
+            ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);

Review Comment:
   Why to sleep here? Dispatcher is a single thread, we should avoid to sleep 
here.



##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java:
##########
@@ -45,7 +51,21 @@ public HostWeight doSelect(Collection<HostWeight> sources) {
         }
         lowerNode.setCurrentWeight(lowerNode.getCurrentWeight() + totalWeight);
         return lowerNode;
+    }
 
+    private List<HostWeight> canAssignTaskHost(Collection<HostWeight> sources) 
{
+        List<HostWeight> zeroWaitingTask = sources.stream().filter(h -> 
h.getWaitingTaskCount() == 0).collect(Collectors.toList());
+        if (!zeroWaitingTask.isEmpty()) {
+            return zeroWaitingTask;
+        }
+        HostWeight hostWeight = 
sources.stream().min(Comparator.comparing(HostWeight::getWaitingTaskCount)).get();
+        List<HostWeight> waitingTask = Lists.newArrayList(hostWeight);
+        List<HostWeight> equalWaitingTask = sources.stream().filter(h -> 
!h.getHost().equals(hostWeight.getHost()) && h.getWaitingTaskCount() == 
hostWeight.getWaitingTaskCount())

Review Comment:
   Why not just return the hostWeight list which order by waitingTaskCount asc?



##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThread.java:
##########
@@ -221,4 +227,23 @@ private void handleResultEvent(TaskEvent taskEvent, 
TaskInstance taskInstance) {
             
channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
         }
     }
+
+    /**
+     * handle result event
+     */
+    private void handleWorkerRejectEvent(Channel channel, TaskInstance 
taskInstance, WorkflowExecuteThread executeThread) {
+        try {
+            if (executeThread != null) {
+                executeThread.resubmit(taskInstance.getTaskCode());
+            }
+            if (channel != null) {
+                TaskRecallAckCommand taskRecallAckCommand = new 
TaskRecallAckCommand(ExecutionStatus.SUCCESS.getCode(), taskInstance.getId());
+                channel.writeAndFlush(taskRecallAckCommand.convert2Command());
+            }
+        } catch (Exception e) {
+            logger.error("worker reject error", e);

Review Comment:
   ```suggestion
               logger.error("handle worker reject event error", e);
   ```



##########
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java:
##########
@@ -47,7 +51,7 @@ public class WorkerManagerThread implements Runnable {
     /**
      * task queue
      */
-    private final DelayQueue<TaskExecuteThread> workerExecuteQueue = new 
DelayQueue<>();

Review Comment:
   The DelayQueue will take effect of the delay strategy, so it's useful and 
don't remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to