This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new f88def8  [IMPROVEMENT-8178] Add Netty processor in Spring container 
(#8179)
f88def8 is described below

commit f88def8ef740702442f0e3ae1a9bd4a7a919aabb
Author: Wenjun Ruan <[email protected]>
AuthorDate: Tue Jan 25 21:33:09 2022 +0800

    [IMPROVEMENT-8178] Add Netty processor in Spring container (#8179)
    
    * Add Netty processor in Spring container
---
 .gitignore                                         |  1 +
 .../server/log/LoggerRequestProcessor.java         |  2 ++
 .../server/master/MasterServer.java                |  9 +++--
 .../dispatch/executor/NettyExecutorManager.java    |  9 +++--
 .../processor/TaskKillResponseProcessor.java       |  2 ++
 .../service/alert/AlertClientService.java          |  3 +-
 .../server/worker/WorkerServer.java                | 34 ++++++++++++------
 .../server/worker/config/BeanConfig.java           | 34 ++++++++++++++++++
 .../worker/processor/DBTaskAckProcessor.java       |  3 +-
 .../worker/processor/DBTaskResponseProcessor.java  |  2 ++
 .../worker/processor/HostUpdateProcessor.java      |  2 ++
 .../worker/processor/TaskExecuteProcessor.java     | 29 +++++++---------
 .../server/worker/processor/TaskKillProcessor.java | 24 +++++--------
 .../server/worker/config/BeanConfigTest.java       | 40 ++++++++++++++++++++++
 .../processor/TaskCallbackServiceTestConfig.java   |  0
 .../processor/TaskExecuteProcessorTest.java        |  2 --
 .../registry/WorkerRegistryClientTest.java         |  0
 17 files changed, 142 insertions(+), 54 deletions(-)

diff --git a/.gitignore b/.gitignore
index 3ddb824..edc8066 100644
--- a/.gitignore
+++ b/.gitignore
@@ -47,6 +47,7 @@ dolphinscheduler-ui/dist
 dolphinscheduler-ui/node
 dolphinscheduler-common/sql
 dolphinscheduler-common/test
+dolphinscheduler-worker/logs
 
 # ------------------
 # pydolphinscheduler
diff --git 
a/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
 
b/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
index f6e23f0..6e85b21 100644
--- 
a/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
+++ 
b/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
@@ -48,12 +48,14 @@ import java.util.stream.Stream;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
 
 import io.netty.channel.Channel;
 
 /**
  * logger request process logic
  */
+@Component
 public class LoggerRequestProcessor implements NettyRequestProcessor {
 
     private final Logger logger = 
LoggerFactory.getLogger(LoggerRequestProcessor.class);
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 3021c02..22b4a69 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -90,11 +90,17 @@ public class MasterServer implements IStoppable {
     private CacheProcessor cacheProcessor;
 
     @Autowired
+    private TaskKillResponseProcessor taskKillResponseProcessor;
+
+    @Autowired
     private EventExecuteService eventExecuteService;
 
     @Autowired
     private FailoverExecuteThread failoverExecuteThread;
 
+    @Autowired
+    private LoggerRequestProcessor loggerRequestProcessor;
+
     public static void main(String[] args) {
         Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER);
         SpringApplication.run(MasterServer.class);
@@ -111,14 +117,13 @@ public class MasterServer implements IStoppable {
         this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
         
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, 
taskResponseProcessor);
         
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, 
taskAckProcessor);
-        
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new 
TaskKillResponseProcessor());
+        
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, 
taskKillResponseProcessor);
         
this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, 
stateEventProcessor);
         
this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST,
 taskEventProcessor);
         
this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST,
 taskEventProcessor);
         this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, 
cacheProcessor);
 
         // logger server
-        LoggerRequestProcessor loggerRequestProcessor = new 
LoggerRequestProcessor();
         
this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, 
loggerRequestProcessor);
         
this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, 
loggerRequestProcessor);
         
