This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch thread-model in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/thread-model by this push: new f12f07f Make heartbeat back to work. And default threadpool size more sense. f12f07f is described below commit f12f07f6412bf50ca953b830a7a73a036ad87ee2 Author: Wu Sheng <wu.sh...@foxmail.com> AuthorDate: Tue Feb 19 23:14:16 2019 +0800 Make heartbeat back to work. And default threadpool size more sense. --- .../datacarrier/consumer/BulkConsumePool.java | 6 +--- .../register/worker/RegisterDistinctWorker.java | 6 +++- .../register/worker/RegisterPersistentWorker.java | 6 +++- .../server/library/server/jetty/JettyServer.java | 3 -- .../receiver/mesh/TelemetryDataDispatcher.java | 34 +++++++++------------- 5 files changed, 25 insertions(+), 30 deletions(-) diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java index 6befe2c..1b95c1b 100644 --- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java +++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java @@ -115,11 +115,7 @@ public class BulkConsumePool implements ConsumerPool { } public static int recommendMaxSize() { - int processorNum = Runtime.getRuntime().availableProcessors(); - if (processorNum > 1) { - processorNum -= 1; - } - return processorNum; + return Runtime.getRuntime().availableProcessors() * 2; } } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java index ae4bc67..158f01f 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java @@ -45,7 +45,11 @@ public class RegisterDistinctWorker extends AbstractWorker<RegisterSource> { this.sources = new HashMap<>(); this.dataCarrier = new DataCarrier<>(1, 1000); String name = "REGISTER_L1"; - BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 200); + int size = BulkConsumePool.Creator.recommendMaxSize() / 8; + if (size == 0) { + size = 1; + } + BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, size, 200); try { ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator); } catch (Exception e) { diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java index 1ccd21d..2bd11e5 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java @@ -55,7 +55,11 @@ public class RegisterPersistentWorker extends AbstractWorker<RegisterSource> { this.dataCarrier = new DataCarrier<>("IndicatorPersistentWorker." + modelName, 1, 1000); String name = "REGISTER_L2"; - BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 200); + int size = BulkConsumePool.Creator.recommendMaxSize() / 8; + if (size == 0) { + size = 1; + } + BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, size, 200); try { ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator); } catch (Exception e) { diff --git a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/jetty/JettyServer.java b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/jetty/JettyServer.java index dcd66d0..af8c91e 100644 --- a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/jetty/JettyServer.java +++ b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/jetty/JettyServer.java @@ -20,11 +20,8 @@ package org.apache.skywalking.oap.server.library.server.jetty; import java.net.InetSocketAddress; import java.util.Objects; -import org.apache.skywalking.oap.server.library.server.Server; import org.apache.skywalking.oap.server.library.server.*; -import org.eclipse.jetty.server.*; import org.eclipse.jetty.servlet.*; -import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.slf4j.*; /** diff --git a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java index a74afac..e0452f8 100644 --- a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java +++ b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java @@ -20,27 +20,15 @@ package org.apache.skywalking.aop.server.receiver.mesh; import java.util.Objects; import org.apache.logging.log4j.util.Strings; -import org.apache.skywalking.apm.network.servicemesh.Protocol; -import org.apache.skywalking.apm.network.servicemesh.ServiceMeshMetric; +import org.apache.skywalking.apm.network.servicemesh.*; import org.apache.skywalking.oap.server.core.CoreModule; -import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache; -import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache; +import org.apache.skywalking.oap.server.core.cache.*; import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory; -import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister; -import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister; -import org.apache.skywalking.oap.server.core.source.All; -import org.apache.skywalking.oap.server.core.source.DetectPoint; -import org.apache.skywalking.oap.server.core.source.Endpoint; -import org.apache.skywalking.oap.server.core.source.RequestType; -import org.apache.skywalking.oap.server.core.source.Service; -import org.apache.skywalking.oap.server.core.source.ServiceInstance; -import org.apache.skywalking.oap.server.core.source.ServiceInstanceRelation; -import org.apache.skywalking.oap.server.core.source.ServiceRelation; -import org.apache.skywalking.oap.server.core.source.SourceReceiver; +import org.apache.skywalking.oap.server.core.register.service.*; +import org.apache.skywalking.oap.server.core.source.*; import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.util.TimeBucketUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.slf4j.*; /** * TelemetryDataDispatcher processes the {@link ServiceMeshMetric} format telemetry data, transfers it to source @@ -89,7 +77,7 @@ public class TelemetryDataDispatcher { ServiceMeshMetric metric = decorator.getMetric(); long minuteTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(metric.getStartTime()); - //heartbeat(decorator, minuteTimeBucket); + heartbeat(decorator, minuteTimeBucket); if (org.apache.skywalking.apm.network.common.DetectPoint.server.equals(metric.getDetectPoint())) { toAll(decorator, minuteTimeBucket); toService(decorator, minuteTimeBucket); @@ -108,7 +96,10 @@ public class TelemetryDataDispatcher { int instanceId = metric.getSourceServiceInstanceId(); ServiceInstanceInventory serviceInstanceInventory = SERVICE_INSTANCE_CACHE.get(instanceId); if (Objects.nonNull(serviceInstanceInventory)) { - SERVICE_INVENTORY_REGISTER.heartbeat(serviceInstanceInventory.getServiceId(), metric.getEndTime()); + if (metric.getEndTime() - serviceInstanceInventory.getHeartbeatTime() > 10 * 1000L) { + // trigger heartbeat every 10s. + SERVICE_INVENTORY_REGISTER.heartbeat(serviceInstanceInventory.getServiceId(), metric.getEndTime()); + } } else { logger.warn("Can't found service by service instance id from cache, service instance id is: {}", instanceId); } @@ -118,7 +109,10 @@ public class TelemetryDataDispatcher { instanceId = metric.getDestServiceInstanceId(); serviceInstanceInventory = SERVICE_INSTANCE_CACHE.get(instanceId); if (Objects.nonNull(serviceInstanceInventory)) { - SERVICE_INVENTORY_REGISTER.heartbeat(serviceInstanceInventory.getServiceId(), metric.getEndTime()); + if (metric.getEndTime() - serviceInstanceInventory.getHeartbeatTime() > 10 * 1000L) { + // trigger heartbeat every 10s. + SERVICE_INVENTORY_REGISTER.heartbeat(serviceInstanceInventory.getServiceId(), metric.getEndTime()); + } } else { logger.warn("Can't found service by service instance id from cache, service instance id is: {}", instanceId); }