wu-sheng closed pull request #1505: Feature/oap/remote
URL: https://github.com/apache/incubator-skywalking/pull/1505
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManagerTest.java
 
b/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManagerTest.java
index 2e6913030..55671bce0 100644
--- 
a/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManagerTest.java
+++ 
b/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManagerTest.java
@@ -19,15 +19,14 @@
 package org.apache.skywalking.oap.server.cluster.plugin.standalone;
 
 import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.*;
 
 public class StandaloneManagerTest {
     @Test
     public void test() {
         StandaloneManager standaloneManager = new StandaloneManager();
-        RemoteInstance remote1 = new RemoteInstance();
-        RemoteInstance remote2 = new RemoteInstance();
+        RemoteInstance remote1 = new RemoteInstance("A", 100, true);
+        RemoteInstance remote2 = new RemoteInstance("B", 100, false);
 
         standaloneManager.registerRemote(remote1);
         Assert.assertEquals(remote1, 
standaloneManager.queryRemoteNodes().get(0));
diff --git 
a/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ClusterModuleZookeeperProviderTestCase.java
 
b/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ClusterModuleZookeeperProviderTestCase.java
index bbffa9ffc..a318481ed 100644
--- 
a/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ClusterModuleZookeeperProviderTestCase.java
+++ 
b/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ClusterModuleZookeeperProviderTestCase.java
@@ -21,16 +21,9 @@
 import java.io.IOException;
 import java.util.List;
 import org.apache.curator.test.TestingServer;
-import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
-import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
-import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
-import org.apache.skywalking.oap.server.core.cluster.ServiceRegisterException;
-import org.apache.skywalking.oap.server.library.module.ModuleStartException;
-import 
org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+import org.apache.skywalking.oap.server.core.cluster.*;
+import org.apache.skywalking.oap.server.library.module.*;
+import org.junit.*;
 
 /**
  * @author peng-yongsheng
@@ -59,9 +52,7 @@ public void testStart() throws ServiceNotProvidedException, 
ModuleStartException
         ClusterRegister moduleRegister = 
provider.getService(ClusterRegister.class);
         ClusterNodesQuery clusterNodesQuery = 
provider.getService(ClusterNodesQuery.class);
 
-        RemoteInstance remoteInstance = new RemoteInstance();
-        remoteInstance.setHost("ProviderAHost");
-        remoteInstance.setPort(1000);
+        RemoteInstance remoteInstance = new RemoteInstance("ProviderAHost", 
1000, true);
 
         moduleRegister.registerRemote(remoteInstance);
 
diff --git a/oap-server/server-core/pom.xml b/oap-server/server-core/pom.xml
index 767e1c8bd..ca6ac7abe 100644
--- a/oap-server/server-core/pom.xml
+++ b/oap-server/server-core/pom.xml
@@ -41,6 +41,11 @@
             <artifactId>library-util</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.skywalking</groupId>
+            <artifactId>library-client</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.skywalking</groupId>
             <artifactId>library-server</artifactId>
@@ -57,4 +62,49 @@
             <version>${project.version}</version>
         </dependency>
     </dependencies>
+
+    <build>
+        <extensions>
+            <extension>
+                <groupId>kr.motd.maven</groupId>
+                <artifactId>os-maven-plugin</artifactId>
+                <version>1.4.1.Final</version>
+            </extension>
+        </extensions>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-resources-plugin</artifactId>
+                <version>2.4.3</version>
+                <configuration>
+                    <encoding>${project.build.sourceEncoding}</encoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.xolstice.maven.plugins</groupId>
+                <artifactId>protobuf-maven-plugin</artifactId>
+                <version>0.5.0</version>
+                <configuration>
+                    <!--
+                      The version of protoc must match protobuf-java. If you 
don't depend on
+                      protobuf-java directly, you will be transitively 
depending on the
+                      protobuf-java version that grpc depends on.
+                    -->
+                    
<protocArtifact>com.google.protobuf:protoc:3.3.0:exe:${os.detected.classifier}
+                    </protocArtifact>
+                    <pluginId>grpc-java</pluginId>
+                    
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.4.0:exe:${os.detected.classifier}
+                    </pluginArtifact>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>compile</goal>
+                            <goal>compile-custom</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 </project>
\ No newline at end of file
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
index 6f3de4875..b03152fd6 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
@@ -19,7 +19,11 @@
 package org.apache.skywalking.oap.server.core;
 
 import java.util.*;
+import 
org.apache.skywalking.oap.server.core.analysis.indicator.define.IndicatorMapper;
+import 
org.apache.skywalking.oap.server.core.analysis.worker.define.WorkerMapper;
 import org.apache.skywalking.oap.server.core.receiver.SourceReceiver;
+import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
+import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
 import org.apache.skywalking.oap.server.core.server.*;
 import org.apache.skywalking.oap.server.library.module.ModuleDefine;
 
@@ -38,6 +42,7 @@
         List<Class> classes = new ArrayList<>();
         addServerInterface(classes);
         addReceiverInterface(classes);
+        addInsideService(classes);
 
         return classes.toArray(new Class[] {});
     }
@@ -47,6 +52,13 @@ private void addServerInterface(List<Class> classes) {
         classes.add(JettyHandlerRegister.class);
     }
 
+    private void addInsideService(List<Class> classes) {
+        classes.add(IndicatorMapper.class);
+        classes.add(WorkerMapper.class);
+        classes.add(RemoteClientManager.class);
+        classes.add(RemoteSenderService.class);
+    }
+
     private void addReceiverInterface(List<Class> classes) {
         classes.add(SourceReceiver.class);
     }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 1af3873de..757c34a76 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -18,24 +18,18 @@
 
 package org.apache.skywalking.oap.server.core;
 
-import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
-import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
-import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
-import org.apache.skywalking.oap.server.core.receiver.SourceReceiver;
-import org.apache.skywalking.oap.server.core.receiver.SourceReceiverImpl;
-import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
-import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegisterImpl;
-import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister;
-import org.apache.skywalking.oap.server.core.server.JettyHandlerRegisterImpl;
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
-import org.apache.skywalking.oap.server.library.module.ModuleProvider;
-import org.apache.skywalking.oap.server.library.module.ModuleStartException;
-import 
org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
+import org.apache.skywalking.oap.server.core.analysis.indicator.define.*;
+import org.apache.skywalking.oap.server.core.analysis.worker.define.*;
+import org.apache.skywalking.oap.server.core.cluster.*;
+import org.apache.skywalking.oap.server.core.receiver.*;
+import org.apache.skywalking.oap.server.core.remote.*;
+import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
+import org.apache.skywalking.oap.server.core.server.*;
+import org.apache.skywalking.oap.server.library.module.*;
 import org.apache.skywalking.oap.server.library.server.ServerException;
 import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
 import org.apache.skywalking.oap.server.library.server.jetty.JettyServer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
 
 /**
  * @author peng-yongsheng
@@ -47,17 +41,21 @@
     private final CoreModuleConfig moduleConfig;
     private GRPCServer grpcServer;
     private JettyServer jettyServer;
+    private final IndicatorMapper indicatorMapper;
+    private final WorkerMapper workerMapper;
 
     public CoreModuleProvider() {
         super();
         this.moduleConfig = new CoreModuleConfig();
+        this.indicatorMapper = new IndicatorMapper();
+        this.workerMapper = new WorkerMapper(getManager());
     }
 
     @Override public String name() {
         return "default";
     }
 
-    @Override public Class module() {
+    @Override public Class<? extends ModuleDefine> module() {
         return CoreModule.class;
     }
 
@@ -75,11 +73,24 @@ public CoreModuleProvider() {
         this.registerServiceImplementation(GRPCHandlerRegister.class, new 
GRPCHandlerRegisterImpl(grpcServer));
         this.registerServiceImplementation(JettyHandlerRegister.class, new 
JettyHandlerRegisterImpl(jettyServer));
 
-        this.registerServiceImplementation(SourceReceiver.class, new 
SourceReceiverImpl());
+        this.registerServiceImplementation(SourceReceiver.class, new 
SourceReceiverImpl(getManager()));
+
+        this.registerServiceImplementation(IndicatorMapper.class, 
indicatorMapper);
+        this.registerServiceImplementation(WorkerMapper.class, workerMapper);
+
+        this.registerServiceImplementation(RemoteClientManager.class, new 
RemoteClientManager(getManager()));
+        this.registerServiceImplementation(RemoteSenderService.class, new 
RemoteSenderService(getManager()));
     }
 
-    @Override public void start() {
+    @Override public void start() throws ModuleStartException {
+        grpcServer.addHandler(new RemoteServiceHandler(getManager()));
 
+        try {
+            indicatorMapper.load();
+            workerMapper.load();
+        } catch (IndicatorDefineLoadException | WorkerDefineLoadException e) {
+            throw new ModuleStartException(e.getMessage(), e);
+        }
     }
 
     @Override public void notifyAfterCompleted() throws ModuleStartException {
@@ -90,9 +101,7 @@ public CoreModuleProvider() {
             throw new ModuleStartException(e.getMessage(), e);
         }
 
-        RemoteInstance gRPCServerInstance = new RemoteInstance();
-        gRPCServerInstance.setHost(moduleConfig.getGRPCHost());
-        gRPCServerInstance.setPort(moduleConfig.getGRPCPort());
+        RemoteInstance gRPCServerInstance = new 
RemoteInstance(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), true);
         
this.getManager().find(ClusterModule.NAME).getService(ClusterRegister.class).registerRemote(gRPCServerInstance);
     }
 
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
index de8da4e4e..931738273 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
@@ -21,6 +21,7 @@
 import java.util.*;
 import 
org.apache.skywalking.oap.server.core.analysis.endpoint.EndpointDispatcher;
 import org.apache.skywalking.oap.server.core.receiver.Scope;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.slf4j.*;
 
 /**
@@ -32,9 +33,9 @@
 
     private Map<Scope, SourceDispatcher> dispatcherMap;
 
-    public DispatcherManager() {
+    public DispatcherManager(ModuleManager moduleManager) {
         this.dispatcherMap = new HashMap<>();
-        this.dispatcherMap.put(Scope.Endpoint, new EndpointDispatcher());
+        this.dispatcherMap.put(Scope.Endpoint, new 
EndpointDispatcher(moduleManager));
     }
 
     public SourceDispatcher getDispatcher(Scope scope) {
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/StreamData.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/StreamData.java
index 63f149f7b..57c75d449 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/StreamData.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/StreamData.java
@@ -18,10 +18,12 @@
 
 package org.apache.skywalking.oap.server.core.analysis.data;
 
+import org.apache.skywalking.oap.server.core.remote.*;
+
 /**
  * @author peng-yongsheng
  */
-public abstract class StreamData implements QueueData {
+public abstract class StreamData implements QueueData, Serializable, 
Deserializable {
 
     private EndOfBatchContext endOfBatchContext;
 
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointDispatcher.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointDispatcher.java
index 152ac5eb9..bcb3a2c66 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointDispatcher.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointDispatcher.java
@@ -18,18 +18,22 @@
 
 package org.apache.skywalking.oap.server.core.analysis.endpoint;
 
+import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
+import 
org.apache.skywalking.oap.server.core.analysis.worker.define.WorkerMapper;
 import org.apache.skywalking.oap.server.core.receiver.Endpoint;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
 
 /**
  * @author peng-yongsheng
  */
 public class EndpointDispatcher implements SourceDispatcher<Endpoint> {
 
-    private final EndpointLatencyAvgAggregator avgAggregator;
+    private final ModuleManager moduleManager;
+    private EndpointLatencyAvgAggregateWorker avgAggregator;
 
-    public EndpointDispatcher() {
-        this.avgAggregator = new EndpointLatencyAvgAggregator();
+    public EndpointDispatcher(ModuleManager moduleManager) {
+        this.moduleManager = moduleManager;
     }
 
     @Override public void dispatch(Endpoint source) {
@@ -37,7 +41,14 @@ public EndpointDispatcher() {
     }
 
     private void avg(Endpoint source) {
-        EndpointLatencyAvgIndicator indicator = new 
EndpointLatencyAvgIndicator(source.getTimeBucket(), source.getId());
+        if (avgAggregator == null) {
+            WorkerMapper workerMapper = 
moduleManager.find(CoreModule.NAME).getService(WorkerMapper.class);
+            avgAggregator = 
(EndpointLatencyAvgAggregateWorker)workerMapper.findInstanceByClass(EndpointLatencyAvgAggregateWorker.class);
+        }
+
+        EndpointLatencyAvgIndicator indicator = new 
EndpointLatencyAvgIndicator();
+        indicator.setId(source.getId());
+        indicator.setTimeBucket(source.getTimeBucket());
         indicator.combine(source.getLatency(), 1);
         avgAggregator.in(indicator);
     }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregateWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregateWorker.java
new file mode 100644
index 000000000..62d51341e
--- /dev/null
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregateWorker.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.endpoint;
+
+import 
org.apache.skywalking.oap.server.core.analysis.worker.AbstractAggregatorWorker;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class EndpointLatencyAvgAggregateWorker extends 
AbstractAggregatorWorker<EndpointLatencyAvgIndicator> {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(EndpointLatencyAvgAggregateWorker.class);
+
+    private final EndpointLatencyAvgRemoteWorker remoter;
+
+    public EndpointLatencyAvgAggregateWorker(ModuleManager moduleManager) {
+        super(moduleManager);
+        this.remoter = new EndpointLatencyAvgRemoteWorker(moduleManager);
+    }
+
+    @Override protected void onNext(EndpointLatencyAvgIndicator data) {
+        remoter.in(data);
+    }
+}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java
index a200aeba7..e339afa73 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java
@@ -18,19 +18,16 @@
 
 package org.apache.skywalking.oap.server.core.analysis.endpoint;
 
+import lombok.*;
 import org.apache.skywalking.oap.server.core.analysis.indicator.AvgIndicator;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
 
 /**
  * @author peng-yongsheng
  */
 public class EndpointLatencyAvgIndicator extends AvgIndicator {
 
-    private final int id;
-
-    public EndpointLatencyAvgIndicator(long timeBucket, int id) {
-        super(timeBucket);
-        this.id = id;
-    }
+    @Setter @Getter private int id;
 
     @Override public int hashCode() {
         int result = 17;
@@ -55,4 +52,22 @@ public EndpointLatencyAvgIndicator(long timeBucket, int id) {
 
         return true;
     }
+
+    @Override public RemoteData.Builder serialize() {
+        RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
+        remoteBuilder.setDataIntegers(0, getId());
+        remoteBuilder.setDataIntegers(1, getCount());
+
+        remoteBuilder.setDataLongs(0, getTimeBucket());
+        remoteBuilder.setDataLongs(1, getSummation());
+        return remoteBuilder;
+    }
+
+    @Override public void deserialize(RemoteData remoteData) {
+        setId(remoteData.getDataIntegers(0));
+        setCount(remoteData.getDataIntegers(1));
+
+        setTimeBucket(remoteData.getDataLongs(0));
+        setSummation(remoteData.getDataLongs(1));
+    }
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregator.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgPersistentWorker.java
similarity index 70%
rename from 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregator.java
rename to 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgPersistentWorker.java
index f40990976..e3c5c2306 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregator.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgPersistentWorker.java
@@ -18,17 +18,15 @@
 
 package org.apache.skywalking.oap.server.core.analysis.endpoint;
 
-import org.apache.skywalking.oap.server.core.analysis.AbstractAggregator;
-import org.slf4j.*;
+import 
org.apache.skywalking.oap.server.core.analysis.worker.AbstractPersistentWorker;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
 
 /**
  * @author peng-yongsheng
  */
-public class EndpointLatencyAvgAggregator extends 
AbstractAggregator<EndpointLatencyAvgIndicator> {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(EndpointLatencyAvgAggregator.class);
-
-    @Override protected void onNext(EndpointLatencyAvgIndicator data) {
+public class EndpointLatencyAvgPersistentWorker extends 
AbstractPersistentWorker<EndpointLatencyAvgIndicator> {
 
+    public EndpointLatencyAvgPersistentWorker(ModuleManager moduleManager) {
+        super(moduleManager);
     }
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgRemoteWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgRemoteWorker.java
new file mode 100644
index 000000000..47f75215a
--- /dev/null
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgRemoteWorker.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.endpoint;
+
+import 
org.apache.skywalking.oap.server.core.analysis.worker.AbstractRemoteWorker;
+import org.apache.skywalking.oap.server.core.remote.selector.Selector;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+
+/**
+ * @author peng-yongsheng
+ */
+public class EndpointLatencyAvgRemoteWorker extends 
AbstractRemoteWorker<EndpointLatencyAvgIndicator> {
+
+    public EndpointLatencyAvgRemoteWorker(ModuleManager moduleManager) {
+        super(moduleManager);
+    }
+
+    @Override public Selector selector() {
+        return Selector.HashCode;
+    }
+
+    @Override public Class nextWorkerClass() {
+        return EndpointLatencyAvgPersistentWorker.class;
+    }
+}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java
index 54ca9d68c..da065f37f 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java
@@ -18,28 +18,26 @@
 
 package org.apache.skywalking.oap.server.core.analysis.indicator;
 
+import lombok.*;
 import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.*;
+import org.apache.skywalking.oap.server.core.remote.selector.Selector;
 
 /**
  * @author peng-yongsheng
  */
-@IndicatorType
+@IndicatorType(selector = Selector.HashCode)
 public abstract class AvgIndicator extends Indicator {
 
-    private long summation;
-    private int count;
-
-    public AvgIndicator(long timeBucket) {
-        super(timeBucket);
-    }
+    @Getter @Setter private long summation;
+    @Getter @Setter private int count;
 
     @Entrance
-    public void combine(@SourceFrom long summation, @ConstOne int count) {
+    public final void combine(@SourceFrom long summation, @ConstOne int count) 
{
         this.summation += summation;
         this.count += count;
     }
 
-    @Override public void combine(Indicator indicator) {
+    @Override public final void combine(Indicator indicator) {
         AvgIndicator avgIndicator = (AvgIndicator)indicator;
         combine(avgIndicator.summation, avgIndicator.count);
     }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
index 66bb57d58..533379e7d 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
@@ -18,7 +18,7 @@
 
 package org.apache.skywalking.oap.server.core.analysis.indicator;
 
-import lombok.Getter;
+import lombok.*;
 import org.apache.skywalking.oap.server.core.analysis.data.StreamData;
 
 /**
@@ -26,11 +26,7 @@
  */
 public abstract class Indicator extends StreamData {
 
-    @Getter private final long timeBucket;
-
-    public Indicator(long timeBucket) {
-        this.timeBucket = timeBucket;
-    }
+    @Getter @Setter private long timeBucket;
 
     public abstract void combine(Indicator indicator);
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
index 7e8b8e1a6..46f03450f 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.oap.server.core.analysis.indicator.annotation;
 
 import java.lang.annotation.*;
+import org.apache.skywalking.oap.server.core.remote.selector.Selector;
 
 /**
  * @author peng-yongsheng
@@ -26,4 +27,5 @@
 @Target(ElementType.TYPE)
 @Retention(RetentionPolicy.SOURCE)
 public @interface IndicatorType {
+    Selector selector();
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorDefineLoadException.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorDefineLoadException.java
new file mode 100644
index 000000000..ca0521a97
--- /dev/null
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorDefineLoadException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.indicator.define;
+
+/**
+ * @author peng-yongsheng
+ */
+public class IndicatorDefineLoadException extends Exception {
+
+    public IndicatorDefineLoadException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapper.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapper.java
new file mode 100644
index 000000000..8513fa2cc
--- /dev/null
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapper.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.indicator.define;
+
+import java.io.*;
+import java.net.URL;
+import java.util.*;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.library.module.Service;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class IndicatorMapper implements Service {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(IndicatorMapper.class);
+
+    private int id = 0;
+    private final Map<Class<Indicator>, Integer> classKeyMapping;
+    private final Map<Integer, Class<Indicator>> idKeyMapping;
+
+    public IndicatorMapper() {
+        this.classKeyMapping = new HashMap<>();
+        this.idKeyMapping = new HashMap<>();
+    }
+
+    @SuppressWarnings(value = "unchecked")
+    public void load() throws IndicatorDefineLoadException {
+        try {
+            List<String> indicatorClasses = new LinkedList<>();
+
+            Enumeration<URL> urlEnumeration = 
this.getClass().getClassLoader().getResources("META-INF/defines/indicator.def");
+            while (urlEnumeration.hasMoreElements()) {
+                URL definitionFileURL = urlEnumeration.nextElement();
+                logger.info("Load indicator definition file url: {}", 
definitionFileURL.getPath());
+                BufferedReader bufferedReader = new BufferedReader(new 
InputStreamReader(definitionFileURL.openStream()));
+                Properties properties = new Properties();
+                properties.load(bufferedReader);
+
+                Enumeration defineItem = properties.propertyNames();
+                while (defineItem.hasMoreElements()) {
+                    String fullNameClass = (String)defineItem.nextElement();
+                    indicatorClasses.add(fullNameClass);
+                }
+            }
+
+            for (String indicatorClassName : indicatorClasses) {
+                Class<Indicator> indicatorClass = 
(Class<Indicator>)Class.forName(indicatorClassName);
+                id++;
+                classKeyMapping.put(indicatorClass, id);
+                idKeyMapping.put(id, indicatorClass);
+            }
+        } catch (IOException | ClassNotFoundException e) {
+            throw new IndicatorDefineLoadException(e.getMessage(), e);
+        }
+    }
+
+    public int findIdByClass(Class indicatorClass) {
+        return classKeyMapping.get(indicatorClass);
+    }
+
+    public Class<Indicator> findClassById(int id) {
+        return idKeyMapping.get(id);
+    }
+}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/AbstractAggregator.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractAggregatorWorker.java
similarity index 84%
rename from 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/AbstractAggregator.java
rename to 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractAggregatorWorker.java
index f4e769ade..65d68b440 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/AbstractAggregator.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractAggregatorWorker.java
@@ -16,35 +16,36 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.analysis;
+package org.apache.skywalking.oap.server.core.analysis.worker;
 
 import java.util.*;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
 import org.apache.skywalking.oap.server.core.analysis.data.*;
 import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.slf4j.*;
 
 /**
  * @author peng-yongsheng
  */
-public abstract class AbstractAggregator<INPUT extends Indicator> {
+public abstract class AbstractAggregatorWorker<INPUT extends Indicator> 
extends Worker<INPUT> {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(AbstractAggregator.class);
+    private static final Logger logger = 
LoggerFactory.getLogger(AbstractAggregatorWorker.class);
 
     private final DataCarrier<INPUT> dataCarrier;
     private final MergeDataCache<INPUT> mergeDataCache;
     private int messageNum;
 
-    public AbstractAggregator() {
+    public AbstractAggregatorWorker(ModuleManager moduleManager) {
         this.mergeDataCache = new MergeDataCache<>();
         this.dataCarrier = new DataCarrier<>(1, 10000);
         this.dataCarrier.consume(new AggregatorConsumer(this), 1);
     }
 
-    public void in(INPUT message) {
-        message.setEndOfBatchContext(new EndOfBatchContext(false));
-        dataCarrier.produce(message);
+    @Override public final void in(INPUT input) {
+        input.setEndOfBatchContext(new EndOfBatchContext(false));
+        dataCarrier.produce(input);
     }
 
     private void onWork(INPUT message) {
@@ -91,9 +92,9 @@ private void aggregate(INPUT message) {
 
     private class AggregatorConsumer implements IConsumer<INPUT> {
 
-        private final AbstractAggregator<INPUT> aggregator;
+        private final AbstractAggregatorWorker<INPUT> aggregator;
 
-        private AggregatorConsumer(AbstractAggregator<INPUT> aggregator) {
+        private AggregatorConsumer(AbstractAggregatorWorker<INPUT> aggregator) 
{
             this.aggregator = aggregator;
         }
 
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java
new file mode 100644
index 000000000..bcead6792
--- /dev/null
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.worker;
+
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+
+/**
+ * @author peng-yongsheng
+ */
+public abstract class AbstractPersistentWorker<INPUT extends Indicator> 
extends Worker<INPUT> {
+
+    public AbstractPersistentWorker(ModuleManager moduleManager) {
+    }
+
+    @Override public final void in(INPUT input) {
+
+    }
+}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractRemoteWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractRemoteWorker.java
new file mode 100644
index 000000000..e3fb89d0c
--- /dev/null
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractRemoteWorker.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.worker;
+
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import 
org.apache.skywalking.oap.server.core.analysis.worker.define.WorkerMapper;
+import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
+import org.apache.skywalking.oap.server.core.remote.selector.Selector;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public abstract class AbstractRemoteWorker<INPUT extends Indicator> extends 
Worker<INPUT> {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(AbstractRemoteWorker.class);
+
+    private final ModuleManager moduleManager;
+    private RemoteSenderService remoteSender;
+    private WorkerMapper workerMapper;
+
+    public AbstractRemoteWorker(ModuleManager moduleManager) {
+        this.moduleManager = moduleManager;
+    }
+
+    @Override public final void in(INPUT input) {
+        if (remoteSender == null) {
+            remoteSender = 
moduleManager.find(CoreModule.NAME).getService(RemoteSenderService.class);
+        }
+        if (workerMapper == null) {
+            workerMapper = 
moduleManager.find(CoreModule.NAME).getService(WorkerMapper.class);
+        }
+
+        try {
+            int nextWorkerId = workerMapper.findIdByClass(nextWorkerClass());
+            remoteSender.send(nextWorkerId, input, selector());
+        } catch (Throwable e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    public abstract Class nextWorkerClass();
+
+    public abstract Selector selector();
+}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/Worker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/Worker.java
new file mode 100644
index 000000000..53010eedf
--- /dev/null
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/Worker.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.worker;
+
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+
+/**
+ * @author peng-yongsheng
+ */
+public abstract class Worker<INPUT extends Indicator> {
+
+    public abstract void in(INPUT input);
+}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/define/WorkerDefineLoadException.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/define/WorkerDefineLoadException.java
new file mode 100644
index 000000000..a24a0a7a7
--- /dev/null
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/define/WorkerDefineLoadException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.worker.define;
+
+/**
+ * @author peng-yongsheng
+ */
+public class WorkerDefineLoadException extends Exception {
+
+    public WorkerDefineLoadException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/define/WorkerMapper.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/define/WorkerMapper.java
new file mode 100644
index 000000000..973243056
--- /dev/null
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/define/WorkerMapper.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.worker.define;
+
+import java.io.*;
+import java.lang.reflect.Constructor;
+import java.net.URL;
+import java.util.*;
+import org.apache.skywalking.oap.server.core.analysis.worker.Worker;
+import org.apache.skywalking.oap.server.library.module.*;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class WorkerMapper implements Service {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(WorkerMapper.class);
+
+    private int id = 0;
+    private final ModuleManager moduleManager;
+    private final Map<Class<Worker>, Integer> classKeyMapping;
+    private final Map<Integer, Class<Worker>> idKeyMapping;
+    private final Map<Class<Worker>, Worker> classKeyInstanceMapping;
+    private final Map<Integer, Worker> idKeyInstanceMapping;
+
+    public WorkerMapper(ModuleManager moduleManager) {
+        this.moduleManager = moduleManager;
+        this.classKeyMapping = new HashMap<>();
+        this.idKeyMapping = new HashMap<>();
+        this.classKeyInstanceMapping = new HashMap<>();
+        this.idKeyInstanceMapping = new HashMap<>();
+    }
+
+    @SuppressWarnings(value = "unchecked")
+    public void load() throws WorkerDefineLoadException {
+        try {
+            List<String> workerClasses = new LinkedList<>();
+
+            Enumeration<URL> urlEnumeration = 
this.getClass().getClassLoader().getResources("META-INF/defines/worker.def");
+            while (urlEnumeration.hasMoreElements()) {
+                URL definitionFileURL = urlEnumeration.nextElement();
+                logger.info("Load worker definition file url: {}", 
definitionFileURL.getPath());
+                BufferedReader bufferedReader = new BufferedReader(new 
InputStreamReader(definitionFileURL.openStream()));
+                Properties properties = new Properties();
+                properties.load(bufferedReader);
+
+                Enumeration defineItem = properties.propertyNames();
+                while (defineItem.hasMoreElements()) {
+                    String fullNameClass = (String)defineItem.nextElement();
+                    workerClasses.add(fullNameClass);
+                }
+            }
+
+            for (String workerClassName : workerClasses) {
+                Class<Worker> workerClass = 
(Class<Worker>)Class.forName(workerClassName);
+                id++;
+                classKeyMapping.put(workerClass, id);
+                idKeyMapping.put(id, workerClass);
+
+                Constructor<Worker> constructor = 
workerClass.getDeclaredConstructor(ModuleManager.class);
+                Worker worker = constructor.newInstance(moduleManager);
+                classKeyInstanceMapping.put(workerClass, worker);
+                idKeyInstanceMapping.put(id, worker);
+            }
+        } catch (Exception e) {
+            throw new WorkerDefineLoadException(e.getMessage(), e);
+        }
+    }
+
+    public int findIdByClass(Class workerClass) {
+        return classKeyMapping.get(workerClass);
+    }
+
+    public Class<Worker> findClassById(int id) {
+        return idKeyMapping.get(id);
+    }
+
+    public Worker findInstanceByClass(Class workerClass) {
+        return classKeyInstanceMapping.get(workerClass);
+    }
+
+    public Worker findInstanceById(int id) {
+        return idKeyInstanceMapping.get(id);
+    }
+}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java
index 53c4ea0cc..1e85b3252 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java
@@ -19,48 +19,29 @@
 package org.apache.skywalking.oap.server.core.cluster;
 
 import java.util.Objects;
+import lombok.*;
 
 /**
  * @author peng-yongsheng
  */
-public class RemoteInstance {
+public class RemoteInstance implements Comparable<RemoteInstance> {
 
-    private String host;
-    private int port;
-    private boolean self = false;
+    @Getter private final String host;
+    @Getter private final int port;
+    @Getter @Setter private boolean isSelf = false;
 
-    public RemoteInstance() {
-
-    }
-
-    public RemoteInstance(String host, int port, boolean self) {
+    public RemoteInstance(String host, int port, boolean isSelf) {
         this.host = host;
         this.port = port;
-        this.self = self;
-    }
-
-    public String getHost() {
-        return host;
-    }
-
-    public void setHost(String host) {
-        this.host = host;
-    }
-
-    public int getPort() {
-        return port;
-    }
-
-    public void setPort(int port) {
-        this.port = port;
+        this.isSelf = isSelf;
     }
 
-    public boolean isSelf() {
-        return self;
+    @Override public int compareTo(RemoteInstance o) {
+        return toString().compareTo(toString());
     }
 
-    public void setSelf(boolean self) {
-        this.self = self;
+    @Override public String toString() {
+        return host + String.valueOf(port);
     }
 
     @Override public boolean equals(Object o) {
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/receiver/SourceReceiverImpl.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/receiver/SourceReceiverImpl.java
index 4d04a31dd..a6cf211ec 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/receiver/SourceReceiverImpl.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/receiver/SourceReceiverImpl.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.oap.server.core.receiver;
 
 import org.apache.skywalking.oap.server.core.analysis.DispatcherManager;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
 
 /**
  * @author peng-yongsheng
@@ -27,8 +28,8 @@
 
     private final DispatcherManager dispatcherManager;
 
-    public SourceReceiverImpl() {
-        this.dispatcherManager = new DispatcherManager();
+    public SourceReceiverImpl(ModuleManager moduleManager) {
+        this.dispatcherManager = new DispatcherManager(moduleManager);
     }
 
     @Override public void receive(Source source) {
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Deserializable.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Deserializable.java
new file mode 100644
index 000000000..b2b3a7ecb
--- /dev/null
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Deserializable.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote;
+
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+
+/**
+ * @author peng-yongsheng
+ */
+public interface Deserializable {
+    void deserialize(RemoteData remoteData);
+}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.java
new file mode 100644
index 000000000..b2ce1c45e
--- /dev/null
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote;
+
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.remote.client.*;
+import org.apache.skywalking.oap.server.core.remote.selector.*;
+import org.apache.skywalking.oap.server.library.module.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class RemoteSenderService implements Service {
+
+    private final ModuleManager moduleManager;
+    private final HashCodeSelector hashCodeSelector;
+    private final ForeverFirstSelector foreverFirstSelector;
+    private final RollingSelector rollingSelector;
+
+    public RemoteSenderService(ModuleManager moduleManager) {
+        this.moduleManager = moduleManager;
+        this.hashCodeSelector = new HashCodeSelector();
+        this.foreverFirstSelector = new ForeverFirstSelector();
+        this.rollingSelector = new RollingSelector();
+    }
+
+    public void send(int nextWorkId, Indicator indicator, Selector selector) {
+        RemoteClientManager clientManager = 
moduleManager.find(CoreModule.NAME).getService(RemoteClientManager.class);
+
+        RemoteClient remoteClient;
+        switch (selector) {
+            case HashCode:
+                remoteClient = 
hashCodeSelector.select(clientManager.getRemoteClient(), indicator);
+                remoteClient.push(nextWorkId, indicator);
+            case Rolling:
+                remoteClient = 
rollingSelector.select(clientManager.getRemoteClient(), indicator);
+                remoteClient.push(nextWorkId, indicator);
+            case ForeverFirst:
+                remoteClient = 
foreverFirstSelector.select(clientManager.getRemoteClient(), indicator);
+                remoteClient.push(nextWorkId, indicator);
+        }
+    }
+}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
new file mode 100644
index 000000000..892e9516d
--- /dev/null
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote;
+
+import io.grpc.stub.StreamObserver;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import 
org.apache.skywalking.oap.server.core.analysis.indicator.define.IndicatorMapper;
+import 
org.apache.skywalking.oap.server.core.analysis.worker.define.WorkerMapper;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class RemoteServiceHandler extends 
RemoteServiceGrpc.RemoteServiceImplBase implements GRPCHandler {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(RemoteServiceHandler.class);
+
+    private final IndicatorMapper indicatorMapper;
+    private final WorkerMapper workerMapper;
+
+    public RemoteServiceHandler(ModuleManager moduleManager) {
+        this.indicatorMapper = 
moduleManager.find(CoreModule.NAME).getService(IndicatorMapper.class);
+        this.workerMapper = 
moduleManager.find(CoreModule.NAME).getService(WorkerMapper.class);
+    }
+
+    @Override public StreamObserver<RemoteMessage> call(StreamObserver<Empty> 
responseObserver) {
+        return new StreamObserver<RemoteMessage>() {
+            @Override public void onNext(RemoteMessage message) {
+                int indicatorId = message.getIndicatorId();
+                int nextWorkerId = message.getNextWorkerId();
+                RemoteData remoteData = message.getRemoteData();
+
+                Class<Indicator> indicatorClass = 
indicatorMapper.findClassById(indicatorId);
+                try {
+                    indicatorClass.newInstance().deserialize(remoteData);
+                } catch (InstantiationException | IllegalAccessException e) {
+                    logger.warn(e.getMessage());
+                }
+            }
+
+            @Override public void onError(Throwable throwable) {
+                logger.error(throwable.getMessage(), throwable);
+            }
+
+            @Override public void onCompleted() {
+                responseObserver.onNext(Empty.newBuilder().build());
+                responseObserver.onCompleted();
+            }
+        };
+    }
+}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/RemoteData.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Serializable.java
similarity index 80%
rename from 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/RemoteData.java
rename to 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Serializable.java
index fe4f1f5b5..1a7dbede3 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/RemoteData.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Serializable.java
@@ -16,11 +16,13 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.analysis.data;
+package org.apache.skywalking.oap.server.core.remote;
+
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
 
 /**
  * @author peng-yongsheng
  */
-public interface RemoteData {
-    String selectKey();
+public interface Serializable {
+    RemoteData.Builder serialize();
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
new file mode 100644
index 000000000..75a5952ad
--- /dev/null
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote.client;
+
+import io.grpc.stub.StreamObserver;
+import java.util.List;
+import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
+import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
+import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import 
org.apache.skywalking.oap.server.core.analysis.indicator.define.IndicatorMapper;
+import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
+import org.apache.skywalking.oap.server.library.client.grpc.GRPCClient;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class GRPCRemoteClient implements RemoteClient, 
Comparable<GRPCRemoteClient> {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(GRPCRemoteClient.class);
+
+    private final GRPCClient client;
+    private final DataCarrier<RemoteMessage> carrier;
+    private final IndicatorMapper indicatorMapper;
+
+    public GRPCRemoteClient(IndicatorMapper indicatorMapper, RemoteInstance 
remoteInstance, int channelSize,
+        int bufferSize) {
+        this.indicatorMapper = indicatorMapper;
+        this.client = new GRPCClient(remoteInstance.getHost(), 
remoteInstance.getPort());
+        this.carrier = new DataCarrier<>(channelSize, bufferSize);
+        this.carrier.setBufferStrategy(BufferStrategy.BLOCKING);
+        this.carrier.consume(new RemoteMessageConsumer(), 1);
+    }
+
+    @Override public void push(int nextWorkerId, Indicator indicator) {
+        int indicatorId = indicatorMapper.findIdByClass(indicator.getClass());
+        RemoteMessage.Builder builder = RemoteMessage.newBuilder();
+        builder.setNextWorkerId(nextWorkerId);
+        builder.setIndicatorId(indicatorId);
+        builder.setRemoteData(indicator.serialize());
+
+        this.carrier.produce(builder.build());
+    }
+
+    class RemoteMessageConsumer implements IConsumer<RemoteMessage> {
+        @Override public void init() {
+        }
+
+        @Override public void consume(List<RemoteMessage> remoteMessages) {
+            StreamObserver<RemoteMessage> streamObserver = 
createStreamObserver();
+            for (RemoteMessage remoteMessage : remoteMessages) {
+                streamObserver.onNext(remoteMessage);
+            }
+            streamObserver.onCompleted();
+        }
+
+        @Override public void onError(List<RemoteMessage> remoteMessages, 
Throwable t) {
+            logger.error(t.getMessage(), t);
+        }
+
+        @Override public void onExit() {
+        }
+    }
+
+    private StreamObserver<RemoteMessage> createStreamObserver() {
+        RemoteServiceGrpc.RemoteServiceStub stub = 
RemoteServiceGrpc.newStub(client.getChannel());
+
+        StreamStatus status = new StreamStatus(false);
+        return stub.call(new StreamObserver<Empty>() {
+            @Override public void onNext(Empty empty) {
+            }
+
+            @Override public void onError(Throwable throwable) {
+                logger.error(throwable.getMessage(), throwable);
+            }
+
+            @Override public void onCompleted() {
+                status.finished();
+            }
+        });
+    }
+
+    class StreamStatus {
+
+        private final Logger logger = 
LoggerFactory.getLogger(StreamStatus.class);
+
+        private volatile boolean status;
+
+        StreamStatus(boolean status) {
+            this.status = status;
+        }
+
+        public boolean isFinish() {
+            return status;
+        }
+
+        void finished() {
+            this.status = true;
+        }
+
+        /**
+         * @param maxTimeout max wait time, milliseconds.
+         */
+        public void wait4Finish(long maxTimeout) {
+            long time = 0;
+            while (!status) {
+                if (time > maxTimeout) {
+                    break;
+                }
+                try2Sleep(5);
+                time += 5;
+            }
+        }
+
+        /**
+         * Try to sleep, and ignore the {@link InterruptedException}
+         *
+         * @param millis the length of time to sleep in milliseconds
+         */
+        private void try2Sleep(long millis) {
+            try {
+                Thread.sleep(millis);
+            } catch (InterruptedException e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+    }
+
+    @Override public int compareTo(GRPCRemoteClient o) {
+        return this.client.toString().compareTo(o.client.toString());
+    }
+
+    public String getHost() {
+        return client.getHost();
+    }
+
+    public int getPort() {
+        return client.getPort();
+    }
+}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClient.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClient.java
new file mode 100644
index 000000000..98d23f101
--- /dev/null
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClient.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote.client;
+
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+
+/**
+ * @author peng-yongsheng
+ */
+public interface RemoteClient {
+
+    String getHost();
+
+    int getPort();
+
+    void push(int nextWorkerId, Indicator indicator);
+}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
new file mode 100644
index 000000000..4d2049bab
--- /dev/null
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote.client;
+
+import java.util.*;
+import java.util.concurrent.*;
+import 
org.apache.skywalking.oap.server.core.analysis.indicator.define.IndicatorMapper;
+import org.apache.skywalking.oap.server.core.cluster.*;
+import org.apache.skywalking.oap.server.library.module.*;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class RemoteClientManager implements Service {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(RemoteClientManager.class);
+
+    private final ModuleManager moduleManager;
+    private IndicatorMapper indicatorMapper;
+    private ClusterNodesQuery clusterNodesQuery;
+    private final List<RemoteClient> clientsA;
+    private final List<RemoteClient> clientsB;
+    private List<RemoteClient> usingClients;
+
+    public RemoteClientManager(ModuleManager moduleManager) {
+        this.moduleManager = moduleManager;
+        this.clientsA = new LinkedList<>();
+        this.clientsB = new LinkedList<>();
+        this.usingClients = clientsA;
+    }
+
+    public void start() {
+        this.clusterNodesQuery = 
moduleManager.find(ClusterModule.NAME).getService(ClusterNodesQuery.class);
+        this.indicatorMapper = 
moduleManager.find(ClusterModule.NAME).getService(IndicatorMapper.class);
+        
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::refresh, 
1, 2, TimeUnit.SECONDS);
+    }
+
+    private void refresh() {
+        List<RemoteInstance> instanceList = 
clusterNodesQuery.queryRemoteNodes();
+        Collections.sort(instanceList);
+
+        if (!compare(instanceList)) {
+            buildNewClients(instanceList);
+        }
+    }
+
+    public List<RemoteClient> getRemoteClient() {
+        return usingClients;
+    }
+
+    private List<RemoteClient> getFreeClients() {
+        if (usingClients.equals(clientsA)) {
+            return clientsB;
+        } else {
+            return clientsA;
+        }
+    }
+
+    private void switchCurrentClients() {
+        if (usingClients.equals(clientsA)) {
+            usingClients = clientsB;
+        } else {
+            usingClients = clientsA;
+        }
+    }
+
+    private void buildNewClients(List<RemoteInstance> remoteInstances) {
+        getFreeClients().clear();
+
+        Map<String, RemoteClient> currentClientsMap = new HashMap<>();
+        this.usingClients.forEach(remoteClient -> {
+            currentClientsMap.put(address(remoteClient.getHost(), 
remoteClient.getPort()), remoteClient);
+        });
+
+        remoteInstances.forEach(remoteInstance -> {
+            String address = address(remoteInstance.getHost(), 
remoteInstance.getPort());
+            RemoteClient client;
+            if (currentClientsMap.containsKey(address)) {
+                client = currentClientsMap.get(address);
+            } else {
+                if (remoteInstance.isSelf()) {
+                    client = new SelfRemoteClient(moduleManager, 
remoteInstance.getHost(), remoteInstance.getPort());
+                } else {
+                    client = new GRPCRemoteClient(indicatorMapper, 
remoteInstance, 1, 3000);
+                }
+            }
+            getFreeClients().add(client);
+        });
+
+        switchCurrentClients();
+    }
+
+    private boolean compare(List<RemoteInstance> remoteInstances) {
+        if (usingClients.size() == remoteInstances.size()) {
+            for (int i = 0; i < usingClients.size(); i++) {
+                if (!address(usingClients.get(i).getHost(), 
usingClients.get(i).getPort()).equals(address(remoteInstances.get(i).getHost(), 
remoteInstances.get(i).getPort()))) {
+                    return false;
+                }
+            }
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    private String address(String host, int port) {
+        return host + String.valueOf(port);
+    }
+}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java
new file mode 100644
index 000000000..109ff7798
--- /dev/null
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote.client;
+
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import 
org.apache.skywalking.oap.server.core.analysis.worker.define.WorkerMapper;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+
+/**
+ * @author peng-yongsheng
+ */
+public class SelfRemoteClient implements RemoteClient {
+
+    private final ModuleManager moduleManager;
+    private final String host;
+    private final int port;
+
+    public SelfRemoteClient(ModuleManager moduleManager, String host, int 
port) {
+        this.moduleManager = moduleManager;
+        this.host = host;
+        this.port = port;
+    }
+
+    @Override public String getHost() {
+        return host;
+    }
+
+    @Override public int getPort() {
+        return port;
+    }
+
+    @Override public void push(int nextWorkerId, Indicator indicator) {
+        WorkerMapper workerMapper = 
moduleManager.find(CoreModule.NAME).getService(WorkerMapper.class);
+        workerMapper.findInstanceById(nextWorkerId).in(indicator);
+    }
+}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/ForeverFirstSelector.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/ForeverFirstSelector.java
new file mode 100644
index 000000000..e28f2031f
--- /dev/null
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/ForeverFirstSelector.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote.selector;
+
+import java.util.List;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.remote.client.RemoteClient;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class ForeverFirstSelector implements RemoteClientSelector {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(ForeverFirstSelector.class);
+
+    @Override public RemoteClient select(List<RemoteClient> clients, Indicator 
indicator) {
+        if (logger.isDebugEnabled()) {
+            logger.debug("clients size: {}", clients.size());
+        }
+        return clients.get(0);
+    }
+}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/HashCodeSelector.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/HashCodeSelector.java
new file mode 100644
index 000000000..3d256b5ea
--- /dev/null
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/HashCodeSelector.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote.selector;
+
+import java.util.List;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.remote.client.RemoteClient;
+
+/**
+ * @author peng-yongsheng
+ */
+public class HashCodeSelector implements RemoteClientSelector {
+
+    @Override public RemoteClient select(List<RemoteClient> clients, Indicator 
indicator) {
+        int size = clients.size();
+        int selectIndex = Math.abs(indicator.hashCode()) % size;
+        return clients.get(selectIndex);
+    }
+}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/RemoteClientSelector.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/RemoteClientSelector.java
new file mode 100644
index 000000000..438cbad0b
--- /dev/null
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/RemoteClientSelector.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote.selector;
+
+import java.util.List;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.remote.client.RemoteClient;
+
+/**
+ * @author peng-yongsheng
+ */
+public interface RemoteClientSelector {
+    RemoteClient select(List<RemoteClient> clients, Indicator indicator);
+}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/RollingSelector.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/RollingSelector.java
new file mode 100644
index 000000000..c74e8c364
--- /dev/null
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/RollingSelector.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.remote.selector;
+
+import java.util.List;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.remote.client.RemoteClient;
+
+/**
+ * @author peng-yongsheng
+ */
+public class RollingSelector implements RemoteClientSelector {
+
+    private int index = 0;
+
+    @Override public RemoteClient select(List<RemoteClient> clients, Indicator 
indicator) {
+        int size = clients.size();
+        index++;
+        int selectIndex = Math.abs(index) % size;
+
+        if (index == Integer.MAX_VALUE) {
+            index = 0;
+        }
+        return clients.get(selectIndex);
+    }
+}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Selector.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/Selector.java
similarity index 93%
rename from 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Selector.java
rename to 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/Selector.java
index b7ac6ed00..bd5164896 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Selector.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/Selector.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.remote;
+package org.apache.skywalking.oap.server.core.remote.selector;
 
 /**
  * @author peng-yongsheng
diff --git a/oap-server/server-core/src/main/proto/RemoteService.proto 
b/oap-server/server-core/src/main/proto/RemoteService.proto
new file mode 100644
index 000000000..410b0c31e
--- /dev/null
+++ b/oap-server/server-core/src/main/proto/RemoteService.proto
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+syntax = "proto3";
+
+option java_multiple_files = true;
+option java_package = 
"org.apache.skywalking.oap.server.core.remote.grpc.proto";
+
+service RemoteService {
+    rpc call (stream RemoteMessage) returns (Empty) {
+    }
+}
+
+message RemoteMessage {
+    int32 nextWorkerId = 1;
+    int32 indicatorId = 2;
+    RemoteData remoteData = 3;
+}
+
+message RemoteData {
+    repeated string dataStrings = 1;
+    repeated int64 dataLongs = 2;
+    repeated double dataDoubles = 3;
+    repeated int32 dataIntegers = 4;
+}
+
+message Empty {
+}
\ No newline at end of file
diff --git 
a/oap-server/server-core/src/main/resources/META-INF/defines/indicator.def 
b/oap-server/server-core/src/main/resources/META-INF/defines/indicator.def
new file mode 100644
index 000000000..ce6b6ddc3
--- /dev/null
+++ b/oap-server/server-core/src/main/resources/META-INF/defines/indicator.def
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+org.apache.skywalking.oap.server.core.analysis.endpoint.EndpointLatencyAvgIndicator
\ No newline at end of file
diff --git 
a/oap-server/server-core/src/main/resources/META-INF/defines/worker.def 
b/oap-server/server-core/src/main/resources/META-INF/defines/worker.def
new file mode 100644
index 000000000..5afc8db52
--- /dev/null
+++ b/oap-server/server-core/src/main/resources/META-INF/defines/worker.def
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+org.apache.skywalking.oap.server.core.analysis.endpoint.EndpointLatencyAvgAggregateWorker
+org.apache.skywalking.oap.server.core.analysis.endpoint.EndpointLatencyAvgRemoteWorker
+org.apache.skywalking.oap.server.core.analysis.endpoint.EndpointLatencyAvgPersistentWorker
\ No newline at end of file
diff --git 
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapperTestCase.java
 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapperTestCase.java
new file mode 100644
index 000000000..4b58eb567
--- /dev/null
+++ 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapperTestCase.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.indicator.define;
+
+import org.junit.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class IndicatorMapperTestCase {
+
+    @Test
+    public void test() throws IndicatorDefineLoadException {
+        IndicatorMapper mapper = new IndicatorMapper();
+        mapper.load();
+
+        Assert.assertEquals(1, mapper.findIdByClass(TestAvgIndicator.class));
+        Assert.assertEquals(TestAvgIndicator.class, mapper.findClassById(1));
+    }
+}
diff --git 
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/TestAvgIndicator.java
 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/TestAvgIndicator.java
new file mode 100644
index 000000000..17d1189ee
--- /dev/null
+++ 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/TestAvgIndicator.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.indicator.define;
+
+import lombok.*;
+import org.apache.skywalking.oap.server.core.analysis.indicator.AvgIndicator;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+
+/**
+ * @author peng-yongsheng
+ */
+public class TestAvgIndicator extends AvgIndicator {
+
+    @Setter @Getter private int id;
+
+    @Override public RemoteData.Builder serialize() {
+        return null;
+    }
+
+    @Override public void deserialize(RemoteData remoteData) {
+    }
+}
diff --git 
a/oap-server/server-core/src/test/resources/META-INF/defines/indicator.def 
b/oap-server/server-core/src/test/resources/META-INF/defines/indicator.def
new file mode 100644
index 000000000..97491fffb
--- /dev/null
+++ b/oap-server/server-core/src/test/resources/META-INF/defines/indicator.def
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+org.apache.skywalking.oap.server.core.analysis.indicator.define.TestAvgIndicator
\ No newline at end of file
diff --git 
a/oap-server/server-core/src/test/resources/META-INF/defines/worker.def 
b/oap-server/server-core/src/test/resources/META-INF/defines/worker.def
new file mode 100644
index 000000000..33ebbb1f3
--- /dev/null
+++ b/oap-server/server-core/src/test/resources/META-INF/defines/worker.def
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
\ No newline at end of file
diff --git a/oap-server/server-core/src/test/resources/log4j2.xml 
b/oap-server/server-core/src/test/resources/log4j2.xml
new file mode 100644
index 000000000..6eb5b3fb9
--- /dev/null
+++ b/oap-server/server-core/src/test/resources/log4j2.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+<Configuration status="DEBUG">
+    <Appenders>
+        <Console name="Console" target="SYSTEM_OUT">
+            <PatternLayout charset="UTF-8" pattern="%d - %c -%-4r [%t] %-5p %x 
- %m%n"/>
+        </Console>
+    </Appenders>
+    <Loggers>
+        <Root level="DEBUG">
+            <AppenderRef ref="Console"/>
+        </Root>
+    </Loggers>
+</Configuration>
diff --git 
a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java
 
b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java
index 2416be639..188fae35c 100644
--- 
a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java
+++ 
b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.oap.server.library.client.grpc;
 
 import io.grpc.*;
+import lombok.Getter;
 import org.apache.skywalking.oap.server.library.client.Client;
 
 /**
@@ -26,9 +27,9 @@
  */
 public class GRPCClient implements Client {
 
-    private final String host;
+    @Getter private final String host;
 
-    private final int port;
+    @Getter private final int port;
 
     private ManagedChannel channel;
 
diff --git 
a/oap-server/server-library/library-module/src/main/java/org/apache/skywalking/oap/server/library/module/ModuleProvider.java
 
b/oap-server/server-library/library-module/src/main/java/org/apache/skywalking/oap/server/library/module/ModuleProvider.java
index 7596fb60b..9873303c0 100644
--- 
a/oap-server/server-library/library-module/src/main/java/org/apache/skywalking/oap/server/library/module/ModuleProvider.java
+++ 
b/oap-server/server-library/library-module/src/main/java/org/apache/skywalking/oap/server/library/module/ModuleProvider.java
@@ -18,8 +18,7 @@
 
 package org.apache.skywalking.oap.server.library.module;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
 
 /**
  * The <code>ModuleProvider</code> is an implementation of a {@link 
ModuleDefine}.
diff --git 
a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/mesh-receiver-provider/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/provider/MeshGRPCHandler.java
 
b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/mesh-receiver-provider/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/provider/MeshGRPCHandler.java
index e9764ce56..889382106 100644
--- 
a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/mesh-receiver-provider/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/provider/MeshGRPCHandler.java
+++ 
b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/mesh-receiver-provider/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/provider/MeshGRPCHandler.java
@@ -35,8 +35,6 @@
                 if (logger.isDebugEnabled()) {
                     logger.debug("Received mesh metric: {}", metric);
                 }
-
-
             }
 
             @Override public void onError(Throwable throwable) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to