This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 99d8276be7 Optimizing the scope of RPC base classes (#15946)
99d8276be7 is described below

commit 99d8276be7ff1f0cb47f6d67d7440a9380614073
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed May 8 11:46:09 2024 +0800

    Optimizing the scope of RPC base classes (#15946)
    
    * Optimizing the scope of RPC base classes
    
    * Fix UT
---
 .../dolphinscheduler/alert/rpc/AlertRpcServer.java |  16 +--
 .../alert/rpc/AlertRpcServerTest.java              |  22 +++--
 .../api/service/LoggerServiceTest.java             |  19 ++--
 ...voker.java => AbstractClientMethodInvoker.java} |   5 +-
 .../base/client/ClientInvocationHandler.java       |   5 +-
 .../extract/base/client/ClientMethodInvoker.java   |   2 +-
 .../base/client/IRpcClientProxyFactory.java        |   2 +-
 .../client/JdkDynamicRpcClientProxyFactory.java    |   5 +-
 .../base/{ => client}/NettyClientHandler.java      |  18 +---
 .../base/{ => client}/NettyRemotingClient.java     | 110 +++------------------
 .../{ => client}/NettyRemotingClientFactory.java   |   2 +-
 .../SingletonJdkDynamicRpcClientProxyFactory.java  |   1 -
 .../base/client/SyncClientMethodInvoker.java       |   5 +-
 .../extract/base/future/ResponseFuture.java        |  76 +-------------
 .../base/server/JdkDynamicServerHandler.java       |  12 +--
 .../base/{ => server}/NettyRemotingServer.java     |  57 +++++------
 .../{ => server}/NettyRemotingServerFactory.java   |   6 +-
 .../extract/base/server/RpcServer.java             |  74 ++++++++++++++
 .../extract/base/server/ServerMethodInvoker.java   |   4 +-
 .../base/server/ServerMethodInvokerImpl.java       |   9 +-
 ...voker.java => ServerMethodInvokerRegistry.java} |  10 +-
 .../server/SpringServerMethodInvokerDiscovery.java |  37 ++-----
 ...ngletonJdkDynamicRpcClientProxyFactoryTest.java |  12 +--
 .../server/master/rpc/MasterRpcServer.java         |  18 +---
 .../MasterRpcServerTest.java}                      |  28 +++---
 .../microbench/rpc/RpcBenchMarkTest.java           |  14 ++-
 .../server/worker/rpc/WorkerRpcServer.java         |  18 +---
 .../server/worker/rpc/WorkerRpcServerTest.java     |  29 +++---
 28 files changed, 233 insertions(+), 383 deletions(-)

diff --git 
a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/rpc/AlertRpcServer.java
 
b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/rpc/AlertRpcServer.java
index 3bd368573a..d73e4755dd 100644
--- 
a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/rpc/AlertRpcServer.java
+++ 
b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/rpc/AlertRpcServer.java
@@ -18,7 +18,6 @@
 package org.apache.dolphinscheduler.alert.rpc;
 
 import org.apache.dolphinscheduler.alert.config.AlertConfig;
-import org.apache.dolphinscheduler.extract.base.NettyRemotingServerFactory;
 import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
 import 
org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery;
 
@@ -31,20 +30,7 @@ import org.springframework.stereotype.Service;
 public class AlertRpcServer extends SpringServerMethodInvokerDiscovery 
implements AutoCloseable {
 
     public AlertRpcServer(AlertConfig alertConfig) {
-        super(NettyRemotingServerFactory.buildNettyRemotingServer(
-                
NettyServerConfig.builder().serverName("AlertRpcServer").listenPort(alertConfig.getPort()).build()));
+        
super(NettyServerConfig.builder().serverName("AlertRpcServer").listenPort(alertConfig.getPort()).build());
     }
 
-    public void start() {
-        log.info("Starting AlertRpcServer...");
-        nettyRemotingServer.start();
-        log.info("Started AlertRpcServer...");
-    }
-
-    @Override
-    public void close() {
-        log.info("Closing AlertRpcServer...");
-        nettyRemotingServer.close();
-        log.info("Closed AlertRpcServer...");
-    }
 }
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServerFactory.java
 
b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/rpc/AlertRpcServerTest.java
similarity index 67%
copy from 
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServerFactory.java
copy to 
dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/rpc/AlertRpcServerTest.java
index 6bf1b8d31c..75f16848fd 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServerFactory.java
+++ 
b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/rpc/AlertRpcServerTest.java
@@ -15,16 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.extract.base;
+package org.apache.dolphinscheduler.alert.rpc;
 
-import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
+import org.apache.dolphinscheduler.alert.config.AlertConfig;
 
-import lombok.experimental.UtilityClass;
+import org.junit.jupiter.api.Test;
 
