caishunfeng commented on code in PR #13103:
URL:
https://github.com/apache/dolphinscheduler/pull/13103#discussion_r1040372086
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java:
##########
@@ -2045,6 +2046,74 @@ private void setGlobalParamIfCommanded(ProcessDefinition
processDefinition, Map<
}
}
+ /**
+ * clear related data if command of process instance is EXECUTE_TASK
+ * 1. find all task code from sub dag (only contains related task)
+ * 2. set the flag of tasks to Flag.NO
+ * 3. clear varPool data from re-execute task instance in process instance
+ * 4. remove related task instance from taskInstanceMap, completeTaskMap,
validTaskMap
+ *
+ * @return task instance
+ */
+ private void clearDataIfExecuteTask() {
+ // only clear data if command is EXECUTE_TASK
+ if
(!processInstance.getCommandType().equals(CommandType.EXECUTE_TASK)) {
+ return;
+ }
+
+ // Records the key of varPool data to be removed
+ Set<String> taskCodesString = dag.getAllNodesList();
+
+ List<TaskInstance> removeTaskInstances = new ArrayList<>();
+
+ for (String taskCodeString : taskCodesString) {
+ long taskCode = Long.parseLong(taskCodeString);
+ TaskInstance taskInstance;
+ if (validTaskMap.containsKey(taskCode)) {
+ taskInstance = taskInstanceMap.get(validTaskMap.get(taskCode));
+ } else {
+ taskInstance =
taskInstanceDao.findTaskByInstanceIdAndCode(processInstance.getId(), taskCode);
+ }
+ if (taskInstance == null) {
+ continue;
Review Comment:
It's better to add some warn log.
##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java:
##########
@@ -487,6 +491,108 @@ public Map<String, Object> execute(User loginUser,
Integer workflowInstanceId, E
return execute(loginUser, processDefinition.getProjectCode(),
workflowInstanceId, executeType);
}
+ /**
+ * do action to execute task in process instance
+ *
+ * @param loginUser login user
+ * @param projectCode project code
+ * @param processInstanceId process instance id
+ * @param startNodeList start node list
+ * @param taskDependType task depend type
+ * @return execute result code
+ */
+ @Override
+ public Map<String, Object> executeTask(User loginUser, long projectCode,
Integer processInstanceId,
Review Comment:
Please don't use Map<String,Object> as result object, because it's not clear
enough. It's better to throw a ServiceException if not success.
```suggestion
public void executeTask(User loginUser, long projectCode, Integer
processInstanceId,
```
##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java:
##########
@@ -487,6 +491,108 @@ public Map<String, Object> execute(User loginUser,
Integer workflowInstanceId, E
return execute(loginUser, processDefinition.getProjectCode(),
workflowInstanceId, executeType);
}
+ /**
+ * do action to execute task in process instance
+ *
+ * @param loginUser login user
+ * @param projectCode project code
+ * @param processInstanceId process instance id
+ * @param startNodeList start node list
+ * @param taskDependType task depend type
+ * @return execute result code
+ */
+ @Override
+ public Map<String, Object> executeTask(User loginUser, long projectCode,
Integer processInstanceId,
+ String startNodeList,
TaskDependType taskDependType) {
+
+ Project project = projectMapper.queryByCode(projectCode);
+ // check user access for project
+
+ Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project, projectCode,
+
ApiFuncIdentificationConstant.map.get(ExecuteType.EXECUTE_TASK));
+
+ if (result.get(Constants.STATUS) != Status.SUCCESS) {
+ return result;
+ }
Review Comment:
```suggestion
projectService.checkProjectAndAuthThrowException(loginUser, project,
projectCode,
ApiFuncIdentificationConstant.map.get(ExecuteType.EXECUTE_TASK));
```
##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java:
##########
@@ -487,6 +491,108 @@ public Map<String, Object> execute(User loginUser,
Integer workflowInstanceId, E
return execute(loginUser, processDefinition.getProjectCode(),
workflowInstanceId, executeType);
}
+ /**
+ * do action to execute task in process instance
+ *
+ * @param loginUser login user
+ * @param projectCode project code
+ * @param processInstanceId process instance id
+ * @param startNodeList start node list
+ * @param taskDependType task depend type
+ * @return execute result code
+ */
+ @Override
+ public Map<String, Object> executeTask(User loginUser, long projectCode,
Integer processInstanceId,
+ String startNodeList,
TaskDependType taskDependType) {
+
+ Project project = projectMapper.queryByCode(projectCode);
+ // check user access for project
+
+ Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project, projectCode,
+
ApiFuncIdentificationConstant.map.get(ExecuteType.EXECUTE_TASK));
+
+ if (result.get(Constants.STATUS) != Status.SUCCESS) {
+ return result;
+ }
+
+ // check master exists
+ if (!checkMasterExists(result)) {
+ return result;
+ }
+
+ ProcessInstance processInstance =
processService.findProcessInstanceDetailById(processInstanceId)
Review Comment:
It seems need to check if workflow instance finished.
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java:
##########
@@ -2045,6 +2046,74 @@ private void setGlobalParamIfCommanded(ProcessDefinition
processDefinition, Map<
}
}
+ /**
+ * clear related data if command of process instance is EXECUTE_TASK
+ * 1. find all task code from sub dag (only contains related task)
+ * 2. set the flag of tasks to Flag.NO
+ * 3. clear varPool data from re-execute task instance in process instance
+ * 4. remove related task instance from taskInstanceMap, completeTaskMap,
validTaskMap
+ *
+ * @return task instance
+ */
+ private void clearDataIfExecuteTask() {
+ // only clear data if command is EXECUTE_TASK
+ if
(!processInstance.getCommandType().equals(CommandType.EXECUTE_TASK)) {
+ return;
+ }
+
+ // Records the key of varPool data to be removed
+ Set<String> taskCodesString = dag.getAllNodesList();
+
+ List<TaskInstance> removeTaskInstances = new ArrayList<>();
+
+ for (String taskCodeString : taskCodesString) {
+ long taskCode = Long.parseLong(taskCodeString);
+ TaskInstance taskInstance;
+ if (validTaskMap.containsKey(taskCode)) {
+ taskInstance = taskInstanceMap.get(validTaskMap.get(taskCode));
+ } else {
+ taskInstance =
taskInstanceDao.findTaskByInstanceIdAndCode(processInstance.getId(), taskCode);
+ }
+ if (taskInstance == null) {
+ continue;
+ }
+ removeTaskInstances.add(taskInstance);
+ }
+
+ for (TaskInstance taskInstance : removeTaskInstances) {
+ taskInstance.setFlag(Flag.NO);
+ taskInstanceDao.updateTaskInstance(taskInstance);
+ }
+
+ Set<String> removeSet = new HashSet<>();
+ for (TaskInstance taskInstance : removeTaskInstances) {
+ String taskVarPool = taskInstance.getVarPool();
+ if (StringUtils.isNotEmpty(taskVarPool)) {
+ List<Property> properties = JSONUtils.toList(taskVarPool,
Property.class);
+ List<String> keys = properties.stream()
+ .filter(property ->
property.getDirect().equals(Direct.OUT))
+ .map(property -> String.format("%s_%s",
property.getProp(), property.getType()))
+ .collect(Collectors.toList());
+ removeSet.addAll(keys);
+ }
+ }
+
+ // remove varPool data and update process instance
+ List<Property> processProperties =
JSONUtils.toList(processInstance.getVarPool(), Property.class);
+ processProperties = processProperties.stream()
+ .filter(property -> !(property.getDirect().equals(Direct.IN)
+ && removeSet.contains(String.format("%s_%s",
property.getProp(), property.getType()))))
+ .collect(Collectors.toList());
+
+ processInstance.setVarPool(JSONUtils.toJsonString(processProperties));
+ processInstanceDao.updateProcessInstance(processInstance);
+
+ // remove task instance from taskInstanceMap, completeTaskMap,
validTaskMap
Review Comment:
Should remove errorTaskMap?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]