This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch 3.2.1-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 52d315d3cabbcb3aa27c83b3730b5db77adda4a1
Author: Wenjun Ruan <wen...@apache.org>
AuthorDate: Tue Feb 6 10:09:30 2024 +0800

    Fix Recover WorkflowInstance will casue workflow Instance state is success 
but task insatnce is killed/paused (#15574)
    
    (cherry picked from commit 43a06525a29e01f6769aa0057512884d7441b438)
---
 .../api/service/ProcessInstanceServiceTest.java    |   1 -
 .../dao/mapper/TaskInstanceMapper.java             |  33 -------
 .../dao/repository/TaskInstanceDao.java            |   5 +-
 .../dao/mapper/TaskInstanceMapper.xml              |  99 +------------------
 .../dao/mapper/ProcessInstanceMapperTest.java      |  24 -----
 .../dao/mapper/TaskInstanceMapperTest.java         | 110 ---------------------
 .../service/process/ProcessService.java            |   3 -
 .../service/process/ProcessServiceImpl.java        |  61 ++++--------
 8 files changed, 26 insertions(+), 310 deletions(-)

diff --git 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
index 800f8bda18..9fb4d83052 100644
--- 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
+++ 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
@@ -769,7 +769,6 @@ public class ProcessInstanceServiceTest {
                 processInstance.getProcessDefinitionCode(),
                 processInstance.getProcessDefinitionVersion())).thenReturn(new 
ProcessDefinitionLog());
         
