This is an automated email from the ASF dual-hosted git repository.
jinyleechina pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 2aa191014d fix 9584 (#9585)
2aa191014d is described below
commit 2aa191014d91bec76344f4de106ff7ba29f3b1ed
Author: JinYong Li <[email protected]>
AuthorDate: Wed Apr 20 16:10:04 2022 +0800
fix 9584 (#9585)
---
.../impl/ProcessTaskRelationServiceImpl.java | 23 +++++++++++++++-------
1 file changed, 16 insertions(+), 7 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
index 3cf41f820a..88254d19fe 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
@@ -291,6 +291,18 @@ public class ProcessTaskRelationServiceImpl extends
BaseServiceImpl implements P
putMsg(result, Status.DATA_IS_NULL, "preTaskCodes");
return result;
}
+ List<Long> currentUpstreamList =
upstreamList.stream().map(ProcessTaskRelation::getPreTaskCode).collect(Collectors.toList());
+ if (currentUpstreamList.contains(0L)) {
+ putMsg(result, Status.DATA_IS_NOT_VALID, "currentUpstreamList");
+ return result;
+ }
+ List<Long> tmpCurrent = Lists.newArrayList(currentUpstreamList);
+ tmpCurrent.removeAll(preTaskCodeList);
+ preTaskCodeList.removeAll(currentUpstreamList);
+ if (!preTaskCodeList.isEmpty()) {
+ putMsg(result, Status.DATA_IS_NOT_VALID,
StringUtils.join(preTaskCodeList, Constants.COMMA));
+ return result;
+ }
ProcessDefinition processDefinition =
processDefinitionMapper.queryByCode(upstreamList.get(0).getProcessDefinitionCode());
if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST,
upstreamList.get(0).getProcessDefinitionCode());
@@ -300,20 +312,17 @@ public class ProcessTaskRelationServiceImpl extends
BaseServiceImpl implements P
List<ProcessTaskRelation> processTaskRelationList =
Lists.newArrayList(processTaskRelations);
List<ProcessTaskRelation> processTaskRelationWaitRemove =
Lists.newArrayList();
for (ProcessTaskRelation processTaskRelation :
processTaskRelationList) {
- if (preTaskCodeList.size() > 1) {
- if
(preTaskCodeList.contains(processTaskRelation.getPreTaskCode())) {
-
preTaskCodeList.remove(processTaskRelation.getPreTaskCode());
+ if (currentUpstreamList.size() > 1) {
+ if
(currentUpstreamList.contains(processTaskRelation.getPreTaskCode())) {
+
currentUpstreamList.remove(processTaskRelation.getPreTaskCode());
processTaskRelationWaitRemove.add(processTaskRelation);
}
} else {
- if (processTaskRelation.getPostTaskCode() == taskCode) {
+ if (processTaskRelation.getPostTaskCode() == taskCode &&
(currentUpstreamList.isEmpty() || tmpCurrent.isEmpty())) {
processTaskRelation.setPreTaskVersion(0);
processTaskRelation.setPreTaskCode(0L);
}
}
- if
(preTaskCodeList.contains(processTaskRelation.getPostTaskCode())) {
- processTaskRelationWaitRemove.add(processTaskRelation);
- }
}
processTaskRelationList.removeAll(processTaskRelationWaitRemove);
updateProcessDefiniteVersion(loginUser, result, processDefinition);