caishunfeng commented on a change in pull request #8496:
URL: https://github.com/apache/dolphinscheduler/pull/8496#discussion_r812578966
##########
File path:
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
##########
@@ -608,15 +611,19 @@ private int createCommand(CommandType commandType, long
processDefineCode,
/**
* create complement command
- * close left open right
+ * close left and close right
*
* @param start
* @param end
* @param runMode
* @return
*/
- private int createComplementCommandList(Date start, Date end, RunMode
runMode, Command command, Integer expectedParallelismNumber) {
+ @SuppressWarnings("checkstyle:OperatorWrap")
Review comment:
Is it necenarry to add this annotation?
##########
File path:
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
##########
@@ -673,14 +691,80 @@ private int createComplementCommandList(Date start, Date
end, RunMode runMode, C
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE,
DateUtils.dateToString(listDate.get(endDateIndex)));
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
processService.createCommand(command);
+
+ if (schedules.isEmpty() || complementDependentMode ==
ComplementDependentMode.OFF_MODE) {
+ logger.info("process code: {} complement dependent
in off mode or schedule's size is 0, skip "
+ + "dependent complement data",
command.getProcessDefinitionCode());
+ } else {
+ dependentProcessDefinitionCreateCount +=
createComplementDependentCommand(schedules, command);
+ }
}
}
break;
}
default:
break;
}
- logger.info("create complement command count: {}", createCount);
+ logger.info("create complement command count: {}, create dependent
complement command count: {}", createCount
+ , dependentProcessDefinitionCreateCount);
return createCount;
}
+
+ /**
+ * create complement dependent command
+ */
+ @SuppressWarnings("checkstyle:OperatorWrap")
+ private int createComplementDependentCommand(List<Schedule> schedules,
Command command) {
+ int dependentProcessDefinitionCreateCount = 0;
+ Command dependentCommand;
+
+ try {
+ dependentCommand = (Command) BeanUtils.cloneBean(command);
+ } catch (Exception e) {
+ logger.error("copy dependent command error: ", e);
+ return dependentProcessDefinitionCreateCount;
+ }
+
+ List<DependentProcessDefinition> dependentProcessDefinitionList =
+
getComplementDependentDefinitionList(dependentCommand.getProcessDefinitionCode(),
+ CronUtils.getMaxCycle(schedules.get(0).getCrontab()),
+ dependentCommand.getWorkerGroup());
+
+ dependentCommand.setTaskDependType(TaskDependType.TASK_POST);
+ for (DependentProcessDefinition dependentProcessDefinition :
dependentProcessDefinitionList) {
+
dependentCommand.setProcessDefinitionCode(dependentProcessDefinition.getProcessDefinitionCode());
+
dependentCommand.setWorkerGroup(dependentProcessDefinition.getWorkerGroup());
+ Map<String, String> cmdParam =
JSONUtils.toMap(dependentCommand.getCommandParam());
+ cmdParam.put(CMD_PARAM_START_NODES,
String.valueOf(dependentProcessDefinition.getTaskDefinitionCode()));
+ dependentCommand.setCommandParam(JSONUtils.toJsonString(cmdParam));
+ dependentProcessDefinitionCreateCount +=
processService.createCommand(dependentCommand);
+ }
+
+ return dependentProcessDefinitionCreateCount;
+ }
+
+ /**
+ * get complement dependent process definition list
+ */
+ @SuppressWarnings("checkstyle:OperatorWrap")
+ private List<DependentProcessDefinition>
getComplementDependentDefinitionList(long processDefinitionCode,
+
CycleEnum processDefinitionCycle,
+
String workerGroup) {
+ List<DependentProcessDefinition> validDependentProcessDefinitionList =
new ArrayList<>();
+
+ List<DependentProcessDefinition> dependentProcessDefinitionList =
+
processService.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode);
+
+ for (DependentProcessDefinition dependentProcessDefinition :
dependentProcessDefinitionList) {
+ if (dependentProcessDefinition.getDependentCycle() ==
processDefinitionCycle) {
Review comment:
Please add some notes here.
##########
File path:
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
##########
@@ -88,4 +88,23 @@
where project_code = #{projectCode}
and process_definition_code = #{processDefinitionCode}
</select>
+
+ <select id="queryDependentProcessDefinitionByProcessDefinitionCode"
resultType="DependentProcessDefinition">
+ SELECT
+ c.code AS process_definition_code
+ ,c.name AS process_definition_name
+ ,a.code AS task_definition_code
+ ,a.task_params
+ ,d.worker_group
+ FROM
+ t_ds_task_definition a
+ JOIN t_ds_process_task_relation b ON a.code = b.pre_task_code and
a.version = b.pre_task_version
+ JOIN t_ds_process_definition c ON c.code = b.process_definition_code
AND c.version = b.process_definition_version AND c.project_code = b.project_code
+ JOIN t_ds_schedules d ON d.process_definition_code = c.code
Review comment:
I think the worker group logic should be independent, for two reasons:
- reduce the amount of data to join
- do it explicitly in code, which can show the logic more clear
what do you think?
##########
File path:
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
##########
@@ -673,14 +691,80 @@ private int createComplementCommandList(Date start, Date
end, RunMode runMode, C
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE,
DateUtils.dateToString(listDate.get(endDateIndex)));
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
processService.createCommand(command);
+
+ if (schedules.isEmpty() || complementDependentMode ==
ComplementDependentMode.OFF_MODE) {
+ logger.info("process code: {} complement dependent
in off mode or schedule's size is 0, skip "
+ + "dependent complement data",
command.getProcessDefinitionCode());
+ } else {
+ dependentProcessDefinitionCreateCount +=
createComplementDependentCommand(schedules, command);
+ }
}
}
break;
}
default:
break;
}
- logger.info("create complement command count: {}", createCount);
+ logger.info("create complement command count: {}, create dependent
complement command count: {}", createCount
+ , dependentProcessDefinitionCreateCount);
return createCount;
}
+
+ /**
+ * create complement dependent command
+ */
+ @SuppressWarnings("checkstyle:OperatorWrap")
+ private int createComplementDependentCommand(List<Schedule> schedules,
Command command) {
+ int dependentProcessDefinitionCreateCount = 0;
+ Command dependentCommand;
+
+ try {
+ dependentCommand = (Command) BeanUtils.cloneBean(command);
Review comment:
Should dependentCommand set id = 0?
##########
File path:
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
##########
@@ -88,4 +88,23 @@
where project_code = #{projectCode}
and process_definition_code = #{processDefinitionCode}
</select>
+
+ <select id="queryDependentProcessDefinitionByProcessDefinitionCode"
resultType="DependentProcessDefinition">
+ SELECT
+ c.code AS process_definition_code
+ ,c.name AS process_definition_name
+ ,a.code AS task_definition_code
+ ,a.task_params
+ ,d.worker_group
+ FROM
+ t_ds_task_definition a
+ JOIN t_ds_process_task_relation b ON a.code = b.pre_task_code and
a.version = b.pre_task_version
+ JOIN t_ds_process_definition c ON c.code = b.process_definition_code
AND c.version = b.process_definition_version AND c.project_code = b.project_code
Review comment:
Maybe it should remove the project_code check, because DEPENDENT task
support cross projects.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]