This is an automated email from the ASF dual-hosted git repository. zhongjiajie pushed a commit to branch 3.2.1-prepare in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit 52d315d3cabbcb3aa27c83b3730b5db77adda4a1 Author: Wenjun Ruan <wen...@apache.org> AuthorDate: Tue Feb 6 10:09:30 2024 +0800 Fix Recover WorkflowInstance will casue workflow Instance state is success but task insatnce is killed/paused (#15574) (cherry picked from commit 43a06525a29e01f6769aa0057512884d7441b438) --- .../api/service/ProcessInstanceServiceTest.java | 1 - .../dao/mapper/TaskInstanceMapper.java | 33 ------- .../dao/repository/TaskInstanceDao.java | 5 +- .../dao/mapper/TaskInstanceMapper.xml | 99 +------------------ .../dao/mapper/ProcessInstanceMapperTest.java | 24 ----- .../dao/mapper/TaskInstanceMapperTest.java | 110 --------------------- .../service/process/ProcessService.java | 3 - .../service/process/ProcessServiceImpl.java | 61 ++++-------- 8 files changed, 26 insertions(+), 310 deletions(-) diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java index 800f8bda18..9fb4d83052 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java @@ -769,7 +769,6 @@ public class ProcessInstanceServiceTest { processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion())).thenReturn(new ProcessDefinitionLog()); when(processInstanceMapper.queryDetailById(1)).thenReturn(processInstance); - when(taskInstanceMapper.queryByInstanceIdAndName(Mockito.anyInt(), Mockito.any())).thenReturn(taskInstance); DAG<Long, TaskNode, TaskNodeRelation> graph = new DAG<>(); for (long i = 1; i <= 7; ++i) { graph.addNode(i, new TaskNode()); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java index 04e818f5c3..f80dac826c 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java @@ -39,23 +39,10 @@ import com.baomidou.mybatisplus.core.metadata.IPage; */ public interface TaskInstanceMapper extends BaseMapper<TaskInstance> { - List<Integer> queryTaskByProcessIdAndState(@Param("processInstanceId") Integer processInstanceId, - @Param("state") Integer state); - List<TaskInstance> findValidTaskListByProcessId(@Param("processInstanceId") Integer processInstanceId, @Param("flag") Flag flag, @Param("testFlag") int testFlag); - List<TaskInstance> queryByHostAndStatus(@Param("host") String host, - @Param("states") int[] stateArray); - - int setFailoverByHostAndStateArray(@Param("host") String host, - @Param("states") int[] stateArray, - @Param("destStatus") TaskExecutionStatus destStatus); - - TaskInstance queryByInstanceIdAndName(@Param("processInstanceId") int processInstanceId, - @Param("name") String name); - TaskInstance queryByInstanceIdAndCode(@Param("processInstanceId") int processInstanceId, @Param("taskCode") Long taskCode); @@ -66,9 +53,6 @@ public interface TaskInstanceMapper extends BaseMapper<TaskInstance> { List<TaskInstance> queryByProcessInstanceIdsAndTaskCodes(@Param("processInstanceIds") List<Integer> processInstanceIds, @Param("taskCodes") List<Long> taskCodes); - Integer countTask(@Param("projectCodes") Long[] projectCodes, - @Param("taskIds") int[] taskIds); - /** * Statistics task instance group by given project codes list by start time * <p> @@ -97,20 +81,6 @@ public interface TaskInstanceMapper extends BaseMapper<TaskInstance> { @Param("endTime") Date endTime, @Param("projectIds") Set<Integer> projectIds); - /** - * Statistics task instance group by given project codes list by submit time - * <p> - * We only need project codes to determine whether the task instance belongs to the user or not. - * - * @param startTime Statistics start time - * @param endTime Statistics end time - * @param projectCodes Project codes list to filter - * @return List of ExecuteStatusCount - */ - List<ExecuteStatusCount> countTaskInstanceStateByProjectCodesAndStatesBySubmitTime(@Param("startTime") Date startTime, - @Param("endTime") Date endTime, - @Param("projectCodes") Long[] projectCodes, - @Param("states") List<TaskExecutionStatus> states); /** * Statistics task instance group by given project codes list by submit time * <p> @@ -159,9 +129,6 @@ public interface TaskInstanceMapper extends BaseMapper<TaskInstance> { @Param("startTime") Date startTime, @Param("endTime") Date endTime); - List<TaskInstance> loadAllInfosNoRelease(@Param("processInstanceId") int processInstanceId, - @Param("status") int status); - void deleteByWorkflowInstanceId(@Param("workflowInstanceId") int workflowInstanceId); List<TaskInstance> findByWorkflowInstanceId(@Param("workflowInstanceId") Integer workflowInstanceId); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java index 0156416fd3..27a0dbb0f6 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java @@ -100,9 +100,10 @@ public interface TaskInstanceDao extends IDao<TaskInstance> { /** * find last task instance corresponding to taskCode in the date interval + * * @param processInstanceId Task's parent process instance id - * @param depTaskCode taskCode - * @param testFlag test flag + * @param depTaskCode taskCode + * @param testFlag test flag * @return task instance */ TaskInstance queryLastTaskInstanceIntervalInProcessInstance(Integer processInstanceId, diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml index 1544d0ed8f..548baae566 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml @@ -30,24 +30,6 @@ ${alias}.flag, ${alias}.is_cache, ${alias}.cache_key, ${alias}.retry_interval, ${alias}.max_retry_times, ${alias}.task_instance_priority, ${alias}.worker_group,${alias}.environment_code , ${alias}.executor_id, ${alias}.first_submit_time, ${alias}.delay_time, ${alias}.task_params, ${alias}.var_pool, ${alias}.dry_run, ${alias}.test_flag, ${alias}.task_group_id, ${alias}.task_execute_type </sql> - <update id="setFailoverByHostAndStateArray"> - update t_ds_task_instance - set state = #{destStatus} - where host = #{host} - <if test="states != null and states.length != 0"> - and state in - <foreach collection="states" index="index" item="i" open="(" separator="," close=")"> - #{i} - </foreach> - </if> - </update> - <select id="queryTaskByProcessIdAndState" resultType="java.lang.Integer"> - select id - from t_ds_task_instance - WHERE process_instance_id = #{processInstanceId} - and state = #{state} - and flag = 1 - </select> <select id="findValidTaskListByProcessId" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance"> select <include refid="baseSql"/> @@ -63,21 +45,6 @@ from t_ds_task_instance WHERE process_instance_id = #{workflowInstanceId} </select> - <select id="queryByHostAndStatus" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance"> - select - <include refid="baseSql"/> - from t_ds_task_instance - where 1 = 1 - <if test="host != null and host != ''"> - and host = #{host} - </if> - <if test="states != null and states.length != 0"> - and state in - <foreach collection="states" index="index" item="i" open="(" separator="," close=")"> - #{i} - </foreach> - </if> - </select> <select id="countTaskInstanceStateByProjectCodes" resultType="org.apache.dolphinscheduler.dao.model.TaskInstanceStatusCountDto"> select state, count(0) as count @@ -118,32 +85,7 @@ </if> group by t.state </select> - <select id="countTaskInstanceStateByProjectCodesAndStatesBySubmitTime" resultType="org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount"> - select - state, count(0) as count - from t_ds_task_instance t - left join t_ds_task_definition_log d on d.code=t.task_code and d.version=t.task_definition_version - where 1=1 - <if test="states != null and states.size != 0"> - and t.state in - <foreach collection="states" index="index" item="state" open="(" separator="," close=")"> - #{state.code} - </foreach> - </if> - <if test="projectCodes != null and projectCodes.length != 0"> - and d.project_code in - <foreach collection="projectCodes" index="index" item="i" open="(" separator="," close=")"> - #{i} - </foreach> - </if> - <if test="startTime != null"> - and t.submit_time <![CDATA[ > ]]> #{startTime} - </if> - <if test="endTime != null"> - and t.submit_time <![CDATA[ <= ]]> #{endTime} - </if> - group by t.state - </select> + <select id="countTaskInstanceStateByProjectCodesAndStatesBySubmitTimeV2" resultType="org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount"> select state, count(0) as count @@ -181,15 +123,7 @@ </if> group by t.state </select> - <select id="queryByInstanceIdAndName" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance"> - select - <include refid="baseSql"/> - from t_ds_task_instance - where process_instance_id = #{processInstanceId} - and name = #{name} - and flag = 1 - limit 1 - </select> + <select id="queryByInstanceIdAndCode" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance"> select <include refid="baseSql"/> @@ -229,24 +163,7 @@ </foreach> </if> </select> - <select id="countTask" resultType="java.lang.Integer"> - select count(1) as count - from t_ds_task_instance task,t_ds_task_definition_log define - where task.task_code=define.code - and task.task_definition_version=define.version - <if test="projectCodes != null and projectCodes.length != 0"> - and define.project_code in - <foreach collection="projectCodes" index="index" item="i" open="(" separator="," close=")"> - #{i} - </foreach> - </if> - <if test="taskIds != null and taskIds.length != 0"> - and task.id in - <foreach collection="taskIds" index="index" item="i" open="(" separator="," close=")"> - #{i} - </foreach> - </if> - </select> + <select id="queryTaskInstanceListPaging" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance"> select <include refid="baseSql"/> @@ -330,16 +247,6 @@ </if> order by start_time desc </select> - <select id="loadAllInfosNoRelease" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance"> - select - <include refid="baseSqlV2"> - <property name="alias" value="instance"/> - </include> - from t_ds_task_instance instance - left join t_ds_task_group_queue que on instance.id = que.task_id - where instance.process_instance_id = #{processInstanceId} - and que.status = #{status} - </select> <select id="findLastTaskInstances" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance"> select <include refid="baseSqlV2"> diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java index 5d454c2206..102110c18f 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java @@ -197,30 +197,6 @@ public class ProcessInstanceMapperTest extends BaseDaoTest { processInstanceMapper.deleteById(processInstance.getId()); } - /** - * test set failover by host and state - */ - @Test - public void testSetFailoverByHostAndStateArray() { - - int[] stateArray = new int[]{ - WorkflowExecutionStatus.RUNNING_EXECUTION.ordinal(), - WorkflowExecutionStatus.SUCCESS.ordinal()}; - - ProcessInstance processInstance = insertOne(); - - processInstance.setState(WorkflowExecutionStatus.RUNNING_EXECUTION); - processInstance.setHost("192.168.2.220"); - processInstanceMapper.updateById(processInstance); - String host = processInstance.getHost(); - int update = processInstanceMapper.setFailoverByHostAndStateArray(host, stateArray); - Assertions.assertNotEquals(0, update); - - processInstance = processInstanceMapper.selectById(processInstance.getId()); - Assertions.assertNull(processInstance.getHost()); - processInstanceMapper.deleteById(processInstance.getId()); - } - /** * test update process instance by state */ diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java index 03a9e69ff6..d48dde831a 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java @@ -143,25 +143,6 @@ public class TaskInstanceMapperTest extends BaseDaoTest { Assertions.assertNotEquals(0, taskInstances.size()); } - /** - * test query task instance by process instance id and state - */ - @Test - public void testQueryTaskByProcessIdAndState() { - // insert ProcessInstance - ProcessInstance processInstance = insertProcessInstance(); - - // insert taskInstance - TaskInstance task = insertTaskInstance(processInstance.getId()); - task.setProcessInstanceId(processInstance.getId()); - taskInstanceMapper.updateById(task); - List<Integer> taskInstances = taskInstanceMapper.queryTaskByProcessIdAndState( - task.getProcessInstanceId(), - TaskExecutionStatus.RUNNING_EXECUTION.getCode()); - taskInstanceMapper.deleteById(task.getId()); - Assertions.assertNotEquals(0, taskInstances.size()); - } - /** * test find valid task list by process instance id */ @@ -194,66 +175,6 @@ public class TaskInstanceMapperTest extends BaseDaoTest { Assertions.assertNotEquals(0, taskInstances1.size()); } - /** - * test query by host and status - */ - @Test - public void testQueryByHostAndStatus() { - // insert ProcessInstance - ProcessInstance processInstance = insertProcessInstance(); - - // insert taskInstance - TaskInstance task = insertTaskInstance(processInstance.getId()); - task.setHost("111.111.11.11"); - taskInstanceMapper.updateById(task); - - List<TaskInstance> taskInstances = taskInstanceMapper.queryByHostAndStatus( - task.getHost(), new int[]{TaskExecutionStatus.RUNNING_EXECUTION.getCode()}); - taskInstanceMapper.deleteById(task.getId()); - Assertions.assertNotEquals(0, taskInstances.size()); - } - - /** - * test set failover by host and state array - */ - @Test - public void testSetFailoverByHostAndStateArray() { - // insert ProcessInstance - ProcessInstance processInstance = insertProcessInstance(); - - // insert taskInstance - TaskInstance task = insertTaskInstance(processInstance.getId()); - task.setHost("111.111.11.11"); - taskInstanceMapper.updateById(task); - - int setResult = taskInstanceMapper.setFailoverByHostAndStateArray( - task.getHost(), - new int[]{TaskExecutionStatus.RUNNING_EXECUTION.getCode()}, - TaskExecutionStatus.NEED_FAULT_TOLERANCE); - taskInstanceMapper.deleteById(task.getId()); - Assertions.assertNotEquals(0, setResult); - } - - /** - * test query by task instance id and name - */ - @Test - public void testQueryByInstanceIdAndName() { - // insert ProcessInstance - ProcessInstance processInstance = insertProcessInstance(); - - // insert taskInstance - TaskInstance task = insertTaskInstance(processInstance.getId()); - task.setHost("111.111.11.11"); - taskInstanceMapper.updateById(task); - - TaskInstance taskInstance = taskInstanceMapper.queryByInstanceIdAndName( - task.getProcessInstanceId(), - task.getName()); - taskInstanceMapper.deleteById(task.getId()); - Assertions.assertNotEquals(null, taskInstance); - } - /** * test query by task instance id and code */ @@ -294,37 +215,6 @@ public class TaskInstanceMapperTest extends BaseDaoTest { Assertions.assertEquals(1, taskInstances.size()); } - /** - * test count task instance - */ - @Test - public void testCountTask() { - // insert ProcessInstance - ProcessInstance processInstance = insertProcessInstance(); - - // insert taskInstance - TaskInstance task = insertTaskInstance(processInstance.getId()); - ProcessDefinition definition = new ProcessDefinition(); - definition.setCode(1L); - definition.setProjectCode(1111L); - definition.setCreateTime(new Date()); - definition.setUpdateTime(new Date()); - processDefinitionMapper.insert(definition); - taskInstanceMapper.updateById(task); - - int countTask = taskInstanceMapper.countTask( - new Long[0], - new int[0]); - int countTask2 = taskInstanceMapper.countTask( - new Long[]{definition.getProjectCode()}, - new int[]{task.getId()}); - taskInstanceMapper.deleteById(task.getId()); - processDefinitionMapper.deleteById(definition.getId()); - Assertions.assertEquals(0, countTask); - Assertions.assertEquals(0, countTask2); - - } - /** * test count task instance state by user */ diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index ba4def7e06..101350e90d 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -45,7 +45,6 @@ import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.UdfFunc; import org.apache.dolphinscheduler.dao.entity.User; -import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.service.exceptions.CronParseException; import org.apache.dolphinscheduler.service.model.TaskNode; @@ -98,8 +97,6 @@ public interface ProcessService { void updateTaskDefinitionResources(TaskDefinition taskDefinition); - List<Integer> findTaskIdByInstanceState(int instanceId, TaskExecutionStatus state); - int deleteWorkProcessMapByParentId(int parentWorkProcessId); ProcessInstance findSubProcessInstance(Integer parentProcessId, Integer parentTaskId); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index 130a1c0993..1f27857513 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -763,22 +763,27 @@ public class ProcessServiceImpl implements ProcessService { case DYNAMIC_GENERATION: break; case START_FAILURE_TASK_PROCESS: - // find failed tasks and init these tasks - List<Integer> failedList = - this.findTaskIdByInstanceState(processInstance.getId(), TaskExecutionStatus.FAILURE); - List<Integer> toleranceList = this.findTaskIdByInstanceState(processInstance.getId(), - TaskExecutionStatus.NEED_FAULT_TOLERANCE); - List<Integer> killedList = - this.findTaskIdByInstanceState(processInstance.getId(), TaskExecutionStatus.KILL); - cmdParam.remove(CommandKeyConstants.CMD_PARAM_RECOVERY_START_NODE_STRING); - - failedList.addAll(killedList); - failedList.addAll(toleranceList); - for (Integer taskId : failedList) { - initTaskInstance(taskInstanceDao.queryById(taskId)); + case RECOVER_SUSPENDED_PROCESS: + List<TaskInstance> needToStartTaskInstances = taskInstanceDao + .queryValidTaskListByWorkflowInstanceId(processInstance.getId(), processInstance.getTestFlag()) + .stream() + .filter(taskInstance -> { + TaskExecutionStatus state = taskInstance.getState(); + return state == TaskExecutionStatus.FAILURE + || state == TaskExecutionStatus.PAUSE + || state == TaskExecutionStatus.NEED_FAULT_TOLERANCE + || state == TaskExecutionStatus.KILL; + }) + .collect(Collectors.toList()); + + for (TaskInstance taskInstance : needToStartTaskInstances) { + initTaskInstance(taskInstance); } - cmdParam.put(CommandKeyConstants.CMD_PARAM_RECOVERY_START_NODE_STRING, - String.join(Constants.COMMA, convertIntListToString(failedList))); + String startTaskInstanceIds = needToStartTaskInstances.stream() + .map(TaskInstance::getId) + .map(String::valueOf) + .collect(Collectors.joining(Constants.COMMA)); + cmdParam.put(CommandKeyConstants.CMD_PARAM_RECOVERY_START_NODE_STRING, startTaskInstanceIds); processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam)); processInstance.setRunTimes(runTime + 1); break; @@ -786,20 +791,6 @@ public class ProcessServiceImpl implements ProcessService { break; case RECOVER_WAITING_THREAD: break; - case RECOVER_SUSPENDED_PROCESS: - // find pause tasks and init task's state - cmdParam.remove(CommandKeyConstants.CMD_PARAM_RECOVERY_START_NODE_STRING); - List<Integer> stopNodeList = findTaskIdByInstanceState(processInstance.getId(), - TaskExecutionStatus.KILL); - for (Integer taskId : stopNodeList) { - // initialize the pause state - initTaskInstance(taskInstanceDao.queryById(taskId)); - } - cmdParam.put(CommandKeyConstants.CMD_PARAM_RECOVERY_START_NODE_STRING, - String.join(Constants.COMMA, convertIntListToString(stopNodeList))); - processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam)); - processInstance.setRunTimes(runTime + 1); - break; case RECOVER_TOLERANCE_FAULT_PROCESS: // recover tolerance fault process processInstance.setRecovery(Flag.YES); @@ -1312,18 +1303,6 @@ public class ProcessServiceImpl implements ProcessService { return resourceInfo; } - /** - * get id list by task state - * - * @param instanceId instanceId - * @param state state - * @return task instance states - */ - @Override - public List<Integer> findTaskIdByInstanceState(int instanceId, TaskExecutionStatus state) { - return taskInstanceMapper.queryTaskByProcessIdAndState(instanceId, state.getCode()); - } - /** * delete work process map by parent process id *