-@UtilityClass
-public class NettyRemotingServerFactory {
+class AlertRpcServerTest {
 
-    public NettyRemotingServer buildNettyRemotingServer(NettyServerConfig 
nettyServerConfig) {
-        return new NettyRemotingServer(nettyServerConfig);
+    private final AlertRpcServer alertRpcServer = new AlertRpcServer(new 
AlertConfig());
+
+    @Test
+    void testStart() {
+        alertRpcServer.start();
+    }
+
+    @Test
+    void testClose() {
+        alertRpcServer.close();
     }
+
 }
diff --git 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java
 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java
index 4861e1004e..972092602f 100644
--- 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java
+++ 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java
@@ -40,7 +40,6 @@ import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
-import org.apache.dolphinscheduler.extract.base.NettyRemotingServer;
 import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
 import 
org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery;
 import org.apache.dolphinscheduler.extract.common.ILogService;
@@ -91,7 +90,7 @@ public class LoggerServiceTest {
     @Mock
     private TaskDefinitionMapper taskDefinitionMapper;
 
-    private NettyRemotingServer nettyRemotingServer;
+    private SpringServerMethodInvokerDiscovery 
springServerMethodInvokerDiscovery;
 
     private int nettyServerPort = 18080;
 
@@ -103,11 +102,10 @@ public class LoggerServiceTest {
             return;
         }
 
-        nettyRemotingServer = new 
NettyRemotingServer(NettyServerConfig.builder().listenPort(nettyServerPort).build());
-        nettyRemotingServer.start();
-        SpringServerMethodInvokerDiscovery springServerMethodInvokerDiscovery =
-                new SpringServerMethodInvokerDiscovery(nettyRemotingServer);
-        springServerMethodInvokerDiscovery.postProcessAfterInitialization(new 
ILogService() {
+        springServerMethodInvokerDiscovery = new 
SpringServerMethodInvokerDiscovery(
+                
NettyServerConfig.builder().serverName("TestLogServer").listenPort(nettyServerPort).build());
+        springServerMethodInvokerDiscovery.start();
+        
springServerMethodInvokerDiscovery.registerServerMethodInvokerProvider(new 
ILogService() {
 
             @Override
             public TaskInstanceLogFileDownloadResponse 
getTaskInstanceWholeLogFileBytes(TaskInstanceLogFileDownloadRequest 
taskInstanceLogFileDownloadRequest) {
@@ -142,13 +140,14 @@ public class LoggerServiceTest {
             public void removeTaskInstanceLog(String 
taskInstanceLogAbsolutePath) {
 
             }
-        }, "iLogServiceImpl");
+        });
+        springServerMethodInvokerDiscovery.start();
     }
 
     @AfterEach
     public void tearDown() {
-        if (nettyRemotingServer != null) {
-            nettyRemotingServer.close();
+        if (springServerMethodInvokerDiscovery != null) {
+            springServerMethodInvokerDiscovery.close();
         }
     }
 
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/BaseRemoteMethodInvoker.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/AbstractClientMethodInvoker.java
similarity index 83%
rename from 
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/BaseRemoteMethodInvoker.java
rename to 
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/AbstractClientMethodInvoker.java
index 519dd87199..b753f1efa7 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/BaseRemoteMethodInvoker.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/AbstractClientMethodInvoker.java
@@ -17,12 +17,11 @@
 
 package org.apache.dolphinscheduler.extract.base.client;
 
-import org.apache.dolphinscheduler.extract.base.NettyRemotingClient;
 import org.apache.dolphinscheduler.extract.base.utils.Host;
 
 import java.lang.reflect.Method;
 
-public abstract class BaseRemoteMethodInvoker implements ClientMethodInvoker {
+abstract class AbstractClientMethodInvoker implements ClientMethodInvoker {
 
     protected final String methodIdentifier;
 
@@ -32,7 +31,7 @@ public abstract class BaseRemoteMethodInvoker implements 
ClientMethodInvoker {
 
     protected final Host serverHost;
 
-    public BaseRemoteMethodInvoker(Host serverHost, Method localMethod, 
NettyRemotingClient nettyRemotingClient) {
+    AbstractClientMethodInvoker(Host serverHost, Method localMethod, 
NettyRemotingClient nettyRemotingClient) {
         this.serverHost = serverHost;
         this.localMethod = localMethod;
         this.nettyRemotingClient = nettyRemotingClient;
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/ClientInvocationHandler.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/ClientInvocationHandler.java
index d5c9ab73d3..41ec3e056d 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/ClientInvocationHandler.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/ClientInvocationHandler.java
@@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.extract.base.client;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import org.apache.dolphinscheduler.extract.base.NettyRemotingClient;
 import org.apache.dolphinscheduler.extract.base.RpcMethod;
 import org.apache.dolphinscheduler.extract.base.utils.Host;
 
@@ -31,7 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
-public class ClientInvocationHandler implements InvocationHandler {
+class ClientInvocationHandler implements InvocationHandler {
 
     private final NettyRemotingClient nettyRemotingClient;
 
@@ -39,7 +38,7 @@ public class ClientInvocationHandler implements 
InvocationHandler {
 
     private final Host serverHost;
 
-    public ClientInvocationHandler(Host serverHost, NettyRemotingClient 
nettyRemotingClient) {
+    ClientInvocationHandler(Host serverHost, NettyRemotingClient 
nettyRemotingClient) {
         this.serverHost = checkNotNull(serverHost);
         this.nettyRemotingClient = checkNotNull(nettyRemotingClient);
         this.methodInvokerMap = new ConcurrentHashMap<>();
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/ClientMethodInvoker.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/ClientMethodInvoker.java
index dcf53b0311..a287fd95ce 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/ClientMethodInvoker.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/ClientMethodInvoker.java
@@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.extract.base.client;
 
 import java.lang.reflect.Method;
 
-public interface ClientMethodInvoker {
+interface ClientMethodInvoker {
 
     Object invoke(Object proxy, Method method, Object[] args) throws Throwable;
 
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/IRpcClientProxyFactory.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/IRpcClientProxyFactory.java
index e60b0f18b0..afd3adf348 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/IRpcClientProxyFactory.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/IRpcClientProxyFactory.java
@@ -17,7 +17,7 @@
 
 package org.apache.dolphinscheduler.extract.base.client;
 
-public interface IRpcClientProxyFactory {
+interface IRpcClientProxyFactory {
 
     /**
      * Create the client proxy.
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/JdkDynamicRpcClientProxyFactory.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/JdkDynamicRpcClientProxyFactory.java
index 5635a88f34..bf329ab3fc 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/JdkDynamicRpcClientProxyFactory.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/JdkDynamicRpcClientProxyFactory.java
@@ -17,7 +17,6 @@
 
 package org.apache.dolphinscheduler.extract.base.client;
 
-import org.apache.dolphinscheduler.extract.base.NettyRemotingClient;
 import org.apache.dolphinscheduler.extract.base.utils.Host;
 
 import java.lang.reflect.Proxy;
@@ -34,7 +33,7 @@ import com.google.common.cache.LoadingCache;
 /**
  * This class is used to create a proxy client which will transform local 
method invocation to remove invocation.
  */
-public class JdkDynamicRpcClientProxyFactory implements IRpcClientProxyFactory 
{
+class JdkDynamicRpcClientProxyFactory implements IRpcClientProxyFactory {
 
     private final NettyRemotingClient nettyRemotingClient;
 
@@ -49,7 +48,7 @@ public class JdkDynamicRpcClientProxyFactory implements 
IRpcClientProxyFactory {
                 }
             });
 
-    public JdkDynamicRpcClientProxyFactory(NettyRemotingClient 
nettyRemotingClient) {
+    JdkDynamicRpcClientProxyFactory(NettyRemotingClient nettyRemotingClient) {
         this.nettyRemotingClient = nettyRemotingClient;
     }
 
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyClientHandler.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyClientHandler.java
similarity index 87%
rename from 
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyClientHandler.java
rename to 
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyClientHandler.java
index b0d998af83..be570eb577 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyClientHandler.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyClientHandler.java
@@ -15,16 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.extract.base;
+package org.apache.dolphinscheduler.extract.base.client;
 
+import org.apache.dolphinscheduler.extract.base.StandardRpcResponse;
 import org.apache.dolphinscheduler.extract.base.future.ResponseFuture;
 import org.apache.dolphinscheduler.extract.base.protocal.HeartBeatTransporter;
 import org.apache.dolphinscheduler.extract.base.protocal.Transporter;
 import org.apache.dolphinscheduler.extract.base.serialize.JsonSerializer;
 import org.apache.dolphinscheduler.extract.base.utils.ChannelUtils;
 
-import java.util.concurrent.ExecutorService;
-
 import lombok.extern.slf4j.Slf4j;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandler;
@@ -38,11 +37,8 @@ public class NettyClientHandler extends 
ChannelInboundHandlerAdapter {
 
     private final NettyRemotingClient nettyRemotingClient;
 
-    private final ExecutorService callbackExecutor;
-
-    public NettyClientHandler(NettyRemotingClient nettyRemotingClient, 
ExecutorService callbackExecutor) {
+    public NettyClientHandler(NettyRemotingClient nettyRemotingClient) {
         this.nettyRemotingClient = nettyRemotingClient;
-        this.callbackExecutor = callbackExecutor;
     }
 
     @Override
@@ -64,13 +60,7 @@ public class NettyClientHandler extends 
ChannelInboundHandlerAdapter {
         }
         StandardRpcResponse deserialize = 
JsonSerializer.deserialize(transporter.getBody(), StandardRpcResponse.class);
         future.setIRpcResponse(deserialize);
-        future.release();
-        if (future.getInvokeCallback() != null) {
-            future.removeFuture();
-            this.callbackExecutor.execute(future::executeInvokeCallback);
-        } else {
-            future.putResponse(deserialize);
-        }
+        future.putResponse(deserialize);
     }
 
     @Override
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingClient.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClient.java
similarity index 62%
rename from 
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingClient.java
rename to 
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClient.java
index e4682f5224..3999f5c9f5 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingClient.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClient.java
@@ -15,33 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.extract.base;
+package org.apache.dolphinscheduler.extract.base.client;
 
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.extract.base.IRpcResponse;
 import org.apache.dolphinscheduler.extract.base.config.NettyClientConfig;
 import org.apache.dolphinscheduler.extract.base.exception.RemotingException;
 import 
org.apache.dolphinscheduler.extract.base.exception.RemotingTimeoutException;
-import 
org.apache.dolphinscheduler.extract.base.exception.RemotingTooMuchRequestException;
-import org.apache.dolphinscheduler.extract.base.future.InvokeCallback;
-import org.apache.dolphinscheduler.extract.base.future.ReleaseSemaphore;
 import org.apache.dolphinscheduler.extract.base.future.ResponseFuture;
 import org.apache.dolphinscheduler.extract.base.protocal.Transporter;
 import org.apache.dolphinscheduler.extract.base.protocal.TransporterDecoder;
 import org.apache.dolphinscheduler.extract.base.protocal.TransporterEncoder;
-import 
org.apache.dolphinscheduler.extract.base.utils.CallerThreadExecutePolicy;
 import org.apache.dolphinscheduler.extract.base.utils.Constants;
 import org.apache.dolphinscheduler.extract.base.utils.Host;
 import org.apache.dolphinscheduler.extract.base.utils.NettyUtils;
 
 import java.net.InetSocketAddress;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -71,14 +62,8 @@ public class NettyRemotingClient implements AutoCloseable {
 
     private final NettyClientConfig clientConfig;
 
-    private final Semaphore asyncSemaphore = new Semaphore(1024, true);
-
-    private final ExecutorService callbackExecutor;
-
     private final NettyClientHandler clientHandler;
 
-    private final ScheduledExecutorService responseFutureExecutor;
-
     public NettyRemotingClient(final NettyClientConfig clientConfig) {
         this.clientConfig = clientConfig;
         ThreadFactory nettyClientThreadFactory = 
ThreadUtils.newDaemonThreadFactory("NettyClientThread-");
@@ -87,18 +72,7 @@ public class NettyRemotingClient implements AutoCloseable {
         } else {
             this.workerGroup = new 
NioEventLoopGroup(clientConfig.getWorkerThreads(), nettyClientThreadFactory);
         }
-        this.callbackExecutor = new ThreadPoolExecutor(
-                Constants.CPUS,
-                Constants.CPUS,
-                1,
-                TimeUnit.MINUTES,
-                new LinkedBlockingQueue<>(1000),
-                
ThreadUtils.newDaemonThreadFactory("NettyClientCallbackThread-"),
-                new CallerThreadExecutePolicy());
-        this.clientHandler = new NettyClientHandler(this, callbackExecutor);
-
-        this.responseFutureExecutor = 
Executors.newSingleThreadScheduledExecutor(
-                
ThreadUtils.newDaemonThreadFactory("NettyClientResponseFutureThread-"));
+        this.clientHandler = new NettyClientHandler(this);
 
         this.start();
     }
@@ -127,66 +101,9 @@ public class NettyRemotingClient implements AutoCloseable {
                                 .addLast(new TransporterDecoder(), 
clientHandler, new TransporterEncoder());
                     }
                 });
-        
this.responseFutureExecutor.scheduleWithFixedDelay(ResponseFuture::scanFutureTable,
 0, 1, TimeUnit.SECONDS);
         isStarted.compareAndSet(false, true);
     }
 
-    public void sendAsync(final Host host,
-                          final Transporter transporter,
-                          final long timeoutMillis,
-                          final InvokeCallback invokeCallback) throws 
InterruptedException, RemotingException {
-        final Channel channel = getChannel(host);
-        if (channel == null) {
-            throw new RemotingException("network error");
-        }
-        /*
-         * request unique identification
-         */
-        final long opaque = transporter.getHeader().getOpaque();
-        /*
-         * control concurrency number
-         */
-        boolean acquired = this.asyncSemaphore.tryAcquire(timeoutMillis, 
TimeUnit.MILLISECONDS);
-        if (acquired) {
-            final ReleaseSemaphore releaseSemaphore = new 
ReleaseSemaphore(this.asyncSemaphore);
-
-            /*
-             * response future
-             */
-            final ResponseFuture responseFuture = new ResponseFuture(opaque,
-                    timeoutMillis,
-                    invokeCallback,
-                    releaseSemaphore);
-            try {
-                channel.writeAndFlush(transporter).addListener(future -> {
-                    if (future.isSuccess()) {
-                        responseFuture.setSendOk(true);
-                        return;
-                    } else {
-                        responseFuture.setSendOk(false);
-                    }
-                    responseFuture.setCause(future.cause());
-                    responseFuture.putResponse(null);
-                    try {
-                        responseFuture.executeInvokeCallback();
-                    } catch (Exception ex) {
-                        log.error("execute callback error", ex);
-                    } finally {
-                        responseFuture.release();
-                    }
-                });
-            } catch (Exception ex) {
-                responseFuture.release();
-                throw new RemotingException(String.format("Send transporter to 
host: %s failed", host), ex);
-            }
-        } else {
-            String message = String.format(
-                    "try to acquire async semaphore timeout: %d, waiting 
thread num: %d, total permits: %d",
-                    timeoutMillis, asyncSemaphore.getQueueLength(), 
asyncSemaphore.availablePermits());
-            throw new RemotingTooMuchRequestException(message);
-        }
-    }
-
     public IRpcResponse sendSync(final Host host, final Transporter 
transporter,
                                  final long timeoutMillis) throws 
InterruptedException, RemotingException {
         final Channel channel = getChannel(host);
@@ -194,7 +111,7 @@ public class NettyRemotingClient implements AutoCloseable {
             throw new RemotingException(String.format("connect to : %s fail", 
host));
         }
         final long opaque = transporter.getHeader().getOpaque();
-        final ResponseFuture responseFuture = new ResponseFuture(opaque, 
timeoutMillis, null, null);
+        final ResponseFuture responseFuture = new ResponseFuture(opaque, 
timeoutMillis);
         channel.writeAndFlush(transporter).addListener(future -> {
             if (future.isSuccess()) {
                 responseFuture.setSendOk(true);
@@ -220,7 +137,7 @@ public class NettyRemotingClient implements AutoCloseable {
         return iRpcResponse;
     }
 
-    public Channel getChannel(Host host) {
+    private Channel getChannel(Host host) {
         Channel channel = channels.get(host);
         if (channel != null && channel.isActive()) {
             return channel;
@@ -235,9 +152,9 @@ public class NettyRemotingClient implements AutoCloseable {
      * @param isSync sync flag
      * @return channel
      */
-    public Channel createChannel(Host host, boolean isSync) {
-        ChannelFuture future;
+    private Channel createChannel(Host host, boolean isSync) {
         try {
+            ChannelFuture future;
             synchronized (bootstrap) {
                 future = bootstrap.connect(new InetSocketAddress(host.getIp(), 
host.getPort()));
             }
@@ -249,10 +166,11 @@ public class NettyRemotingClient implements AutoCloseable 
{
                 channels.put(host, channel);
                 return channel;
             }
-        } catch (Exception ex) {
-            log.warn(String.format("connect to %s error", host), ex);
+            throw new IllegalArgumentException("connect to host: " + host + " 
failed");
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException("Connect to host: " + host + " failed", 
e);
         }
-        return null;
     }
 
     @Override
@@ -263,12 +181,6 @@ public class NettyRemotingClient implements AutoCloseable {
                 if (workerGroup != null) {
                     this.workerGroup.shutdownGracefully();
                 }
-                if (callbackExecutor != null) {
-                    this.callbackExecutor.shutdownNow();
-                }
-                if (this.responseFutureExecutor != null) {
-                    this.responseFutureExecutor.shutdownNow();
-                }
                 log.info("netty client closed");
             } catch (Exception ex) {
                 log.error("netty client close exception", ex);
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingClientFactory.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClientFactory.java
similarity index 95%
rename from 
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingClientFactory.java
rename to 
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClientFactory.java
index 7bbebfbf3d..d14a8aa54e 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingClientFactory.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClientFactory.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.extract.base;
+package org.apache.dolphinscheduler.extract.base.client;
 
 import org.apache.dolphinscheduler.extract.base.config.NettyClientConfig;
 
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactory.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactory.java
index 28d82532be..44d310e70b 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactory.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactory.java
@@ -17,7 +17,6 @@
 
 package org.apache.dolphinscheduler.extract.base.client;
 
-import org.apache.dolphinscheduler.extract.base.NettyRemotingClientFactory;
 import org.apache.dolphinscheduler.extract.base.config.NettyClientConfig;
 
 public class SingletonJdkDynamicRpcClientProxyFactory {
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SyncClientMethodInvoker.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SyncClientMethodInvoker.java
index b5fdf3fb71..4731a22d0a 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SyncClientMethodInvoker.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SyncClientMethodInvoker.java
@@ -18,7 +18,6 @@
 package org.apache.dolphinscheduler.extract.base.client;
 
 import org.apache.dolphinscheduler.extract.base.IRpcResponse;
-import org.apache.dolphinscheduler.extract.base.NettyRemotingClient;
 import org.apache.dolphinscheduler.extract.base.RpcMethod;
 import org.apache.dolphinscheduler.extract.base.StandardRpcRequest;
 import 
org.apache.dolphinscheduler.extract.base.exception.MethodInvocationException;
@@ -29,9 +28,9 @@ import org.apache.dolphinscheduler.extract.base.utils.Host;
 
 import java.lang.reflect.Method;
 
-public class SyncClientMethodInvoker extends BaseRemoteMethodInvoker {
+class SyncClientMethodInvoker extends AbstractClientMethodInvoker {
 
-    public SyncClientMethodInvoker(Host serverHost, Method localMethod, 
NettyRemotingClient nettyRemotingClient) {
+    SyncClientMethodInvoker(Host serverHost, Method localMethod, 
NettyRemotingClient nettyRemotingClient) {
         super(serverHost, localMethod, nettyRemotingClient);
     }
 
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/future/ResponseFuture.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/future/ResponseFuture.java
index 35405c5578..1fbbd9ed6c 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/future/ResponseFuture.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/future/ResponseFuture.java
@@ -19,8 +19,6 @@ package org.apache.dolphinscheduler.extract.base.future;
 
 import org.apache.dolphinscheduler.extract.base.IRpcResponse;
 
-import java.util.Iterator;
-import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -34,17 +32,13 @@ import lombok.extern.slf4j.Slf4j;
 @Slf4j
 public class ResponseFuture {
 
-    private static final ConcurrentHashMap<Long, ResponseFuture> FUTURE_TABLE 
= new ConcurrentHashMap<>(256);
+    private static final ConcurrentHashMap<Long, ResponseFuture> FUTURE_TABLE 
= new ConcurrentHashMap<>();
 
     private final long opaque;
 
     // remove the timeout
     private final long timeoutMillis;
 
-    private final InvokeCallback invokeCallback;
-
-    private final ReleaseSemaphore releaseSemaphore;
-
     private final CountDownLatch latch = new CountDownLatch(1);
 
     private final long beginTimestamp = System.currentTimeMillis();
@@ -57,14 +51,9 @@ public class ResponseFuture {
 
     private Throwable cause;
 
-    public ResponseFuture(long opaque,
-                          long timeoutMillis,
-                          InvokeCallback invokeCallback,
-                          ReleaseSemaphore releaseSemaphore) {
+    public ResponseFuture(long opaque, long timeoutMillis) {
         this.opaque = opaque;
         this.timeoutMillis = timeoutMillis;
-        this.invokeCallback = invokeCallback;
-        this.releaseSemaphore = releaseSemaphore;
         FUTURE_TABLE.put(opaque, this);
     }
 
@@ -90,10 +79,6 @@ public class ResponseFuture {
         return FUTURE_TABLE.get(opaque);
     }
 
-    public void removeFuture() {
-        FUTURE_TABLE.remove(opaque);
-    }
-
     /**
      * whether timeout
      *
@@ -104,15 +89,6 @@ public class ResponseFuture {
         return diff > this.timeoutMillis;
     }
 
-    /**
-     * execute invoke callback
-     */
-    public void executeInvokeCallback() {
-        if (invokeCallback != null) {
-            invokeCallback.operationComplete(this);
-        }
-    }
-
     public boolean isSendOK() {
         return sendOk;
     }
@@ -129,52 +105,4 @@ public class ResponseFuture {
         return cause;
     }
 
-    public long getOpaque() {
-        return opaque;
-    }
-
-    public long getTimeoutMillis() {
-        return timeoutMillis;
-    }
-
-    public long getBeginTimestamp() {
-        return beginTimestamp;
-    }
-
-    public InvokeCallback getInvokeCallback() {
-        return invokeCallback;
-    }
-
-    /**
-     * release
-     */
-    public void release() {
-        if (this.releaseSemaphore != null) {
-            this.releaseSemaphore.release();
-        }
-    }
-
-    /**
-     * scan future table
-     */
-    public static void scanFutureTable() {
-        Iterator<Map.Entry<Long, ResponseFuture>> it = 
FUTURE_TABLE.entrySet().iterator();
-        while (it.hasNext()) {
-            Map.Entry<Long, ResponseFuture> next = it.next();
-            ResponseFuture future = next.getValue();
-            if ((future.getBeginTimestamp() + future.getTimeoutMillis() + 
1000) > System.currentTimeMillis()) {
-                continue;
-            }
-            try {
-                // todo: use thread pool to execute the async callback, 
otherwise will block the scan thread
-                future.release();
-                future.executeInvokeCallback();
-            } catch (Exception ex) {
-                log.error("ScanFutureTable, execute callback error, requestId: 
{}", future.getOpaque(), ex);
-            }
-            it.remove();
-            log.debug("Remove timeout request: {}", future);
-        }
-    }
-
 }
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java
index b4978172f1..f57ff0b609 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java
@@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.extract.base.server;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import org.apache.dolphinscheduler.extract.base.NettyRemotingServer;
 import org.apache.dolphinscheduler.extract.base.StandardRpcRequest;
 import org.apache.dolphinscheduler.extract.base.StandardRpcResponse;
 import org.apache.dolphinscheduler.extract.base.protocal.HeartBeatTransporter;
@@ -30,6 +29,7 @@ import 
org.apache.dolphinscheduler.extract.base.utils.ChannelUtils;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
 
 import lombok.extern.slf4j.Slf4j;
@@ -42,14 +42,14 @@ import io.netty.handler.timeout.IdleStateEvent;
 
 @Slf4j
 @ChannelHandler.Sharable
-public class JdkDynamicServerHandler extends ChannelInboundHandlerAdapter {
+class JdkDynamicServerHandler extends ChannelInboundHandlerAdapter {
 
-    private final NettyRemotingServer nettyRemotingServer;
+    private final ExecutorService methodInvokeExecutor;
 
     private final Map<String, ServerMethodInvoker> methodInvokerMap;
 
-    public JdkDynamicServerHandler(NettyRemotingServer nettyRemotingServer) {
-        this.nettyRemotingServer = nettyRemotingServer;
+    JdkDynamicServerHandler(ExecutorService methodInvokeExecutor) {
+        this.methodInvokeExecutor = methodInvokeExecutor;
         this.methodInvokerMap = new ConcurrentHashMap<>();
     }
 
@@ -90,7 +90,7 @@ public class JdkDynamicServerHandler extends 
ChannelInboundHandlerAdapter {
                 channel.writeAndFlush(response);
                 return;
             }
-            nettyRemotingServer.getDefaultExecutor().execute(() -> {
+            methodInvokeExecutor.execute(() -> {
                 StandardRpcResponse iRpcResponse;
                 try {
                     StandardRpcRequest standardRpcRequest =
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServer.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/NettyRemotingServer.java
similarity index 75%
rename from 
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServer.java
rename to 
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/NettyRemotingServer.java
index 365a17dd03..9beeaced3d 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServer.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/NettyRemotingServer.java
@@ -15,15 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.extract.base;
+package org.apache.dolphinscheduler.extract.base.server;
 
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
 import org.apache.dolphinscheduler.extract.base.exception.RemoteException;
 import org.apache.dolphinscheduler.extract.base.protocal.TransporterDecoder;
 import org.apache.dolphinscheduler.extract.base.protocal.TransporterEncoder;
-import org.apache.dolphinscheduler.extract.base.server.JdkDynamicServerHandler;
-import org.apache.dolphinscheduler.extract.base.server.ServerMethodInvoker;
 import org.apache.dolphinscheduler.extract.base.utils.Constants;
 import org.apache.dolphinscheduler.extract.base.utils.NettyUtils;
 
@@ -32,6 +30,7 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.ChannelFuture;
@@ -48,12 +47,15 @@ import io.netty.handler.timeout.IdleStateHandler;
  * remoting netty server
  */
 @Slf4j
-public class NettyRemotingServer {
+class NettyRemotingServer {
 
     private final ServerBootstrap serverBootstrap = new ServerBootstrap();
 
-    private final ExecutorService defaultExecutor = ThreadUtils
-            .newDaemonFixedThreadExecutor("NettyRemotingServerThread", 
Runtime.getRuntime().availableProcessors() * 2);
+    @Getter
+    private final String serverName;
+
+    @Getter
+    private final ExecutorService methodInvokerExecutor;
 
     private final EventLoopGroup bossGroup;
 
@@ -61,16 +63,20 @@ public class NettyRemotingServer {
 
     private final NettyServerConfig serverConfig;
 
-    private final JdkDynamicServerHandler serverHandler = new 
JdkDynamicServerHandler(this);
+    private final JdkDynamicServerHandler channelHandler;
 
     private final AtomicBoolean isStarted = new AtomicBoolean(false);
 
-    public NettyRemotingServer(final NettyServerConfig serverConfig) {
+    NettyRemotingServer(final NettyServerConfig serverConfig) {
         this.serverConfig = serverConfig;
+        this.serverName = serverConfig.getServerName();
+        this.methodInvokerExecutor = ThreadUtils.newDaemonFixedThreadExecutor(
+                serverName + "MethodInvoker-%d", 
Runtime.getRuntime().availableProcessors() * 2 + 1);
+        this.channelHandler = new 
JdkDynamicServerHandler(methodInvokerExecutor);
         ThreadFactory bossThreadFactory =
-                
ThreadUtils.newDaemonThreadFactory(serverConfig.getServerName() + 
"BossThread_%s");
+                ThreadUtils.newDaemonThreadFactory(serverName + 
"BossThread-%d");
         ThreadFactory workerThreadFactory =
-                
ThreadUtils.newDaemonThreadFactory(serverConfig.getServerName() + 
"WorkerThread_%s");
+                ThreadUtils.newDaemonThreadFactory(serverName + 
"WorkerThread-%d");
         if (Epoll.isAvailable()) {
             this.bossGroup = new EpollEventLoopGroup(1, bossThreadFactory);
             this.workGroup = new 
EpollEventLoopGroup(serverConfig.getWorkerThread(), workerThreadFactory);
@@ -80,7 +86,7 @@ public class NettyRemotingServer {
         }
     }
 
-    public void start() {
+    void start() {
         if (isStarted.compareAndSet(false, true)) {
             this.serverBootstrap
                     .group(this.bossGroup, this.workGroup)
@@ -103,9 +109,9 @@ public class NettyRemotingServer {
             try {
                 future = 
serverBootstrap.bind(serverConfig.getListenPort()).sync();
             } catch (Exception e) {
-                log.error("{} bind fail {}, exit", 
serverConfig.getServerName(), e.getMessage(), e);
                 throw new RemoteException(
-                        String.format("%s bind %s fail", 
serverConfig.getServerName(), serverConfig.getListenPort()));
+                        String.format("%s bind %s fail", 
serverConfig.getServerName(), serverConfig.getListenPort()),
+                        e);
             }
 
             if (future.isSuccess()) {
@@ -113,14 +119,9 @@ public class NettyRemotingServer {
                 return;
             }
 
-            if (future.cause() != null) {
-                throw new RemoteException(
-                        String.format("%s bind %s fail", 
serverConfig.getServerName(), serverConfig.getListenPort()),
-                        future.cause());
-            } else {
-                throw new RemoteException(
-                        String.format("%s bind %s fail", 
serverConfig.getServerName(), serverConfig.getListenPort()));
-            }
+            throw new RemoteException(
+                    String.format("%s bind %s fail", 
serverConfig.getServerName(), serverConfig.getListenPort()),
+                    future.cause());
         }
     }
 
@@ -135,18 +136,14 @@ public class NettyRemotingServer {
                 .addLast("decoder", new TransporterDecoder())
                 .addLast("server-idle-handle",
                         new IdleStateHandler(0, 0, 
Constants.NETTY_SERVER_HEART_BEAT_TIME, TimeUnit.MILLISECONDS))
-                .addLast("handler", serverHandler);
-    }
-
-    public ExecutorService getDefaultExecutor() {
-        return defaultExecutor;
+                .addLast("handler", channelHandler);
     }
 
-    public void registerMethodInvoker(ServerMethodInvoker methodInvoker) {
-        serverHandler.registerMethodInvoker(methodInvoker);
+    void registerMethodInvoker(ServerMethodInvoker methodInvoker) {
+        channelHandler.registerMethodInvoker(methodInvoker);
     }
 
-    public void close() {
+    void close() {
         if (isStarted.compareAndSet(true, false)) {
             try {
                 if (bossGroup != null) {
@@ -155,7 +152,7 @@ public class NettyRemotingServer {
                 if (workGroup != null) {
                     this.workGroup.shutdownGracefully();
                 }
-                defaultExecutor.shutdown();
+                methodInvokerExecutor.shutdown();
             } catch (Exception ex) {
                 log.error("netty server close exception", ex);
             }
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServerFactory.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/NettyRemotingServerFactory.java
similarity index 84%
rename from 
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServerFactory.java
rename to 
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/NettyRemotingServerFactory.java
index 6bf1b8d31c..70ed0529e8 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServerFactory.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/NettyRemotingServerFactory.java
@@ -15,16 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.extract.base;
+package org.apache.dolphinscheduler.extract.base.server;
 
 import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
 
 import lombok.experimental.UtilityClass;
 
 @UtilityClass
-public class NettyRemotingServerFactory {
+class NettyRemotingServerFactory {
 
-    public NettyRemotingServer buildNettyRemotingServer(NettyServerConfig 
nettyServerConfig) {
+    NettyRemotingServer buildNettyRemotingServer(NettyServerConfig 
nettyServerConfig) {
         return new NettyRemotingServer(nettyServerConfig);
     }
 }
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/RpcServer.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/RpcServer.java
new file mode 100644
index 0000000000..213868ba46
--- /dev/null
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/RpcServer.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.extract.base.server;
+
+import org.apache.dolphinscheduler.extract.base.RpcMethod;
+import org.apache.dolphinscheduler.extract.base.RpcService;
+import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
+
+import java.lang.reflect.Method;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * The RpcServer based on Netty. The server will register the method invoker 
and provide the service to the client.
+ * Once the server is started, it will listen on the port and wait for the 
client to connect.
+ * <pre>
+ *          RpcServer rpcServer = new RpcServer(new NettyServerConfig());
+ *          rpcServer.registerServerMethodInvokerProvider(new 
ServerMethodInvokerProviderImpl());
+ *          rpcServer.start();
+ * </pre>
+ */
+@Slf4j
+public class RpcServer implements ServerMethodInvokerRegistry, AutoCloseable {
+
+    private final NettyRemotingServer nettyRemotingServer;
+
+    public RpcServer(NettyServerConfig nettyServerConfig) {
+        this.nettyRemotingServer = 
NettyRemotingServerFactory.buildNettyRemotingServer(nettyServerConfig);
+    }
+
+    public void start() {
+        nettyRemotingServer.start();
+    }
+
+    @Override
+    public void registerServerMethodInvokerProvider(Object 
serverMethodInvokerProviderBean) {
+        for (Class<?> anInterface : 
serverMethodInvokerProviderBean.getClass().getInterfaces()) {
+            if (anInterface.getAnnotation(RpcService.class) == null) {
+                continue;
+            }
+            for (Method method : anInterface.getDeclaredMethods()) {
+                RpcMethod rpcMethod = method.getAnnotation(RpcMethod.class);
+                if (rpcMethod == null) {
+                    continue;
+                }
+                ServerMethodInvoker serverMethodInvoker =
+                        new 
ServerMethodInvokerImpl(serverMethodInvokerProviderBean, method);
+                nettyRemotingServer.registerMethodInvoker(serverMethodInvoker);
+                log.debug("Register ServerMethodInvoker: {} to bean: {}",
+                        serverMethodInvoker.getMethodIdentify(), 
serverMethodInvoker.getMethodProviderIdentify());
+            }
+        }
+    }
+
+    @Override
+    public void close() {
+        nettyRemotingServer.close();
+    }
+}
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvoker.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvoker.java
index ee633217b2..151b54bb97 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvoker.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvoker.java
@@ -17,10 +17,12 @@
 
 package org.apache.dolphinscheduler.extract.base.server;
 
-public interface ServerMethodInvoker {
+interface ServerMethodInvoker {
 
     String getMethodIdentify();
 
+    String getMethodProviderIdentify();
+
     Object invoke(final Object... arg) throws Throwable;
 
 }
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvokerImpl.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvokerImpl.java
index eea9da5e14..4c29650aa0 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvokerImpl.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvokerImpl.java
@@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.extract.base.server;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 
-public class ServerMethodInvokerImpl implements ServerMethodInvoker {
+class ServerMethodInvokerImpl implements ServerMethodInvoker {
 
     private final Object serviceBean;
 
@@ -28,7 +28,7 @@ public class ServerMethodInvokerImpl implements 
ServerMethodInvoker {
 
     private final String methodIdentify;
 
-    public ServerMethodInvokerImpl(Object serviceBean, Method method) {
+    ServerMethodInvokerImpl(Object serviceBean, Method method) {
         this.serviceBean = serviceBean;
         this.method = method;
         this.methodIdentify = method.toGenericString();
@@ -48,4 +48,9 @@ public class ServerMethodInvokerImpl implements 
ServerMethodInvoker {
     public String getMethodIdentify() {
         return methodIdentify;
     }
+
+    @Override
+    public String getMethodProviderIdentify() {
+        return serviceBean.getClass().getName();
+    }
 }
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvoker.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvokerRegistry.java
similarity index 68%
copy from 
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvoker.java
copy to 
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvokerRegistry.java
index ee633217b2..4e56be2617 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvoker.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvokerRegistry.java
@@ -17,10 +17,12 @@
 
 package org.apache.dolphinscheduler.extract.base.server;
 
-public interface ServerMethodInvoker {
+interface ServerMethodInvokerRegistry {
 
-    String getMethodIdentify();
-
-    Object invoke(final Object... arg) throws Throwable;
+    /**
+     * Register service object, which will be used to invoke the {@link 
ServerMethodInvoker}.
+     * The serverMethodInvokerProviderObject should implement with interface 
which contains {@link org.apache.dolphinscheduler.extract.base.RpcService} 
annotation.
+     */
+    void registerServerMethodInvokerProvider(Object 
serverMethodInvokerProviderObject);
 
 }
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/SpringServerMethodInvokerDiscovery.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/SpringServerMethodInvokerDiscovery.java
index 2b87a70080..de4943990c 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/SpringServerMethodInvokerDiscovery.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/SpringServerMethodInvokerDiscovery.java
@@ -17,11 +17,7 @@
 
 package org.apache.dolphinscheduler.extract.base.server;
 
-import org.apache.dolphinscheduler.extract.base.NettyRemotingServer;
-import org.apache.dolphinscheduler.extract.base.RpcMethod;
-import org.apache.dolphinscheduler.extract.base.RpcService;
-
-import java.lang.reflect.Method;
+import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -29,38 +25,21 @@ import org.springframework.beans.BeansException;
 import org.springframework.beans.factory.config.BeanPostProcessor;
 import org.springframework.lang.Nullable;
 
+/**
+ * The RpcServer which will auto discovery the {@link ServerMethodInvoker} 
from Spring container.
+ */
 @Slf4j
-public class SpringServerMethodInvokerDiscovery implements BeanPostProcessor {
+public class SpringServerMethodInvokerDiscovery extends RpcServer implements 
BeanPostProcessor {
 
-    protected final NettyRemotingServer nettyRemotingServer;
-
-    public SpringServerMethodInvokerDiscovery(NettyRemotingServer 
nettyRemotingServer) {
-        this.nettyRemotingServer = nettyRemotingServer;
+    public SpringServerMethodInvokerDiscovery(NettyServerConfig 
nettyServerConfig) {
+        super(nettyServerConfig);
     }
 
     @Nullable
     @Override
     public Object postProcessAfterInitialization(Object bean, String beanName) 
throws BeansException {
-        Class<?>[] interfaces = bean.getClass().getInterfaces();
-        for (Class<?> anInterface : interfaces) {
-            if (anInterface.getAnnotation(RpcService.class) == null) {
-                continue;
-            }
-            registerRpcMethodInvoker(anInterface, bean, beanName);
-        }
+        registerServerMethodInvokerProvider(bean);
         return bean;
     }
 
-    private void registerRpcMethodInvoker(Class<?> anInterface, Object bean, 
String beanName) {
-        Method[] declaredMethods = anInterface.getDeclaredMethods();
-        for (Method method : declaredMethods) {
-            RpcMethod rpcMethod = method.getAnnotation(RpcMethod.class);
-            if (rpcMethod == null) {
-                continue;
-            }
-            ServerMethodInvoker methodInvoker = new 
ServerMethodInvokerImpl(bean, method);
-            nettyRemotingServer.registerMethodInvoker(methodInvoker);
-            log.debug("Register ServerMethodInvoker: {} to bean: {}", 
methodInvoker.getMethodIdentify(), beanName);
-        }
-    }
 }
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactoryTest.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactoryTest.java
index 521cf7c75a..92ed49934c 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactoryTest.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactoryTest.java
@@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.extract.base.client;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
-import org.apache.dolphinscheduler.extract.base.NettyRemotingServer;
 import org.apache.dolphinscheduler.extract.base.RpcMethod;
 import org.apache.dolphinscheduler.extract.base.RpcService;
 import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
@@ -37,7 +36,7 @@ import org.junit.jupiter.api.Test;
 
 public class SingletonJdkDynamicRpcClientProxyFactoryTest {
 
-    private NettyRemotingServer nettyRemotingServer;
+    private SpringServerMethodInvokerDiscovery 
springServerMethodInvokerDiscovery;
 
     private String serverAddress;
 
@@ -48,11 +47,10 @@ public class SingletonJdkDynamicRpcClientProxyFactoryTest {
                 .serverName("ApiServer")
                 .listenPort(listenPort)
                 .build();
-        nettyRemotingServer = new NettyRemotingServer(nettyServerConfig);
-        nettyRemotingServer.start();
         serverAddress = "localhost:" + listenPort;
-        new SpringServerMethodInvokerDiscovery(nettyRemotingServer)
-                .postProcessAfterInitialization(new IServiceImpl(), 
"iServiceImpl");
+        springServerMethodInvokerDiscovery = new 
SpringServerMethodInvokerDiscovery(nettyServerConfig);
+        
springServerMethodInvokerDiscovery.registerServerMethodInvokerProvider(new 
IServiceImpl());
+        springServerMethodInvokerDiscovery.start();
     }
 
     @Test
@@ -82,7 +80,7 @@ public class SingletonJdkDynamicRpcClientProxyFactoryTest {
 
     @AfterEach
     public void tearDown() {
-        nettyRemotingServer.close();
+        springServerMethodInvokerDiscovery.close();
     }
 
     @RpcService
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcServer.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcServer.java
index 0eaf885d11..ab89b021d6 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcServer.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcServer.java
@@ -17,7 +17,6 @@
 
 package org.apache.dolphinscheduler.server.master.rpc;
 
-import org.apache.dolphinscheduler.extract.base.NettyRemotingServerFactory;
 import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
 import 
org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
@@ -31,21 +30,8 @@ import org.springframework.stereotype.Component;
 public class MasterRpcServer extends SpringServerMethodInvokerDiscovery 
implements AutoCloseable {
 
     public MasterRpcServer(MasterConfig masterConfig) {
-        
super(NettyRemotingServerFactory.buildNettyRemotingServer(NettyServerConfig.builder()
-                
.serverName("MasterRpcServer").listenPort(masterConfig.getListenPort()).build()));
-    }
-
-    public void start() {
-        log.info("Starting MasterRPCServer...");
-        nettyRemotingServer.start();
-        log.info("Started MasterRPCServer...");
-    }
-
-    @Override
-    public void close() {
-        log.info("Closing MasterRPCServer...");
-        nettyRemotingServer.close();
-        log.info("Closed MasterRPCServer...");
+        
super(NettyServerConfig.builder().serverName("MasterRpcServer").listenPort(masterConfig.getListenPort())
+                .build());
     }
 
 }
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessorTestConfig.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcServerTest.java
similarity index 60%
copy from 
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessorTestConfig.java
copy to 
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcServerTest.java
index df88de34e3..1e5a77edb3 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessorTestConfig.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcServerTest.java
@@ -15,22 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.master.processor;
+package org.apache.dolphinscheduler.server.master.rpc;
 
-import 
org.apache.dolphinscheduler.server.master.utils.DataQualityResultOperator;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 
-import org.mockito.Mockito;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
 
-/**
- * dependency config
- */
-@Configuration
-public class TaskResponseProcessorTestConfig {
+class MasterRpcServerTest {
+
+    private final MasterRpcServer masterRpcServer = new MasterRpcServer(new 
MasterConfig());
+
+    @Test
+    void testStart() {
+        Assertions.assertDoesNotThrow(masterRpcServer::start);
+    }
 
-    @Bean
-    public DataQualityResultOperator dataQualityResultOperator() {
-        return Mockito.mock(DataQualityResultOperator.class);
+    @Test
+    void testClose() {
+        Assertions.assertDoesNotThrow(masterRpcServer::close);
     }
 }
diff --git 
a/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/RpcBenchMarkTest.java
 
b/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/RpcBenchMarkTest.java
index 1a3e4ab1e2..496983118f 100644
--- 
a/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/RpcBenchMarkTest.java
+++ 
b/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/RpcBenchMarkTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.dolphinscheduler.microbench.rpc;
 
-import org.apache.dolphinscheduler.extract.base.NettyRemotingServer;
 import 
org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
 import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
 import 
org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery;
@@ -46,18 +45,17 @@ import org.openjdk.jmh.infra.Blackhole;
 @BenchmarkMode({Mode.Throughput, Mode.AverageTime, Mode.SampleTime})
 public class RpcBenchMarkTest extends AbstractBaseBenchmark {
 
-    private NettyRemotingServer nettyRemotingServer;
+    private SpringServerMethodInvokerDiscovery 
springServerMethodInvokerDiscovery;
 
     private IService iService;
 
     @Setup
     public void before() {
-        nettyRemotingServer = new NettyRemotingServer(
-                
NettyServerConfig.builder().serverName("NettyRemotingServer").listenPort(12345).build());
-        nettyRemotingServer.start();
-        SpringServerMethodInvokerDiscovery springServerMethodInvokerDiscovery =
-                new SpringServerMethodInvokerDiscovery(nettyRemotingServer);
+        NettyServerConfig nettyServerConfig =
+                
NettyServerConfig.builder().serverName("NettyRemotingServer").listenPort(12345).build();
+        springServerMethodInvokerDiscovery = new 
SpringServerMethodInvokerDiscovery(nettyServerConfig);
         springServerMethodInvokerDiscovery.postProcessAfterInitialization(new 
IServiceImpl(), "iServiceImpl");
+        springServerMethodInvokerDiscovery.start();
         iService =
                 
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient("localhost:12345", 
IService.class);
     }
@@ -72,6 +70,6 @@ public class RpcBenchMarkTest extends AbstractBaseBenchmark {
 
     @TearDown
     public void after() {
-        nettyRemotingServer.close();
+        springServerMethodInvokerDiscovery.close();
     }
 }
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
index 7733fbba4f..b9f3855cf9 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
@@ -17,7 +17,6 @@
 
 package org.apache.dolphinscheduler.server.worker.rpc;
 
-import org.apache.dolphinscheduler.extract.base.NettyRemotingServerFactory;
 import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
 import 
org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
@@ -33,21 +32,8 @@ import org.springframework.stereotype.Service;
 public class WorkerRpcServer extends SpringServerMethodInvokerDiscovery 
implements Closeable {
 
     public WorkerRpcServer(WorkerConfig workerConfig) {
-        
super(NettyRemotingServerFactory.buildNettyRemotingServer(NettyServerConfig.builder()
-                
.serverName("WorkerRpcServer").listenPort(workerConfig.getListenPort()).build()));
-    }
-
-    public void start() {
-        log.info("WorkerRpcServer starting...");
-        nettyRemotingServer.start();
-        log.info("WorkerRpcServer started...");
-    }
-
-    @Override
-    public void close() {
-        log.info("WorkerRpcServer closing");
-        nettyRemotingServer.close();
-        log.info("WorkerRpcServer closed");
+        
super(NettyServerConfig.builder().serverName("WorkerRpcServer").listenPort(workerConfig.getListenPort())
+                .build());
     }
 
 }
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessorTestConfig.java
 
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServerTest.java
similarity index 60%
rename from 
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessorTestConfig.java
rename to 
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServerTest.java
index df88de34e3..d27eaeeadf 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessorTestConfig.java
+++ 
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServerTest.java
@@ -15,22 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.master.processor;
+package org.apache.dolphinscheduler.server.worker.rpc;
 
-import 
org.apache.dolphinscheduler.server.master.utils.DataQualityResultOperator;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 
-import org.mockito.Mockito;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
 
-/**
- * dependency config
- */
-@Configuration
-public class TaskResponseProcessorTestConfig {
+class WorkerRpcServerTest {
+
+    private final WorkerRpcServer workerRpcServer = new WorkerRpcServer(new 
WorkerConfig());
 
-    @Bean
-    public DataQualityResultOperator dataQualityResultOperator() {
-        return Mockito.mock(DataQualityResultOperator.class);
+    @Test
+    void testStart() {
+        Assertions.assertDoesNotThrow(workerRpcServer::start);
     }
+
+    @Test
+    void testClose() {
+        Assertions.assertDoesNotThrow(workerRpcServer::close);
+    }
+
 }

Reply via email to