This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new c59b2d5b8c remove sub workflow finish notify (#15057)
c59b2d5b8c is described below
commit c59b2d5b8cd6532b38c37f6284ca034c5b2321c3
Author: caishunfeng <[email protected]>
AuthorDate: Mon Oct 23 10:19:57 2023 +0800
remove sub workflow finish notify (#15057)
Co-authored-by: xiangzihao <[email protected]>
---
.../master/runner/WorkflowExecuteThreadPool.java | 103 ---------------------
1 file changed, 103 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
index c0743b5a2c..1aa4232865 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
@@ -17,30 +17,15 @@
package org.apache.dolphinscheduler.server.master.runner;
-import org.apache.dolphinscheduler.common.enums.Flag;
-import org.apache.dolphinscheduler.common.enums.StateEventType;
-import org.apache.dolphinscheduler.common.utils.NetUtils;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import
org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
-import
org.apache.dolphinscheduler.extract.master.ITaskInstanceExecutionEventListener;
-import
org.apache.dolphinscheduler.extract.master.transportor.WorkflowInstanceStateChangeEvent;
-import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.event.StateEvent;
-import org.apache.dolphinscheduler.server.master.event.TaskStateEvent;
-import
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnable;
-import
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableHolder;
-import org.apache.dolphinscheduler.service.process.ProcessService;
-import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
-import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@@ -49,8 +34,6 @@ import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
-import com.google.common.base.Strings;
-
/**
* Used to execute {@link WorkflowExecuteRunnable}.
*/
@@ -61,9 +44,6 @@ public class WorkflowExecuteThreadPool extends
ThreadPoolTaskExecutor {
@Autowired
private MasterConfig masterConfig;
- @Autowired
- private ProcessService processService;
-
@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
@@ -122,8 +102,6 @@ public class WorkflowExecuteThreadPool extends
ThreadPoolTaskExecutor {
LogUtils.setWorkflowInstanceIdMDC(workflowInstanceId);
try {
log.error("Workflow instance events handle failed", ex);
- notifyProcessChanged(
-
workflowExecuteThread.getWorkflowExecuteContext().getWorkflowInstance());
multiThreadFilterMap.remove(workflowInstanceId);
} finally {
LogUtils.removeWorkflowInstanceIdMDC();
@@ -140,8 +118,6 @@ public class WorkflowExecuteThreadPool extends
ThreadPoolTaskExecutor {
.removeProcess4TimeoutCheck(workflowExecuteThread.getWorkflowExecuteContext()
.getWorkflowInstance().getId());
processInstanceExecCacheManager.removeByProcessInstanceId(workflowInstanceId);
- notifyProcessChanged(
-
workflowExecuteThread.getWorkflowExecuteContext().getWorkflowInstance());
log.info("Workflow instance is finished.");
}
} catch (Exception e) {
@@ -155,83 +131,4 @@ public class WorkflowExecuteThreadPool extends
ThreadPoolTaskExecutor {
});
}
- /**
- * notify process change
- */
- private void notifyProcessChanged(ProcessInstance finishProcessInstance) {
- if (Flag.NO == finishProcessInstance.getIsSubProcess()) {
- return;
- }
- Map<ProcessInstance, TaskInstance> fatherMaps =
processService.notifyProcessList(finishProcessInstance.getId());
- for (Map.Entry<ProcessInstance, TaskInstance> entry :
fatherMaps.entrySet()) {
- ProcessInstance processInstance = entry.getKey();
- TaskInstance taskInstance = entry.getValue();
- crossWorkflowParameterPassing(finishProcessInstance, taskInstance);
- String address = NetUtils.getAddr(masterConfig.getListenPort());
- try {
-
LogUtils.setWorkflowAndTaskInstanceIDMDC(processInstance.getId(),
taskInstance.getId());
- if (processInstance.getHost().equalsIgnoreCase(address)) {
- log.info("Process host is local master, will notify it");
- this.notifyMyself(processInstance, taskInstance);
- } else {
- log.info("Process host is remote master, will notify it");
- this.notifyProcess(finishProcessInstance, processInstance,
taskInstance);
- }
- } finally {
- LogUtils.removeWorkflowAndTaskInstanceIdMDC();
- }
- }
- }
-
- private void crossWorkflowParameterPassing(ProcessInstance
finishProcessInstance, TaskInstance taskInstance) {
- try {
- MasterTaskExecuteRunnable masterTaskExecuteRunnable =
-
MasterTaskExecuteRunnableHolder.getMasterTaskExecuteRunnable(taskInstance.getId());
- masterTaskExecuteRunnable.getILogicTask().getTaskParameters()
- .setVarPool(finishProcessInstance.getVarPool());
- log.info("Cross workflow parameter passing success,
finishProcessInstanceId: {}, taskInstanceId: {}",
- finishProcessInstance.getId(), taskInstance.getId());
- } catch (Exception ex) {
- log.error("Cross workflow parameter passing error,
finishProcessInstanceId: {}, taskInstanceId: {}",
- finishProcessInstance.getId(), taskInstance.getId(), ex);
- }
- }
-
- /**
- * notify myself
- */
- private void notifyMyself(@NonNull ProcessInstance processInstance,
@NonNull TaskInstance taskInstance) {
- if
(!processInstanceExecCacheManager.contains(processInstance.getId())) {
- log.warn("The execute cache manager doesn't contains this workflow
instance");
- return;
- }
- TaskStateEvent stateEvent = TaskStateEvent.builder()
- .processInstanceId(processInstance.getId())
- .taskInstanceId(taskInstance.getId())
- .type(StateEventType.TASK_STATE_CHANGE)
- .status(TaskExecutionStatus.RUNNING_EXECUTION)
- .build();
- this.submitStateEvent(stateEvent);
- }
-
- /**
- * notify process's master
- */
- private void notifyProcess(ProcessInstance finishProcessInstance,
ProcessInstance processInstance,
- TaskInstance taskInstance) {
- String processInstanceHost = processInstance.getHost();
- if (Strings.isNullOrEmpty(processInstanceHost)) {
- log.error("Process {} host is empty, cannot notify task {} now,
taskId: {}", processInstance.getName(),
- taskInstance.getName(), taskInstance.getId());
- return;
- }
- ITaskInstanceExecutionEventListener
iTaskInstanceExecutionEventListener =
- SingletonJdkDynamicRpcClientProxyFactory
- .getProxyClient(processInstanceHost,
ITaskInstanceExecutionEventListener.class);
-
- WorkflowInstanceStateChangeEvent workflowInstanceStateChangeEvent =
new WorkflowInstanceStateChangeEvent(
- finishProcessInstance.getId(), 0,
finishProcessInstance.getState(), processInstance.getId(),
- taskInstance.getId());
-
iTaskInstanceExecutionEventListener.onWorkflowInstanceInstanceStateChange(workflowInstanceStateChangeEvent);
- }
}