caishunfeng commented on a change in pull request #8496:
URL: https://github.com/apache/dolphinscheduler/pull/8496#discussion_r812578966



##########
File path: 
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
##########
@@ -608,15 +611,19 @@ private int createCommand(CommandType commandType, long 
processDefineCode,
 
     /**
      * create complement command
-     * close left open right
+     * close left and close right
      *
      * @param start
      * @param end
      * @param runMode
      * @return
      */
-    private int createComplementCommandList(Date start, Date end, RunMode 
runMode, Command command, Integer expectedParallelismNumber) {
+    @SuppressWarnings("checkstyle:OperatorWrap")

Review comment:
       Is it necenarry to add this annotation?

##########
File path: 
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
##########
@@ -673,14 +691,80 @@ private int createComplementCommandList(Date start, Date 
end, RunMode runMode, C
                         cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, 
DateUtils.dateToString(listDate.get(endDateIndex)));
                         
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
                         processService.createCommand(command);
+
+                        if (schedules.isEmpty() || complementDependentMode == 
ComplementDependentMode.OFF_MODE) {
+                            logger.info("process code: {} complement dependent 
in off mode or schedule's size is 0, skip "
+                                    + "dependent complement data", 
command.getProcessDefinitionCode());
+                        } else {
+                            dependentProcessDefinitionCreateCount += 
createComplementDependentCommand(schedules, command);
+                        }
                     }
                 }
                 break;
             }
             default:
                 break;
         }
-        logger.info("create complement command count: {}", createCount);
+        logger.info("create complement command count: {}, create dependent 
complement command count: {}", createCount
+                , dependentProcessDefinitionCreateCount);
         return createCount;
     }
+
+    /**
+     * create complement dependent command
+     */
+    @SuppressWarnings("checkstyle:OperatorWrap")
+    private int createComplementDependentCommand(List<Schedule> schedules, 
Command command) {
+        int dependentProcessDefinitionCreateCount = 0;
+        Command dependentCommand;
+
+        try {
+            dependentCommand = (Command) BeanUtils.cloneBean(command);
+        } catch (Exception e) {
+            logger.error("copy dependent command error: ", e);
+            return dependentProcessDefinitionCreateCount;
+        }
+
+        List<DependentProcessDefinition> dependentProcessDefinitionList =
+                
getComplementDependentDefinitionList(dependentCommand.getProcessDefinitionCode(),
+                        CronUtils.getMaxCycle(schedules.get(0).getCrontab()),
+                        dependentCommand.getWorkerGroup());
+
+        dependentCommand.setTaskDependType(TaskDependType.TASK_POST);
+        for (DependentProcessDefinition dependentProcessDefinition : 
dependentProcessDefinitionList) {
+            
dependentCommand.setProcessDefinitionCode(dependentProcessDefinition.getProcessDefinitionCode());
+            
dependentCommand.setWorkerGroup(dependentProcessDefinition.getWorkerGroup());
+            Map<String, String> cmdParam = 
JSONUtils.toMap(dependentCommand.getCommandParam());
+            cmdParam.put(CMD_PARAM_START_NODES, 
String.valueOf(dependentProcessDefinition.getTaskDefinitionCode()));
+            dependentCommand.setCommandParam(JSONUtils.toJsonString(cmdParam));
+            dependentProcessDefinitionCreateCount += 
processService.createCommand(dependentCommand);
+        }
+
+        return dependentProcessDefinitionCreateCount;
+    }
+
+    /**
+     * get complement dependent process definition list
+     */
+    @SuppressWarnings("checkstyle:OperatorWrap")
+    private List<DependentProcessDefinition> 
getComplementDependentDefinitionList(long processDefinitionCode,
+                                                                               
CycleEnum processDefinitionCycle,
+                                                                               
String workerGroup) {
+        List<DependentProcessDefinition> validDependentProcessDefinitionList = 
new ArrayList<>();
+
+        List<DependentProcessDefinition> dependentProcessDefinitionList =
+                
processService.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode);
+
+        for (DependentProcessDefinition dependentProcessDefinition : 
dependentProcessDefinitionList) {
+            if (dependentProcessDefinition.getDependentCycle() == 
processDefinitionCycle) {

Review comment:
       Please add some notes here.

##########
File path: 
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
##########
@@ -88,4 +88,23 @@
          where project_code = #{projectCode}
                 and process_definition_code = #{processDefinitionCode}
     </select>
+
+    <select id="queryDependentProcessDefinitionByProcessDefinitionCode" 
resultType="DependentProcessDefinition">
+        SELECT
+        c.code AS process_definition_code
+        ,c.name AS process_definition_name
+        ,a.code AS task_definition_code
+        ,a.task_params
+        ,d.worker_group
+        FROM
+        t_ds_task_definition a
+        JOIN t_ds_process_task_relation b ON a.code    = b.pre_task_code and 
a.version = b.pre_task_version
+        JOIN t_ds_process_definition c ON c.code = b.process_definition_code 
AND c.version = b.process_definition_version AND c.project_code = b.project_code
+        JOIN t_ds_schedules d ON d.process_definition_code = c.code

Review comment:
       I think the worker group logic should be independent, for two reasons:
   
   - reduce the amount of data to join
   - do it explicitly in code, which can show the logic more clear
   
   what do you think?

##########
File path: 
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
##########
@@ -673,14 +691,80 @@ private int createComplementCommandList(Date start, Date 
end, RunMode runMode, C
                         cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, 
DateUtils.dateToString(listDate.get(endDateIndex)));
                         
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
                         processService.createCommand(command);
+
+                        if (schedules.isEmpty() || complementDependentMode == 
ComplementDependentMode.OFF_MODE) {
+                            logger.info("process code: {} complement dependent 
in off mode or schedule's size is 0, skip "
+                                    + "dependent complement data", 
command.getProcessDefinitionCode());
+                        } else {
+                            dependentProcessDefinitionCreateCount += 
createComplementDependentCommand(schedules, command);
+                        }
                     }
                 }
                 break;
             }
             default:
                 break;
         }
-        logger.info("create complement command count: {}", createCount);
+        logger.info("create complement command count: {}, create dependent 
complement command count: {}", createCount
+                , dependentProcessDefinitionCreateCount);
         return createCount;
     }
+
+    /**
+     * create complement dependent command
+     */
+    @SuppressWarnings("checkstyle:OperatorWrap")
+    private int createComplementDependentCommand(List<Schedule> schedules, 
Command command) {
+        int dependentProcessDefinitionCreateCount = 0;
+        Command dependentCommand;
+
+        try {
+            dependentCommand = (Command) BeanUtils.cloneBean(command);

Review comment:
       Should dependentCommand set id = 0?

##########
File path: 
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
##########
@@ -88,4 +88,23 @@
          where project_code = #{projectCode}
                 and process_definition_code = #{processDefinitionCode}
     </select>
+
+    <select id="queryDependentProcessDefinitionByProcessDefinitionCode" 
resultType="DependentProcessDefinition">
+        SELECT
+        c.code AS process_definition_code
+        ,c.name AS process_definition_name
+        ,a.code AS task_definition_code
+        ,a.task_params
+        ,d.worker_group
+        FROM
+        t_ds_task_definition a
+        JOIN t_ds_process_task_relation b ON a.code    = b.pre_task_code and 
a.version = b.pre_task_version
+        JOIN t_ds_process_definition c ON c.code = b.process_definition_code 
AND c.version = b.process_definition_version AND c.project_code = b.project_code

Review comment:
       Maybe it should remove the project_code check, because DEPENDENT task 
support cross projects.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to