This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/master by this push: new c019535 Fix mesh telemetry performance issue and adjust default thread number (#2261) c019535 is described below commit c019535ee92affa79f713008fe046f2353b31033 Author: 吴晟 Wu Sheng <wu.sh...@foxmail.com> AuthorDate: Wed Feb 20 08:29:17 2019 +0800 Fix mesh telemetry performance issue and adjust default thread number (#2261) * Try adjustment. * Remove heartbeat. * Make heartbeat back to work. And default threadpool size more sense. * Make L2 to less than before. * Make instance heartbeat works. * Try L1 aggregation thread = core * 2 * 2. --- .../apm/commons/datacarrier/buffer/Channels.java | 8 ++--- .../datacarrier/consumer/BulkConsumePool.java | 8 ++--- .../consumer/MultipleChannelsConsumer.java | 3 +- .../analysis/worker/IndicatorAggregateWorker.java | 5 +-- .../analysis/worker/IndicatorPersistentWorker.java | 2 +- .../register/worker/RegisterDistinctWorker.java | 6 +++- .../register/worker/RegisterPersistentWorker.java | 6 +++- .../server/library/server/jetty/JettyServer.java | 19 ++--------- .../receiver/mesh/TelemetryDataDispatcher.java | 37 ++++++++++------------ 9 files changed, 38 insertions(+), 56 deletions(-) diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Channels.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Channels.java index 2a83ba0..e0f3026 100644 --- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Channels.java +++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Channels.java @@ -29,6 +29,7 @@ public class Channels<T> { private final Buffer<T>[] bufferChannels; private IDataPartitioner<T> dataPartitioner; private BufferStrategy strategy; + private final long size; public Channels(int channelSize, int bufferSize, IDataPartitioner<T> partitioner, BufferStrategy strategy) { this.dataPartitioner = partitioner; @@ -37,6 +38,7 @@ public class Channels<T> { for (int i = 0; i < channelSize; i++) { bufferChannels[i] = new Buffer<T>(bufferSize, strategy); } + size = channelSize * bufferSize; } public boolean save(T data) { @@ -81,12 +83,8 @@ public class Channels<T> { return this.bufferChannels.length; } - public int getBufferSize() { - return bufferChannels[0].getBufferSize(); - } - public long size() { - return (long)getChannelSize() * getBufferSize(); + return size; } public Buffer<T> getBuffer(int index) { diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java index 798a601..1b95c1b 100644 --- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java +++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java @@ -66,7 +66,7 @@ public class BulkConsumePool implements ConsumerPool { for (int i = 1; i < allConsumers.size(); i++) { MultipleChannelsConsumer option = allConsumers.get(i); if (option.size() < winner.size()) { - return option; + winner = option; } } return winner; @@ -115,11 +115,7 @@ public class BulkConsumePool implements ConsumerPool { } public static int recommendMaxSize() { - int processorNum = Runtime.getRuntime().availableProcessors(); - if (processorNum > 1) { - processorNum -= 1; - } - return processorNum; + return Runtime.getRuntime().availableProcessors() * 2; } } } diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java index 1679302..1877446 100644 --- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java +++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java @@ -74,9 +74,8 @@ public class MultipleChannelsConsumer extends Thread { Buffer buffer = target.channels.getBuffer(i); consumeList.addAll(buffer.obtain()); } - hasData = consumeList.size() > 0; - if (consumeList.size() > 0) { + if (hasData = consumeList.size() > 0) { try { target.consumer.consume(consumeList); } catch (Throwable t) { diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java index 148534a..f0c3886 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java @@ -47,7 +47,8 @@ public class IndicatorAggregateWorker extends AbstractWorker<Indicator> { private final String modelName; private CounterMetric aggregationCounter; - IndicatorAggregateWorker(ModuleManager moduleManager, int workerId, AbstractWorker<Indicator> nextWorker, String modelName) { + IndicatorAggregateWorker(ModuleManager moduleManager, int workerId, AbstractWorker<Indicator> nextWorker, + String modelName) { super(workerId); this.modelName = modelName; this.nextWorker = nextWorker; @@ -55,7 +56,7 @@ public class IndicatorAggregateWorker extends AbstractWorker<Indicator> { this.dataCarrier = new DataCarrier<>("IndicatorAggregateWorker." + modelName, 1, 10000); String name = "INDICATOR_L1_AGGREGATION"; - BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, BulkConsumePool.Creator.recommendMaxSize(), 20); + BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, BulkConsumePool.Creator.recommendMaxSize() * 2, 20); try { ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator); } catch (Exception e) { diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java index e90d98a..0e49e16 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java @@ -58,7 +58,7 @@ public class IndicatorPersistentWorker extends PersistenceWorker<Indicator, Merg this.nextWorker = nextWorker; String name = "INDICATOR_L2_AGGREGATION"; - int size = BulkConsumePool.Creator.recommendMaxSize() / 4; + int size = BulkConsumePool.Creator.recommendMaxSize() / 8; if (size == 0) { size = 1; } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java index ae4bc67..158f01f 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java @@ -45,7 +45,11 @@ public class RegisterDistinctWorker extends AbstractWorker<RegisterSource> { this.sources = new HashMap<>(); this.dataCarrier = new DataCarrier<>(1, 1000); String name = "REGISTER_L1"; - BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 200); + int size = BulkConsumePool.Creator.recommendMaxSize() / 8; + if (size == 0) { + size = 1; + } + BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, size, 200); try { ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator); } catch (Exception e) { diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java index 1ccd21d..2bd11e5 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java @@ -55,7 +55,11 @@ public class RegisterPersistentWorker extends AbstractWorker<RegisterSource> { this.dataCarrier = new DataCarrier<>("IndicatorPersistentWorker." + modelName, 1, 1000); String name = "REGISTER_L2"; - BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 200); + int size = BulkConsumePool.Creator.recommendMaxSize() / 8; + if (size == 0) { + size = 1; + } + BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, size, 200); try { ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator); } catch (Exception e) { diff --git a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/jetty/JettyServer.java b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/jetty/JettyServer.java index 97f4c85..af8c91e 100644 --- a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/jetty/JettyServer.java +++ b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/jetty/JettyServer.java @@ -18,12 +18,10 @@ package org.apache.skywalking.oap.server.library.server.jetty; +import java.net.InetSocketAddress; import java.util.Objects; -import org.apache.skywalking.oap.server.library.server.Server; import org.apache.skywalking.oap.server.library.server.*; -import org.eclipse.jetty.server.*; import org.eclipse.jetty.servlet.*; -import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.slf4j.*; /** @@ -63,20 +61,7 @@ public class JettyServer implements Server { @Override public void initialize() { - QueuedThreadPool threadPool = new QueuedThreadPool(); - if (selectorNum > 0) { - threadPool.setMaxThreads(selectorNum * 2 + 2); - } - - server = new org.eclipse.jetty.server.Server(threadPool); - - HttpConfiguration httpConfig = new HttpConfiguration(); - ServerConnector http = new ServerConnector(server, null, null, null, - 1, selectorNum, new HttpConnectionFactory(httpConfig)); - http.setPort(port); - http.setHost(host); - - server.addConnector(http); + server = new org.eclipse.jetty.server.Server(new InetSocketAddress(host, port)); servletContextHandler = new ServletContextHandler(ServletContextHandler.NO_SESSIONS); servletContextHandler.setContextPath(contextPath); diff --git a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java index 4f3e54b..b0c1326 100644 --- a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java +++ b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java @@ -20,27 +20,15 @@ package org.apache.skywalking.aop.server.receiver.mesh; import java.util.Objects; import org.apache.logging.log4j.util.Strings; -import org.apache.skywalking.apm.network.servicemesh.Protocol; -import org.apache.skywalking.apm.network.servicemesh.ServiceMeshMetric; +import org.apache.skywalking.apm.network.servicemesh.*; import org.apache.skywalking.oap.server.core.CoreModule; -import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache; -import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache; +import org.apache.skywalking.oap.server.core.cache.*; import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory; -import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister; -import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister; -import org.apache.skywalking.oap.server.core.source.All; -import org.apache.skywalking.oap.server.core.source.DetectPoint; -import org.apache.skywalking.oap.server.core.source.Endpoint; -import org.apache.skywalking.oap.server.core.source.RequestType; -import org.apache.skywalking.oap.server.core.source.Service; -import org.apache.skywalking.oap.server.core.source.ServiceInstance; -import org.apache.skywalking.oap.server.core.source.ServiceInstanceRelation; -import org.apache.skywalking.oap.server.core.source.ServiceRelation; -import org.apache.skywalking.oap.server.core.source.SourceReceiver; +import org.apache.skywalking.oap.server.core.register.service.*; +import org.apache.skywalking.oap.server.core.source.*; import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.util.TimeBucketUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.slf4j.*; /** * TelemetryDataDispatcher processes the {@link ServiceMeshMetric} format telemetry data, transfers it to source @@ -103,22 +91,29 @@ public class TelemetryDataDispatcher { private static void heartbeat(ServiceMeshMetricDataDecorator decorator, long minuteTimeBucket) { ServiceMeshMetric metric = decorator.getMetric(); + int heartbeatCycle = 10000; // source - SERVICE_INSTANCE_INVENTORY_REGISTER.heartbeat(metric.getSourceServiceInstanceId(), metric.getEndTime()); int instanceId = metric.getSourceServiceInstanceId(); ServiceInstanceInventory serviceInstanceInventory = SERVICE_INSTANCE_CACHE.get(instanceId); if (Objects.nonNull(serviceInstanceInventory)) { - SERVICE_INVENTORY_REGISTER.heartbeat(serviceInstanceInventory.getServiceId(), metric.getEndTime()); + if (metric.getEndTime() - serviceInstanceInventory.getHeartbeatTime() > heartbeatCycle) { + // trigger heartbeat every 10s. + SERVICE_INSTANCE_INVENTORY_REGISTER.heartbeat(metric.getSourceServiceInstanceId(), metric.getEndTime()); + SERVICE_INVENTORY_REGISTER.heartbeat(serviceInstanceInventory.getServiceId(), metric.getEndTime()); + } } else { logger.warn("Can't found service by service instance id from cache, service instance id is: {}", instanceId); } // dest - SERVICE_INSTANCE_INVENTORY_REGISTER.heartbeat(metric.getDestServiceInstanceId(), metric.getEndTime()); instanceId = metric.getDestServiceInstanceId(); serviceInstanceInventory = SERVICE_INSTANCE_CACHE.get(instanceId); if (Objects.nonNull(serviceInstanceInventory)) { - SERVICE_INVENTORY_REGISTER.heartbeat(serviceInstanceInventory.getServiceId(), metric.getEndTime()); + if (metric.getEndTime() - serviceInstanceInventory.getHeartbeatTime() > heartbeatCycle) { + // trigger heartbeat every 10s. + SERVICE_INSTANCE_INVENTORY_REGISTER.heartbeat(metric.getDestServiceInstanceId(), metric.getEndTime()); + SERVICE_INVENTORY_REGISTER.heartbeat(serviceInstanceInventory.getServiceId(), metric.getEndTime()); + } } else { logger.warn("Can't found service by service instance id from cache, service instance id is: {}", instanceId); }