This is an automated email from the ASF dual-hosted git repository.
kerwin pushed a commit to branch 3.1.7-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/3.1.7-prepare by this push:
new 9b094e980b cherry-pick [Bug-13951 ][dolphinscheduler-service]
StartParams is not applied when task is
failover(RECOVER_TOLERANCE_FAULT_PROCESS CommandType) #13958
9b094e980b is described below
commit 9b094e980bc6f67f0e5b9216cc8f38cf22809a8c
Author: Drake Youngkun Min <[email protected]>
AuthorDate: Wed May 10 14:02:21 2023 +0900
cherry-pick [Bug-13951 ][dolphinscheduler-service] StartParams is not
applied when task is failover(RECOVER_TOLERANCE_FAULT_PROCESS CommandType)
#13958
---
.../service/process/ProcessServiceImpl.java | 19 ++++++++++++----
.../service/process/ProcessServiceTest.java | 25 ++++++++++++++++++++++
2 files changed, 40 insertions(+), 4 deletions(-)
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 9a5780aaf2..93fc7e2490 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -938,8 +938,9 @@ public class ProcessServiceImpl implements ProcessService {
}
if (cmdParam != null) {
CommandType commandTypeIfComplement =
getCommandTypeIfComplement(processInstance, command);
- // reset global params while repeat running is needed by cmdParam
- if (commandTypeIfComplement == CommandType.REPEAT_RUNNING) {
+ // reset global params while repeat running and recover tolerance
fault process is needed by cmdParam
+ if (commandTypeIfComplement == CommandType.REPEAT_RUNNING ||
+ commandTypeIfComplement ==
CommandType.RECOVER_TOLERANCE_FAULT_PROCESS) {
setGlobalParamIfCommanded(processDefinition, cmdParam);
}
@@ -2081,8 +2082,7 @@ public class ProcessServiceImpl implements ProcessService
{
cmd.setProcessDefinitionCode(processDefinition.getCode());
cmd.setProcessDefinitionVersion(processDefinition.getVersion());
cmd.setProcessInstanceId(processInstance.getId());
- cmd.setCommandParam(
- String.format("{\"%s\":%d}",
CMD_PARAM_RECOVER_PROCESS_ID_STRING, processInstance.getId()));
+
cmd.setCommandParam(JSONUtils.toJsonString(createCommandParams(processInstance)));
cmd.setExecutorId(processInstance.getExecutorId());
cmd.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS);
cmd.setProcessInstancePriority(processInstance.getProcessInstancePriority());
@@ -3202,4 +3202,15 @@ public class ProcessServiceImpl implements
ProcessService {
}
}
}
+
+ private Map<String, Object> createCommandParams(ProcessInstance
processInstance) {
+ Map<String, Object> commandMap =
+ JSONUtils.parseObject(processInstance.getCommandParam(), new
TypeReference<Map<String, Object>>() {
+ });
+ Map<String, Object> recoverFailoverCommandParams = new HashMap<>();
+ Optional.ofNullable(MapUtils.getObject(commandMap,
CMD_PARAM_START_PARAMS))
+ .ifPresent(startParams ->
recoverFailoverCommandParams.put(CMD_PARAM_START_PARAMS, startParams));
+ recoverFailoverCommandParams.put(CMD_PARAM_RECOVER_PROCESS_ID_STRING,
processInstance.getId());
+ return recoverFailoverCommandParams;
+ }
}
diff --git
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 3e379ebcf4..d44c8c01c5 100644
---
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -477,6 +477,31 @@ public class ProcessServiceTest {
Mockito.when(commandMapper.deleteById(9)).thenReturn(1);
ProcessInstance processInstance10 = processService.handleCommand(host,
command9);
Assert.assertTrue(processInstance10 != null);
+
+ // build command same as
processService.processNeedFailoverProcessInstances(processInstance);
+ Command command12 = new Command();
+ command12.setId(12);
+ command12.setProcessDefinitionCode(definitionCode);
+ command12.setProcessDefinitionVersion(definitionVersion);
+ command12.setProcessInstanceId(processInstanceId);
+ command12.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS);
+ HashMap<String, String> startParams12 = new HashMap<>();
+ startParams12.put("startParam11", "testStartParam11");
+ HashMap<String, String> commandParams12 = new HashMap<>();
+ commandParams12.put(CMD_PARAM_START_PARAMS,
JSONUtils.toJsonString(startParams12));
+ commandParams12.put("ProcessInstanceId", "222");
+ command12.setCommandParam(JSONUtils.toJsonString(commandParams12));
+
Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance);
+ Mockito.when(commandMapper.deleteById(12)).thenReturn(1);
+ Mockito.when(curingGlobalParamsService.curingGlobalParams(222,
+ processDefinition.getGlobalParamMap(),
+ processDefinition.getGlobalParamList(),
+ CommandType.RECOVER_TOLERANCE_FAULT_PROCESS,
+ processInstance.getScheduleTime(),
null)).thenReturn("\"testStartParam11\"");
+ ProcessInstance processInstance13 = processService.handleCommand(host,
command12);
+ Assert.assertNotNull(processInstance13);
+ Assert.assertNotNull(processInstance13.getGlobalParams());
+
Assert.assertTrue(processInstance13.getGlobalParams().contains("\"testStartParam11\""));
}
@Test(expected = ServiceException.class)