This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 36ebfe0 [Implement][server]TaskProcessor code optimization (#7146)
36ebfe0 is described below
commit 36ebfe0984bd4018c38c6cffebc0292bc94d7117
Author: Kerwin <[email protected]>
AuthorDate: Mon Dec 6 14:15:09 2021 +0800
[Implement][server]TaskProcessor code optimization (#7146)
* TaskProcessor code optimization
* fix code style
* update Collectors.toMap
---
.../master/runner/MasterSchedulerService.java | 6 +++-
.../master/runner/WorkflowExecuteThread.java | 15 ++++++--
.../master/runner/task/BaseTaskProcessor.java | 9 +++--
.../runner/task/CommonTaskProcessFactory.java | 36 -------------------
.../master/runner/task/CommonTaskProcessor.java | 8 ++---
.../runner/task/ConditionTaskProcessFactory.java | 35 -------------------
.../master/runner/task/ConditionTaskProcessor.java | 9 ++---
.../runner/task/DependentTaskProcessFactory.java | 36 -------------------
.../master/runner/task/DependentTaskProcessor.java | 8 +++--
.../master/runner/task/ITaskProcessFactory.java | 25 --------------
.../master/runner/task/SubTaskProcessFactory.java | 35 -------------------
.../master/runner/task/SubTaskProcessor.java | 8 +++--
.../runner/task/SwitchTaskProcessFactory.java | 36 -------------------
.../master/runner/task/SwitchTaskProcessor.java | 8 +++--
.../master/runner/task/TaskProcessorFactory.java | 40 +++++++++++++---------
.../server/master/WorkflowExecuteThreadTest.java | 6 +++-
.../runner/task/TaskProcessorFactoryTest.java | 3 +-
17 files changed, 78 insertions(+), 245 deletions(-)
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index 6e05da9..00eab78 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -33,6 +33,7 @@ import
org.apache.dolphinscheduler.server.master.config.MasterConfig;
import
org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
+import
org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -68,6 +69,8 @@ public class MasterSchedulerService extends Thread {
*/
@Autowired
private ProcessService processService;
+ @Autowired
+ private TaskProcessorFactory taskProcessorFactory;
/**
* zookeeper master client
@@ -205,7 +208,8 @@ public class MasterSchedulerService extends Thread {
, nettyExecutorManager
, processAlertManager
, masterConfig
- , taskTimeoutCheckList);
+ , taskTimeoutCheckList
+ , taskProcessorFactory);
this.processInstanceExecCacheManager.cache(processInstance.getId(),
workflowExecuteThread);
if (processInstance.getTimeout() > 0) {
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index fa047a0..8a59409 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -131,6 +131,11 @@ public class WorkflowExecuteThread implements Runnable {
private ProcessDefinition processDefinition;
/**
+ * task processor
+ */
+ private TaskProcessorFactory taskProcessorFactory;
+
+ /**
* the object of DAG
*/
private DAG<String, TaskNode, TaskNodeRelation> dag;
@@ -216,13 +221,18 @@ public class WorkflowExecuteThread implements Runnable {
* @param processInstance processInstance
* @param processService processService
* @param nettyExecutorManager nettyExecutorManager
+ * @param processAlertManager processAlertManager
+ * @param masterConfig masterConfig
+ * @param taskTimeoutCheckList taskTimeoutCheckList
+ * @param taskProcessorFactory taskProcessorFactory
*/
public WorkflowExecuteThread(ProcessInstance processInstance
, ProcessService processService
, NettyExecutorManager nettyExecutorManager
, ProcessAlertManager processAlertManager
, MasterConfig masterConfig
- , ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList) {
+ , ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList
+ , TaskProcessorFactory taskProcessorFactory) {
this.processService = processService;
this.processInstance = processInstance;
@@ -230,6 +240,7 @@ public class WorkflowExecuteThread implements Runnable {
this.nettyExecutorManager = nettyExecutorManager;
this.processAlertManager = processAlertManager;
this.taskTimeoutCheckList = taskTimeoutCheckList;
+ this.taskProcessorFactory = taskProcessorFactory;
}
@Override
@@ -791,7 +802,7 @@ public class WorkflowExecuteThread implements Runnable {
*/
private TaskInstance submitTaskExec(TaskInstance taskInstance) {
try {
- ITaskProcessor taskProcessor =
TaskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
+ ITaskProcessor taskProcessor =
taskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
if (taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION
&&
taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) {
notifyProcessHostUpdate(taskInstance);
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
index 07ae812..b4951f4 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
@@ -40,7 +40,6 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
@@ -62,6 +61,7 @@ import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
import com.google.common.base.Enums;
import com.google.common.base.Strings;
@@ -80,7 +80,8 @@ public abstract class BaseTaskProcessor implements
ITaskProcessor {
protected ProcessInstance processInstance;
- protected ProcessService processService =
SpringApplicationContext.getBean(ProcessService.class);
+ @Autowired
+ protected ProcessService processService;
/**
* pause task, common tasks donot need this.
@@ -107,7 +108,6 @@ public abstract class BaseTaskProcessor implements
ITaskProcessor {
public void run() {
}
-
@Override
public boolean action(TaskAction taskAction) {
@@ -119,7 +119,7 @@ public abstract class BaseTaskProcessor implements
ITaskProcessor {
case TIMEOUT:
return timeout();
default:
- logger.error("unknown task action: {}", taskAction.toString());
+ logger.error("unknown task action: {}", taskAction);
}
return false;
@@ -305,7 +305,6 @@ public abstract class BaseTaskProcessor implements
ITaskProcessor {
}
}
-
/**
* set SQL task relation
*
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessFactory.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessFactory.java
deleted file mode 100644
index 4884650..0000000
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessFactory.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner.task;
-
-import org.apache.dolphinscheduler.common.Constants;
-
-import com.google.auto.service.AutoService;
-
-@AutoService(ITaskProcessFactory.class)
-public class CommonTaskProcessFactory implements ITaskProcessFactory {
- @Override
- public String type() {
- return Constants.COMMON_TASK_TYPE;
-
- }
-
- @Override
- public ITaskProcessor create() {
- return new CommonTaskProcessor();
- }
-}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
index 60c7de7..d44315a 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
@@ -24,7 +24,6 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
import org.apache.dolphinscheduler.remote.utils.Host;
-import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import
org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
@@ -40,20 +39,19 @@ import org.apache.commons.lang.StringUtils;
import java.util.Date;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
/**
* common task processor
*/
+@Service
public class CommonTaskProcessor extends BaseTaskProcessor {
@Autowired
private TaskPriorityQueue taskUpdateQueue;
@Autowired
- MasterConfig masterConfig;
-
- @Autowired
- NettyExecutorManager nettyExecutorManager =
SpringApplicationContext.getBean(NettyExecutorManager.class);
+ NettyExecutorManager nettyExecutorManager;
@Override
public boolean submit(TaskInstance task, ProcessInstance processInstance,
int maxRetryTimes, int commitInterval) {
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessFactory.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessFactory.java
deleted file mode 100644
index 3028c56..0000000
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessFactory.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner.task;
-
-import org.apache.dolphinscheduler.common.enums.TaskType;
-
-import com.google.auto.service.AutoService;
-
-@AutoService(ITaskProcessFactory.class)
-public class ConditionTaskProcessFactory implements ITaskProcessFactory {
- @Override
- public String type() {
- return TaskType.CONDITIONS.getDesc();
- }
-
- @Override
- public ITaskProcessor create() {
- return new ConditionTaskProcessor();
- }
-}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
index 2412659..c3c65b3 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
@@ -26,14 +26,12 @@ import
org.apache.dolphinscheduler.common.model.DependentItem;
import org.apache.dolphinscheduler.common.model.DependentTaskModel;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.utils.DependentUtils;
-import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.LogUtils;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.ArrayList;
import java.util.Date;
@@ -41,11 +39,13 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
/**
* condition task processor
*/
+@Service
public class ConditionTaskProcessor extends BaseTaskProcessor {
/**
@@ -65,7 +65,8 @@ public class ConditionTaskProcessor extends BaseTaskProcessor
{
*/
private Map<Long, ExecutionStatus> completeTaskList = new
ConcurrentHashMap<>();
- MasterConfig masterConfig =
SpringApplicationContext.getBean(MasterConfig.class);
+ @Autowired
+ private MasterConfig masterConfig;
private TaskDefinition taskDefinition;
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessFactory.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessFactory.java
deleted file mode 100644
index 3f885ed..0000000
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessFactory.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner.task;
-
-import org.apache.dolphinscheduler.common.enums.TaskType;
-
-import com.google.auto.service.AutoService;
-
-@AutoService(ITaskProcessFactory.class)
-public class DependentTaskProcessFactory implements ITaskProcessFactory {
-
- @Override
- public String type() {
- return TaskType.DEPENDENT.getDesc();
- }
-
- @Override
- public ITaskProcessor create() {
- return new DependentTaskProcessor();
- }
-}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
index 0478f7e..28cd0e7 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
@@ -33,7 +33,6 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.DependentExecute;
import org.apache.dolphinscheduler.server.utils.LogUtils;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.ArrayList;
import java.util.Date;
@@ -41,11 +40,15 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
import com.fasterxml.jackson.annotation.JsonFormat;
/**
* dependent task processor
*/
+@Service
public class DependentTaskProcessor extends BaseTaskProcessor {
private DependentParameters dependentParameters;
@@ -72,7 +75,8 @@ public class DependentTaskProcessor extends BaseTaskProcessor
{
ProcessInstance processInstance;
TaskDefinition taskDefinition;
- MasterConfig masterConfig =
SpringApplicationContext.getBean(MasterConfig.class);
+ @Autowired
+ private MasterConfig masterConfig;
boolean allDependentItemFinished;
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessFactory.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessFactory.java
deleted file mode 100644
index ffbbafb..0000000
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessFactory.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner.task;
-
-public interface ITaskProcessFactory {
-
- String type();
-
- ITaskProcessor create();
-}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessFactory.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessFactory.java
deleted file mode 100644
index 439d8e1..0000000
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessFactory.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner.task;
-
-import org.apache.dolphinscheduler.common.enums.TaskType;
-
-import com.google.auto.service.AutoService;
-
-@AutoService(ITaskProcessFactory.class)
-public class SubTaskProcessFactory implements ITaskProcessFactory {
- @Override
- public String type() {
- return TaskType.SUB_PROCESS.getDesc();
- }
-
- @Override
- public ITaskProcessor create() {
- return new SubTaskProcessor();
- }
-}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
index 32cbce1..10d1b28 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
@@ -25,15 +25,18 @@ import
org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.Date;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
/**
*
*/
+@Service
public class SubTaskProcessor extends BaseTaskProcessor {
private ProcessInstance processInstance;
@@ -46,7 +49,8 @@ public class SubTaskProcessor extends BaseTaskProcessor {
*/
private final Lock runLock = new ReentrantLock();
- private StateEventCallbackService stateEventCallbackService =
SpringApplicationContext.getBean(StateEventCallbackService.class);
+ @Autowired
+ private StateEventCallbackService stateEventCallbackService;
@Override
public boolean submit(TaskInstance task, ProcessInstance processInstance,
int masterTaskCommitRetryTimes, int masterTaskCommitInterval) {
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessFactory.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessFactory.java
deleted file mode 100644
index d536e65..0000000
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessFactory.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner.task;
-
-import org.apache.dolphinscheduler.common.enums.TaskType;
-
-import com.google.auto.service.AutoService;
-
-@AutoService(ITaskProcessFactory.class)
-public class SwitchTaskProcessFactory implements ITaskProcessFactory {
-
- @Override
- public String type() {
- return TaskType.SWITCH.getDesc();
- }
-
- @Override
- public ITaskProcessor create() {
- return new SwitchTaskProcessor();
- }
-}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
index b67dc47..5378649 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
@@ -31,7 +31,6 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.server.utils.SwitchTaskUtils;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.commons.lang.StringUtils;
@@ -43,6 +42,10 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
public class SwitchTaskProcessor extends BaseTaskProcessor {
protected final String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*";
@@ -52,7 +55,8 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
private ProcessInstance processInstance;
TaskDefinition taskDefinition;
- MasterConfig masterConfig =
SpringApplicationContext.getBean(MasterConfig.class);
+ @Autowired
+ private MasterConfig masterConfig;
/**
* switch result
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
index 61a8ba5..09e8bf2 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
@@ -17,37 +17,43 @@
package org.apache.dolphinscheduler.server.master.runner.task;
-import org.apache.dolphinscheduler.common.Constants;
+import static org.apache.dolphinscheduler.common.Constants.COMMON_TASK_TYPE;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
import java.util.Map;
-import java.util.ServiceLoader;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.Collectors;
-import com.google.common.base.Strings;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
/**
* the factory to create task processor
*/
+@Service
public class TaskProcessorFactory {
- public static final Map<String, ITaskProcessFactory> PROCESS_FACTORY_MAP =
new ConcurrentHashMap<>();
+ private static final String DEFAULT_PROCESSOR = COMMON_TASK_TYPE;
- private static final String DEFAULT_PROCESSOR = Constants.COMMON_TASK_TYPE;
+ private Map<String, ITaskProcessor> taskProcessorMap;
- static {
- for (ITaskProcessFactory iTaskProcessor :
ServiceLoader.load(ITaskProcessFactory.class)) {
- PROCESS_FACTORY_MAP.put(iTaskProcessor.type(), iTaskProcessor);
- }
+ @Autowired
+ public TaskProcessorFactory(List<ITaskProcessor> taskProcessors) {
+ taskProcessorMap =
taskProcessors.stream().collect(Collectors.toMap(ITaskProcessor::getType,
Function.identity(), (v1, v2) -> v2));
}
- public static ITaskProcessor getTaskProcessor(String type) {
- if (Strings.isNullOrEmpty(type)) {
- return PROCESS_FACTORY_MAP.get(DEFAULT_PROCESSOR).create();
+ public ITaskProcessor getTaskProcessor(String key) {
+ if (StringUtils.isEmpty(key)) {
+ key = DEFAULT_PROCESSOR;
}
- if (!PROCESS_FACTORY_MAP.containsKey(type)) {
- return PROCESS_FACTORY_MAP.get(DEFAULT_PROCESSOR).create();
+ ITaskProcessor taskProcessor = taskProcessorMap.get(key);
+ if (Objects.isNull(taskProcessor)) {
+ taskProcessor = taskProcessorMap.get(DEFAULT_PROCESSOR);
}
- return PROCESS_FACTORY_MAP.get(type).create();
- }
+ return taskProcessor;
+ }
}
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
index fe9bddc..48c1f84 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
@@ -37,6 +37,7 @@ import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
+import
org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.lang.reflect.Field;
@@ -81,9 +82,12 @@ public class WorkflowExecuteThreadTest {
private ApplicationContext applicationContext;
+ private TaskProcessorFactory taskProcessorFactory;
+
@Before
public void init() throws Exception {
processService = mock(ProcessService.class);
+ taskProcessorFactory = mock(TaskProcessorFactory.class);
applicationContext = mock(ApplicationContext.class);
config = new MasterConfig();
@@ -104,7 +108,7 @@ public class WorkflowExecuteThreadTest {
Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition);
ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList = new
ConcurrentHashMap<>();
- workflowExecuteThread = PowerMockito.spy(new
WorkflowExecuteThread(processInstance, processService, null, null, config,
taskTimeoutCheckList));
+ workflowExecuteThread = PowerMockito.spy(new
WorkflowExecuteThread(processInstance, processService, null, null, config,
taskTimeoutCheckList, taskProcessorFactory));
// prepareProcess init dag
Field dag = WorkflowExecuteThread.class.getDeclaredField("dag");
dag.setAccessible(true);
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
index a49719a..2dd349c 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
@@ -26,13 +26,14 @@ import org.junit.Test;
@Ignore
public class TaskProcessorFactoryTest {
+ private TaskProcessorFactory taskProcessorFactory;
@Test
public void testFactory() {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setTaskType("shell");
- ITaskProcessor iTaskProcessor =
TaskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
+ ITaskProcessor iTaskProcessor =
taskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
Assert.assertNotNull(iTaskProcessor);
}