This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 36ebfe0  [Implement][server]TaskProcessor code optimization (#7146)
36ebfe0 is described below

commit 36ebfe0984bd4018c38c6cffebc0292bc94d7117
Author: Kerwin <[email protected]>
AuthorDate: Mon Dec 6 14:15:09 2021 +0800

    [Implement][server]TaskProcessor code optimization (#7146)
    
    * TaskProcessor code optimization
    
    * fix code style
    
    * update Collectors.toMap
---
 .../master/runner/MasterSchedulerService.java      |  6 +++-
 .../master/runner/WorkflowExecuteThread.java       | 15 ++++++--
 .../master/runner/task/BaseTaskProcessor.java      |  9 +++--
 .../runner/task/CommonTaskProcessFactory.java      | 36 -------------------
 .../master/runner/task/CommonTaskProcessor.java    |  8 ++---
 .../runner/task/ConditionTaskProcessFactory.java   | 35 -------------------
 .../master/runner/task/ConditionTaskProcessor.java |  9 ++---
 .../runner/task/DependentTaskProcessFactory.java   | 36 -------------------
 .../master/runner/task/DependentTaskProcessor.java |  8 +++--
 .../master/runner/task/ITaskProcessFactory.java    | 25 --------------
 .../master/runner/task/SubTaskProcessFactory.java  | 35 -------------------
 .../master/runner/task/SubTaskProcessor.java       |  8 +++--
 .../runner/task/SwitchTaskProcessFactory.java      | 36 -------------------
 .../master/runner/task/SwitchTaskProcessor.java    |  8 +++--
 .../master/runner/task/TaskProcessorFactory.java   | 40 +++++++++++++---------
 .../server/master/WorkflowExecuteThreadTest.java   |  6 +++-
 .../runner/task/TaskProcessorFactoryTest.java      |  3 +-
 17 files changed, 78 insertions(+), 245 deletions(-)

diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index 6e05da9..00eab78 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -33,6 +33,7 @@ import 
org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import 
org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
 import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
 import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
+import 
org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
 import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
@@ -68,6 +69,8 @@ public class MasterSchedulerService extends Thread {
      */
     @Autowired
     private ProcessService processService;
+    @Autowired
+    private TaskProcessorFactory taskProcessorFactory;
 
     /**
      * zookeeper master client
@@ -205,7 +208,8 @@ public class MasterSchedulerService extends Thread {
                     , nettyExecutorManager
                     , processAlertManager
                     , masterConfig
-                    , taskTimeoutCheckList);
+                    , taskTimeoutCheckList
+                    , taskProcessorFactory);
 
             
this.processInstanceExecCacheManager.cache(processInstance.getId(), 
workflowExecuteThread);
             if (processInstance.getTimeout() > 0) {
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index fa047a0..8a59409 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -131,6 +131,11 @@ public class WorkflowExecuteThread implements Runnable {
     private ProcessDefinition processDefinition;
 
     /**
+     * task processor
+     */
+    private TaskProcessorFactory taskProcessorFactory;
+
+    /**
      * the object of DAG
      */
     private DAG<String, TaskNode, TaskNodeRelation> dag;
@@ -216,13 +221,18 @@ public class WorkflowExecuteThread implements Runnable {
      * @param processInstance processInstance
      * @param processService processService
      * @param nettyExecutorManager nettyExecutorManager
+     * @param processAlertManager processAlertManager
+     * @param masterConfig masterConfig
+     * @param taskTimeoutCheckList taskTimeoutCheckList
+     * @param taskProcessorFactory taskProcessorFactory
      */
     public WorkflowExecuteThread(ProcessInstance processInstance
             , ProcessService processService
             , NettyExecutorManager nettyExecutorManager
             , ProcessAlertManager processAlertManager
             , MasterConfig masterConfig
-            , ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList) {
+            , ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList
+            , TaskProcessorFactory taskProcessorFactory) {
         this.processService = processService;
 
         this.processInstance = processInstance;
@@ -230,6 +240,7 @@ public class WorkflowExecuteThread implements Runnable {
         this.nettyExecutorManager = nettyExecutorManager;
         this.processAlertManager = processAlertManager;
         this.taskTimeoutCheckList = taskTimeoutCheckList;
+        this.taskProcessorFactory = taskProcessorFactory;
     }
 
     @Override
@@ -791,7 +802,7 @@ public class WorkflowExecuteThread implements Runnable {
      */
     private TaskInstance submitTaskExec(TaskInstance taskInstance) {
         try {
-            ITaskProcessor taskProcessor = 
TaskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
+            ITaskProcessor taskProcessor = 
taskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
             if (taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION
                     && 
taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) {
                 notifyProcessHostUpdate(taskInstance);
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
index 07ae812..b4951f4 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
@@ -40,7 +40,6 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.Tenant;
 import org.apache.dolphinscheduler.dao.entity.UdfFunc;
 import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.spi.enums.ResourceType;
@@ -62,6 +61,7 @@ import java.util.stream.Stream;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
 
 import com.google.common.base.Enums;
 import com.google.common.base.Strings;
@@ -80,7 +80,8 @@ public abstract class BaseTaskProcessor implements 
ITaskProcessor {
 
     protected ProcessInstance processInstance;
 
-    protected ProcessService processService = 
SpringApplicationContext.getBean(ProcessService.class);
+    @Autowired
+    protected ProcessService processService;
 
     /**
      * pause task, common tasks donot need this.
@@ -107,7 +108,6 @@ public abstract class BaseTaskProcessor implements 
ITaskProcessor {
     public void run() {
     }
 
-
     @Override
     public boolean action(TaskAction taskAction) {
 
@@ -119,7 +119,7 @@ public abstract class BaseTaskProcessor implements 
ITaskProcessor {
             case TIMEOUT:
                 return timeout();
             default:
-                logger.error("unknown task action: {}", taskAction.toString());
+                logger.error("unknown task action: {}", taskAction);
 
         }
         return false;
@@ -305,7 +305,6 @@ public abstract class BaseTaskProcessor implements 
ITaskProcessor {
         }
     }
 
-
     /**
      * set SQL task relation
      *
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessFactory.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessFactory.java
deleted file mode 100644
index 4884650..0000000
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessFactory.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner.task;
-
-import org.apache.dolphinscheduler.common.Constants;
-
-import com.google.auto.service.AutoService;
-
-@AutoService(ITaskProcessFactory.class)
-public class CommonTaskProcessFactory implements ITaskProcessFactory {
-    @Override
-    public String type() {
-        return Constants.COMMON_TASK_TYPE;
-
-    }
-
-    @Override
-    public ITaskProcessor create() {
-        return new CommonTaskProcessor();
-    }
-}
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
index 60c7de7..d44315a 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
@@ -24,7 +24,6 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
 import org.apache.dolphinscheduler.remote.utils.Host;
-import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import 
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
 import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
 import 
org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
@@ -40,20 +39,19 @@ import org.apache.commons.lang.StringUtils;
 import java.util.Date;
 
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
 
 /**
  * common task processor
  */
+@Service
 public class CommonTaskProcessor extends BaseTaskProcessor {
 
     @Autowired
     private TaskPriorityQueue taskUpdateQueue;
 
     @Autowired
-    MasterConfig masterConfig;
-
-    @Autowired
-    NettyExecutorManager nettyExecutorManager = 
SpringApplicationContext.getBean(NettyExecutorManager.class);
+    NettyExecutorManager nettyExecutorManager;
 
     @Override
     public boolean submit(TaskInstance task, ProcessInstance processInstance, 
int maxRetryTimes, int commitInterval) {
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessFactory.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessFactory.java
deleted file mode 100644
index 3028c56..0000000
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessFactory.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner.task;
-
-import org.apache.dolphinscheduler.common.enums.TaskType;
-
-import com.google.auto.service.AutoService;
-
-@AutoService(ITaskProcessFactory.class)
-public class ConditionTaskProcessFactory implements ITaskProcessFactory {
-    @Override
-    public String type() {
-        return TaskType.CONDITIONS.getDesc();
-    }
-
-    @Override
-    public ITaskProcessor create() {
-        return new ConditionTaskProcessor();
-    }
-}
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
index 2412659..c3c65b3 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
@@ -26,14 +26,12 @@ import 
org.apache.dolphinscheduler.common.model.DependentItem;
 import org.apache.dolphinscheduler.common.model.DependentTaskModel;
 import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
 import org.apache.dolphinscheduler.common.utils.DependentUtils;
-import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.utils.LogUtils;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 
 import java.util.ArrayList;
 import java.util.Date;
@@ -41,11 +39,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
 
 /**
  * condition task processor
  */
+@Service
 public class ConditionTaskProcessor extends BaseTaskProcessor {
 
     /**
@@ -65,7 +65,8 @@ public class ConditionTaskProcessor extends BaseTaskProcessor 
{
      */
     private Map<Long, ExecutionStatus> completeTaskList = new 
ConcurrentHashMap<>();
 
-    MasterConfig masterConfig = 
SpringApplicationContext.getBean(MasterConfig.class);
+    @Autowired
+    private MasterConfig masterConfig;
 
     private TaskDefinition taskDefinition;
 
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessFactory.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessFactory.java
deleted file mode 100644
index 3f885ed..0000000
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessFactory.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner.task;
-
-import org.apache.dolphinscheduler.common.enums.TaskType;
-
-import com.google.auto.service.AutoService;
-
-@AutoService(ITaskProcessFactory.class)
-public class DependentTaskProcessFactory implements ITaskProcessFactory {
-
-    @Override
-    public String type() {
-        return TaskType.DEPENDENT.getDesc();
-    }
-
-    @Override
-    public ITaskProcessor create() {
-        return new DependentTaskProcessor();
-    }
-}
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
index 0478f7e..28cd0e7 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
@@ -33,7 +33,6 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.utils.DependentExecute;
 import org.apache.dolphinscheduler.server.utils.LogUtils;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 
 import java.util.ArrayList;
 import java.util.Date;
@@ -41,11 +40,15 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
 import com.fasterxml.jackson.annotation.JsonFormat;
 
 /**
  * dependent task processor
  */
+@Service
 public class DependentTaskProcessor extends BaseTaskProcessor {
 
     private DependentParameters dependentParameters;
@@ -72,7 +75,8 @@ public class DependentTaskProcessor extends BaseTaskProcessor 
{
     ProcessInstance processInstance;
     TaskDefinition taskDefinition;
 
-    MasterConfig masterConfig = 
SpringApplicationContext.getBean(MasterConfig.class);
+    @Autowired
+    private MasterConfig masterConfig;
 
     boolean allDependentItemFinished;
 
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessFactory.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessFactory.java
deleted file mode 100644
index ffbbafb..0000000
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessFactory.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner.task;
-
-public interface ITaskProcessFactory {
-
-    String type();
-
-    ITaskProcessor create();
-}
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessFactory.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessFactory.java
deleted file mode 100644
index 439d8e1..0000000
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessFactory.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner.task;
-
-import org.apache.dolphinscheduler.common.enums.TaskType;
-
-import com.google.auto.service.AutoService;
-
-@AutoService(ITaskProcessFactory.class)
-public class SubTaskProcessFactory implements ITaskProcessFactory {
-    @Override
-    public String type() {
-        return TaskType.SUB_PROCESS.getDesc();
-    }
-
-    @Override
-    public ITaskProcessor create() {
-        return new SubTaskProcessor();
-    }
-}
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
index 32cbce1..10d1b28 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
@@ -25,15 +25,18 @@ import 
org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
 import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 
 import java.util.Date;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
 /**
  *
  */
+@Service
 public class SubTaskProcessor extends BaseTaskProcessor {
 
     private ProcessInstance processInstance;
@@ -46,7 +49,8 @@ public class SubTaskProcessor extends BaseTaskProcessor {
      */
     private final Lock runLock = new ReentrantLock();
 
-    private StateEventCallbackService stateEventCallbackService = 
SpringApplicationContext.getBean(StateEventCallbackService.class);
+    @Autowired
+    private StateEventCallbackService stateEventCallbackService;
 
     @Override
     public boolean submit(TaskInstance task, ProcessInstance processInstance, 
int masterTaskCommitRetryTimes, int masterTaskCommitInterval) {
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessFactory.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessFactory.java
deleted file mode 100644
index d536e65..0000000
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessFactory.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner.task;
-
-import org.apache.dolphinscheduler.common.enums.TaskType;
-
-import com.google.auto.service.AutoService;
-
-@AutoService(ITaskProcessFactory.class)
-public class SwitchTaskProcessFactory implements ITaskProcessFactory {
-
-    @Override
-    public String type() {
-        return TaskType.SWITCH.getDesc();
-    }
-
-    @Override
-    public ITaskProcessor create() {
-        return new SwitchTaskProcessor();
-    }
-}
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
index b67dc47..5378649 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
@@ -31,7 +31,6 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.utils.LogUtils;
 import org.apache.dolphinscheduler.server.utils.SwitchTaskUtils;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 
 import org.apache.commons.lang.StringUtils;
 
@@ -43,6 +42,10 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
 public class SwitchTaskProcessor extends BaseTaskProcessor {
 
     protected final String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*";
@@ -52,7 +55,8 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
     private ProcessInstance processInstance;
     TaskDefinition taskDefinition;
 
-    MasterConfig masterConfig = 
SpringApplicationContext.getBean(MasterConfig.class);
+    @Autowired
+    private MasterConfig masterConfig;
 
     /**
      * switch result
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
index 61a8ba5..09e8bf2 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
@@ -17,37 +17,43 @@
 
 package org.apache.dolphinscheduler.server.master.runner.task;
 
-import org.apache.dolphinscheduler.common.Constants;
+import static org.apache.dolphinscheduler.common.Constants.COMMON_TASK_TYPE;
 
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
 import java.util.Map;
-import java.util.ServiceLoader;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
-import com.google.common.base.Strings;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
 
 /**
  * the factory to create task processor
  */
+@Service
 public class TaskProcessorFactory {
 
-    public static final Map<String, ITaskProcessFactory> PROCESS_FACTORY_MAP = 
new ConcurrentHashMap<>();
+    private static final String DEFAULT_PROCESSOR = COMMON_TASK_TYPE;
 
-    private static final String DEFAULT_PROCESSOR = Constants.COMMON_TASK_TYPE;
+    private Map<String, ITaskProcessor> taskProcessorMap;
 
-    static {
-        for (ITaskProcessFactory iTaskProcessor : 
ServiceLoader.load(ITaskProcessFactory.class)) {
-            PROCESS_FACTORY_MAP.put(iTaskProcessor.type(), iTaskProcessor);
-        }
+    @Autowired
+    public TaskProcessorFactory(List<ITaskProcessor> taskProcessors) {
+        taskProcessorMap = 
taskProcessors.stream().collect(Collectors.toMap(ITaskProcessor::getType, 
Function.identity(), (v1, v2) -> v2));
     }
 
-    public static ITaskProcessor getTaskProcessor(String type) {
-        if (Strings.isNullOrEmpty(type)) {
-            return PROCESS_FACTORY_MAP.get(DEFAULT_PROCESSOR).create();
+    public ITaskProcessor getTaskProcessor(String key) {
+        if (StringUtils.isEmpty(key)) {
+            key = DEFAULT_PROCESSOR;
         }
-        if (!PROCESS_FACTORY_MAP.containsKey(type)) {
-            return PROCESS_FACTORY_MAP.get(DEFAULT_PROCESSOR).create();
+        ITaskProcessor taskProcessor = taskProcessorMap.get(key);
+        if (Objects.isNull(taskProcessor)) {
+            taskProcessor = taskProcessorMap.get(DEFAULT_PROCESSOR);
         }
-        return PROCESS_FACTORY_MAP.get(type).create();
-    }
 
+        return taskProcessor;
+    }
 }
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
index fe9bddc..48c1f84 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
@@ -37,6 +37,7 @@ import org.apache.dolphinscheduler.dao.entity.Schedule;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
+import 
org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
 import java.lang.reflect.Field;
@@ -81,9 +82,12 @@ public class WorkflowExecuteThreadTest {
 
     private ApplicationContext applicationContext;
 
+    private TaskProcessorFactory taskProcessorFactory;
+
     @Before
     public void init() throws Exception {
         processService = mock(ProcessService.class);
+        taskProcessorFactory = mock(TaskProcessorFactory.class);
 
         applicationContext = mock(ApplicationContext.class);
         config = new MasterConfig();
@@ -104,7 +108,7 @@ public class WorkflowExecuteThreadTest {
         
Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition);
 
         ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList = new 
ConcurrentHashMap<>();
-        workflowExecuteThread = PowerMockito.spy(new 
WorkflowExecuteThread(processInstance, processService, null, null, config, 
taskTimeoutCheckList));
+        workflowExecuteThread = PowerMockito.spy(new 
WorkflowExecuteThread(processInstance, processService, null, null, config, 
taskTimeoutCheckList, taskProcessorFactory));
         // prepareProcess init dag
         Field dag = WorkflowExecuteThread.class.getDeclaredField("dag");
         dag.setAccessible(true);
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
index a49719a..2dd349c 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
@@ -26,13 +26,14 @@ import org.junit.Test;
 @Ignore
 public class TaskProcessorFactoryTest {
 
+    private TaskProcessorFactory taskProcessorFactory;
     @Test
     public void testFactory() {
 
         TaskInstance taskInstance = new TaskInstance();
         taskInstance.setTaskType("shell");
 
-        ITaskProcessor iTaskProcessor = 
TaskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
+        ITaskProcessor iTaskProcessor = 
taskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
 
         Assert.assertNotNull(iTaskProcessor);
     }

Reply via email to