this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, 
loggerRequestProcessor);
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
index b0b8d25..521d0c4 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
@@ -63,6 +63,9 @@ public class NettyExecutorManager extends 
AbstractExecutorManager<Boolean>{
     private TaskAckProcessor taskAckProcessor;
 
     @Autowired
+    private TaskKillResponseProcessor taskKillResponseProcessor;
+
+    @Autowired
     private TaskResponseProcessor taskResponseProcessor;
 
     /**
@@ -80,13 +83,9 @@ public class NettyExecutorManager extends 
AbstractExecutorManager<Boolean>{
 
     @PostConstruct
     public void init(){
-        /**
-         * register EXECUTE_TASK_RESPONSE command type TaskResponseProcessor
-         * register EXECUTE_TASK_ACK command type TaskAckProcessor
-         */
         
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, 
taskResponseProcessor);
         
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_ACK, 
taskAckProcessor);
-        
this.nettyRemotingClient.registerProcessor(CommandType.TASK_KILL_RESPONSE, new 
TaskKillResponseProcessor());
+        
this.nettyRemotingClient.registerProcessor(CommandType.TASK_KILL_RESPONSE, 
taskKillResponseProcessor);
     }
 
     /**
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
index 28f18fe..135257c 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
@@ -25,6 +25,7 @@ import 
org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
 
 import com.google.common.base.Preconditions;
 
@@ -33,6 +34,7 @@ import io.netty.channel.Channel;
 /**
  *  task response processor
  */
+@Component
 public class TaskKillResponseProcessor implements NettyRequestProcessor {
 
     private final Logger logger = 
LoggerFactory.getLogger(TaskKillResponseProcessor.class);
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java
index 49977fa..0aeb25b 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java
@@ -28,7 +28,7 @@ import 
org.apache.dolphinscheduler.remote.utils.JsonSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class AlertClientService {
+public class AlertClientService implements AutoCloseable {
 
     private static final Logger logger = 
LoggerFactory.getLogger(AlertClientService.class);
 
@@ -70,6 +70,7 @@ public class AlertClientService {
     /**
      * close
      */
+    @Override
     public void close() {
         this.client.close();
         this.isRunning = false;
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 79e2e82..6296aeb 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -81,6 +81,7 @@ public class WorkerServer implements IStoppable {
     /**
      * alert model netty remote server
      */
+    @Autowired
     private AlertClientService alertClientService;
 
     @Autowired
@@ -98,6 +99,24 @@ public class WorkerServer implements IStoppable {
     @Autowired
     private TaskPluginManager taskPluginManager;
 
+    @Autowired
+    private TaskExecuteProcessor taskExecuteProcessor;
+
+    @Autowired
+    private TaskKillProcessor taskKillProcessor;
+
+    @Autowired
+    private DBTaskAckProcessor dbTaskAckProcessor;
+
+    @Autowired
+    private DBTaskResponseProcessor dbTaskResponseProcessor;
+
+    @Autowired
+    private HostUpdateProcessor hostUpdateProcessor;
+
+    @Autowired
+    private LoggerRequestProcessor loggerRequestProcessor;
+
     /**
      * worker server startup, not use web service
      *
@@ -113,22 +132,17 @@ public class WorkerServer implements IStoppable {
      */
     @PostConstruct
     public void run() {
-        // alert-server client registry
-        alertClientService = new 
AlertClientService(workerConfig.getAlertListenHost(),
-                                                    
workerConfig.getAlertListenPort());
-
         // init remoting server
         NettyServerConfig serverConfig = new NettyServerConfig();
         serverConfig.setListenPort(workerConfig.getListenPort());
         this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
-        
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, 
new TaskExecuteProcessor(alertClientService, taskPluginManager));
-        
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new 
TaskKillProcessor());
-        this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, 
new DBTaskAckProcessor());
-        
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, new 
DBTaskResponseProcessor());
-        
this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST,
 new HostUpdateProcessor());
+        
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, 
taskExecuteProcessor);
+        
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, 
taskKillProcessor);
+        this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, 
dbTaskAckProcessor);
+        
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, 
dbTaskResponseProcessor);
+        
this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST,
 hostUpdateProcessor);
 
         // logger server
