This is an automated email from the ASF dual-hosted git repository. caishunfeng pushed a commit to branch 3.1.0-prepare in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit d7f40b19b59e0c79a2f5c1cff0a66b49f4824d0d Author: Stalary <[email protected]> AuthorDate: Thu Sep 8 15:08:10 2022 +0800 [Bug](dependent) Dependent downstream trigger error when schedule cycle not day. (#11734) * FIX: dependent * FIX: version * MOD: for review --- .../api/service/impl/ExecutorServiceImpl.java | 8 +++++--- .../dao/entity/DependentProcessDefinition.java | 17 +++++++++++++++-- .../dao/mapper/WorkFlowLineageMapper.xml | 1 + 3 files changed, 21 insertions(+), 5 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index 5755362d99..174da3f10c 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -909,6 +909,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ // and causing duplicate when clone it. dependentCommand.setId(null); dependentCommand.setProcessDefinitionCode(dependentProcessDefinition.getProcessDefinitionCode()); + dependentCommand.setProcessDefinitionVersion(dependentProcessDefinition.getProcessDefinitionVersion()); dependentCommand.setWorkerGroup(dependentProcessDefinition.getWorkerGroup()); Map<String, String> cmdParam = JSONUtils.toMap(dependentCommand.getCommandParam()); cmdParam.put(CMD_PARAM_START_NODES, String.valueOf(dependentProcessDefinition.getTaskDefinitionCode())); @@ -929,7 +930,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ processService.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode); return checkDependentProcessDefinitionValid(dependentProcessDefinitionList, processDefinitionCycle, - workerGroup); + workerGroup, processDefinitionCode); } /** @@ -940,7 +941,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ private List<DependentProcessDefinition> checkDependentProcessDefinitionValid( List<DependentProcessDefinition> dependentProcessDefinitionList, CycleEnum processDefinitionCycle, - String workerGroup) { + String workerGroup, + long upstreamProcessDefinitionCode) { List<DependentProcessDefinition> validDependentProcessDefinitionList = new ArrayList<>(); List<Long> processDefinitionCodeList = @@ -951,7 +953,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ processService.queryWorkerGroupByProcessDefinitionCodes(processDefinitionCodeList); for (DependentProcessDefinition dependentProcessDefinition : dependentProcessDefinitionList) { - if (dependentProcessDefinition.getDependentCycle() == processDefinitionCycle) { + if (dependentProcessDefinition.getDependentCycle(upstreamProcessDefinitionCode) == processDefinitionCycle) { if (processDefinitionWorkerGroupMap .get(dependentProcessDefinition.getProcessDefinitionCode()) == null) { dependentProcessDefinition.setWorkerGroup(workerGroup); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java index 9de57dff33..87bb3d4234 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java @@ -41,6 +41,11 @@ public class DependentProcessDefinition { */ private String processDefinitionName; + /** + * process definition version + **/ + private int processDefinitionVersion; + /** * task definition name */ @@ -60,14 +65,14 @@ public class DependentProcessDefinition { * get dependent cycle * @return CycleEnum */ - public CycleEnum getDependentCycle() { + public CycleEnum getDependentCycle(long upstreamProcessDefinitionCode) { DependentParameters dependentParameters = this.getDependentParameters(); List<DependentTaskModel> dependentTaskModelList = dependentParameters.getDependTaskList(); for (DependentTaskModel dependentTaskModel : dependentTaskModelList) { List<DependentItem> dependentItemList = dependentTaskModel.getDependItemList(); for (DependentItem dependentItem : dependentItemList) { - if (this.getProcessDefinitionCode() == dependentItem.getDefinitionCode()) { + if (upstreamProcessDefinitionCode == dependentItem.getDefinitionCode()) { return cycle2CycleEnum(dependentItem.getCycle()); } } @@ -122,6 +127,14 @@ public class DependentProcessDefinition { this.processDefinitionCode = code; } + public int getProcessDefinitionVersion() { + return processDefinitionVersion; + } + + public void setProcessDefinitionVersion(int processDefinitionVersion) { + this.processDefinitionVersion = processDefinitionVersion; + } + public long getTaskDefinitionCode() { return this.taskDefinitionCode; } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml index b17499bb60..2689b6d50f 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml @@ -149,6 +149,7 @@ SELECT c.code AS process_definition_code ,c.name AS process_definition_name + ,c.version as process_definition_version ,a.code AS task_definition_code ,a.task_params FROM
