This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch 2.0.2-release
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/2.0.2-release by this push:
new 04edcac Revert "[Bug-7788][MasterServer] fix submit duplicate tasks
sometimes when retry (#7808)" (#7864)
04edcac is described below
commit 04edcacb0fcddf4476ccbfacb2dae527d6a4e330
Author: BaoLiang <[email protected]>
AuthorDate: Fri Jan 7 09:45:46 2022 +0800
Revert "[Bug-7788][MasterServer] fix submit duplicate tasks sometimes when
retry (#7808)" (#7864)
This reverts commit ff3f5717c4247e3be10ef087983ac6b7129519f6.
---
.../master/runner/WorkflowExecuteThread.java | 31 +++-------------------
1 file changed, 4 insertions(+), 27 deletions(-)
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 92862ae..085c1d5 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -76,7 +76,6 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -1172,35 +1171,13 @@ public class WorkflowExecuteThread implements Runnable {
* @param taskInstance task instance
*/
private void addTaskToStandByList(TaskInstance taskInstance) {
+ logger.info("add task to stand by list, task name: {} , task id:{}",
taskInstance.getName(), taskInstance.getId());
try {
- if (readyToSubmitTaskQueue.contains(taskInstance)) {
- logger.warn("task was found in ready submit queue, task
code:{}", taskInstance.getTaskCode());
- return;
+ if (!readyToSubmitTaskQueue.contains(taskInstance)) {
+ readyToSubmitTaskQueue.put(taskInstance);
}
- // need to check if the tasks with same task code is active
- boolean active = false;
- Map<Integer, TaskInstance> taskInstanceMap =
taskInstanceHashMap.column(taskInstance.getTaskCode());
- if (taskInstanceMap != null && taskInstanceMap.size() > 0) {
- for (Entry<Integer, TaskInstance> entry :
taskInstanceMap.entrySet()) {
- Integer taskInstanceId = entry.getKey();
- if (activeTaskProcessorMaps.containsKey(taskInstanceId)) {
- TaskInstance latestTaskInstance =
processService.findTaskInstanceById(taskInstanceId);
- if (latestTaskInstance != null &&
!latestTaskInstance.getState().typeIsFailure()) {
- active = true;
- break;
- }
- }
- }
- }
- if (active) {
- logger.warn("task was found in active task list, task
code:{}", taskInstance.getTaskCode());
- return;
- }
- logger.info("add task to stand by list, task name:{}, task id:{},
task code:{}",
- taskInstance.getName(), taskInstance.getId(),
taskInstance.getTaskCode());
- readyToSubmitTaskQueue.put(taskInstance);
} catch (Exception e) {
- logger.error("add task instance to readyToSubmitTaskQueue,
taskName:{}, task id:{}", taskInstance.getName(), taskInstance.getId(), e);
+ logger.error("add task instance to readyToSubmitTaskQueue,
taskName: {}, task id: {}", taskInstance.getName(), taskInstance.getId(), e);
}
}