This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 24a97fd [Bug] [dolphinscheduler-api]support workflow instance to
definition (#7930)
24a97fd is described below
commit 24a97fda75c362d4acf97867d7dfcb4cccbb26a3
Author: JinYong Li <[email protected]>
AuthorDate: Tue Jan 11 14:41:45 2022 +0800
[Bug] [dolphinscheduler-api]support workflow instance to definition (#7930)
* add task save and binds workflow
* add task update with upstream
* support workflow instance to definition
* fix ut
---
.../api/controller/ProcessInstanceController.java | 13 ++-
.../service/impl/ProcessDefinitionServiceImpl.java | 19 +++--
.../service/impl/ProcessInstanceServiceImpl.java | 99 ++++++++++------------
.../service/impl/TaskDefinitionServiceImpl.java | 6 +-
.../api/service/ProcessDefinitionServiceTest.java | 12 +--
.../api/service/ProcessInstanceServiceTest.java | 5 +-
.../api/service/TaskDefinitionServiceImplTest.java | 2 +-
.../main/resources/sql/dolphinscheduler_mysql.sql | 8 +-
.../2.1.0_schema/mysql/dolphinscheduler_ddl.sql | 6 +-
.../master/runner/WorkflowExecuteThread.java | 2 +-
.../service/process/ProcessService.java | 82 ++++++++++--------
.../service/process/ProcessServiceTest.java | 8 +-
12 files changed, 134 insertions(+), 128 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
index da48b6d..c888ff7 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
@@ -177,13 +177,13 @@ public class ProcessInstanceController extends
BaseController {
@ApiImplicitParams({
@ApiImplicitParam(name = "taskRelationJson", value =
"TASK_RELATION_JSON", type = "String"),
@ApiImplicitParam(name = "taskDefinitionJson", value =
"TASK_DEFINITION_JSON", type = "String"),
- @ApiImplicitParam(name = "id", value = "PROCESS_INSTANCE_ID", required
= true, dataType = "Int", example = "100"),
+ @ApiImplicitParam(name = "id", value = "PROCESS_INSTANCE_ID", required
= true, dataType = "Int", example = "1"),
@ApiImplicitParam(name = "scheduleTime", value = "SCHEDULE_TIME", type
= "String"),
- @ApiImplicitParam(name = "syncDefine", value = "SYNC_DEFINE", required
= true, type = "Boolean"),
- @ApiImplicitParam(name = "globalParams", value =
"PROCESS_GLOBAL_PARAMS", type = "String"),
+ @ApiImplicitParam(name = "syncDefine", value = "SYNC_DEFINE", required
= true, type = "Boolean", example = "false"),
+ @ApiImplicitParam(name = "globalParams", value =
"PROCESS_GLOBAL_PARAMS", type = "String", example = "[]"),
@ApiImplicitParam(name = "locations", value =
"PROCESS_INSTANCE_LOCATIONS", type = "String"),
- @ApiImplicitParam(name = "timeout", value = "PROCESS_TIMEOUT", type =
"String"),
- @ApiImplicitParam(name = "tenantCode", value = "TENANT_CODE", type =
"Int", example = "0")
+ @ApiImplicitParam(name = "timeout", value = "PROCESS_TIMEOUT", type =
"Int", example = "0"),
+ @ApiImplicitParam(name = "tenantCode", value = "TENANT_CODE", type =
"String", example = "default")
})
@PutMapping(value = "/{id}")
@ResponseStatus(HttpStatus.OK)
@@ -199,8 +199,7 @@ public class ProcessInstanceController extends
BaseController {
@RequestParam(value = "globalParams",
required = false, defaultValue = "[]") String globalParams,
@RequestParam(value = "locations",
required = false) String locations,
@RequestParam(value = "timeout",
required = false, defaultValue = "0") int timeout,
- @RequestParam(value = "tenantCode",
required = true) String tenantCode,
- @RequestParam(value = "flag", required
= false) Flag flag) {
+ @RequestParam(value = "tenantCode",
required = true) String tenantCode) {
Map<String, Object> result =
processInstanceService.updateProcessInstance(loginUser, projectCode, id,
taskRelationJson, taskDefinitionJson, scheduleTime, syncDefine,
globalParams, locations, timeout, tenantCode);
return returnDataList(result);
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index 957d3ac..ed87844 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -263,7 +263,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
ProcessDefinition
processDefinition,
List<TaskDefinitionLog>
taskDefinitionLogs) {
Map<String, Object> result = new HashMap<>();
- int saveTaskResult = processService.saveTaskDefine(loginUser,
processDefinition.getProjectCode(), taskDefinitionLogs);
+ int saveTaskResult = processService.saveTaskDefine(loginUser,
processDefinition.getProjectCode(), taskDefinitionLogs, Boolean.TRUE);
if (saveTaskResult == Constants.EXIT_CODE_SUCCESS) {
logger.info("The task has not changed, so skip");
}
@@ -271,12 +271,13 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR);
}
- int insertVersion = processService.saveProcessDefine(loginUser,
processDefinition, true);
+ int insertVersion = processService.saveProcessDefine(loginUser,
processDefinition, Boolean.TRUE, Boolean.TRUE);
if (insertVersion == 0) {
putMsg(result, Status.CREATE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.CREATE_PROCESS_DEFINITION_ERROR);
}
- int insertResult = processService.saveTaskRelation(loginUser,
processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion,
taskRelationList, taskDefinitionLogs);
+ int insertResult = processService.saveTaskRelation(loginUser,
processDefinition.getProjectCode(), processDefinition.getCode(),
+ insertVersion, taskRelationList, taskDefinitionLogs, Boolean.TRUE);
if (insertResult == Constants.EXIT_CODE_SUCCESS) {
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition);
@@ -590,7 +591,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
ProcessDefinition
processDefinitionDeepCopy,
List<TaskDefinitionLog>
taskDefinitionLogs) {
Map<String, Object> result = new HashMap<>();
- int saveTaskResult = processService.saveTaskDefine(loginUser,
processDefinition.getProjectCode(), taskDefinitionLogs);
+ int saveTaskResult = processService.saveTaskDefine(loginUser,
processDefinition.getProjectCode(), taskDefinitionLogs, Boolean.TRUE);
if (saveTaskResult == Constants.EXIT_CODE_SUCCESS) {
logger.info("The task has not changed, so skip");
}
@@ -603,14 +604,14 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
insertVersion = processDefinitionDeepCopy.getVersion();
} else {
processDefinition.setUpdateTime(new Date());
- insertVersion = processService.saveProcessDefine(loginUser,
processDefinition, true);
+ insertVersion = processService.saveProcessDefine(loginUser,
processDefinition, Boolean.TRUE, Boolean.TRUE);
}
if (insertVersion == 0) {
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
int insertResult = processService.saveTaskRelation(loginUser,
processDefinition.getProjectCode(),
- processDefinition.getCode(), insertVersion, taskRelationList,
taskDefinitionLogs);
+ processDefinition.getCode(), insertVersion, taskRelationList,
taskDefinitionLogs, Boolean.TRUE);
if (insertResult == Constants.EXIT_CODE_SUCCESS) {
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition);
@@ -748,7 +749,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
}
switch (releaseState) {
case ONLINE:
- List<ProcessTaskRelation> relationList =
processService.findRelationByCode(projectCode, code);
+ List<ProcessTaskRelation> relationList =
processService.findRelationByCode(code, processDefinition.getVersion());
if (CollectionUtils.isEmpty(relationList)) {
putMsg(result, Status.PROCESS_DAG_IS_EMPTY);
return result;
@@ -1899,7 +1900,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
private Map<String, Object> createEmptyDagDefine(User loginUser,
ProcessDefinition processDefinition) {
Map<String, Object> result = new HashMap<>();
- int insertVersion = processService.saveProcessDefine(loginUser,
processDefinition, true);
+ int insertVersion = processService.saveProcessDefine(loginUser,
processDefinition, Boolean.TRUE, Boolean.TRUE);
if (insertVersion == 0) {
putMsg(result, Status.CREATE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.CREATE_PROCESS_DEFINITION_ERROR);
@@ -2103,7 +2104,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
}
switch (releaseState) {
case ONLINE:
- List<ProcessTaskRelation> relationList =
processService.findRelationByCode(projectCode, code);
+ List<ProcessTaskRelation> relationList =
processService.findRelationByCode(code, processDefinition.getVersion());
if (CollectionUtils.isEmpty(relationList)) {
putMsg(result, Status.PROCESS_DAG_IS_EMPTY);
return result;
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
index 179f361..61af8f7 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
@@ -470,64 +470,55 @@ public class ProcessInstanceServiceImpl extends
BaseServiceImpl implements Proce
return result;
}
setProcessInstance(processInstance, tenantCode, scheduleTime,
globalParams, timeout);
- if (Boolean.TRUE.equals(syncDefine)) {
- List<TaskDefinitionLog> taskDefinitionLogs =
JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);
- if (taskDefinitionLogs.isEmpty()) {
- putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJson);
+ List<TaskDefinitionLog> taskDefinitionLogs =
JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);
+ if (taskDefinitionLogs.isEmpty()) {
+ putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJson);
+ return result;
+ }
+ for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
+ if (!CheckUtils.checkTaskDefinitionParameters(taskDefinitionLog)) {
+ putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID,
taskDefinitionLog.getName());
return result;
}
- for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
- if
(!CheckUtils.checkTaskDefinitionParameters(taskDefinitionLog)) {
- putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID,
taskDefinitionLog.getName());
- return result;
- }
- }
- int saveTaskResult = processService.saveTaskDefine(loginUser,
projectCode, taskDefinitionLogs);
- if (saveTaskResult == Constants.DEFINITION_FAILURE) {
- putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR);
- throw new
ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR);
- }
- ProcessDefinition processDefinition =
processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
- List<ProcessTaskRelationLog> taskRelationList =
JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class);
- //check workflow json is valid
- result =
processDefinitionService.checkProcessNodeList(taskRelationJson);
- if (result.get(Constants.STATUS) != Status.SUCCESS) {
+ }
+ int saveTaskResult = processService.saveTaskDefine(loginUser,
projectCode, taskDefinitionLogs, syncDefine);
+ if (saveTaskResult == Constants.DEFINITION_FAILURE) {
+ putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR);
+ throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR);
+ }
+ ProcessDefinition processDefinition =
processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
+ List<ProcessTaskRelationLog> taskRelationList =
JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class);
+ //check workflow json is valid
+ result =
processDefinitionService.checkProcessNodeList(taskRelationJson);
+ if (result.get(Constants.STATUS) != Status.SUCCESS) {
+ return result;
+ }
+ int tenantId = -1;
+ if (!Constants.DEFAULT.equals(tenantCode)) {
+ Tenant tenant = tenantMapper.queryByTenantCode(tenantCode);
+ if (tenant == null) {
+ putMsg(result, Status.TENANT_NOT_EXIST);
return result;
}
- int tenantId = -1;
- if (!Constants.DEFAULT.equals(tenantCode)) {
- Tenant tenant = tenantMapper.queryByTenantCode(tenantCode);
- if (tenant == null) {
- putMsg(result, Status.TENANT_NOT_EXIST);
- return result;
- }
- tenantId = tenant.getId();
- }
- ProcessDefinition processDefinitionDeepCopy =
JSONUtils.parseObject(JSONUtils.toJsonString(processDefinition),
ProcessDefinition.class);
- processDefinition.set(projectCode, processDefinition.getName(),
processDefinition.getDescription(), globalParams, locations, timeout, tenantId);
- processDefinition.setUpdateTime(new Date());
- int insertVersion;
- if (processDefinition.equals(processDefinitionDeepCopy)) {
- insertVersion = processDefinitionDeepCopy.getVersion();
- } else {
- processDefinition.setUpdateTime(new Date());
- insertVersion = processService.saveProcessDefine(loginUser,
processDefinition, false);
- }
- if (insertVersion == 0) {
- putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
- throw new
ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
- }
- int insertResult = processService.saveTaskRelation(loginUser,
processDefinition.getProjectCode(),
- processDefinition.getCode(), insertVersion, taskRelationList,
taskDefinitionLogs);
- if (insertResult == Constants.EXIT_CODE_SUCCESS) {
- putMsg(result, Status.SUCCESS);
- result.put(Constants.DATA_LIST, processDefinition);
- } else {
- putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
- throw new
ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
- }
- processInstance.setProcessDefinitionVersion(insertVersion);
+ tenantId = tenant.getId();
+ }
+ processDefinition.set(projectCode, processDefinition.getName(),
processDefinition.getDescription(), globalParams, locations, timeout, tenantId);
+ processDefinition.setUpdateTime(new Date());
+ int insertVersion = processService.saveProcessDefine(loginUser,
processDefinition, syncDefine, Boolean.FALSE);
+ if (insertVersion == 0) {
+ putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
+ throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
+ }
+ int insertResult = processService.saveTaskRelation(loginUser,
processDefinition.getProjectCode(),
+ processDefinition.getCode(), insertVersion, taskRelationList,
taskDefinitionLogs, syncDefine);
+ if (insertResult == Constants.EXIT_CODE_SUCCESS) {
+ putMsg(result, Status.SUCCESS);
+ result.put(Constants.DATA_LIST, processDefinition);
+ } else {
+ putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
+ throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
+ processInstance.setProcessDefinitionVersion(insertVersion);
int update = processService.updateProcessInstance(processInstance);
if (update == 0) {
putMsg(result, Status.UPDATE_PROCESS_INSTANCE_ERROR);
@@ -745,7 +736,7 @@ public class ProcessInstanceServiceImpl extends
BaseServiceImpl implements Proce
processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion()
);
- if (processDefinition != null && projectCode !=
processDefinition.getProjectCode()) {
+ if (processDefinition == null || projectCode !=
processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST,
processInstanceId);
return result;
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
index 094ce9f..fc06beb 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
@@ -137,7 +137,7 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
return result;
}
}
- int saveTaskResult = processService.saveTaskDefine(loginUser,
projectCode, taskDefinitionLogs);
+ int saveTaskResult = processService.saveTaskDefine(loginUser,
projectCode, taskDefinitionLogs, Boolean.TRUE);
if (saveTaskResult == Constants.DEFINITION_FAILURE) {
putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR);
@@ -230,13 +230,13 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
processTaskRelationLogList.addAll(processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()));
}
int insertResult = processService.saveTaskRelation(loginUser,
projectCode, processDefinition.getCode(), processDefinition.getVersion(),
- processTaskRelationLogList, null);
+ processTaskRelationLogList, Lists.newArrayList(),
Boolean.TRUE);
if (insertResult != Constants.EXIT_CODE_SUCCESS) {
putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR);
throw new
ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
}
}
- int saveTaskResult = processService.saveTaskDefine(loginUser,
projectCode, Lists.newArrayList(taskDefinition));
+ int saveTaskResult = processService.saveTaskDefine(loginUser,
projectCode, Lists.newArrayList(taskDefinition), Boolean.TRUE);
if (saveTaskResult == Constants.DEFINITION_FAILURE) {
putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR);
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
index 8bdcc1d..84c7f68 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
@@ -298,7 +298,7 @@ public class ProcessDefinitionServiceTest {
processDefinitionList.add(definition);
Set<Long> definitionCodes =
Arrays.stream("46".split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet());
Mockito.when(processDefineMapper.queryByCodes(definitionCodes)).thenReturn(processDefinitionList);
- Mockito.when(processService.saveProcessDefine(loginUser, definition,
true)).thenReturn(2);
+ Mockito.when(processService.saveProcessDefine(loginUser, definition,
Boolean.TRUE, Boolean.TRUE)).thenReturn(2);
Map<String, Object> map3 =
processDefinitionService.batchCopyProcessDefinition(
loginUser, projectCode, "46", 1L);
Assert.assertEquals(Status.SUCCESS, map3.get(Constants.STATUS));
@@ -330,7 +330,7 @@ public class ProcessDefinitionServiceTest {
processDefinitionList.add(definition);
Set<Long> definitionCodes =
Arrays.stream("46".split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet());
Mockito.when(processDefineMapper.queryByCodes(definitionCodes)).thenReturn(processDefinitionList);
- Mockito.when(processService.saveProcessDefine(loginUser, definition,
true)).thenReturn(2);
+ Mockito.when(processService.saveProcessDefine(loginUser, definition,
Boolean.TRUE, Boolean.TRUE)).thenReturn(2);
Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode,
46L)).thenReturn(getProcessTaskRelation(projectCode));
putMsg(result, Status.SUCCESS);
@@ -442,7 +442,7 @@ public class ProcessDefinitionServiceTest {
processTaskRelation.setProcessDefinitionCode(46L);
processTaskRelation.setPostTaskCode(123L);
processTaskRelationList.add(processTaskRelation);
- Mockito.when(processService.findRelationByCode(projectCode,
46L)).thenReturn(processTaskRelationList);
+ Mockito.when(processService.findRelationByCode(46L,
1)).thenReturn(processTaskRelationList);
Map<String, Object> onlineRes =
processDefinitionService.releaseProcessDefinition(
loginUser, projectCode, 46, ReleaseState.ONLINE);
Assert.assertEquals(Status.SUCCESS, onlineRes.get(Constants.STATUS));
@@ -692,10 +692,10 @@ public class ProcessDefinitionServiceTest {
Mockito.when(dataSourceMapper.queryDataSourceByNameAndUserId(userId,
"mysql_1")).thenReturn(dataSource);
long projectCode = 1001;
- Mockito.when(processService.saveTaskDefine(Mockito.same(loginUser),
Mockito.eq(projectCode), Mockito.notNull())).thenReturn(2);
- Mockito.when(processService.saveProcessDefine(Mockito.same(loginUser),
Mockito.notNull(), Mockito.notNull())).thenReturn(1);
+ Mockito.when(processService.saveTaskDefine(Mockito.same(loginUser),
Mockito.eq(projectCode), Mockito.notNull(),
Mockito.anyBoolean())).thenReturn(2);
+ Mockito.when(processService.saveProcessDefine(Mockito.same(loginUser),
Mockito.notNull(), Mockito.notNull(), Mockito.anyBoolean())).thenReturn(1);
Mockito.when(processService.saveTaskRelation(Mockito.same(loginUser),
Mockito.eq(projectCode), Mockito.anyLong(),
- Mockito.eq(1), Mockito.notNull(),
Mockito.notNull())).thenReturn(0);
+ Mockito.eq(1), Mockito.notNull(), Mockito.notNull(),
Mockito.anyBoolean())).thenReturn(0);
Map<String, Object> result =
processDefinitionService.importSqlProcessDefinition(loginUser, projectCode,
mockMultipartFile);
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
index c7c0996..d2c669d 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
@@ -424,7 +424,7 @@ public class ProcessInstanceServiceTest {
when(tenantMapper.queryByTenantCode("root")).thenReturn(tenant);
when(processService.getTenantForProcess(Mockito.anyInt(),
Mockito.anyInt())).thenReturn(tenant);
when(processService.updateProcessInstance(processInstance)).thenReturn(1);
- when(processService.saveProcessDefine(loginUser, processDefinition,
false)).thenReturn(1);
+ when(processService.saveProcessDefine(loginUser, processDefinition,
Boolean.TRUE, Boolean.FALSE)).thenReturn(1);
when(processDefinitionService.checkProcessNodeList(shellJson)).thenReturn(result);
putMsg(result, Status.SUCCESS, projectCode);
Map<String, Object> processInstanceFinishRes =
processInstanceService.updateProcessInstance(loginUser, projectCode, 1,
@@ -435,8 +435,9 @@ public class ProcessInstanceServiceTest {
when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
putMsg(result, Status.SUCCESS, projectCode);
+ when(processService.saveProcessDefine(loginUser, processDefinition,
Boolean.FALSE, Boolean.FALSE)).thenReturn(1);
Map<String, Object> successRes =
processInstanceService.updateProcessInstance(loginUser, projectCode, 1,
- shellJson, taskJson,"2020-02-21 00:00:00", false, "", "", 0,
"root");
+ shellJson, taskJson,"2020-02-21 00:00:00", Boolean.FALSE, "", "",
0, "root");
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
}
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
index d8852b4..266f95e 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
@@ -97,7 +97,7 @@ public class TaskDefinitionServiceImplTest {
+
"\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0,"
+
"\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}]";
List<TaskDefinitionLog> taskDefinitions =
JSONUtils.toList(createTaskDefinitionJson, TaskDefinitionLog.class);
- Mockito.when(processService.saveTaskDefine(loginUser, projectCode,
taskDefinitions)).thenReturn(1);
+ Mockito.when(processService.saveTaskDefine(loginUser, projectCode,
taskDefinitions, Boolean.TRUE)).thenReturn(1);
Map<String, Object> relation = taskDefinitionService
.createTaskDefinition(loginUser, projectCode,
createTaskDefinitionJson);
Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS));
diff --git
a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
index 2155618..b7dfc38 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
@@ -478,7 +478,7 @@ CREATE TABLE `t_ds_task_definition` (
`delay_time` int(11) DEFAULT '0' COMMENT 'delay execution time,unit: minute',
`resource_ids` text COMMENT 'resource id, separated by comma',
`task_group_id` int(11) DEFAULT NULL COMMENT 'task group id',
- `task_group_priority` tinyint(4) DEFAULT 1 COMMENT 'task group priority',
+ `task_group_priority` tinyint(4) DEFAULT '0' COMMENT 'task group priority',
`create_time` datetime NOT NULL COMMENT 'create time',
`update_time` datetime NOT NULL COMMENT 'update time',
PRIMARY KEY (`id`,`code`)
@@ -511,7 +511,7 @@ CREATE TABLE `t_ds_task_definition_log` (
`resource_ids` text DEFAULT NULL COMMENT 'resource id, separated by comma',
`operator` int(11) DEFAULT NULL COMMENT 'operator user id',
`task_group_id` int(11) DEFAULT NULL COMMENT 'task group id',
- `task_group_priority` tinyint(4) DEFAULT 1 COMMENT 'task group priority',
+ `task_group_priority` tinyint(4) DEFAULT 0 COMMENT 'task group priority',
`operate_time` datetime DEFAULT NULL COMMENT 'operate time',
`create_time` datetime NOT NULL COMMENT 'create time',
`update_time` datetime NOT NULL COMMENT 'update time',
@@ -538,7 +538,7 @@ CREATE TABLE `t_ds_process_task_relation` (
`create_time` datetime NOT NULL COMMENT 'create time',
`update_time` datetime NOT NULL COMMENT 'update time',
PRIMARY KEY (`id`),
- KEY `idx_project_code_project_code`
(`project_code`,`process_definition_code`)
+ KEY `idx_code` (`project_code`,`process_definition_code`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
-- ----------------------------
@@ -562,7 +562,7 @@ CREATE TABLE `t_ds_process_task_relation_log` (
`create_time` datetime NOT NULL COMMENT 'create time',
`update_time` datetime NOT NULL COMMENT 'update time',
PRIMARY KEY (`id`),
- KEY `idx_project_code_project_code`
(`project_code`,`process_definition_code`)
+ KEY `idx_process_code_version`
(`process_definition_code`,`process_definition_version`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
-- ----------------------------
diff --git
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.1.0_schema/mysql/dolphinscheduler_ddl.sql
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.1.0_schema/mysql/dolphinscheduler_ddl.sql
index 0a5c91a..e0d6673 100644
---
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.1.0_schema/mysql/dolphinscheduler_ddl.sql
+++
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.1.0_schema/mysql/dolphinscheduler_ddl.sql
@@ -16,11 +16,11 @@
*/
ALTER TABLE `t_ds_task_instance` MODIFY COLUMN `task_params` longtext COMMENT
'job custom parameters' AFTER `app_link`;
-ALTER TABLE `t_ds_process_task_relation` ADD INDEX
`idx_project_code_process_definition_code` (`project_code`,
`process_definition_code`) USING BTREE;
-ALTER TABLE `t_ds_process_task_relation_log` ADD INDEX
`idx_project_code_process_definition_code` (`project_code`,
`process_definition_code`) USING BTREE;
+ALTER TABLE `t_ds_process_task_relation` ADD KEY `idx_code` (`project_code`,
`process_definition_code`) USING BTREE;
+ALTER TABLE `t_ds_process_task_relation_log` ADD KEY
`idx_process_code_version`
(`process_definition_code`,`process_definition_version`) USING BTREE;
ALTER TABLE `t_ds_task_definition_log` ADD INDEX `idx_code_version`
(`code`,`version`) USING BTREE;
alter table t_ds_task_definition_log add `task_group_id` int(11) DEFAULT NULL
COMMENT 'task group id' AFTER `resource_ids`;
alter table t_ds_task_definition_log add `task_group_priority` int(11) DEFAULT
NULL COMMENT 'task group id' AFTER `task_group_id`;
alter table t_ds_task_definition add `task_group_id` int(11) DEFAULT NULL
COMMENT 'task group id' AFTER `resource_ids`;
-alter table t_ds_task_definition add `task_group_priority` int(11) DEFAULT
NULL COMMENT 'task group id' AFTER `task_group_id`;
\ No newline at end of file
+alter table t_ds_task_definition add `task_group_priority` int(11) DEFAULT '0'
COMMENT 'task group id' AFTER `task_group_id`;
\ No newline at end of file
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 5ca705d..2d83f9a 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -716,7 +716,7 @@ public class WorkflowExecuteThread {
List<TaskInstance> recoverNodeList =
getStartTaskInstanceList(processInstance.getCommandParam());
- List<ProcessTaskRelation> processTaskRelations =
processService.findRelationByCode(processDefinition.getProjectCode(),
processDefinition.getCode());
+ List<ProcessTaskRelation> processTaskRelations =
processService.findRelationByCode(processDefinition.getCode(),
processDefinition.getVersion());
List<TaskDefinitionLog> taskDefinitionLogs =
processService.getTaskDefineLogListByRelation(processTaskRelations);
List<TaskNode> taskNodeList =
processService.transformTask(processTaskRelations, taskDefinitionLogs);
forbiddenTaskMap.clear();
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 4441bfb..ea09aeb 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -2254,7 +2254,7 @@ public class ProcessService {
return StringUtils.join(resourceIds, ",");
}
- public int saveTaskDefine(User operator, long projectCode,
List<TaskDefinitionLog> taskDefinitionLogs) {
+ public int saveTaskDefine(User operator, long projectCode,
List<TaskDefinitionLog> taskDefinitionLogs, Boolean syncDefine) {
Date now = new Date();
List<TaskDefinitionLog> newTaskDefinitionLogs = new ArrayList<>();
List<TaskDefinitionLog> updateTaskDefinitionLogs = new ArrayList<>();
@@ -2299,13 +2299,21 @@ public class ProcessService {
newTaskDefinitionLogs.add(taskDefinitionToUpdate);
} else {
insertResult +=
taskDefinitionLogMapper.insert(taskDefinitionToUpdate);
- taskDefinitionToUpdate.setId(task.getId());
- updateResult +=
taskDefinitionMapper.updateById(taskDefinitionToUpdate);
+ if (Boolean.TRUE.equals(syncDefine)) {
+ taskDefinitionToUpdate.setId(task.getId());
+ updateResult +=
taskDefinitionMapper.updateById(taskDefinitionToUpdate);
+ } else {
+ updateResult++;
+ }
}
}
if (!newTaskDefinitionLogs.isEmpty()) {
- updateResult +=
taskDefinitionMapper.batchInsert(newTaskDefinitionLogs);
insertResult +=
taskDefinitionLogMapper.batchInsert(newTaskDefinitionLogs);
+ if (Boolean.TRUE.equals(syncDefine)) {
+ updateResult +=
taskDefinitionMapper.batchInsert(newTaskDefinitionLogs);
+ } else {
+ updateResult += newTaskDefinitionLogs.size();
+ }
}
return (insertResult & updateResult) > 0 ? 1 :
Constants.EXIT_CODE_SUCCESS;
}
@@ -2313,7 +2321,7 @@ public class ProcessService {
/**
* save processDefinition (including create or update processDefinition)
*/
- public int saveProcessDefine(User operator, ProcessDefinition
processDefinition, Boolean isFromProcessDefine) {
+ public int saveProcessDefine(User operator, ProcessDefinition
processDefinition, Boolean syncDefine, Boolean isFromProcessDefine) {
ProcessDefinitionLog processDefinitionLog = new
ProcessDefinitionLog(processDefinition);
Integer version =
processDefineLogMapper.queryMaxVersionForDefinition(processDefinition.getCode());
int insertVersion = version == null || version == 0 ?
Constants.VERSION_FIRST : version + 1;
@@ -2322,12 +2330,14 @@ public class ProcessService {
processDefinitionLog.setOperator(operator.getId());
processDefinitionLog.setOperateTime(processDefinition.getUpdateTime());
int insertLog = processDefineLogMapper.insert(processDefinitionLog);
- int result;
- if (0 == processDefinition.getId()) {
- result = processDefineMapper.insert(processDefinitionLog);
- } else {
- processDefinitionLog.setId(processDefinition.getId());
- result = processDefineMapper.updateById(processDefinitionLog);
+ int result = 1;
+ if (Boolean.TRUE.equals(syncDefine)) {
+ if (0 == processDefinition.getId()) {
+ result = processDefineMapper.insert(processDefinitionLog);
+ } else {
+ processDefinitionLog.setId(processDefinition.getId());
+ result = processDefineMapper.updateById(processDefinitionLog);
+ }
}
return (insertLog & result) > 0 ? insertVersion : 0;
}
@@ -2336,7 +2346,8 @@ public class ProcessService {
* save task relations
*/
public int saveTaskRelation(User operator, long projectCode, long
processDefinitionCode, int processDefinitionVersion,
- List<ProcessTaskRelationLog> taskRelationList,
List<TaskDefinitionLog> taskDefinitionLogs) {
+ List<ProcessTaskRelationLog> taskRelationList,
List<TaskDefinitionLog> taskDefinitionLogs,
+ Boolean syncDefine) {
if (taskRelationList.isEmpty()) {
return Constants.EXIT_CODE_SUCCESS;
}
@@ -2365,19 +2376,22 @@ public class ProcessService {
processTaskRelationLog.setOperator(operator.getId());
processTaskRelationLog.setOperateTime(now);
}
- List<ProcessTaskRelation> processTaskRelationList =
processTaskRelationMapper.queryByProcessCode(projectCode,
processDefinitionCode);
- if (!processTaskRelationList.isEmpty()) {
- Set<Integer> processTaskRelationSet =
processTaskRelationList.stream().map(ProcessTaskRelation::hashCode).collect(toSet());
- Set<Integer> taskRelationSet =
taskRelationList.stream().map(ProcessTaskRelationLog::hashCode).collect(toSet());
- boolean result =
CollectionUtils.isEqualCollection(processTaskRelationSet, taskRelationSet);
- if (result) {
- return Constants.EXIT_CODE_SUCCESS;
+ int insert = taskRelationList.size();
+ if (Boolean.TRUE.equals(syncDefine)) {
+ List<ProcessTaskRelation> processTaskRelationList =
processTaskRelationMapper.queryByProcessCode(projectCode,
processDefinitionCode);
+ if (!processTaskRelationList.isEmpty()) {
+ Set<Integer> processTaskRelationSet =
processTaskRelationList.stream().map(ProcessTaskRelation::hashCode).collect(toSet());
+ Set<Integer> taskRelationSet =
taskRelationList.stream().map(ProcessTaskRelationLog::hashCode).collect(toSet());
+ boolean result =
CollectionUtils.isEqualCollection(processTaskRelationSet, taskRelationSet);
+ if (result) {
+ return Constants.EXIT_CODE_SUCCESS;
+ }
+ processTaskRelationMapper.deleteByCode(projectCode,
processDefinitionCode);
}
- processTaskRelationMapper.deleteByCode(projectCode,
processDefinitionCode);
+ insert = processTaskRelationMapper.batchInsert(taskRelationList);
}
- int result = processTaskRelationMapper.batchInsert(taskRelationList);
int resultLog =
processTaskRelationLogMapper.batchInsert(taskRelationList);
- return (result & resultLog) > 0 ? Constants.EXIT_CODE_SUCCESS :
Constants.EXIT_CODE_FAILURE;
+ return (insert & resultLog) > 0 ? Constants.EXIT_CODE_SUCCESS :
Constants.EXIT_CODE_FAILURE;
}
public boolean isTaskOnline(long taskCode) {
@@ -2400,14 +2414,15 @@ public class ProcessService {
/**
* Generate the DAG Graph based on the process definition id
+ * Use temporarily before refactoring taskNode
*
* @param processDefinition process definition
* @return dag graph
*/
public DAG<String, TaskNode, TaskNodeRelation>
genDagGraph(ProcessDefinition processDefinition) {
- List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(),
processDefinition.getCode());
- List<TaskNode> taskNodeList = transformTask(processTaskRelations,
Lists.newArrayList());
- ProcessDag processDag = DagHelper.getProcessDag(taskNodeList, new
ArrayList<>(processTaskRelations));
+ List<ProcessTaskRelation> taskRelations =
this.findRelationByCode(processDefinition.getCode(),
processDefinition.getVersion());
+ List<TaskNode> taskNodeList = transformTask(taskRelations,
Lists.newArrayList());
+ ProcessDag processDag = DagHelper.getProcessDag(taskNodeList, new
ArrayList<>(taskRelations));
// Generate concrete Dag to be executed
return DagHelper.buildDagGraph(processDag);
}
@@ -2416,12 +2431,10 @@ public class ProcessService {
* generate DagData
*/
public DagData genDagData(ProcessDefinition processDefinition) {
- List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(),
processDefinition.getCode());
- List<TaskDefinitionLog> taskDefinitionLogList =
genTaskDefineList(processTaskRelations);
- List<TaskDefinition> taskDefinitions = taskDefinitionLogList.stream()
- .map(taskDefinitionLog ->
JSONUtils.parseObject(JSONUtils.toJsonString(taskDefinitionLog),
TaskDefinition.class))
- .collect(Collectors.toList());
- return new DagData(processDefinition, processTaskRelations,
taskDefinitions);
+ List<ProcessTaskRelation> taskRelations =
this.findRelationByCode(processDefinition.getCode(),
processDefinition.getVersion());
+ List<TaskDefinitionLog> taskDefinitionLogList =
genTaskDefineList(taskRelations);
+ List<TaskDefinition> taskDefinitions =
taskDefinitionLogList.stream().map(t -> (TaskDefinition)
t).collect(Collectors.toList());
+ return new DagData(processDefinition, taskRelations, taskDefinitions);
}
public List<TaskDefinitionLog> genTaskDefineList(List<ProcessTaskRelation>
processTaskRelations) {
@@ -2465,10 +2478,11 @@ public class ProcessService {
}
/**
- * find process task relation list by projectCode and processDefinitionCode
+ * find process task relation list by process
*/
- public List<ProcessTaskRelation> findRelationByCode(long projectCode, long
processDefinitionCode) {
- return processTaskRelationMapper.queryByProcessCode(projectCode,
processDefinitionCode);
+ public List<ProcessTaskRelation> findRelationByCode(long
processDefinitionCode, int processDefinitionVersion) {
+ List<ProcessTaskRelationLog> processTaskRelationLogList =
processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinitionCode,
processDefinitionVersion);
+ return processTaskRelationLogList.stream().map(r ->
(ProcessTaskRelation) r).collect(Collectors.toList());
}
/**
diff --git
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 510e3a0..76b5bad 100644
---
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -566,7 +566,7 @@ public class ProcessServiceTest {
Mockito.when(taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskDefinition.getCode(),
taskDefinition.getVersion())).thenReturn(taskDefinition);
Mockito.when(taskDefinitionLogMapper.queryMaxVersionForDefinition(taskDefinition.getCode())).thenReturn(1);
Mockito.when(taskDefinitionMapper.queryByCode(taskDefinition.getCode())).thenReturn(taskDefinition);
- int result = processService.saveTaskDefine(operator, projectCode,
taskDefinitionLogs);
+ int result = processService.saveTaskDefine(operator, projectCode,
taskDefinitionLogs, Boolean.TRUE);
Assert.assertEquals(0, result);
}
@@ -579,7 +579,7 @@ public class ProcessServiceTest {
processDefinition.setVersion(1);
processDefinition.setCode(11L);
- ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
+ ProcessTaskRelationLog processTaskRelation = new
ProcessTaskRelationLog();
processTaskRelation.setName("def 1");
processTaskRelation.setProcessDefinitionVersion(1);
processTaskRelation.setProjectCode(1L);
@@ -588,7 +588,7 @@ public class ProcessServiceTest {
processTaskRelation.setPreTaskCode(2L);
processTaskRelation.setUpdateTime(new Date());
processTaskRelation.setCreateTime(new Date());
- List<ProcessTaskRelation> list = new ArrayList<>();
+ List<ProcessTaskRelationLog> list = new ArrayList<>();
list.add(processTaskRelation);
TaskDefinitionLog taskDefinition = new TaskDefinitionLog();
@@ -616,7 +616,7 @@ public class ProcessServiceTest {
taskDefinitionLogs.add(td2);
Mockito.when(taskDefinitionLogMapper.queryByTaskDefinitions(any())).thenReturn(taskDefinitionLogs);
-
Mockito.when(processTaskRelationMapper.queryByProcessCode(Mockito.anyLong(),
Mockito.anyLong())).thenReturn(list);
+
Mockito.when(processTaskRelationLogMapper.queryByProcessCodeAndVersion(Mockito.anyLong(),
Mockito.anyInt())).thenReturn(list);
DAG<String, TaskNode, TaskNodeRelation>
stringTaskNodeTaskNodeRelationDAG =
processService.genDagGraph(processDefinition);
Assert.assertEquals(1,
stringTaskNodeTaskNodeRelationDAG.getNodesCount());