This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch 2.0.2-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/2.0.2-prepare by this push:
new 890faff [Bug][ApiServer] workflow copy (#7694)
890faff is described below
commit 890faffd22247dc2577b4127d06f1ec0d8763033
Author: JinYong Li <[email protected]>
AuthorDate: Wed Dec 29 00:27:54 2021 +0800
[Bug][ApiServer] workflow copy (#7694)
* fix workflow copy
* fix copy
* fix copy
* code style
---
.../apache/dolphinscheduler/api/enums/Status.java | 2 +-
.../service/impl/ProcessDefinitionServiceImpl.java | 46 +++++++++++++++++-----
2 files changed, 38 insertions(+), 10 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index 87f658b..413a328 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -293,7 +293,7 @@ public enum Status {
PROJECT_PROCESS_NOT_MATCH(50054, "the project and the process is not
match", "项目和工作流不匹配"),
DELETE_EDGE_ERROR(50055, "delete edge error", "删除工作流任务连接线错误"),
NOT_SUPPORT_UPDATE_TASK_DEFINITION(50056, "task state does not support
modification", "当前任务不支持修改"),
- NOT_SUPPORT_COPY_TASK_TYPE(50057, "task type [{}] does not support copy",
"不支持复制的任务类型[{}]"),
+ NOT_SUPPORT_COPY_TASK_TYPE(50057, "task type [{0}] does not support copy",
"不支持复制的任务类型[{0}]"),
HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"),
/**
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index 9c91043..20fa726 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -76,8 +76,6 @@ import
org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.mapred.TaskLog.LogName;
-import org.apache.yetus.audience.InterfaceAudience.Public;
import java.io.BufferedOutputStream;
import java.io.IOException;
@@ -108,6 +106,7 @@ import org.springframework.web.multipart.MultipartFile;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Lists;
@@ -1099,13 +1098,13 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, codes);
return result;
}
- HashMap<Long, Project> userProjects = new
HashMap(Constants.DEFAULT_HASH_MAP_SIZE);
+ HashMap<Long, Project> userProjects = new
HashMap<>(Constants.DEFAULT_HASH_MAP_SIZE);
projectMapper.queryProjectCreatedAndAuthorizedByUserId(loginUser.getId())
- .forEach(userProject ->
userProjects.put(userProject.getCode(), userProject));
+ .forEach(userProject -> userProjects.put(userProject.getCode(),
userProject));
// check processDefinition exist in project
- List<ProcessDefinition> processDefinitionListInProject =
processDefinitionList.stream().
- filter(o ->
userProjects.containsKey(o.getProjectCode())).collect(Collectors.toList());
+ List<ProcessDefinition> processDefinitionListInProject =
processDefinitionList.stream()
+ .filter(o ->
userProjects.containsKey(o.getProjectCode())).collect(Collectors.toList());
if (CollectionUtils.isEmpty(processDefinitionListInProject)) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, codes);
return result;
@@ -1309,6 +1308,9 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
}
List<String> failedProcessList = new ArrayList<>();
doBatchOperateProcessDefinition(loginUser, targetProjectCode,
failedProcessList, codes, result, true);
+ if (result.get(Constants.STATUS) == Status.NOT_SUPPORT_COPY_TASK_TYPE)
{
+ return result;
+ }
checkBatchOperateResult(projectCode, targetProjectCode, result,
failedProcessList, true);
return result;
}
@@ -1386,18 +1388,35 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
processDefinition.setProjectCode(targetProjectCode);
if (isCopy) {
List<TaskDefinitionLog> taskDefinitionLogs =
processService.genTaskDefineList(processTaskRelations);
+ Map<Long, Long> taskCodeMap = new HashMap<>();
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs)
{
if
(TaskType.CONDITIONS.getDesc().equals(taskDefinitionLog.getTaskType())
||
TaskType.SWITCH.getDesc().equals(taskDefinitionLog.getTaskType())
- ||
TaskType.SUB_PROCESS.getDesc().equals(taskDefinitionLog.getTaskType())) {
+ ||
TaskType.SUB_PROCESS.getDesc().equals(taskDefinitionLog.getTaskType())
+ ||
TaskType.DEPENDENT.getDesc().equals(taskDefinitionLog.getTaskType())) {
putMsg(result, Status.NOT_SUPPORT_COPY_TASK_TYPE,
taskDefinitionLog.getTaskType());
- throw new
ServiceException(Status.NOT_SUPPORT_COPY_TASK_TYPE);
+ return;
+ }
+ try {
+ long taskCode =
CodeGenerateUtils.getInstance().genCode();
+ taskCodeMap.put(taskDefinitionLog.getCode(), taskCode);
+ taskDefinitionLog.setCode(taskCode);
+ } catch (CodeGenerateException e) {
+ putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS);
+ throw new
ServiceException(Status.INTERNAL_SERVER_ERROR_ARGS);
}
taskDefinitionLog.setProjectCode(targetProjectCode);
- taskDefinitionLog.setCode(0L);
taskDefinitionLog.setVersion(0);
taskDefinitionLog.setName(taskDefinitionLog.getName() +
"_copy_" + DateUtils.getCurrentTimeStamp());
}
+ for (ProcessTaskRelationLog processTaskRelationLog :
taskRelationList) {
+ if (processTaskRelationLog.getPreTaskCode() > 0) {
+
processTaskRelationLog.setPreTaskCode(taskCodeMap.get(processTaskRelationLog.getPreTaskCode()));
+ }
+ if (processTaskRelationLog.getPostTaskCode() > 0) {
+
processTaskRelationLog.setPostTaskCode(taskCodeMap.get(processTaskRelationLog.getPostTaskCode()));
+ }
+ }
try {
processDefinition.setCode(CodeGenerateUtils.getInstance().genCode());
} catch (CodeGenerateException e) {
@@ -1407,6 +1426,15 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
processDefinition.setId(0);
processDefinition.setUserId(loginUser.getId());
processDefinition.setName(processDefinition.getName() +
"_copy_" + DateUtils.getCurrentTimeStamp());
+ if (StringUtils.isNotBlank(processDefinition.getLocations())) {
+ ArrayNode jsonNodes =
JSONUtils.parseArray(processDefinition.getLocations());
+ for (int i = 0; i < jsonNodes.size(); i++) {
+ ObjectNode node = (ObjectNode) jsonNodes.path(i);
+ node.put("taskCode",
taskCodeMap.get(node.get("taskCode").asLong()));
+ jsonNodes.set(i, node);
+ }
+
processDefinition.setLocations(JSONUtils.toJsonString(jsonNodes));
+ }
try {
result.putAll(createDagDefine(loginUser, taskRelationList,
processDefinition, taskDefinitionLogs));
} catch (Exception e) {