This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new c59b2d5b8c remove sub workflow finish notify (#15057)
c59b2d5b8c is described below

commit c59b2d5b8cd6532b38c37f6284ca034c5b2321c3
Author: caishunfeng <[email protected]>
AuthorDate: Mon Oct 23 10:19:57 2023 +0800

    remove sub workflow finish notify (#15057)
    
    Co-authored-by: xiangzihao <[email protected]>
---
 .../master/runner/WorkflowExecuteThreadPool.java   | 103 ---------------------
 1 file changed, 103 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
index c0743b5a2c..1aa4232865 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
@@ -17,30 +17,15 @@
 
 package org.apache.dolphinscheduler.server.master.runner;
 
-import org.apache.dolphinscheduler.common.enums.Flag;
-import org.apache.dolphinscheduler.common.enums.StateEventType;
-import org.apache.dolphinscheduler.common.utils.NetUtils;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import 
org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
-import 
org.apache.dolphinscheduler.extract.master.ITaskInstanceExecutionEventListener;
-import 
org.apache.dolphinscheduler.extract.master.transportor.WorkflowInstanceStateChangeEvent;
-import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
 import 
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.event.StateEvent;
-import org.apache.dolphinscheduler.server.master.event.TaskStateEvent;
-import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnable;
-import 
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableHolder;
-import org.apache.dolphinscheduler.service.process.ProcessService;
 
-import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 import javax.annotation.PostConstruct;
 
-import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 
 import org.springframework.beans.factory.annotation.Autowired;
@@ -49,8 +34,6 @@ import org.springframework.stereotype.Component;
 import org.springframework.util.concurrent.ListenableFuture;
 import org.springframework.util.concurrent.ListenableFutureCallback;
 
-import com.google.common.base.Strings;
-
 /**
  * Used to execute {@link WorkflowExecuteRunnable}.
  */
@@ -61,9 +44,6 @@ public class WorkflowExecuteThreadPool extends 
ThreadPoolTaskExecutor {
     @Autowired
     private MasterConfig masterConfig;
 
-    @Autowired
-    private ProcessService processService;
-
     @Autowired
     private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
 
@@ -122,8 +102,6 @@ public class WorkflowExecuteThreadPool extends 
ThreadPoolTaskExecutor {
                 LogUtils.setWorkflowInstanceIdMDC(workflowInstanceId);
                 try {
                     log.error("Workflow instance events handle failed", ex);
-                    notifyProcessChanged(
-                            
workflowExecuteThread.getWorkflowExecuteContext().getWorkflowInstance());
                     multiThreadFilterMap.remove(workflowInstanceId);
                 } finally {
                     LogUtils.removeWorkflowInstanceIdMDC();
@@ -140,8 +118,6 @@ public class WorkflowExecuteThreadPool extends 
ThreadPoolTaskExecutor {
                                 
.removeProcess4TimeoutCheck(workflowExecuteThread.getWorkflowExecuteContext()
                                         .getWorkflowInstance().getId());
                         
processInstanceExecCacheManager.removeByProcessInstanceId(workflowInstanceId);
-                        notifyProcessChanged(
-                                
workflowExecuteThread.getWorkflowExecuteContext().getWorkflowInstance());
                         log.info("Workflow instance is finished.");
                     }
                 } catch (Exception e) {
@@ -155,83 +131,4 @@ public class WorkflowExecuteThreadPool extends 
ThreadPoolTaskExecutor {
         });
     }
 
-    /**
-     * notify process change
-     */
-    private void notifyProcessChanged(ProcessInstance finishProcessInstance) {
-        if (Flag.NO == finishProcessInstance.getIsSubProcess()) {
-            return;
-        }
-        Map<ProcessInstance, TaskInstance> fatherMaps = 
processService.notifyProcessList(finishProcessInstance.getId());
-        for (Map.Entry<ProcessInstance, TaskInstance> entry : 
fatherMaps.entrySet()) {
-            ProcessInstance processInstance = entry.getKey();
-            TaskInstance taskInstance = entry.getValue();
-            crossWorkflowParameterPassing(finishProcessInstance, taskInstance);
-            String address = NetUtils.getAddr(masterConfig.getListenPort());
-            try {
-                
LogUtils.setWorkflowAndTaskInstanceIDMDC(processInstance.getId(), 
taskInstance.getId());
-                if (processInstance.getHost().equalsIgnoreCase(address)) {
-                    log.info("Process host is local master, will notify it");
-                    this.notifyMyself(processInstance, taskInstance);
-                } else {
-                    log.info("Process host is remote master, will notify it");
-                    this.notifyProcess(finishProcessInstance, processInstance, 
taskInstance);
-                }
-            } finally {
-                LogUtils.removeWorkflowAndTaskInstanceIdMDC();
-            }
-        }
-    }
-
-    private void crossWorkflowParameterPassing(ProcessInstance 
finishProcessInstance, TaskInstance taskInstance) {
-        try {
-            MasterTaskExecuteRunnable masterTaskExecuteRunnable =
-                    
MasterTaskExecuteRunnableHolder.getMasterTaskExecuteRunnable(taskInstance.getId());
-            masterTaskExecuteRunnable.getILogicTask().getTaskParameters()
-                    .setVarPool(finishProcessInstance.getVarPool());
-            log.info("Cross workflow parameter passing success, 
finishProcessInstanceId: {}, taskInstanceId: {}",
-                    finishProcessInstance.getId(), taskInstance.getId());
-        } catch (Exception ex) {
-            log.error("Cross workflow parameter passing error, 
finishProcessInstanceId: {}, taskInstanceId: {}",
-                    finishProcessInstance.getId(), taskInstance.getId(), ex);
-        }
-    }
-
-    /**
-     * notify myself
-     */
-    private void notifyMyself(@NonNull ProcessInstance processInstance, 
@NonNull TaskInstance taskInstance) {
-        if 
(!processInstanceExecCacheManager.contains(processInstance.getId())) {
-            log.warn("The execute cache manager doesn't contains this workflow 
instance");
-            return;
-        }
-        TaskStateEvent stateEvent = TaskStateEvent.builder()
-                .processInstanceId(processInstance.getId())
-                .taskInstanceId(taskInstance.getId())
-                .type(StateEventType.TASK_STATE_CHANGE)
-                .status(TaskExecutionStatus.RUNNING_EXECUTION)
-                .build();
-        this.submitStateEvent(stateEvent);
-    }
-
-    /**
-     * notify process's master
-     */
-    private void notifyProcess(ProcessInstance finishProcessInstance, 
ProcessInstance processInstance,
-                               TaskInstance taskInstance) {
-        String processInstanceHost = processInstance.getHost();
-        if (Strings.isNullOrEmpty(processInstanceHost)) {
-            log.error("Process {} host is empty, cannot notify task {} now, 
taskId: {}", processInstance.getName(),
-                    taskInstance.getName(), taskInstance.getId());
-            return;
-        }
-        ITaskInstanceExecutionEventListener 
iTaskInstanceExecutionEventListener =
-                SingletonJdkDynamicRpcClientProxyFactory
-                        .getProxyClient(processInstanceHost, 
ITaskInstanceExecutionEventListener.class);
-
-        WorkflowInstanceStateChangeEvent workflowInstanceStateChangeEvent = 
new WorkflowInstanceStateChangeEvent(
-                finishProcessInstance.getId(), 0, 
finishProcessInstance.getState(), processInstance.getId(),
-                taskInstance.getId());
-        
iTaskInstanceExecutionEventListener.onWorkflowInstanceInstanceStateChange(workflowInstanceStateChangeEvent);
-    }
 }

Reply via email to