when(processInstanceMapper.queryDetailById(1)).thenReturn(processInstance);
-        when(taskInstanceMapper.queryByInstanceIdAndName(Mockito.anyInt(), 
Mockito.any())).thenReturn(taskInstance);
         DAG<Long, TaskNode, TaskNodeRelation> graph = new DAG<>();
         for (long i = 1; i <= 7; ++i) {
             graph.addNode(i, new TaskNode());
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
index 04e818f5c3..f80dac826c 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
@@ -39,23 +39,10 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
  */
 public interface TaskInstanceMapper extends BaseMapper<TaskInstance> {
 
-    List<Integer> queryTaskByProcessIdAndState(@Param("processInstanceId") 
Integer processInstanceId,
-                                               @Param("state") Integer state);
-
     List<TaskInstance> 
findValidTaskListByProcessId(@Param("processInstanceId") Integer 
processInstanceId,
                                                     @Param("flag") Flag flag,
                                                     @Param("testFlag") int 
testFlag);
 
-    List<TaskInstance> queryByHostAndStatus(@Param("host") String host,
-                                            @Param("states") int[] stateArray);
-
-    int setFailoverByHostAndStateArray(@Param("host") String host,
-                                       @Param("states") int[] stateArray,
-                                       @Param("destStatus") 
TaskExecutionStatus destStatus);
-
-    TaskInstance queryByInstanceIdAndName(@Param("processInstanceId") int 
processInstanceId,
-                                          @Param("name") String name);
-
     TaskInstance queryByInstanceIdAndCode(@Param("processInstanceId") int 
processInstanceId,
                                           @Param("taskCode") Long taskCode);
 
@@ -66,9 +53,6 @@ public interface TaskInstanceMapper extends 
BaseMapper<TaskInstance> {
     List<TaskInstance> 
queryByProcessInstanceIdsAndTaskCodes(@Param("processInstanceIds") 
List<Integer> processInstanceIds,
                                                              
@Param("taskCodes") List<Long> taskCodes);
 
-    Integer countTask(@Param("projectCodes") Long[] projectCodes,
-                      @Param("taskIds") int[] taskIds);
-
     /**
      * Statistics task instance group by given project codes list by start time
      * <p>
@@ -97,20 +81,6 @@ public interface TaskInstanceMapper extends 
BaseMapper<TaskInstance> {
                                                                   
@Param("endTime") Date endTime,
                                                                   
@Param("projectIds") Set<Integer> projectIds);
 
-    /**
-     * Statistics task instance group by given project codes list by submit 
time
-     * <p>
-     * We only need project codes to determine whether the task instance 
belongs to the user or not.
-     *
-     * @param startTime    Statistics start time
-     * @param endTime      Statistics end time
-     * @param projectCodes Project codes list to filter
-     * @return List of ExecuteStatusCount
-     */
-    List<ExecuteStatusCount> 
countTaskInstanceStateByProjectCodesAndStatesBySubmitTime(@Param("startTime") 
Date startTime,
-                                                                               
        @Param("endTime") Date endTime,
-                                                                               
        @Param("projectCodes") Long[] projectCodes,
-                                                                               
        @Param("states") List<TaskExecutionStatus> states);
     /**
      * Statistics task instance group by given project codes list by submit 
time
      * <p>
@@ -159,9 +129,6 @@ public interface TaskInstanceMapper extends 
BaseMapper<TaskInstance> {
                                                           @Param("startTime") 
Date startTime,
                                                           @Param("endTime") 
Date endTime);
 
-    List<TaskInstance> loadAllInfosNoRelease(@Param("processInstanceId") int 
processInstanceId,
-                                             @Param("status") int status);
-
     void deleteByWorkflowInstanceId(@Param("workflowInstanceId") int 
workflowInstanceId);
 
     List<TaskInstance> findByWorkflowInstanceId(@Param("workflowInstanceId") 
Integer workflowInstanceId);
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
index 0156416fd3..27a0dbb0f6 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
@@ -100,9 +100,10 @@ public interface TaskInstanceDao extends 
IDao<TaskInstance> {
 
     /**
      * find last task instance corresponding to taskCode in the date interval
+     *
      * @param processInstanceId Task's parent process instance id
-     * @param depTaskCode taskCode
-     * @param testFlag test flag
+     * @param depTaskCode       taskCode
+     * @param testFlag          test flag
      * @return task instance
      */
     TaskInstance queryLastTaskInstanceIntervalInProcessInstance(Integer 
processInstanceId,
diff --git 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
index 1544d0ed8f..548baae566 100644
--- 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
+++ 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
@@ -30,24 +30,6 @@
         ${alias}.flag, ${alias}.is_cache, ${alias}.cache_key, 
${alias}.retry_interval, ${alias}.max_retry_times, 
${alias}.task_instance_priority, 
${alias}.worker_group,${alias}.environment_code , ${alias}.executor_id,
         ${alias}.first_submit_time, ${alias}.delay_time, ${alias}.task_params, 
${alias}.var_pool, ${alias}.dry_run, ${alias}.test_flag, 
${alias}.task_group_id, ${alias}.task_execute_type
     </sql>
-    <update id="setFailoverByHostAndStateArray">
-        update t_ds_task_instance
-        set state = #{destStatus}
-        where host = #{host}
-        <if test="states != null and states.length != 0">
-            and state in
-            <foreach collection="states" index="index" item="i" open="(" 
separator="," close=")">
-                #{i}
-            </foreach>
-        </if>
-    </update>
-    <select id="queryTaskByProcessIdAndState" resultType="java.lang.Integer">
-        select id
-        from t_ds_task_instance
-        WHERE  process_instance_id = #{processInstanceId}
-        and state = #{state}
-        and flag = 1
-    </select>
     <select id="findValidTaskListByProcessId" 
resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
         select
         <include refid="baseSql"/>
@@ -63,21 +45,6 @@
         from t_ds_task_instance
         WHERE process_instance_id = #{workflowInstanceId}
     </select>
-    <select id="queryByHostAndStatus" 
resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
-        select
-        <include refid="baseSql"/>
-        from t_ds_task_instance
-        where 1 = 1
-        <if test="host != null and host != ''">
-            and host = #{host}
-        </if>
-        <if test="states != null and states.length != 0">
-            and state in
-            <foreach collection="states" index="index" item="i" open="(" 
separator="," close=")">
-                #{i}
-            </foreach>
-        </if>
-    </select>
 
     <select id="countTaskInstanceStateByProjectCodes" 
resultType="org.apache.dolphinscheduler.dao.model.TaskInstanceStatusCountDto">
         select state, count(0) as count
@@ -118,32 +85,7 @@
         </if>
         group by t.state
     </select>
-    <select id="countTaskInstanceStateByProjectCodesAndStatesBySubmitTime" 
resultType="org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount">
-        select
-        state, count(0) as count
-        from t_ds_task_instance t
-        left join t_ds_task_definition_log d on d.code=t.task_code and 
d.version=t.task_definition_version
-        where 1=1
-        <if test="states != null and states.size != 0">
-            and t.state in
-            <foreach collection="states" index="index" item="state" open="(" 
separator="," close=")">
-                #{state.code}
-            </foreach>
-        </if>
-        <if test="projectCodes != null and projectCodes.length != 0">
-            and d.project_code in
-            <foreach collection="projectCodes" index="index" item="i" open="(" 
separator="," close=")">
-                #{i}
-            </foreach>
-        </if>
-        <if test="startTime != null">
-            and t.submit_time <![CDATA[ > ]]> #{startTime}
-        </if>
-        <if test="endTime != null">
-            and t.submit_time <![CDATA[ <= ]]> #{endTime}
-        </if>
-        group by t.state
-    </select>
+
     <select id="countTaskInstanceStateByProjectCodesAndStatesBySubmitTimeV2" 
resultType="org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount">
         select
         state, count(0) as count
@@ -181,15 +123,7 @@
         </if>
         group by t.state
     </select>
-    <select id="queryByInstanceIdAndName" 
resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
-        select
-        <include refid="baseSql"/>
-        from t_ds_task_instance
-        where process_instance_id = #{processInstanceId}
-        and name = #{name}
-        and flag = 1
-        limit 1
-    </select>
+
     <select id="queryByInstanceIdAndCode" 
resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
         select
         <include refid="baseSql"/>
@@ -229,24 +163,7 @@
             </foreach>
         </if>
     </select>
-    <select id="countTask" resultType="java.lang.Integer">
-        select count(1) as count
-        from t_ds_task_instance task,t_ds_task_definition_log define
-        where task.task_code=define.code
-        and task.task_definition_version=define.version
-        <if test="projectCodes != null and projectCodes.length != 0">
-            and define.project_code in
-            <foreach collection="projectCodes" index="index" item="i" open="(" 
separator="," close=")">
-                #{i}
-            </foreach>
-        </if>
-        <if test="taskIds != null and taskIds.length != 0">
-            and task.id in
-            <foreach collection="taskIds" index="index" item="i" open="(" 
separator="," close=")">
-                #{i}
-            </foreach>
-        </if>
-    </select>
+
     <select id="queryTaskInstanceListPaging" 
resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
         select
         <include refid="baseSql"/>
@@ -330,16 +247,6 @@
         </if>
         order by start_time desc
     </select>
-    <select id="loadAllInfosNoRelease" 
resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
-        select
-        <include refid="baseSqlV2">
-            <property name="alias" value="instance"/>
-        </include>
-        from t_ds_task_instance instance
-        left join t_ds_task_group_queue que on instance.id = que.task_id
-        where instance.process_instance_id = #{processInstanceId}
-        and que.status = #{status}
-    </select>
     <select id="findLastTaskInstances" 
resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
         select
         <include refid="baseSqlV2">
diff --git 
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java
 
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java
index 5d454c2206..102110c18f 100644
--- 
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java
+++ 
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java
@@ -197,30 +197,6 @@ public class ProcessInstanceMapperTest extends BaseDaoTest 
{
         processInstanceMapper.deleteById(processInstance.getId());
     }
 
-    /**
-     * test set failover by host and state
-     */
-    @Test
-    public void testSetFailoverByHostAndStateArray() {
-
-        int[] stateArray = new int[]{
-                WorkflowExecutionStatus.RUNNING_EXECUTION.ordinal(),
-                WorkflowExecutionStatus.SUCCESS.ordinal()};
-
-        ProcessInstance processInstance = insertOne();
-
-        processInstance.setState(WorkflowExecutionStatus.RUNNING_EXECUTION);
-        processInstance.setHost("192.168.2.220");
-        processInstanceMapper.updateById(processInstance);
-        String host = processInstance.getHost();
-        int update = 
processInstanceMapper.setFailoverByHostAndStateArray(host, stateArray);
-        Assertions.assertNotEquals(0, update);
-
-        processInstance = 
processInstanceMapper.selectById(processInstance.getId());
-        Assertions.assertNull(processInstance.getHost());
-        processInstanceMapper.deleteById(processInstance.getId());
-    }
-
     /**
      * test update process instance by state
      */
diff --git 
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
 
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
index 03a9e69ff6..d48dde831a 100644
--- 
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
+++ 
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
@@ -143,25 +143,6 @@ public class TaskInstanceMapperTest extends BaseDaoTest {
         Assertions.assertNotEquals(0, taskInstances.size());
     }
 
-    /**
-     * test query task instance by process instance id and state
-     */
-    @Test
-    public void testQueryTaskByProcessIdAndState() {
-        // insert ProcessInstance
-        ProcessInstance processInstance = insertProcessInstance();
-
-        // insert taskInstance
-        TaskInstance task = insertTaskInstance(processInstance.getId());
-        task.setProcessInstanceId(processInstance.getId());
-        taskInstanceMapper.updateById(task);
-        List<Integer> taskInstances = 
taskInstanceMapper.queryTaskByProcessIdAndState(
-                task.getProcessInstanceId(),
-                TaskExecutionStatus.RUNNING_EXECUTION.getCode());
-        taskInstanceMapper.deleteById(task.getId());
-        Assertions.assertNotEquals(0, taskInstances.size());
-    }
-
     /**
      * test find valid task list by process instance id
      */
@@ -194,66 +175,6 @@ public class TaskInstanceMapperTest extends BaseDaoTest {
         Assertions.assertNotEquals(0, taskInstances1.size());
     }
 
-    /**
-     * test query by host and status
-     */
-    @Test
-    public void testQueryByHostAndStatus() {
-        // insert ProcessInstance
-        ProcessInstance processInstance = insertProcessInstance();
-
-        // insert taskInstance
-        TaskInstance task = insertTaskInstance(processInstance.getId());
-        task.setHost("111.111.11.11");
-        taskInstanceMapper.updateById(task);
-
-        List<TaskInstance> taskInstances = 
taskInstanceMapper.queryByHostAndStatus(
-                task.getHost(), new 
int[]{TaskExecutionStatus.RUNNING_EXECUTION.getCode()});
-        taskInstanceMapper.deleteById(task.getId());
-        Assertions.assertNotEquals(0, taskInstances.size());
-    }
-
-    /**
-     * test set failover by host and state array
-     */
-    @Test
-    public void testSetFailoverByHostAndStateArray() {
-        // insert ProcessInstance
-        ProcessInstance processInstance = insertProcessInstance();
-
-        // insert taskInstance
-        TaskInstance task = insertTaskInstance(processInstance.getId());
-        task.setHost("111.111.11.11");
-        taskInstanceMapper.updateById(task);
-
-        int setResult = taskInstanceMapper.setFailoverByHostAndStateArray(
-                task.getHost(),
-                new int[]{TaskExecutionStatus.RUNNING_EXECUTION.getCode()},
-                TaskExecutionStatus.NEED_FAULT_TOLERANCE);
-        taskInstanceMapper.deleteById(task.getId());
-        Assertions.assertNotEquals(0, setResult);
-    }
-
-    /**
-     * test query by task instance id and name
-     */
-    @Test
-    public void testQueryByInstanceIdAndName() {
-        // insert ProcessInstance
-        ProcessInstance processInstance = insertProcessInstance();
-
-        // insert taskInstance
-        TaskInstance task = insertTaskInstance(processInstance.getId());
-        task.setHost("111.111.11.11");
-        taskInstanceMapper.updateById(task);
-
-        TaskInstance taskInstance = 
taskInstanceMapper.queryByInstanceIdAndName(
-                task.getProcessInstanceId(),
-                task.getName());
-        taskInstanceMapper.deleteById(task.getId());
-        Assertions.assertNotEquals(null, taskInstance);
-    }
-
     /**
      * test query by task instance id and code
      */
@@ -294,37 +215,6 @@ public class TaskInstanceMapperTest extends BaseDaoTest {
         Assertions.assertEquals(1, taskInstances.size());
     }
 
-    /**
-     * test count task instance
-     */
-    @Test
-    public void testCountTask() {
-        // insert ProcessInstance
-        ProcessInstance processInstance = insertProcessInstance();
-
-        // insert taskInstance
-        TaskInstance task = insertTaskInstance(processInstance.getId());
-        ProcessDefinition definition = new ProcessDefinition();
-        definition.setCode(1L);
-        definition.setProjectCode(1111L);
-        definition.setCreateTime(new Date());
-        definition.setUpdateTime(new Date());
-        processDefinitionMapper.insert(definition);
-        taskInstanceMapper.updateById(task);
-
-        int countTask = taskInstanceMapper.countTask(
-                new Long[0],
-                new int[0]);
-        int countTask2 = taskInstanceMapper.countTask(
-                new Long[]{definition.getProjectCode()},
-                new int[]{task.getId()});
-        taskInstanceMapper.deleteById(task.getId());
-        processDefinitionMapper.deleteById(definition.getId());
-        Assertions.assertEquals(0, countTask);
-        Assertions.assertEquals(0, countTask2);
-
-    }
-
     /**
      * test count task instance state by user
      */
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index ba4def7e06..101350e90d 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -45,7 +45,6 @@ import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.UdfFunc;
 import org.apache.dolphinscheduler.dao.entity.User;
-import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.service.exceptions.CronParseException;
 import org.apache.dolphinscheduler.service.model.TaskNode;
 
@@ -98,8 +97,6 @@ public interface ProcessService {
 
     void updateTaskDefinitionResources(TaskDefinition taskDefinition);
 
-    List<Integer> findTaskIdByInstanceState(int instanceId, 
TaskExecutionStatus state);
-
     int deleteWorkProcessMapByParentId(int parentWorkProcessId);
 
     ProcessInstance findSubProcessInstance(Integer parentProcessId, Integer 
parentTaskId);
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 130a1c0993..1f27857513 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -763,22 +763,27 @@ public class ProcessServiceImpl implements ProcessService 
{
             case DYNAMIC_GENERATION:
                 break;
             case START_FAILURE_TASK_PROCESS:
-                // find failed tasks and init these tasks
-                List<Integer> failedList =
-                        
this.findTaskIdByInstanceState(processInstance.getId(), 
TaskExecutionStatus.FAILURE);
-                List<Integer> toleranceList = 
this.findTaskIdByInstanceState(processInstance.getId(),
-                        TaskExecutionStatus.NEED_FAULT_TOLERANCE);
-                List<Integer> killedList =
-                        
this.findTaskIdByInstanceState(processInstance.getId(), 
TaskExecutionStatus.KILL);
-                
cmdParam.remove(CommandKeyConstants.CMD_PARAM_RECOVERY_START_NODE_STRING);
-
-                failedList.addAll(killedList);
-                failedList.addAll(toleranceList);
-                for (Integer taskId : failedList) {
-                    initTaskInstance(taskInstanceDao.queryById(taskId));
+            case RECOVER_SUSPENDED_PROCESS:
+                List<TaskInstance> needToStartTaskInstances = taskInstanceDao
+                        
.queryValidTaskListByWorkflowInstanceId(processInstance.getId(), 
processInstance.getTestFlag())
+                        .stream()
+                        .filter(taskInstance -> {
+                            TaskExecutionStatus state = 
taskInstance.getState();
+                            return state == TaskExecutionStatus.FAILURE
+                                    || state == TaskExecutionStatus.PAUSE
+                                    || state == 
TaskExecutionStatus.NEED_FAULT_TOLERANCE
+                                    || state == TaskExecutionStatus.KILL;
+                        })
+                        .collect(Collectors.toList());
+
+                for (TaskInstance taskInstance : needToStartTaskInstances) {
+                    initTaskInstance(taskInstance);
                 }
-                
cmdParam.put(CommandKeyConstants.CMD_PARAM_RECOVERY_START_NODE_STRING,
-                        String.join(Constants.COMMA, 
convertIntListToString(failedList)));
+                String startTaskInstanceIds = needToStartTaskInstances.stream()
+                        .map(TaskInstance::getId)
+                        .map(String::valueOf)
+                        .collect(Collectors.joining(Constants.COMMA));
+                
cmdParam.put(CommandKeyConstants.CMD_PARAM_RECOVERY_START_NODE_STRING, 
startTaskInstanceIds);
                 
processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
                 processInstance.setRunTimes(runTime + 1);
                 break;
@@ -786,20 +791,6 @@ public class ProcessServiceImpl implements ProcessService {
                 break;
             case RECOVER_WAITING_THREAD:
                 break;
-            case RECOVER_SUSPENDED_PROCESS:
-                // find pause tasks and init task's state
-                
cmdParam.remove(CommandKeyConstants.CMD_PARAM_RECOVERY_START_NODE_STRING);
-                List<Integer> stopNodeList = 
findTaskIdByInstanceState(processInstance.getId(),
-                        TaskExecutionStatus.KILL);
-                for (Integer taskId : stopNodeList) {
-                    // initialize the pause state
-                    initTaskInstance(taskInstanceDao.queryById(taskId));
-                }
-                
cmdParam.put(CommandKeyConstants.CMD_PARAM_RECOVERY_START_NODE_STRING,
-                        String.join(Constants.COMMA, 
convertIntListToString(stopNodeList)));
-                
processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
-                processInstance.setRunTimes(runTime + 1);
-                break;
             case RECOVER_TOLERANCE_FAULT_PROCESS:
                 // recover tolerance fault process
                 processInstance.setRecovery(Flag.YES);
@@ -1312,18 +1303,6 @@ public class ProcessServiceImpl implements 
ProcessService {
         return resourceInfo;
     }
 
-    /**
-     * get id list by task state
-     *
-     * @param instanceId instanceId
-     * @param state      state
-     * @return task instance states
-     */
-    @Override
-    public List<Integer> findTaskIdByInstanceState(int instanceId, 
TaskExecutionStatus state) {
-        return taskInstanceMapper.queryTaskByProcessIdAndState(instanceId, 
state.getCode());
-    }
-
     /**
      * delete work process map by parent process id
      *

Reply via email to