wu-sheng closed pull request #1505: Feature/oap/remote URL: https://github.com/apache/incubator-skywalking/pull/1505
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManagerTest.java b/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManagerTest.java index 2e6913030..55671bce0 100644 --- a/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManagerTest.java +++ b/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManagerTest.java @@ -19,15 +19,14 @@ package org.apache.skywalking.oap.server.cluster.plugin.standalone; import org.apache.skywalking.oap.server.core.cluster.RemoteInstance; -import org.junit.Assert; -import org.junit.Test; +import org.junit.*; public class StandaloneManagerTest { @Test public void test() { StandaloneManager standaloneManager = new StandaloneManager(); - RemoteInstance remote1 = new RemoteInstance(); - RemoteInstance remote2 = new RemoteInstance(); + RemoteInstance remote1 = new RemoteInstance("A", 100, true); + RemoteInstance remote2 = new RemoteInstance("B", 100, false); standaloneManager.registerRemote(remote1); Assert.assertEquals(remote1, standaloneManager.queryRemoteNodes().get(0)); diff --git a/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ClusterModuleZookeeperProviderTestCase.java b/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ClusterModuleZookeeperProviderTestCase.java index bbffa9ffc..a318481ed 100644 --- a/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ClusterModuleZookeeperProviderTestCase.java +++ b/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ClusterModuleZookeeperProviderTestCase.java @@ -21,16 +21,9 @@ import java.io.IOException; import java.util.List; import org.apache.curator.test.TestingServer; -import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery; -import org.apache.skywalking.oap.server.core.cluster.ClusterRegister; -import org.apache.skywalking.oap.server.core.cluster.RemoteInstance; -import org.apache.skywalking.oap.server.core.cluster.ServiceRegisterException; -import org.apache.skywalking.oap.server.library.module.ModuleStartException; -import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; +import org.apache.skywalking.oap.server.core.cluster.*; +import org.apache.skywalking.oap.server.library.module.*; +import org.junit.*; /** * @author peng-yongsheng @@ -59,9 +52,7 @@ public void testStart() throws ServiceNotProvidedException, ModuleStartException ClusterRegister moduleRegister = provider.getService(ClusterRegister.class); ClusterNodesQuery clusterNodesQuery = provider.getService(ClusterNodesQuery.class); - RemoteInstance remoteInstance = new RemoteInstance(); - remoteInstance.setHost("ProviderAHost"); - remoteInstance.setPort(1000); + RemoteInstance remoteInstance = new RemoteInstance("ProviderAHost", 1000, true); moduleRegister.registerRemote(remoteInstance); diff --git a/oap-server/server-core/pom.xml b/oap-server/server-core/pom.xml index 767e1c8bd..ca6ac7abe 100644 --- a/oap-server/server-core/pom.xml +++ b/oap-server/server-core/pom.xml @@ -41,6 +41,11 @@ <artifactId>library-util</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.skywalking</groupId> + <artifactId>library-client</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>org.apache.skywalking</groupId> <artifactId>library-server</artifactId> @@ -57,4 +62,49 @@ <version>${project.version}</version> </dependency> </dependencies> + + <build> + <extensions> + <extension> + <groupId>kr.motd.maven</groupId> + <artifactId>os-maven-plugin</artifactId> + <version>1.4.1.Final</version> + </extension> + </extensions> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-resources-plugin</artifactId> + <version>2.4.3</version> + <configuration> + <encoding>${project.build.sourceEncoding}</encoding> + </configuration> + </plugin> + <plugin> + <groupId>org.xolstice.maven.plugins</groupId> + <artifactId>protobuf-maven-plugin</artifactId> + <version>0.5.0</version> + <configuration> + <!-- + The version of protoc must match protobuf-java. If you don't depend on + protobuf-java directly, you will be transitively depending on the + protobuf-java version that grpc depends on. + --> + <protocArtifact>com.google.protobuf:protoc:3.3.0:exe:${os.detected.classifier} + </protocArtifact> + <pluginId>grpc-java</pluginId> + <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.4.0:exe:${os.detected.classifier} + </pluginArtifact> + </configuration> + <executions> + <execution> + <goals> + <goal>compile</goal> + <goal>compile-custom</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> </project> \ No newline at end of file diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java index 6f3de4875..b03152fd6 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java @@ -19,7 +19,11 @@ package org.apache.skywalking.oap.server.core; import java.util.*; +import org.apache.skywalking.oap.server.core.analysis.indicator.define.IndicatorMapper; +import org.apache.skywalking.oap.server.core.analysis.worker.define.WorkerMapper; import org.apache.skywalking.oap.server.core.receiver.SourceReceiver; +import org.apache.skywalking.oap.server.core.remote.RemoteSenderService; +import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager; import org.apache.skywalking.oap.server.core.server.*; import org.apache.skywalking.oap.server.library.module.ModuleDefine; @@ -38,6 +42,7 @@ List<Class> classes = new ArrayList<>(); addServerInterface(classes); addReceiverInterface(classes); + addInsideService(classes); return classes.toArray(new Class[] {}); } @@ -47,6 +52,13 @@ private void addServerInterface(List<Class> classes) { classes.add(JettyHandlerRegister.class); } + private void addInsideService(List<Class> classes) { + classes.add(IndicatorMapper.class); + classes.add(WorkerMapper.class); + classes.add(RemoteClientManager.class); + classes.add(RemoteSenderService.class); + } + private void addReceiverInterface(List<Class> classes) { classes.add(SourceReceiver.class); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java index 1af3873de..757c34a76 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java @@ -18,24 +18,18 @@ package org.apache.skywalking.oap.server.core; -import org.apache.skywalking.oap.server.core.cluster.ClusterModule; -import org.apache.skywalking.oap.server.core.cluster.ClusterRegister; -import org.apache.skywalking.oap.server.core.cluster.RemoteInstance; -import org.apache.skywalking.oap.server.core.receiver.SourceReceiver; -import org.apache.skywalking.oap.server.core.receiver.SourceReceiverImpl; -import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister; -import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegisterImpl; -import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister; -import org.apache.skywalking.oap.server.core.server.JettyHandlerRegisterImpl; -import org.apache.skywalking.oap.server.library.module.ModuleConfig; -import org.apache.skywalking.oap.server.library.module.ModuleProvider; -import org.apache.skywalking.oap.server.library.module.ModuleStartException; -import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException; +import org.apache.skywalking.oap.server.core.analysis.indicator.define.*; +import org.apache.skywalking.oap.server.core.analysis.worker.define.*; +import org.apache.skywalking.oap.server.core.cluster.*; +import org.apache.skywalking.oap.server.core.receiver.*; +import org.apache.skywalking.oap.server.core.remote.*; +import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager; +import org.apache.skywalking.oap.server.core.server.*; +import org.apache.skywalking.oap.server.library.module.*; import org.apache.skywalking.oap.server.library.server.ServerException; import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer; import org.apache.skywalking.oap.server.library.server.jetty.JettyServer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.slf4j.*; /** * @author peng-yongsheng @@ -47,17 +41,21 @@ private final CoreModuleConfig moduleConfig; private GRPCServer grpcServer; private JettyServer jettyServer; + private final IndicatorMapper indicatorMapper; + private final WorkerMapper workerMapper; public CoreModuleProvider() { super(); this.moduleConfig = new CoreModuleConfig(); + this.indicatorMapper = new IndicatorMapper(); + this.workerMapper = new WorkerMapper(getManager()); } @Override public String name() { return "default"; } - @Override public Class module() { + @Override public Class<? extends ModuleDefine> module() { return CoreModule.class; } @@ -75,11 +73,24 @@ public CoreModuleProvider() { this.registerServiceImplementation(GRPCHandlerRegister.class, new GRPCHandlerRegisterImpl(grpcServer)); this.registerServiceImplementation(JettyHandlerRegister.class, new JettyHandlerRegisterImpl(jettyServer)); - this.registerServiceImplementation(SourceReceiver.class, new SourceReceiverImpl()); + this.registerServiceImplementation(SourceReceiver.class, new SourceReceiverImpl(getManager())); + + this.registerServiceImplementation(IndicatorMapper.class, indicatorMapper); + this.registerServiceImplementation(WorkerMapper.class, workerMapper); + + this.registerServiceImplementation(RemoteClientManager.class, new RemoteClientManager(getManager())); + this.registerServiceImplementation(RemoteSenderService.class, new RemoteSenderService(getManager())); } - @Override public void start() { + @Override public void start() throws ModuleStartException { + grpcServer.addHandler(new RemoteServiceHandler(getManager())); + try { + indicatorMapper.load(); + workerMapper.load(); + } catch (IndicatorDefineLoadException | WorkerDefineLoadException e) { + throw new ModuleStartException(e.getMessage(), e); + } } @Override public void notifyAfterCompleted() throws ModuleStartException { @@ -90,9 +101,7 @@ public CoreModuleProvider() { throw new ModuleStartException(e.getMessage(), e); } - RemoteInstance gRPCServerInstance = new RemoteInstance(); - gRPCServerInstance.setHost(moduleConfig.getGRPCHost()); - gRPCServerInstance.setPort(moduleConfig.getGRPCPort()); + RemoteInstance gRPCServerInstance = new RemoteInstance(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), true); this.getManager().find(ClusterModule.NAME).getService(ClusterRegister.class).registerRemote(gRPCServerInstance); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java index de8da4e4e..931738273 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java @@ -21,6 +21,7 @@ import java.util.*; import org.apache.skywalking.oap.server.core.analysis.endpoint.EndpointDispatcher; import org.apache.skywalking.oap.server.core.receiver.Scope; +import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.slf4j.*; /** @@ -32,9 +33,9 @@ private Map<Scope, SourceDispatcher> dispatcherMap; - public DispatcherManager() { + public DispatcherManager(ModuleManager moduleManager) { this.dispatcherMap = new HashMap<>(); - this.dispatcherMap.put(Scope.Endpoint, new EndpointDispatcher()); + this.dispatcherMap.put(Scope.Endpoint, new EndpointDispatcher(moduleManager)); } public SourceDispatcher getDispatcher(Scope scope) { diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/StreamData.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/StreamData.java index 63f149f7b..57c75d449 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/StreamData.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/StreamData.java @@ -18,10 +18,12 @@ package org.apache.skywalking.oap.server.core.analysis.data; +import org.apache.skywalking.oap.server.core.remote.*; + /** * @author peng-yongsheng */ -public abstract class StreamData implements QueueData { +public abstract class StreamData implements QueueData, Serializable, Deserializable { private EndOfBatchContext endOfBatchContext; diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointDispatcher.java index 152ac5eb9..bcb3a2c66 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointDispatcher.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointDispatcher.java @@ -18,18 +18,22 @@ package org.apache.skywalking.oap.server.core.analysis.endpoint; +import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher; +import org.apache.skywalking.oap.server.core.analysis.worker.define.WorkerMapper; import org.apache.skywalking.oap.server.core.receiver.Endpoint; +import org.apache.skywalking.oap.server.library.module.ModuleManager; /** * @author peng-yongsheng */ public class EndpointDispatcher implements SourceDispatcher<Endpoint> { - private final EndpointLatencyAvgAggregator avgAggregator; + private final ModuleManager moduleManager; + private EndpointLatencyAvgAggregateWorker avgAggregator; - public EndpointDispatcher() { - this.avgAggregator = new EndpointLatencyAvgAggregator(); + public EndpointDispatcher(ModuleManager moduleManager) { + this.moduleManager = moduleManager; } @Override public void dispatch(Endpoint source) { @@ -37,7 +41,14 @@ public EndpointDispatcher() { } private void avg(Endpoint source) { - EndpointLatencyAvgIndicator indicator = new EndpointLatencyAvgIndicator(source.getTimeBucket(), source.getId()); + if (avgAggregator == null) { + WorkerMapper workerMapper = moduleManager.find(CoreModule.NAME).getService(WorkerMapper.class); + avgAggregator = (EndpointLatencyAvgAggregateWorker)workerMapper.findInstanceByClass(EndpointLatencyAvgAggregateWorker.class); + } + + EndpointLatencyAvgIndicator indicator = new EndpointLatencyAvgIndicator(); + indicator.setId(source.getId()); + indicator.setTimeBucket(source.getTimeBucket()); indicator.combine(source.getLatency(), 1); avgAggregator.in(indicator); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregateWorker.java new file mode 100644 index 000000000..62d51341e --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregateWorker.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.endpoint; + +import org.apache.skywalking.oap.server.core.analysis.worker.AbstractAggregatorWorker; +import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.slf4j.*; + +/** + * @author peng-yongsheng + */ +public class EndpointLatencyAvgAggregateWorker extends AbstractAggregatorWorker<EndpointLatencyAvgIndicator> { + + private static final Logger logger = LoggerFactory.getLogger(EndpointLatencyAvgAggregateWorker.class); + + private final EndpointLatencyAvgRemoteWorker remoter; + + public EndpointLatencyAvgAggregateWorker(ModuleManager moduleManager) { + super(moduleManager); + this.remoter = new EndpointLatencyAvgRemoteWorker(moduleManager); + } + + @Override protected void onNext(EndpointLatencyAvgIndicator data) { + remoter.in(data); + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java index a200aeba7..e339afa73 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java @@ -18,19 +18,16 @@ package org.apache.skywalking.oap.server.core.analysis.endpoint; +import lombok.*; import org.apache.skywalking.oap.server.core.analysis.indicator.AvgIndicator; +import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData; /** * @author peng-yongsheng */ public class EndpointLatencyAvgIndicator extends AvgIndicator { - private final int id; - - public EndpointLatencyAvgIndicator(long timeBucket, int id) { - super(timeBucket); - this.id = id; - } + @Setter @Getter private int id; @Override public int hashCode() { int result = 17; @@ -55,4 +52,22 @@ public EndpointLatencyAvgIndicator(long timeBucket, int id) { return true; } + + @Override public RemoteData.Builder serialize() { + RemoteData.Builder remoteBuilder = RemoteData.newBuilder(); + remoteBuilder.setDataIntegers(0, getId()); + remoteBuilder.setDataIntegers(1, getCount()); + + remoteBuilder.setDataLongs(0, getTimeBucket()); + remoteBuilder.setDataLongs(1, getSummation()); + return remoteBuilder; + } + + @Override public void deserialize(RemoteData remoteData) { + setId(remoteData.getDataIntegers(0)); + setCount(remoteData.getDataIntegers(1)); + + setTimeBucket(remoteData.getDataLongs(0)); + setSummation(remoteData.getDataLongs(1)); + } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgPersistentWorker.java similarity index 70% rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregator.java rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgPersistentWorker.java index f40990976..e3c5c2306 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregator.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgPersistentWorker.java @@ -18,17 +18,15 @@ package org.apache.skywalking.oap.server.core.analysis.endpoint; -import org.apache.skywalking.oap.server.core.analysis.AbstractAggregator; -import org.slf4j.*; +import org.apache.skywalking.oap.server.core.analysis.worker.AbstractPersistentWorker; +import org.apache.skywalking.oap.server.library.module.ModuleManager; /** * @author peng-yongsheng */ -public class EndpointLatencyAvgAggregator extends AbstractAggregator<EndpointLatencyAvgIndicator> { - - private static final Logger logger = LoggerFactory.getLogger(EndpointLatencyAvgAggregator.class); - - @Override protected void onNext(EndpointLatencyAvgIndicator data) { +public class EndpointLatencyAvgPersistentWorker extends AbstractPersistentWorker<EndpointLatencyAvgIndicator> { + public EndpointLatencyAvgPersistentWorker(ModuleManager moduleManager) { + super(moduleManager); } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgRemoteWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgRemoteWorker.java new file mode 100644 index 000000000..47f75215a --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgRemoteWorker.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.endpoint; + +import org.apache.skywalking.oap.server.core.analysis.worker.AbstractRemoteWorker; +import org.apache.skywalking.oap.server.core.remote.selector.Selector; +import org.apache.skywalking.oap.server.library.module.ModuleManager; + +/** + * @author peng-yongsheng + */ +public class EndpointLatencyAvgRemoteWorker extends AbstractRemoteWorker<EndpointLatencyAvgIndicator> { + + public EndpointLatencyAvgRemoteWorker(ModuleManager moduleManager) { + super(moduleManager); + } + + @Override public Selector selector() { + return Selector.HashCode; + } + + @Override public Class nextWorkerClass() { + return EndpointLatencyAvgPersistentWorker.class; + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java index 54ca9d68c..da065f37f 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java @@ -18,28 +18,26 @@ package org.apache.skywalking.oap.server.core.analysis.indicator; +import lombok.*; import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.*; +import org.apache.skywalking.oap.server.core.remote.selector.Selector; /** * @author peng-yongsheng */ -@IndicatorType +@IndicatorType(selector = Selector.HashCode) public abstract class AvgIndicator extends Indicator { - private long summation; - private int count; - - public AvgIndicator(long timeBucket) { - super(timeBucket); - } + @Getter @Setter private long summation; + @Getter @Setter private int count; @Entrance - public void combine(@SourceFrom long summation, @ConstOne int count) { + public final void combine(@SourceFrom long summation, @ConstOne int count) { this.summation += summation; this.count += count; } - @Override public void combine(Indicator indicator) { + @Override public final void combine(Indicator indicator) { AvgIndicator avgIndicator = (AvgIndicator)indicator; combine(avgIndicator.summation, avgIndicator.count); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java index 66bb57d58..533379e7d 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java @@ -18,7 +18,7 @@ package org.apache.skywalking.oap.server.core.analysis.indicator; -import lombok.Getter; +import lombok.*; import org.apache.skywalking.oap.server.core.analysis.data.StreamData; /** @@ -26,11 +26,7 @@ */ public abstract class Indicator extends StreamData { - @Getter private final long timeBucket; - - public Indicator(long timeBucket) { - this.timeBucket = timeBucket; - } + @Getter @Setter private long timeBucket; public abstract void combine(Indicator indicator); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java index 7e8b8e1a6..46f03450f 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java @@ -19,6 +19,7 @@ package org.apache.skywalking.oap.server.core.analysis.indicator.annotation; import java.lang.annotation.*; +import org.apache.skywalking.oap.server.core.remote.selector.Selector; /** * @author peng-yongsheng @@ -26,4 +27,5 @@ @Target(ElementType.TYPE) @Retention(RetentionPolicy.SOURCE) public @interface IndicatorType { + Selector selector(); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorDefineLoadException.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorDefineLoadException.java new file mode 100644 index 000000000..ca0521a97 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorDefineLoadException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.indicator.define; + +/** + * @author peng-yongsheng + */ +public class IndicatorDefineLoadException extends Exception { + + public IndicatorDefineLoadException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapper.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapper.java new file mode 100644 index 000000000..8513fa2cc --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapper.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.indicator.define; + +import java.io.*; +import java.net.URL; +import java.util.*; +import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator; +import org.apache.skywalking.oap.server.library.module.Service; +import org.slf4j.*; + +/** + * @author peng-yongsheng + */ +public class IndicatorMapper implements Service { + + private static final Logger logger = LoggerFactory.getLogger(IndicatorMapper.class); + + private int id = 0; + private final Map<Class<Indicator>, Integer> classKeyMapping; + private final Map<Integer, Class<Indicator>> idKeyMapping; + + public IndicatorMapper() { + this.classKeyMapping = new HashMap<>(); + this.idKeyMapping = new HashMap<>(); + } + + @SuppressWarnings(value = "unchecked") + public void load() throws IndicatorDefineLoadException { + try { + List<String> indicatorClasses = new LinkedList<>(); + + Enumeration<URL> urlEnumeration = this.getClass().getClassLoader().getResources("META-INF/defines/indicator.def"); + while (urlEnumeration.hasMoreElements()) { + URL definitionFileURL = urlEnumeration.nextElement(); + logger.info("Load indicator definition file url: {}", definitionFileURL.getPath()); + BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(definitionFileURL.openStream())); + Properties properties = new Properties(); + properties.load(bufferedReader); + + Enumeration defineItem = properties.propertyNames(); + while (defineItem.hasMoreElements()) { + String fullNameClass = (String)defineItem.nextElement(); + indicatorClasses.add(fullNameClass); + } + } + + for (String indicatorClassName : indicatorClasses) { + Class<Indicator> indicatorClass = (Class<Indicator>)Class.forName(indicatorClassName); + id++; + classKeyMapping.put(indicatorClass, id); + idKeyMapping.put(id, indicatorClass); + } + } catch (IOException | ClassNotFoundException e) { + throw new IndicatorDefineLoadException(e.getMessage(), e); + } + } + + public int findIdByClass(Class indicatorClass) { + return classKeyMapping.get(indicatorClass); + } + + public Class<Indicator> findClassById(int id) { + return idKeyMapping.get(id); + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/AbstractAggregator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractAggregatorWorker.java similarity index 84% rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/AbstractAggregator.java rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractAggregatorWorker.java index f4e769ade..65d68b440 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/AbstractAggregator.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractAggregatorWorker.java @@ -16,35 +16,36 @@ * */ -package org.apache.skywalking.oap.server.core.analysis; +package org.apache.skywalking.oap.server.core.analysis.worker; import java.util.*; import org.apache.skywalking.apm.commons.datacarrier.DataCarrier; import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer; import org.apache.skywalking.oap.server.core.analysis.data.*; import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator; +import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.slf4j.*; /** * @author peng-yongsheng */ -public abstract class AbstractAggregator<INPUT extends Indicator> { +public abstract class AbstractAggregatorWorker<INPUT extends Indicator> extends Worker<INPUT> { - private static final Logger logger = LoggerFactory.getLogger(AbstractAggregator.class); + private static final Logger logger = LoggerFactory.getLogger(AbstractAggregatorWorker.class); private final DataCarrier<INPUT> dataCarrier; private final MergeDataCache<INPUT> mergeDataCache; private int messageNum; - public AbstractAggregator() { + public AbstractAggregatorWorker(ModuleManager moduleManager) { this.mergeDataCache = new MergeDataCache<>(); this.dataCarrier = new DataCarrier<>(1, 10000); this.dataCarrier.consume(new AggregatorConsumer(this), 1); } - public void in(INPUT message) { - message.setEndOfBatchContext(new EndOfBatchContext(false)); - dataCarrier.produce(message); + @Override public final void in(INPUT input) { + input.setEndOfBatchContext(new EndOfBatchContext(false)); + dataCarrier.produce(input); } private void onWork(INPUT message) { @@ -91,9 +92,9 @@ private void aggregate(INPUT message) { private class AggregatorConsumer implements IConsumer<INPUT> { - private final AbstractAggregator<INPUT> aggregator; + private final AbstractAggregatorWorker<INPUT> aggregator; - private AggregatorConsumer(AbstractAggregator<INPUT> aggregator) { + private AggregatorConsumer(AbstractAggregatorWorker<INPUT> aggregator) { this.aggregator = aggregator; } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java new file mode 100644 index 000000000..bcead6792 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.worker; + +import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator; +import org.apache.skywalking.oap.server.library.module.ModuleManager; + +/** + * @author peng-yongsheng + */ +public abstract class AbstractPersistentWorker<INPUT extends Indicator> extends Worker<INPUT> { + + public AbstractPersistentWorker(ModuleManager moduleManager) { + } + + @Override public final void in(INPUT input) { + + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractRemoteWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractRemoteWorker.java new file mode 100644 index 000000000..e3fb89d0c --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractRemoteWorker.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.worker; + +import org.apache.skywalking.oap.server.core.CoreModule; +import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator; +import org.apache.skywalking.oap.server.core.analysis.worker.define.WorkerMapper; +import org.apache.skywalking.oap.server.core.remote.RemoteSenderService; +import org.apache.skywalking.oap.server.core.remote.selector.Selector; +import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.slf4j.*; + +/** + * @author peng-yongsheng + */ +public abstract class AbstractRemoteWorker<INPUT extends Indicator> extends Worker<INPUT> { + + private static final Logger logger = LoggerFactory.getLogger(AbstractRemoteWorker.class); + + private final ModuleManager moduleManager; + private RemoteSenderService remoteSender; + private WorkerMapper workerMapper; + + public AbstractRemoteWorker(ModuleManager moduleManager) { + this.moduleManager = moduleManager; + } + + @Override public final void in(INPUT input) { + if (remoteSender == null) { + remoteSender = moduleManager.find(CoreModule.NAME).getService(RemoteSenderService.class); + } + if (workerMapper == null) { + workerMapper = moduleManager.find(CoreModule.NAME).getService(WorkerMapper.class); + } + + try { + int nextWorkerId = workerMapper.findIdByClass(nextWorkerClass()); + remoteSender.send(nextWorkerId, input, selector()); + } catch (Throwable e) { + logger.error(e.getMessage(), e); + } + } + + public abstract Class nextWorkerClass(); + + public abstract Selector selector(); +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/Worker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/Worker.java new file mode 100644 index 000000000..53010eedf --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/Worker.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.worker; + +import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator; + +/** + * @author peng-yongsheng + */ +public abstract class Worker<INPUT extends Indicator> { + + public abstract void in(INPUT input); +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/define/WorkerDefineLoadException.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/define/WorkerDefineLoadException.java new file mode 100644 index 000000000..a24a0a7a7 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/define/WorkerDefineLoadException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.worker.define; + +/** + * @author peng-yongsheng + */ +public class WorkerDefineLoadException extends Exception { + + public WorkerDefineLoadException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/define/WorkerMapper.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/define/WorkerMapper.java new file mode 100644 index 000000000..973243056 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/define/WorkerMapper.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.worker.define; + +import java.io.*; +import java.lang.reflect.Constructor; +import java.net.URL; +import java.util.*; +import org.apache.skywalking.oap.server.core.analysis.worker.Worker; +import org.apache.skywalking.oap.server.library.module.*; +import org.slf4j.*; + +/** + * @author peng-yongsheng + */ +public class WorkerMapper implements Service { + + private static final Logger logger = LoggerFactory.getLogger(WorkerMapper.class); + + private int id = 0; + private final ModuleManager moduleManager; + private final Map<Class<Worker>, Integer> classKeyMapping; + private final Map<Integer, Class<Worker>> idKeyMapping; + private final Map<Class<Worker>, Worker> classKeyInstanceMapping; + private final Map<Integer, Worker> idKeyInstanceMapping; + + public WorkerMapper(ModuleManager moduleManager) { + this.moduleManager = moduleManager; + this.classKeyMapping = new HashMap<>(); + this.idKeyMapping = new HashMap<>(); + this.classKeyInstanceMapping = new HashMap<>(); + this.idKeyInstanceMapping = new HashMap<>(); + } + + @SuppressWarnings(value = "unchecked") + public void load() throws WorkerDefineLoadException { + try { + List<String> workerClasses = new LinkedList<>(); + + Enumeration<URL> urlEnumeration = this.getClass().getClassLoader().getResources("META-INF/defines/worker.def"); + while (urlEnumeration.hasMoreElements()) { + URL definitionFileURL = urlEnumeration.nextElement(); + logger.info("Load worker definition file url: {}", definitionFileURL.getPath()); + BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(definitionFileURL.openStream())); + Properties properties = new Properties(); + properties.load(bufferedReader); + + Enumeration defineItem = properties.propertyNames(); + while (defineItem.hasMoreElements()) { + String fullNameClass = (String)defineItem.nextElement(); + workerClasses.add(fullNameClass); + } + } + + for (String workerClassName : workerClasses) { + Class<Worker> workerClass = (Class<Worker>)Class.forName(workerClassName); + id++; + classKeyMapping.put(workerClass, id); + idKeyMapping.put(id, workerClass); + + Constructor<Worker> constructor = workerClass.getDeclaredConstructor(ModuleManager.class); + Worker worker = constructor.newInstance(moduleManager); + classKeyInstanceMapping.put(workerClass, worker); + idKeyInstanceMapping.put(id, worker); + } + } catch (Exception e) { + throw new WorkerDefineLoadException(e.getMessage(), e); + } + } + + public int findIdByClass(Class workerClass) { + return classKeyMapping.get(workerClass); + } + + public Class<Worker> findClassById(int id) { + return idKeyMapping.get(id); + } + + public Worker findInstanceByClass(Class workerClass) { + return classKeyInstanceMapping.get(workerClass); + } + + public Worker findInstanceById(int id) { + return idKeyInstanceMapping.get(id); + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java index 53c4ea0cc..1e85b3252 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java @@ -19,48 +19,29 @@ package org.apache.skywalking.oap.server.core.cluster; import java.util.Objects; +import lombok.*; /** * @author peng-yongsheng */ -public class RemoteInstance { +public class RemoteInstance implements Comparable<RemoteInstance> { - private String host; - private int port; - private boolean self = false; + @Getter private final String host; + @Getter private final int port; + @Getter @Setter private boolean isSelf = false; - public RemoteInstance() { - - } - - public RemoteInstance(String host, int port, boolean self) { + public RemoteInstance(String host, int port, boolean isSelf) { this.host = host; this.port = port; - this.self = self; - } - - public String getHost() { - return host; - } - - public void setHost(String host) { - this.host = host; - } - - public int getPort() { - return port; - } - - public void setPort(int port) { - this.port = port; + this.isSelf = isSelf; } - public boolean isSelf() { - return self; + @Override public int compareTo(RemoteInstance o) { + return toString().compareTo(toString()); } - public void setSelf(boolean self) { - this.self = self; + @Override public String toString() { + return host + String.valueOf(port); } @Override public boolean equals(Object o) { diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/receiver/SourceReceiverImpl.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/receiver/SourceReceiverImpl.java index 4d04a31dd..a6cf211ec 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/receiver/SourceReceiverImpl.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/receiver/SourceReceiverImpl.java @@ -19,6 +19,7 @@ package org.apache.skywalking.oap.server.core.receiver; import org.apache.skywalking.oap.server.core.analysis.DispatcherManager; +import org.apache.skywalking.oap.server.library.module.ModuleManager; /** * @author peng-yongsheng @@ -27,8 +28,8 @@ private final DispatcherManager dispatcherManager; - public SourceReceiverImpl() { - this.dispatcherManager = new DispatcherManager(); + public SourceReceiverImpl(ModuleManager moduleManager) { + this.dispatcherManager = new DispatcherManager(moduleManager); } @Override public void receive(Source source) { diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Deserializable.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Deserializable.java new file mode 100644 index 000000000..b2b3a7ecb --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Deserializable.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.remote; + +import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData; + +/** + * @author peng-yongsheng + */ +public interface Deserializable { + void deserialize(RemoteData remoteData); +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.java new file mode 100644 index 000000000..b2ce1c45e --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.remote; + +import org.apache.skywalking.oap.server.core.CoreModule; +import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator; +import org.apache.skywalking.oap.server.core.remote.client.*; +import org.apache.skywalking.oap.server.core.remote.selector.*; +import org.apache.skywalking.oap.server.library.module.*; + +/** + * @author peng-yongsheng + */ +public class RemoteSenderService implements Service { + + private final ModuleManager moduleManager; + private final HashCodeSelector hashCodeSelector; + private final ForeverFirstSelector foreverFirstSelector; + private final RollingSelector rollingSelector; + + public RemoteSenderService(ModuleManager moduleManager) { + this.moduleManager = moduleManager; + this.hashCodeSelector = new HashCodeSelector(); + this.foreverFirstSelector = new ForeverFirstSelector(); + this.rollingSelector = new RollingSelector(); + } + + public void send(int nextWorkId, Indicator indicator, Selector selector) { + RemoteClientManager clientManager = moduleManager.find(CoreModule.NAME).getService(RemoteClientManager.class); + + RemoteClient remoteClient; + switch (selector) { + case HashCode: + remoteClient = hashCodeSelector.select(clientManager.getRemoteClient(), indicator); + remoteClient.push(nextWorkId, indicator); + case Rolling: + remoteClient = rollingSelector.select(clientManager.getRemoteClient(), indicator); + remoteClient.push(nextWorkId, indicator); + case ForeverFirst: + remoteClient = foreverFirstSelector.select(clientManager.getRemoteClient(), indicator); + remoteClient.push(nextWorkId, indicator); + } + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java new file mode 100644 index 000000000..892e9516d --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.remote; + +import io.grpc.stub.StreamObserver; +import org.apache.skywalking.oap.server.core.CoreModule; +import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator; +import org.apache.skywalking.oap.server.core.analysis.indicator.define.IndicatorMapper; +import org.apache.skywalking.oap.server.core.analysis.worker.define.WorkerMapper; +import org.apache.skywalking.oap.server.core.remote.grpc.proto.*; +import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler; +import org.slf4j.*; + +/** + * @author peng-yongsheng + */ +public class RemoteServiceHandler extends RemoteServiceGrpc.RemoteServiceImplBase implements GRPCHandler { + + private static final Logger logger = LoggerFactory.getLogger(RemoteServiceHandler.class); + + private final IndicatorMapper indicatorMapper; + private final WorkerMapper workerMapper; + + public RemoteServiceHandler(ModuleManager moduleManager) { + this.indicatorMapper = moduleManager.find(CoreModule.NAME).getService(IndicatorMapper.class); + this.workerMapper = moduleManager.find(CoreModule.NAME).getService(WorkerMapper.class); + } + + @Override public StreamObserver<RemoteMessage> call(StreamObserver<Empty> responseObserver) { + return new StreamObserver<RemoteMessage>() { + @Override public void onNext(RemoteMessage message) { + int indicatorId = message.getIndicatorId(); + int nextWorkerId = message.getNextWorkerId(); + RemoteData remoteData = message.getRemoteData(); + + Class<Indicator> indicatorClass = indicatorMapper.findClassById(indicatorId); + try { + indicatorClass.newInstance().deserialize(remoteData); + } catch (InstantiationException | IllegalAccessException e) { + logger.warn(e.getMessage()); + } + } + + @Override public void onError(Throwable throwable) { + logger.error(throwable.getMessage(), throwable); + } + + @Override public void onCompleted() { + responseObserver.onNext(Empty.newBuilder().build()); + responseObserver.onCompleted(); + } + }; + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/RemoteData.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Serializable.java similarity index 80% rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/RemoteData.java rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Serializable.java index fe4f1f5b5..1a7dbede3 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/RemoteData.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Serializable.java @@ -16,11 +16,13 @@ * */ -package org.apache.skywalking.oap.server.core.analysis.data; +package org.apache.skywalking.oap.server.core.remote; + +import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData; /** * @author peng-yongsheng */ -public interface RemoteData { - String selectKey(); +public interface Serializable { + RemoteData.Builder serialize(); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java new file mode 100644 index 000000000..75a5952ad --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.remote.client; + +import io.grpc.stub.StreamObserver; +import java.util.List; +import org.apache.skywalking.apm.commons.datacarrier.DataCarrier; +import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy; +import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer; +import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator; +import org.apache.skywalking.oap.server.core.analysis.indicator.define.IndicatorMapper; +import org.apache.skywalking.oap.server.core.cluster.RemoteInstance; +import org.apache.skywalking.oap.server.core.remote.grpc.proto.*; +import org.apache.skywalking.oap.server.library.client.grpc.GRPCClient; +import org.slf4j.*; + +/** + * @author peng-yongsheng + */ +public class GRPCRemoteClient implements RemoteClient, Comparable<GRPCRemoteClient> { + + private static final Logger logger = LoggerFactory.getLogger(GRPCRemoteClient.class); + + private final GRPCClient client; + private final DataCarrier<RemoteMessage> carrier; + private final IndicatorMapper indicatorMapper; + + public GRPCRemoteClient(IndicatorMapper indicatorMapper, RemoteInstance remoteInstance, int channelSize, + int bufferSize) { + this.indicatorMapper = indicatorMapper; + this.client = new GRPCClient(remoteInstance.getHost(), remoteInstance.getPort()); + this.carrier = new DataCarrier<>(channelSize, bufferSize); + this.carrier.setBufferStrategy(BufferStrategy.BLOCKING); + this.carrier.consume(new RemoteMessageConsumer(), 1); + } + + @Override public void push(int nextWorkerId, Indicator indicator) { + int indicatorId = indicatorMapper.findIdByClass(indicator.getClass()); + RemoteMessage.Builder builder = RemoteMessage.newBuilder(); + builder.setNextWorkerId(nextWorkerId); + builder.setIndicatorId(indicatorId); + builder.setRemoteData(indicator.serialize()); + + this.carrier.produce(builder.build()); + } + + class RemoteMessageConsumer implements IConsumer<RemoteMessage> { + @Override public void init() { + } + + @Override public void consume(List<RemoteMessage> remoteMessages) { + StreamObserver<RemoteMessage> streamObserver = createStreamObserver(); + for (RemoteMessage remoteMessage : remoteMessages) { + streamObserver.onNext(remoteMessage); + } + streamObserver.onCompleted(); + } + + @Override public void onError(List<RemoteMessage> remoteMessages, Throwable t) { + logger.error(t.getMessage(), t); + } + + @Override public void onExit() { + } + } + + private StreamObserver<RemoteMessage> createStreamObserver() { + RemoteServiceGrpc.RemoteServiceStub stub = RemoteServiceGrpc.newStub(client.getChannel()); + + StreamStatus status = new StreamStatus(false); + return stub.call(new StreamObserver<Empty>() { + @Override public void onNext(Empty empty) { + } + + @Override public void onError(Throwable throwable) { + logger.error(throwable.getMessage(), throwable); + } + + @Override public void onCompleted() { + status.finished(); + } + }); + } + + class StreamStatus { + + private final Logger logger = LoggerFactory.getLogger(StreamStatus.class); + + private volatile boolean status; + + StreamStatus(boolean status) { + this.status = status; + } + + public boolean isFinish() { + return status; + } + + void finished() { + this.status = true; + } + + /** + * @param maxTimeout max wait time, milliseconds. + */ + public void wait4Finish(long maxTimeout) { + long time = 0; + while (!status) { + if (time > maxTimeout) { + break; + } + try2Sleep(5); + time += 5; + } + } + + /** + * Try to sleep, and ignore the {@link InterruptedException} + * + * @param millis the length of time to sleep in milliseconds + */ + private void try2Sleep(long millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + } + } + } + + @Override public int compareTo(GRPCRemoteClient o) { + return this.client.toString().compareTo(o.client.toString()); + } + + public String getHost() { + return client.getHost(); + } + + public int getPort() { + return client.getPort(); + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClient.java new file mode 100644 index 000000000..98d23f101 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClient.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.remote.client; + +import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator; + +/** + * @author peng-yongsheng + */ +public interface RemoteClient { + + String getHost(); + + int getPort(); + + void push(int nextWorkerId, Indicator indicator); +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java new file mode 100644 index 000000000..4d2049bab --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.remote.client; + +import java.util.*; +import java.util.concurrent.*; +import org.apache.skywalking.oap.server.core.analysis.indicator.define.IndicatorMapper; +import org.apache.skywalking.oap.server.core.cluster.*; +import org.apache.skywalking.oap.server.library.module.*; +import org.slf4j.*; + +/** + * @author peng-yongsheng + */ +public class RemoteClientManager implements Service { + + private static final Logger logger = LoggerFactory.getLogger(RemoteClientManager.class); + + private final ModuleManager moduleManager; + private IndicatorMapper indicatorMapper; + private ClusterNodesQuery clusterNodesQuery; + private final List<RemoteClient> clientsA; + private final List<RemoteClient> clientsB; + private List<RemoteClient> usingClients; + + public RemoteClientManager(ModuleManager moduleManager) { + this.moduleManager = moduleManager; + this.clientsA = new LinkedList<>(); + this.clientsB = new LinkedList<>(); + this.usingClients = clientsA; + } + + public void start() { + this.clusterNodesQuery = moduleManager.find(ClusterModule.NAME).getService(ClusterNodesQuery.class); + this.indicatorMapper = moduleManager.find(ClusterModule.NAME).getService(IndicatorMapper.class); + Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::refresh, 1, 2, TimeUnit.SECONDS); + } + + private void refresh() { + List<RemoteInstance> instanceList = clusterNodesQuery.queryRemoteNodes(); + Collections.sort(instanceList); + + if (!compare(instanceList)) { + buildNewClients(instanceList); + } + } + + public List<RemoteClient> getRemoteClient() { + return usingClients; + } + + private List<RemoteClient> getFreeClients() { + if (usingClients.equals(clientsA)) { + return clientsB; + } else { + return clientsA; + } + } + + private void switchCurrentClients() { + if (usingClients.equals(clientsA)) { + usingClients = clientsB; + } else { + usingClients = clientsA; + } + } + + private void buildNewClients(List<RemoteInstance> remoteInstances) { + getFreeClients().clear(); + + Map<String, RemoteClient> currentClientsMap = new HashMap<>(); + this.usingClients.forEach(remoteClient -> { + currentClientsMap.put(address(remoteClient.getHost(), remoteClient.getPort()), remoteClient); + }); + + remoteInstances.forEach(remoteInstance -> { + String address = address(remoteInstance.getHost(), remoteInstance.getPort()); + RemoteClient client; + if (currentClientsMap.containsKey(address)) { + client = currentClientsMap.get(address); + } else { + if (remoteInstance.isSelf()) { + client = new SelfRemoteClient(moduleManager, remoteInstance.getHost(), remoteInstance.getPort()); + } else { + client = new GRPCRemoteClient(indicatorMapper, remoteInstance, 1, 3000); + } + } + getFreeClients().add(client); + }); + + switchCurrentClients(); + } + + private boolean compare(List<RemoteInstance> remoteInstances) { + if (usingClients.size() == remoteInstances.size()) { + for (int i = 0; i < usingClients.size(); i++) { + if (!address(usingClients.get(i).getHost(), usingClients.get(i).getPort()).equals(address(remoteInstances.get(i).getHost(), remoteInstances.get(i).getPort()))) { + return false; + } + } + return true; + } else { + return false; + } + } + + private String address(String host, int port) { + return host + String.valueOf(port); + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java new file mode 100644 index 000000000..109ff7798 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.remote.client; + +import org.apache.skywalking.oap.server.core.CoreModule; +import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator; +import org.apache.skywalking.oap.server.core.analysis.worker.define.WorkerMapper; +import org.apache.skywalking.oap.server.library.module.ModuleManager; + +/** + * @author peng-yongsheng + */ +public class SelfRemoteClient implements RemoteClient { + + private final ModuleManager moduleManager; + private final String host; + private final int port; + + public SelfRemoteClient(ModuleManager moduleManager, String host, int port) { + this.moduleManager = moduleManager; + this.host = host; + this.port = port; + } + + @Override public String getHost() { + return host; + } + + @Override public int getPort() { + return port; + } + + @Override public void push(int nextWorkerId, Indicator indicator) { + WorkerMapper workerMapper = moduleManager.find(CoreModule.NAME).getService(WorkerMapper.class); + workerMapper.findInstanceById(nextWorkerId).in(indicator); + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/ForeverFirstSelector.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/ForeverFirstSelector.java new file mode 100644 index 000000000..e28f2031f --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/ForeverFirstSelector.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.remote.selector; + +import java.util.List; +import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator; +import org.apache.skywalking.oap.server.core.remote.client.RemoteClient; +import org.slf4j.*; + +/** + * @author peng-yongsheng + */ +public class ForeverFirstSelector implements RemoteClientSelector { + + private static final Logger logger = LoggerFactory.getLogger(ForeverFirstSelector.class); + + @Override public RemoteClient select(List<RemoteClient> clients, Indicator indicator) { + if (logger.isDebugEnabled()) { + logger.debug("clients size: {}", clients.size()); + } + return clients.get(0); + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/HashCodeSelector.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/HashCodeSelector.java new file mode 100644 index 000000000..3d256b5ea --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/HashCodeSelector.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.remote.selector; + +import java.util.List; +import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator; +import org.apache.skywalking.oap.server.core.remote.client.RemoteClient; + +/** + * @author peng-yongsheng + */ +public class HashCodeSelector implements RemoteClientSelector { + + @Override public RemoteClient select(List<RemoteClient> clients, Indicator indicator) { + int size = clients.size(); + int selectIndex = Math.abs(indicator.hashCode()) % size; + return clients.get(selectIndex); + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/RemoteClientSelector.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/RemoteClientSelector.java new file mode 100644 index 000000000..438cbad0b --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/RemoteClientSelector.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.remote.selector; + +import java.util.List; +import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator; +import org.apache.skywalking.oap.server.core.remote.client.RemoteClient; + +/** + * @author peng-yongsheng + */ +public interface RemoteClientSelector { + RemoteClient select(List<RemoteClient> clients, Indicator indicator); +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/RollingSelector.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/RollingSelector.java new file mode 100644 index 000000000..c74e8c364 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/RollingSelector.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.remote.selector; + +import java.util.List; +import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator; +import org.apache.skywalking.oap.server.core.remote.client.RemoteClient; + +/** + * @author peng-yongsheng + */ +public class RollingSelector implements RemoteClientSelector { + + private int index = 0; + + @Override public RemoteClient select(List<RemoteClient> clients, Indicator indicator) { + int size = clients.size(); + index++; + int selectIndex = Math.abs(index) % size; + + if (index == Integer.MAX_VALUE) { + index = 0; + } + return clients.get(selectIndex); + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Selector.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/Selector.java similarity index 93% rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Selector.java rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/Selector.java index b7ac6ed00..bd5164896 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/Selector.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/Selector.java @@ -16,7 +16,7 @@ * */ -package org.apache.skywalking.oap.server.core.remote; +package org.apache.skywalking.oap.server.core.remote.selector; /** * @author peng-yongsheng diff --git a/oap-server/server-core/src/main/proto/RemoteService.proto b/oap-server/server-core/src/main/proto/RemoteService.proto new file mode 100644 index 000000000..410b0c31e --- /dev/null +++ b/oap-server/server-core/src/main/proto/RemoteService.proto @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "org.apache.skywalking.oap.server.core.remote.grpc.proto"; + +service RemoteService { + rpc call (stream RemoteMessage) returns (Empty) { + } +} + +message RemoteMessage { + int32 nextWorkerId = 1; + int32 indicatorId = 2; + RemoteData remoteData = 3; +} + +message RemoteData { + repeated string dataStrings = 1; + repeated int64 dataLongs = 2; + repeated double dataDoubles = 3; + repeated int32 dataIntegers = 4; +} + +message Empty { +} \ No newline at end of file diff --git a/oap-server/server-core/src/main/resources/META-INF/defines/indicator.def b/oap-server/server-core/src/main/resources/META-INF/defines/indicator.def new file mode 100644 index 000000000..ce6b6ddc3 --- /dev/null +++ b/oap-server/server-core/src/main/resources/META-INF/defines/indicator.def @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# + +org.apache.skywalking.oap.server.core.analysis.endpoint.EndpointLatencyAvgIndicator \ No newline at end of file diff --git a/oap-server/server-core/src/main/resources/META-INF/defines/worker.def b/oap-server/server-core/src/main/resources/META-INF/defines/worker.def new file mode 100644 index 000000000..5afc8db52 --- /dev/null +++ b/oap-server/server-core/src/main/resources/META-INF/defines/worker.def @@ -0,0 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# + +org.apache.skywalking.oap.server.core.analysis.endpoint.EndpointLatencyAvgAggregateWorker +org.apache.skywalking.oap.server.core.analysis.endpoint.EndpointLatencyAvgRemoteWorker +org.apache.skywalking.oap.server.core.analysis.endpoint.EndpointLatencyAvgPersistentWorker \ No newline at end of file diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapperTestCase.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapperTestCase.java new file mode 100644 index 000000000..4b58eb567 --- /dev/null +++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapperTestCase.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.indicator.define; + +import org.junit.*; + +/** + * @author peng-yongsheng + */ +public class IndicatorMapperTestCase { + + @Test + public void test() throws IndicatorDefineLoadException { + IndicatorMapper mapper = new IndicatorMapper(); + mapper.load(); + + Assert.assertEquals(1, mapper.findIdByClass(TestAvgIndicator.class)); + Assert.assertEquals(TestAvgIndicator.class, mapper.findClassById(1)); + } +} diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/TestAvgIndicator.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/TestAvgIndicator.java new file mode 100644 index 000000000..17d1189ee --- /dev/null +++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/TestAvgIndicator.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.indicator.define; + +import lombok.*; +import org.apache.skywalking.oap.server.core.analysis.indicator.AvgIndicator; +import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData; + +/** + * @author peng-yongsheng + */ +public class TestAvgIndicator extends AvgIndicator { + + @Setter @Getter private int id; + + @Override public RemoteData.Builder serialize() { + return null; + } + + @Override public void deserialize(RemoteData remoteData) { + } +} diff --git a/oap-server/server-core/src/test/resources/META-INF/defines/indicator.def b/oap-server/server-core/src/test/resources/META-INF/defines/indicator.def new file mode 100644 index 000000000..97491fffb --- /dev/null +++ b/oap-server/server-core/src/test/resources/META-INF/defines/indicator.def @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# + +org.apache.skywalking.oap.server.core.analysis.indicator.define.TestAvgIndicator \ No newline at end of file diff --git a/oap-server/server-core/src/test/resources/META-INF/defines/worker.def b/oap-server/server-core/src/test/resources/META-INF/defines/worker.def new file mode 100644 index 000000000..33ebbb1f3 --- /dev/null +++ b/oap-server/server-core/src/test/resources/META-INF/defines/worker.def @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# \ No newline at end of file diff --git a/oap-server/server-core/src/test/resources/log4j2.xml b/oap-server/server-core/src/test/resources/log4j2.xml new file mode 100644 index 000000000..6eb5b3fb9 --- /dev/null +++ b/oap-server/server-core/src/test/resources/log4j2.xml @@ -0,0 +1,31 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + ~ + --> + +<Configuration status="DEBUG"> + <Appenders> + <Console name="Console" target="SYSTEM_OUT"> + <PatternLayout charset="UTF-8" pattern="%d - %c -%-4r [%t] %-5p %x - %m%n"/> + </Console> + </Appenders> + <Loggers> + <Root level="DEBUG"> + <AppenderRef ref="Console"/> + </Root> + </Loggers> +</Configuration> diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java index 2416be639..188fae35c 100644 --- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java +++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java @@ -19,6 +19,7 @@ package org.apache.skywalking.oap.server.library.client.grpc; import io.grpc.*; +import lombok.Getter; import org.apache.skywalking.oap.server.library.client.Client; /** @@ -26,9 +27,9 @@ */ public class GRPCClient implements Client { - private final String host; + @Getter private final String host; - private final int port; + @Getter private final int port; private ManagedChannel channel; diff --git a/oap-server/server-library/library-module/src/main/java/org/apache/skywalking/oap/server/library/module/ModuleProvider.java b/oap-server/server-library/library-module/src/main/java/org/apache/skywalking/oap/server/library/module/ModuleProvider.java index 7596fb60b..9873303c0 100644 --- a/oap-server/server-library/library-module/src/main/java/org/apache/skywalking/oap/server/library/module/ModuleProvider.java +++ b/oap-server/server-library/library-module/src/main/java/org/apache/skywalking/oap/server/library/module/ModuleProvider.java @@ -18,8 +18,7 @@ package org.apache.skywalking.oap.server.library.module; -import java.util.HashMap; -import java.util.Map; +import java.util.*; /** * The <code>ModuleProvider</code> is an implementation of a {@link ModuleDefine}. diff --git a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/mesh-receiver-provider/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/provider/MeshGRPCHandler.java b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/mesh-receiver-provider/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/provider/MeshGRPCHandler.java index e9764ce56..889382106 100644 --- a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/mesh-receiver-provider/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/provider/MeshGRPCHandler.java +++ b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/mesh-receiver-provider/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/provider/MeshGRPCHandler.java @@ -35,8 +35,6 @@ if (logger.isDebugEnabled()) { logger.debug("Received mesh metric: {}", metric); } - - } @Override public void onError(Throwable throwable) { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services