-        LoggerRequestProcessor loggerRequestProcessor = new 
LoggerRequestProcessor();
         
this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, 
loggerRequestProcessor);
         
this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, 
loggerRequestProcessor);
         
this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, 
loggerRequestProcessor);
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/BeanConfig.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/BeanConfig.java
new file mode 100644
index 0000000..7b4c6d8
--- /dev/null
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/BeanConfig.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.config;
+
+import org.apache.dolphinscheduler.service.alert.AlertClientService;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class BeanConfig {
+
+    @Bean
+    public AlertClientService alertClientService(WorkerConfig workerConfig) {
+        return new AlertClientService(
+                workerConfig.getAlertListenHost(),
+                workerConfig.getAlertListenPort());
+    }
+}
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java
index 2c9bfe7..186b99d 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java
@@ -25,17 +25,18 @@ import 
org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
 
 import com.google.common.base.Preconditions;
 
 /**
  *  db task ack processor
  */
+@Component
 public class DBTaskAckProcessor implements NettyRequestProcessor {
 
     private final Logger logger = 
LoggerFactory.getLogger(DBTaskAckProcessor.class);
 
-
     @Override
     public void process(Channel channel, Command command) {
         Preconditions.checkArgument(CommandType.DB_TASK_ACK == 
command.getType(),
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
index fba6729..07fbf06 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
@@ -27,6 +27,7 @@ import 
org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
 
 import com.google.common.base.Preconditions;
 
@@ -35,6 +36,7 @@ import io.netty.channel.Channel;
 /**
  *  db task response processor
  */
+@Component
 public class DBTaskResponseProcessor implements NettyRequestProcessor {
 
     private final Logger logger = 
LoggerFactory.getLogger(DBTaskResponseProcessor.class);
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java
index 8928d50..aa12938 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java
@@ -27,6 +27,7 @@ import 
org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
 
 import com.google.common.base.Preconditions;
 
@@ -36,6 +37,7 @@ import io.netty.channel.Channel;
  * update process host
  * this used when master failover
  */
+@Component
 public class HostUpdateProcessor implements NettyRequestProcessor {
 
     private final Logger logger = 
LoggerFactory.getLogger(HostUpdateProcessor.class);
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index 1f9770a..bca87c3 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -40,7 +40,6 @@ import 
org.apache.dolphinscheduler.server.worker.plugin.TaskPluginManager;
 import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
 import org.apache.dolphinscheduler.service.alert.AlertClientService;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
 import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
@@ -49,6 +48,8 @@ import java.util.Date;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
 
 import com.google.common.base.Preconditions;
 
@@ -57,6 +58,7 @@ import io.netty.channel.Channel;
 /**
  * worker request processor
  */
+@Component
 public class TaskExecuteProcessor implements NettyRequestProcessor {
 
     private static final Logger logger = 
LoggerFactory.getLogger(TaskExecuteProcessor.class);
@@ -64,30 +66,29 @@ public class TaskExecuteProcessor implements 
NettyRequestProcessor {
     /**
      * worker config
      */
-    private final WorkerConfig workerConfig;
+    @Autowired
+    private WorkerConfig workerConfig;
 
     /**
      * task callback service
      */
-    private final TaskCallbackService taskCallbackService;
+    @Autowired
+    private TaskCallbackService taskCallbackService;
 
     /**
      * alert client service
      */
+    @Autowired
     private AlertClientService alertClientService;
 
+    @Autowired
     private TaskPluginManager taskPluginManager;
 
-    /*
+    /**
      * task execute manager
      */
-    private final WorkerManagerThread workerManager;
-
-    public TaskExecuteProcessor() {
-        this.taskCallbackService = 
SpringApplicationContext.getBean(TaskCallbackService.class);
-        this.workerConfig = 
SpringApplicationContext.getBean(WorkerConfig.class);
-        this.workerManager = 
SpringApplicationContext.getBean(WorkerManagerThread.class);
-    }
+    @Autowired
+    private WorkerManagerThread workerManager;
 
     /**
      * Pre-cache task to avoid extreme situations when kill task. There is no 
such task in the cache
@@ -101,12 +102,6 @@ public class TaskExecuteProcessor implements 
NettyRequestProcessor {
         
TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskRequest);
     }
 
-    public TaskExecuteProcessor(AlertClientService alertClientService, 
TaskPluginManager taskPluginManager) {
-        this();
-        this.alertClientService = alertClientService;
-        this.taskPluginManager = taskPluginManager;
-    }
-
     @Override
     public void process(Channel channel, Command command) {
         Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == 
command.getType(),
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index 96b51f3..d036110 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -30,9 +30,7 @@ import 
org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.remote.utils.Pair;
 import org.apache.dolphinscheduler.server.utils.ProcessUtils;
-import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.log.LogClientService;
 import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
@@ -45,6 +43,8 @@ import java.util.List;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
 
 import com.google.common.base.Preconditions;
 
@@ -53,30 +53,22 @@ import io.netty.channel.Channel;
 /**
  * task kill processor
  */
+@Component
 public class TaskKillProcessor implements NettyRequestProcessor {
 
     private final Logger logger = 
LoggerFactory.getLogger(TaskKillProcessor.class);
 
     /**
-     * worker config
-     */
-    private final WorkerConfig workerConfig;
-
-    /**
      * task callback service
      */
-    private final TaskCallbackService taskCallbackService;
+    @Autowired
+    private TaskCallbackService taskCallbackService;
 
-    /*
+    /**
      * task execute manager
      */
-    private final WorkerManagerThread workerManager;
-
-    public TaskKillProcessor() {
-        this.taskCallbackService = 
SpringApplicationContext.getBean(TaskCallbackService.class);
-        this.workerConfig = 
SpringApplicationContext.getBean(WorkerConfig.class);
-        this.workerManager = 
SpringApplicationContext.getBean(WorkerManagerThread.class);
-    }
+    @Autowired
+    private WorkerManagerThread workerManager;
 
     /**
      * task kill process
diff --git 
a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/config/BeanConfigTest.java
 
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/config/BeanConfigTest.java
new file mode 100644
index 0000000..85d208e
--- /dev/null
+++ 
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/config/BeanConfigTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.config;
+
+import org.apache.dolphinscheduler.service.alert.AlertClientService;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@SpringBootTest(classes = {BeanConfig.class, WorkerConfig.class})
+public class BeanConfigTest {
+
+    @Autowired
+    private AlertClientService alertClientService;
+
+    @Test
+    public void alertClientService() {
+        Assert.assertNotNull(alertClientService);
+    }
+}
\ No newline at end of file
diff --git 
a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/worker/processor/TaskCallbackServiceTestConfig.java
 
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTestConfig.java
similarity index 100%
rename from 
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/worker/processor/TaskCallbackServiceTestConfig.java
rename to 
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTestConfig.java
diff --git 
a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/worker/processor/TaskExecuteProcessorTest.java
 
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
similarity index 99%
rename from 
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/worker/processor/TaskExecuteProcessorTest.java
rename to 
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
index 5a345ab..b07ffa6 100644
--- 
a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/worker/processor/TaskExecuteProcessorTest.java
+++ 
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
@@ -49,7 +49,6 @@ import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * test task execute processor
@@ -74,7 +73,6 @@ public class TaskExecuteProcessorTest {
 
     private TaskExecuteRequestCommand taskRequestCommand;
 
-
     private AlertClientService alertClientService;
 
     private WorkerManagerThread workerManager;
diff --git 
a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/worker/registry/WorkerRegistryClientTest.java
 
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java
similarity index 100%
rename from 
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/worker/registry/WorkerRegistryClientTest.java
rename to 
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java

Reply via email to