zhongjiajie commented on a change in pull request #7771:
URL: https://github.com/apache/dolphinscheduler/pull/7771#discussion_r777800386



##########
File path: 
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
##########
@@ -138,6 +139,100 @@ public Result startProcessInstance(@ApiIgnore 
@RequestAttribute(value = Constant
         return returnDataList(result);
     }
 
+    /**
+     * batch execute process instance
+     *
+     * @param loginUser login user
+     * @param projectCode project code
+     * @param processDefinitionCodes process definition codes
+     * @param scheduleTime schedule time
+     * @param failureStrategy failure strategy
+     * @param startNodeList start nodes list
+     * @param taskDependType task depend type
+     * @param execType execute type
+     * @param warningType warning type
+     * @param warningGroupId warning group id
+     * @param runMode run mode
+     * @param processInstancePriority process instance priority
+     * @param workerGroup worker group
+     * @param timeout timeout
+     * @param expectedParallelismNumber the expected parallelism number when 
execute complement in parallel mode
+     * @return start process result code
+     */
+    @ApiOperation(value = "batchStartProcessInstance", notes = 
"BATCH_RUN_PROCESS_INSTANCE_NOTES")
+    @ApiImplicitParams({
+            @ApiImplicitParam(name = "processDefinitionCodes", value = 
"PROCESS_DEFINITION_CODES", required = true, dataType = "String", example = 
"1,2,3"),
+            @ApiImplicitParam(name = "scheduleTime", value = "SCHEDULE_TIME", 
required = true, dataType = "String"),
+            @ApiImplicitParam(name = "failureStrategy", value = 
"FAILURE_STRATEGY", required = true, dataType = "FailureStrategy"),
+            @ApiImplicitParam(name = "startNodeList", value = 
"START_NODE_LIST", dataType = "String"),
+            @ApiImplicitParam(name = "taskDependType", value = 
"TASK_DEPEND_TYPE", dataType = "TaskDependType"),
+            @ApiImplicitParam(name = "execType", value = "COMMAND_TYPE", 
dataType = "CommandType"),
+            @ApiImplicitParam(name = "warningType", value = "WARNING_TYPE", 
required = true, dataType = "WarningType"),
+            @ApiImplicitParam(name = "warningGroupId", value = 
"WARNING_GROUP_ID", required = true, dataType = "Int", example = "100"),
+            @ApiImplicitParam(name = "runMode", value = "RUN_MODE", dataType = 
"RunMode"),
+            @ApiImplicitParam(name = "processInstancePriority", value = 
"PROCESS_INSTANCE_PRIORITY", required = true, dataType = "Priority"),
+            @ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP", 
dataType = "String", example = "default"),
+            @ApiImplicitParam(name = "environmentCode", value = 
"ENVIRONMENT_CODE", dataType = "Long", example = "-1"),
+            @ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataType = 
"Int", example = "100"),
+            @ApiImplicitParam(name = "expectedParallelismNumber", value = 
"EXPECTED_PARALLELISM_NUMBER", dataType = "Int", example = "8")
+    })
+    @PostMapping(value = "batch-start-process-instance")
+    @ResponseStatus(HttpStatus.OK)
+    @ApiException(START_PROCESS_INSTANCE_ERROR)
+    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+    public Result batchStartProcessInstance(@ApiIgnore @RequestAttribute(value 
= Constants.SESSION_USER) User loginUser,
+                                       @ApiParam(name = "projectCode", value = 
"PROJECT_CODE", required = true) @PathVariable long projectCode,
+                                       @RequestParam(value = 
"processDefinitionCodes") String processDefinitionCodes,
+                                       @RequestParam(value = "scheduleTime", 
required = false) String scheduleTime,
+                                       @RequestParam(value = 
"failureStrategy", required = true) FailureStrategy failureStrategy,
+                                       @RequestParam(value = "startNodeList", 
required = false) String startNodeList,
+                                       @RequestParam(value = "taskDependType", 
required = false) TaskDependType taskDependType,
+                                       @RequestParam(value = "execType", 
required = false) CommandType execType,
+                                       @RequestParam(value = "warningType", 
required = true) WarningType warningType,
+                                       @RequestParam(value = "warningGroupId", 
required = false) int warningGroupId,
+                                       @RequestParam(value = "runMode", 
required = false) RunMode runMode,
+                                       @RequestParam(value = 
"processInstancePriority", required = false) Priority processInstancePriority,
+                                       @RequestParam(value = "workerGroup", 
required = false, defaultValue = "default") String workerGroup,
+                                       @RequestParam(value = 
"environmentCode", required = false, defaultValue = "-1") Long environmentCode,
+                                       @RequestParam(value = "timeout", 
required = false) Integer timeout,
+                                       @RequestParam(value = "startParams", 
required = false) String startParams,
+                                       @RequestParam(value = 
"expectedParallelismNumber", required = false) Integer 
expectedParallelismNumber,
+                                       @RequestParam(value = "dryRun", 
defaultValue = "0", required = false) int dryRun) {
+
+        if (timeout == null) {
+            timeout = Constants.MAX_TASK_TIMEOUT;
+        }
+        Map<String, String> startParamMap = null;
+        if (startParams != null) {
+            startParamMap = JSONUtils.toMap(startParams);
+        }
+
+        Map<String, Object> result = new HashMap<>();
+        String[] processDefinitionCodeArray = 
processDefinitionCodes.split(",");

Review comment:
       We have `,` in constant module

##########
File path: 
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
##########
@@ -138,6 +139,100 @@ public Result startProcessInstance(@ApiIgnore 
@RequestAttribute(value = Constant
         return returnDataList(result);
     }
 
+    /**
+     * batch execute process instance
+     *
+     * @param loginUser login user
+     * @param projectCode project code
+     * @param processDefinitionCodes process definition codes
+     * @param scheduleTime schedule time
+     * @param failureStrategy failure strategy
+     * @param startNodeList start nodes list
+     * @param taskDependType task depend type
+     * @param execType execute type
+     * @param warningType warning type
+     * @param warningGroupId warning group id
+     * @param runMode run mode
+     * @param processInstancePriority process instance priority
+     * @param workerGroup worker group
+     * @param timeout timeout
+     * @param expectedParallelismNumber the expected parallelism number when 
execute complement in parallel mode
+     * @return start process result code
+     */
+    @ApiOperation(value = "batchStartProcessInstance", notes = 
"BATCH_RUN_PROCESS_INSTANCE_NOTES")
+    @ApiImplicitParams({
+            @ApiImplicitParam(name = "processDefinitionCodes", value = 
"PROCESS_DEFINITION_CODES", required = true, dataType = "String", example = 
"1,2,3"),
+            @ApiImplicitParam(name = "scheduleTime", value = "SCHEDULE_TIME", 
required = true, dataType = "String"),
+            @ApiImplicitParam(name = "failureStrategy", value = 
"FAILURE_STRATEGY", required = true, dataType = "FailureStrategy"),
+            @ApiImplicitParam(name = "startNodeList", value = 
"START_NODE_LIST", dataType = "String"),
+            @ApiImplicitParam(name = "taskDependType", value = 
"TASK_DEPEND_TYPE", dataType = "TaskDependType"),
+            @ApiImplicitParam(name = "execType", value = "COMMAND_TYPE", 
dataType = "CommandType"),
+            @ApiImplicitParam(name = "warningType", value = "WARNING_TYPE", 
required = true, dataType = "WarningType"),
+            @ApiImplicitParam(name = "warningGroupId", value = 
"WARNING_GROUP_ID", required = true, dataType = "Int", example = "100"),
+            @ApiImplicitParam(name = "runMode", value = "RUN_MODE", dataType = 
"RunMode"),
+            @ApiImplicitParam(name = "processInstancePriority", value = 
"PROCESS_INSTANCE_PRIORITY", required = true, dataType = "Priority"),
+            @ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP", 
dataType = "String", example = "default"),
+            @ApiImplicitParam(name = "environmentCode", value = 
"ENVIRONMENT_CODE", dataType = "Long", example = "-1"),
+            @ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataType = 
"Int", example = "100"),
+            @ApiImplicitParam(name = "expectedParallelismNumber", value = 
"EXPECTED_PARALLELISM_NUMBER", dataType = "Int", example = "8")
+    })
+    @PostMapping(value = "batch-start-process-instance")
+    @ResponseStatus(HttpStatus.OK)
+    @ApiException(START_PROCESS_INSTANCE_ERROR)
+    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+    public Result batchStartProcessInstance(@ApiIgnore @RequestAttribute(value 
= Constants.SESSION_USER) User loginUser,
+                                       @ApiParam(name = "projectCode", value = 
"PROJECT_CODE", required = true) @PathVariable long projectCode,
+                                       @RequestParam(value = 
"processDefinitionCodes") String processDefinitionCodes,
+                                       @RequestParam(value = "scheduleTime", 
required = false) String scheduleTime,
+                                       @RequestParam(value = 
"failureStrategy", required = true) FailureStrategy failureStrategy,
+                                       @RequestParam(value = "startNodeList", 
required = false) String startNodeList,
+                                       @RequestParam(value = "taskDependType", 
required = false) TaskDependType taskDependType,
+                                       @RequestParam(value = "execType", 
required = false) CommandType execType,
+                                       @RequestParam(value = "warningType", 
required = true) WarningType warningType,
+                                       @RequestParam(value = "warningGroupId", 
required = false) int warningGroupId,
+                                       @RequestParam(value = "runMode", 
required = false) RunMode runMode,
+                                       @RequestParam(value = 
"processInstancePriority", required = false) Priority processInstancePriority,
+                                       @RequestParam(value = "workerGroup", 
required = false, defaultValue = "default") String workerGroup,
+                                       @RequestParam(value = 
"environmentCode", required = false, defaultValue = "-1") Long environmentCode,
+                                       @RequestParam(value = "timeout", 
required = false) Integer timeout,
+                                       @RequestParam(value = "startParams", 
required = false) String startParams,
+                                       @RequestParam(value = 
"expectedParallelismNumber", required = false) Integer 
expectedParallelismNumber,
+                                       @RequestParam(value = "dryRun", 
defaultValue = "0", required = false) int dryRun) {
+
+        if (timeout == null) {
+            timeout = Constants.MAX_TASK_TIMEOUT;
+        }

Review comment:
       I am not sure about that, but could we just add it to L197 and set 
`defaultValue` for it?

##########
File path: 
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
##########
@@ -36,7 +37,7 @@
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.User;
 
-import java.util.Map;
+import java.util.*;

Review comment:
       I do not think import `*` is a good idea

##########
File path: 
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
##########
@@ -138,6 +139,100 @@ public Result startProcessInstance(@ApiIgnore 
@RequestAttribute(value = Constant
         return returnDataList(result);
     }
 
+    /**
+     * batch execute process instance
+     *
+     * @param loginUser login user
+     * @param projectCode project code
+     * @param processDefinitionCodes process definition codes
+     * @param scheduleTime schedule time
+     * @param failureStrategy failure strategy
+     * @param startNodeList start nodes list
+     * @param taskDependType task depend type
+     * @param execType execute type
+     * @param warningType warning type
+     * @param warningGroupId warning group id
+     * @param runMode run mode
+     * @param processInstancePriority process instance priority
+     * @param workerGroup worker group
+     * @param timeout timeout
+     * @param expectedParallelismNumber the expected parallelism number when 
execute complement in parallel mode
+     * @return start process result code
+     */
+    @ApiOperation(value = "batchStartProcessInstance", notes = 
"BATCH_RUN_PROCESS_INSTANCE_NOTES")
+    @ApiImplicitParams({
+            @ApiImplicitParam(name = "processDefinitionCodes", value = 
"PROCESS_DEFINITION_CODES", required = true, dataType = "String", example = 
"1,2,3"),
+            @ApiImplicitParam(name = "scheduleTime", value = "SCHEDULE_TIME", 
required = true, dataType = "String"),
+            @ApiImplicitParam(name = "failureStrategy", value = 
"FAILURE_STRATEGY", required = true, dataType = "FailureStrategy"),
+            @ApiImplicitParam(name = "startNodeList", value = 
"START_NODE_LIST", dataType = "String"),
+            @ApiImplicitParam(name = "taskDependType", value = 
"TASK_DEPEND_TYPE", dataType = "TaskDependType"),
+            @ApiImplicitParam(name = "execType", value = "COMMAND_TYPE", 
dataType = "CommandType"),
+            @ApiImplicitParam(name = "warningType", value = "WARNING_TYPE", 
required = true, dataType = "WarningType"),
+            @ApiImplicitParam(name = "warningGroupId", value = 
"WARNING_GROUP_ID", required = true, dataType = "Int", example = "100"),
+            @ApiImplicitParam(name = "runMode", value = "RUN_MODE", dataType = 
"RunMode"),
+            @ApiImplicitParam(name = "processInstancePriority", value = 
"PROCESS_INSTANCE_PRIORITY", required = true, dataType = "Priority"),
+            @ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP", 
dataType = "String", example = "default"),
+            @ApiImplicitParam(name = "environmentCode", value = 
"ENVIRONMENT_CODE", dataType = "Long", example = "-1"),
+            @ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataType = 
"Int", example = "100"),
+            @ApiImplicitParam(name = "expectedParallelismNumber", value = 
"EXPECTED_PARALLELISM_NUMBER", dataType = "Int", example = "8")
+    })
+    @PostMapping(value = "batch-start-process-instance")
+    @ResponseStatus(HttpStatus.OK)
+    @ApiException(START_PROCESS_INSTANCE_ERROR)
+    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+    public Result batchStartProcessInstance(@ApiIgnore @RequestAttribute(value 
= Constants.SESSION_USER) User loginUser,
+                                       @ApiParam(name = "projectCode", value = 
"PROJECT_CODE", required = true) @PathVariable long projectCode,
+                                       @RequestParam(value = 
"processDefinitionCodes") String processDefinitionCodes,
+                                       @RequestParam(value = "scheduleTime", 
required = false) String scheduleTime,
+                                       @RequestParam(value = 
"failureStrategy", required = true) FailureStrategy failureStrategy,
+                                       @RequestParam(value = "startNodeList", 
required = false) String startNodeList,
+                                       @RequestParam(value = "taskDependType", 
required = false) TaskDependType taskDependType,
+                                       @RequestParam(value = "execType", 
required = false) CommandType execType,
+                                       @RequestParam(value = "warningType", 
required = true) WarningType warningType,
+                                       @RequestParam(value = "warningGroupId", 
required = false) int warningGroupId,
+                                       @RequestParam(value = "runMode", 
required = false) RunMode runMode,
+                                       @RequestParam(value = 
"processInstancePriority", required = false) Priority processInstancePriority,
+                                       @RequestParam(value = "workerGroup", 
required = false, defaultValue = "default") String workerGroup,
+                                       @RequestParam(value = 
"environmentCode", required = false, defaultValue = "-1") Long environmentCode,
+                                       @RequestParam(value = "timeout", 
required = false) Integer timeout,
+                                       @RequestParam(value = "startParams", 
required = false) String startParams,
+                                       @RequestParam(value = 
"expectedParallelismNumber", required = false) Integer 
expectedParallelismNumber,
+                                       @RequestParam(value = "dryRun", 
defaultValue = "0", required = false) int dryRun) {
+
+        if (timeout == null) {
+            timeout = Constants.MAX_TASK_TIMEOUT;
+        }
+        Map<String, String> startParamMap = null;
+        if (startParams != null) {
+            startParamMap = JSONUtils.toMap(startParams);
+        }
+
+        Map<String, Object> result = new HashMap<>();
+        String[] processDefinitionCodeArray = 
processDefinitionCodes.split(",");
+        Set<Long> processDefinitionCodeSet = new HashSet<>();
+        List<String> startFailedProcessDefinitionCodeList = new ArrayList<>();
+
+        for (String StrProcessDefinitionCode :processDefinitionCodeArray) {
+            long processDefinitionCode = 
Long.parseLong(StrProcessDefinitionCode);
+            processDefinitionCodeSet.add(processDefinitionCode);
+        }
+
+        for (long processDefinitionCode : processDefinitionCodeSet) {
+            result = execService.execProcessInstance(loginUser, projectCode, 
processDefinitionCode, scheduleTime, execType, failureStrategy,
+                    startNodeList, taskDependType, warningType, 
warningGroupId, runMode, processInstancePriority, workerGroup, environmentCode, 
timeout, startParamMap, expectedParallelismNumber, dryRun);
+
+            if (!Status.SUCCESS.equals(result.get(Constants.STATUS))) {
+                
startFailedProcessDefinitionCodeList.add(String.valueOf(processDefinitionCode));
+            }

Review comment:
       I have a question, should we termination all other start if any of 
`processDefinitionCodeSet` failed

##########
File path: 
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
##########
@@ -138,6 +139,100 @@ public Result startProcessInstance(@ApiIgnore 
@RequestAttribute(value = Constant
         return returnDataList(result);
     }
 
+    /**
+     * batch execute process instance
+     *
+     * @param loginUser login user
+     * @param projectCode project code
+     * @param processDefinitionCodes process definition codes
+     * @param scheduleTime schedule time
+     * @param failureStrategy failure strategy
+     * @param startNodeList start nodes list
+     * @param taskDependType task depend type
+     * @param execType execute type
+     * @param warningType warning type
+     * @param warningGroupId warning group id
+     * @param runMode run mode
+     * @param processInstancePriority process instance priority
+     * @param workerGroup worker group
+     * @param timeout timeout
+     * @param expectedParallelismNumber the expected parallelism number when 
execute complement in parallel mode
+     * @return start process result code
+     */
+    @ApiOperation(value = "batchStartProcessInstance", notes = 
"BATCH_RUN_PROCESS_INSTANCE_NOTES")
+    @ApiImplicitParams({
+            @ApiImplicitParam(name = "processDefinitionCodes", value = 
"PROCESS_DEFINITION_CODES", required = true, dataType = "String", example = 
"1,2,3"),
+            @ApiImplicitParam(name = "scheduleTime", value = "SCHEDULE_TIME", 
required = true, dataType = "String"),
+            @ApiImplicitParam(name = "failureStrategy", value = 
"FAILURE_STRATEGY", required = true, dataType = "FailureStrategy"),
+            @ApiImplicitParam(name = "startNodeList", value = 
"START_NODE_LIST", dataType = "String"),
+            @ApiImplicitParam(name = "taskDependType", value = 
"TASK_DEPEND_TYPE", dataType = "TaskDependType"),
+            @ApiImplicitParam(name = "execType", value = "COMMAND_TYPE", 
dataType = "CommandType"),
+            @ApiImplicitParam(name = "warningType", value = "WARNING_TYPE", 
required = true, dataType = "WarningType"),
+            @ApiImplicitParam(name = "warningGroupId", value = 
"WARNING_GROUP_ID", required = true, dataType = "Int", example = "100"),
+            @ApiImplicitParam(name = "runMode", value = "RUN_MODE", dataType = 
"RunMode"),
+            @ApiImplicitParam(name = "processInstancePriority", value = 
"PROCESS_INSTANCE_PRIORITY", required = true, dataType = "Priority"),
+            @ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP", 
dataType = "String", example = "default"),
+            @ApiImplicitParam(name = "environmentCode", value = 
"ENVIRONMENT_CODE", dataType = "Long", example = "-1"),
+            @ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataType = 
"Int", example = "100"),
+            @ApiImplicitParam(name = "expectedParallelismNumber", value = 
"EXPECTED_PARALLELISM_NUMBER", dataType = "Int", example = "8")
+    })
+    @PostMapping(value = "batch-start-process-instance")
+    @ResponseStatus(HttpStatus.OK)
+    @ApiException(START_PROCESS_INSTANCE_ERROR)
+    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+    public Result batchStartProcessInstance(@ApiIgnore @RequestAttribute(value 
= Constants.SESSION_USER) User loginUser,
+                                       @ApiParam(name = "projectCode", value = 
"PROJECT_CODE", required = true) @PathVariable long projectCode,
+                                       @RequestParam(value = 
"processDefinitionCodes") String processDefinitionCodes,
+                                       @RequestParam(value = "scheduleTime", 
required = false) String scheduleTime,
+                                       @RequestParam(value = 
"failureStrategy", required = true) FailureStrategy failureStrategy,
+                                       @RequestParam(value = "startNodeList", 
required = false) String startNodeList,
+                                       @RequestParam(value = "taskDependType", 
required = false) TaskDependType taskDependType,
+                                       @RequestParam(value = "execType", 
required = false) CommandType execType,
+                                       @RequestParam(value = "warningType", 
required = true) WarningType warningType,
+                                       @RequestParam(value = "warningGroupId", 
required = false) int warningGroupId,
+                                       @RequestParam(value = "runMode", 
required = false) RunMode runMode,
+                                       @RequestParam(value = 
"processInstancePriority", required = false) Priority processInstancePriority,
+                                       @RequestParam(value = "workerGroup", 
required = false, defaultValue = "default") String workerGroup,
+                                       @RequestParam(value = 
"environmentCode", required = false, defaultValue = "-1") Long environmentCode,
+                                       @RequestParam(value = "timeout", 
required = false) Integer timeout,
+                                       @RequestParam(value = "startParams", 
required = false) String startParams,
+                                       @RequestParam(value = 
"expectedParallelismNumber", required = false) Integer 
expectedParallelismNumber,
+                                       @RequestParam(value = "dryRun", 
defaultValue = "0", required = false) int dryRun) {
+
+        if (timeout == null) {
+            timeout = Constants.MAX_TASK_TIMEOUT;
+        }
+        Map<String, String> startParamMap = null;
+        if (startParams != null) {
+            startParamMap = JSONUtils.toMap(startParams);
+        }
+
+        Map<String, Object> result = new HashMap<>();
+        String[] processDefinitionCodeArray = 
processDefinitionCodes.split(",");
+        Set<Long> processDefinitionCodeSet = new HashSet<>();
+        List<String> startFailedProcessDefinitionCodeList = new ArrayList<>();
+
+        for (String StrProcessDefinitionCode :processDefinitionCodeArray) {
+            long processDefinitionCode = 
Long.parseLong(StrProcessDefinitionCode);
+            processDefinitionCodeSet.add(processDefinitionCode);
+        }

Review comment:
       Could we use lambda expression to get `processDefinitionCodeSet` instead 
of use middle variable `processDefinitionCodeArray `




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to