lenboo commented on a change in pull request #4708:
URL:
https://github.com/apache/incubator-dolphinscheduler/pull/4708#discussion_r571561391
##########
File path:
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
##########
@@ -2055,4 +2111,199 @@ public String changeJson(ProcessData processData,
String oldJson) {
}
return JSONUtils.toJsonString(processData);
}
+
+ /**
+ * switch process definition version to process definition log version
+ *
+ * @param processDefinition
+ * @param processDefinitionLog
+ * @return
+ */
+ public int switchVersion(ProcessDefinition processDefinition,
ProcessDefinitionLog processDefinitionLog) {
+ if (null == processDefinition || null == processDefinitionLog) {
+ return Constants.EXIT_CODE_FAILURE;
+ }
+
+ ProcessDefinition tmpDefinition =
JSONUtils.parseObject(JSONUtils.toJsonString(processDefinitionLog),
+ ProcessDefinition.class);
+ tmpDefinition.setId(processDefinition.getId());
+ tmpDefinition.setReleaseState(ReleaseState.OFFLINE);
+ tmpDefinition.setFlag(Flag.YES);
+
+ int switchResult = 0;
+ if (0 == processDefinition.getId()) {
+
+ switchResult = processDefineMapper.insert(tmpDefinition);
+ } else {
+ switchResult = processDefineMapper.updateById(tmpDefinition);
+ }
+ //TODO... switch task relations
+ return switchResult;
+ }
+
+ /**
+ * update task definition
+ *
+ * @param operator
+ * @param projectCode
+ * @param taskNode
+ * @param taskDefinition
+ * @return
+ */
+ public int updateTaskDefinition(User operator, Long projectCode, TaskNode
taskNode, TaskDefinition taskDefinition) {
+
+ List<TaskDefinitionLog> taskDefinitionLogs =
taskDefinitionLogMapper.queryByDefinitionCode(taskDefinition.getCode());
+ int version = taskDefinitionLogs
+ .stream()
+ .map(TaskDefinitionLog::getVersion)
+ .max((x, y) -> x > y ? x : y)
+ .orElse(0) + 1;
+ Date now = new Date();
+ taskDefinition.setVersion(version);
+ taskDefinition.setCode(taskDefinition.getCode());
+ taskDefinition.setName(taskNode.getName());
+ taskDefinition.setDescription(taskNode.getDesc());
+ taskDefinition.setProjectCode(projectCode);
+ taskDefinition.setUserId(operator.getId());
+ taskDefinition.setTaskType(TaskType.of(taskNode.getType()));
+ taskDefinition.setTaskParams(taskNode.getParams());
+ taskDefinition.setFlag(taskNode.isForbidden() ? Flag.NO : Flag.YES);
+ taskDefinition.setTaskPriority(taskNode.getTaskInstancePriority());
+ taskDefinition.setWorkerGroup(taskNode.getWorkerGroup());
+ taskDefinition.setFailRetryTimes(taskNode.getMaxRetryTimes());
+ taskDefinition.setFailRetryInterval(taskNode.getRetryInterval());
+
taskDefinition.setTimeoutFlag(taskNode.getTaskTimeoutParameter().getEnable() ?
TimeoutFlag.OPEN : TimeoutFlag.CLOSE);
+
taskDefinition.setTaskTimeoutStrategy(taskNode.getTaskTimeoutParameter().getStrategy());
+
taskDefinition.setTimeout(taskNode.getTaskTimeoutParameter().getInterval());
+ taskDefinition.setUpdateTime(now);
+ taskDefinition.setResourceIds(getResourceIds(taskDefinition));
+ int update = taskDefinitionMapper.updateById(taskDefinition);
+ // save task definition log
+ TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog();
+ taskDefinitionLog.set(taskDefinition);
+ taskDefinitionLog.setOperator(operator.getId());
+ taskDefinitionLog.setOperateTime(now);
+ int insert = taskDefinitionLogMapper.insert(taskDefinitionLog);
+ return insert & update;
+ }
+
+ /**
+ * get resource ids
+ *
+ * @param taskDefinition taskDefinition
+ * @return resource ids
+ */
+ public String getResourceIds(TaskDefinition taskDefinition) {
+ Set<Integer> resourceIds = null;
+ // TODO modify taskDefinition.getTaskType()
+ AbstractParameters params =
TaskParametersUtils.getParameters(taskDefinition.getTaskType().getDescp(),
taskDefinition.getTaskParams());
+
+ if (params != null &&
CollectionUtils.isNotEmpty(params.getResourceFilesList())) {
+ resourceIds = params.getResourceFilesList().
+ stream()
+ .filter(t -> t.getId() != 0)
+ .map(ResourceInfo::getId)
+ .collect(Collectors.toSet());
+ }
+ if (CollectionUtils.isEmpty(resourceIds)) {
+ return StringUtils.EMPTY;
+ }
+ return StringUtils.join(resourceIds, ",");
+ }
+
+ /**
+ * @param operator
+ * @param name
+ * @param desc
+ * @param locations
+ * @param connects
+ * @param project
+ * @param processData
+ * @param processDefinition
+ * @return
+ */
+ public int saveProcessDefinition(User operator, Project project, String
name, String desc, String locations,
+ String connects, ProcessData processData,
ProcessDefinition processDefinition) {
+ List<TaskNode> taskNodeList = (processData.getTasks() == null) ? new
ArrayList<>() : processData.getTasks();
+ for (TaskNode task : taskNodeList) {
+ // TODO update by code directly
+ TaskDefinition taskDefinition =
taskDefinitionMapper.queryByDefinitionName(project.getCode(), task.getName());
+ updateTaskDefinition(operator, project.getCode(), task,
taskDefinition);
+ }
+ createTaskAndRelation(operator, project.getName(), "",
processDefinition, processData);
+ ProcessDefinitionLog processDefinitionLog = updateProcessLog(operator,
processDefinition.getCode(),
+ name, processData, project, desc, locations, connects);
+ return switchVersion(processDefinition, processDefinitionLog);
+ }
+
+ /**
+ * @param operator
+ * @param processDefinitionCode
+ * @param processDefinitionName
+ * @param processData
+ * @param project
+ * @param desc
+ * @param locations
+ * @param connects
+ * @return
+ */
+ public ProcessDefinitionLog updateProcessLog(User operator, Long
processDefinitionCode, String processDefinitionName,
+ ProcessData processData,
Project project,
+ String desc, String
locations, String connects) {
+ ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog();
+ int version =
processDefinitionLogMapper.queryMaxVersionForDefinition(processDefinitionLog.getCode());
+ processDefinitionLog.setCode(processDefinitionCode);
+ processDefinitionLog.setVersion(version);
+ processDefinitionLog.setName(processDefinitionName);
+ processDefinitionLog.setReleaseState(ReleaseState.OFFLINE);
+ processDefinitionLog.setProjectCode(project.getCode());
+ processDefinitionLog.setDescription(desc);
+ processDefinitionLog.setLocations(locations);
+ processDefinitionLog.setConnects(connects);
+ processDefinitionLog.setTimeout(processData.getTimeout());
+ processDefinitionLog.setTenantId(processData.getTenantId());
+ processDefinitionLog.setOperator(operator.getId());
+ Date updateTime = new Date();
+ processDefinitionLog.setOperateTime(updateTime);
+
+ //custom global params
+ List<Property> globalParamsList = new ArrayList<>();
+ if (CollectionUtils.isNotEmpty(processData.getGlobalParams())) {
+ Set<Property> userDefParamsSet = new
HashSet<>(processData.getGlobalParams());
+ globalParamsList = new ArrayList<>(userDefParamsSet);
+ }
+ processDefinitionLog.setGlobalParamList(globalParamsList);
+ processDefinitionLog.setUpdateTime(updateTime);
+ processDefinitionLog.setFlag(Flag.YES);
+
+ processDefinitionLog.setOperator(operator.getId());
+
processDefinitionLog.setOperateTime(processDefinitionLog.getUpdateTime());
+ int insert = processDefinitionLogMapper.insert(processDefinitionLog);
+ if (insert > 0) {
+ return processDefinitionLog;
+ }
+ return null;
Review comment:
"null judgement" exists in "switchVersion"
may be you miss it.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]