caishunfeng commented on a change in pull request #7258:
URL: https://github.com/apache/dolphinscheduler/pull/7258#discussion_r770181843



##########
File path: 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
##########
@@ -74,30 +84,83 @@ public void run() {
             } catch (Exception e) {
                 logger.error("state wheel thread check error:", e);
             }
-            ThreadUtil.sleepAtLeastIgnoreInterrupts(stateCheckIntervalSecs);
+            ThreadUtil.sleepAtLeastIgnoreInterrupts((long) 
masterConfig.getStateWheelInterval() * Constants.SLEEP_TIME_MILLIS);
         }
     }
 
     public void addProcess4TimeoutCheck(ProcessInstance processInstance) {
-        this.processInstanceTimeoutCheckList.put(processInstance.getId(), 
processInstance);
+        processInstanceTimeoutCheckList.add(processInstance.getId());
+    }
+
+    public void removeProcess4TimeoutCheck(ProcessInstance processInstance) {
+        processInstanceTimeoutCheckList.remove(processInstance.getId());
     }
 
     public void addTask4TimeoutCheck(TaskInstance taskInstance) {
-        this.taskInstanceTimeoutCheckList.put(taskInstance.getId(), 
taskInstance);
+        if (taskInstanceTimeoutCheckList.containsKey(taskInstance.getId())) {
+            return;
+        }
+        TaskDefinition taskDefinition = taskInstance.getTaskDefine();
+        if (taskDefinition == null) {
+            logger.error("taskDefinition is null, taskId:{}", 
taskInstance.getId());
+            return;
+        }
+        if (TimeoutFlag.OPEN == taskDefinition.getTimeoutFlag()) {
+            taskInstanceTimeoutCheckList.put(taskInstance.getId(), 
taskInstance.getProcessInstanceId());
+        }
+        if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
+            taskInstanceTimeoutCheckList.put(taskInstance.getId(), 
taskInstance.getProcessInstanceId());
+        }
+    }
+
+    public void removeTask4TimeoutCheck(TaskInstance taskInstance) {
+        taskInstanceTimeoutCheckList.remove(taskInstance.getId());
     }
 
     public void addTask4RetryCheck(TaskInstance taskInstance) {
-        this.taskInstanceRetryCheckList.put(taskInstance.getId(), 
taskInstance);
+        if (taskInstanceRetryCheckList.containsKey(taskInstance.getId())) {
+            return;
+        }
+        TaskDefinition taskDefinition = taskInstance.getTaskDefine();
+        if (taskDefinition == null) {
+            logger.error("taskDefinition is null, taskId:{}", 
taskInstance.getId());
+            return;
+        }
+        if (taskInstance.taskCanRetry()) {
+            taskInstanceRetryCheckList.put(taskInstance.getId(), 
taskInstance.getProcessInstanceId());
+        }
+
+        if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
+            taskInstanceRetryCheckList.put(taskInstance.getId(), 
taskInstance.getProcessInstanceId());
+        }
     }
 
-    public void checkTask4Timeout() {
+    public void removeTask4RetryCheck(TaskInstance taskInstance) {
+        taskInstanceRetryCheckList.remove(taskInstance.getId());
+    }
+
+    private void checkTask4Timeout() {
         if (taskInstanceTimeoutCheckList.isEmpty()) {
             return;
         }
-        for (TaskInstance taskInstance : 
taskInstanceTimeoutCheckList.values()) {
+        for (Entry<Integer, Integer> entry : 
taskInstanceTimeoutCheckList.entrySet()) {
+            int processInstanceId = entry.getValue();
+            int taskInstanceId = entry.getKey();
+
+            WorkflowExecuteThread workflowExecuteThread = 
processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);

Review comment:
       > 'workflow execute' is a high real-time polling and cannot be affected 
by other services. so i prefer to put this polling in the 
'WorkflowExecuteThreadPool'
   
   Good idea. But I think the `processInstanceExecCacheManager` and 
`WorkflowExecuteThreadPool` is deffierent. It may get `workflowExecuteThread` 
from cacheManager and handle by some logic, like check timeout. And the 
`WorkflowExecuteThreadPool` manages `workflowExecThread` executing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to