This is an automated email from the ASF dual-hosted git repository.

jinyleechina pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 2aa191014d fix 9584 (#9585)
2aa191014d is described below

commit 2aa191014d91bec76344f4de106ff7ba29f3b1ed
Author: JinYong Li <[email protected]>
AuthorDate: Wed Apr 20 16:10:04 2022 +0800

    fix 9584 (#9585)
---
 .../impl/ProcessTaskRelationServiceImpl.java       | 23 +++++++++++++++-------
 1 file changed, 16 insertions(+), 7 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
index 3cf41f820a..88254d19fe 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
@@ -291,6 +291,18 @@ public class ProcessTaskRelationServiceImpl extends 
BaseServiceImpl implements P
             putMsg(result, Status.DATA_IS_NULL, "preTaskCodes");
             return result;
         }
+        List<Long> currentUpstreamList = 
upstreamList.stream().map(ProcessTaskRelation::getPreTaskCode).collect(Collectors.toList());
+        if (currentUpstreamList.contains(0L)) {
+            putMsg(result, Status.DATA_IS_NOT_VALID, "currentUpstreamList");
+            return result;
+        }
+        List<Long> tmpCurrent = Lists.newArrayList(currentUpstreamList);
+        tmpCurrent.removeAll(preTaskCodeList);
+        preTaskCodeList.removeAll(currentUpstreamList);
+        if (!preTaskCodeList.isEmpty()) {
+            putMsg(result, Status.DATA_IS_NOT_VALID, 
StringUtils.join(preTaskCodeList, Constants.COMMA));
+            return result;
+        }
         ProcessDefinition processDefinition = 
processDefinitionMapper.queryByCode(upstreamList.get(0).getProcessDefinitionCode());
         if (processDefinition == null) {
             putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, 
upstreamList.get(0).getProcessDefinitionCode());
@@ -300,20 +312,17 @@ public class ProcessTaskRelationServiceImpl extends 
BaseServiceImpl implements P
         List<ProcessTaskRelation> processTaskRelationList = 
Lists.newArrayList(processTaskRelations);
         List<ProcessTaskRelation> processTaskRelationWaitRemove = 
Lists.newArrayList();
         for (ProcessTaskRelation processTaskRelation : 
processTaskRelationList) {
-            if (preTaskCodeList.size() > 1) {
-                if 
(preTaskCodeList.contains(processTaskRelation.getPreTaskCode())) {
-                    
preTaskCodeList.remove(processTaskRelation.getPreTaskCode());
+            if (currentUpstreamList.size() > 1) {
+                if 
(currentUpstreamList.contains(processTaskRelation.getPreTaskCode())) {
+                    
currentUpstreamList.remove(processTaskRelation.getPreTaskCode());
                     processTaskRelationWaitRemove.add(processTaskRelation);
                 }
             } else {
-                if (processTaskRelation.getPostTaskCode() == taskCode) {
+                if (processTaskRelation.getPostTaskCode() == taskCode && 
(currentUpstreamList.isEmpty() || tmpCurrent.isEmpty())) {
                     processTaskRelation.setPreTaskVersion(0);
                     processTaskRelation.setPreTaskCode(0L);
                 }
             }
-            if 
(preTaskCodeList.contains(processTaskRelation.getPostTaskCode())) {
-                processTaskRelationWaitRemove.add(processTaskRelation);
-            }
         }
         processTaskRelationList.removeAll(processTaskRelationWaitRemove);
         updateProcessDefiniteVersion(loginUser, result, processDefinition);

Reply via email to