This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch thread-model
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/thread-model by this push:
     new f12f07f  Make heartbeat back to work. And default threadpool size more 
sense.
f12f07f is described below

commit f12f07f6412bf50ca953b830a7a73a036ad87ee2
Author: Wu Sheng <wu.sh...@foxmail.com>
AuthorDate: Tue Feb 19 23:14:16 2019 +0800

    Make heartbeat back to work. And default threadpool size more sense.
---
 .../datacarrier/consumer/BulkConsumePool.java      |  6 +---
 .../register/worker/RegisterDistinctWorker.java    |  6 +++-
 .../register/worker/RegisterPersistentWorker.java  |  6 +++-
 .../server/library/server/jetty/JettyServer.java   |  3 --
 .../receiver/mesh/TelemetryDataDispatcher.java     | 34 +++++++++-------------
 5 files changed, 25 insertions(+), 30 deletions(-)

diff --git 
a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java
 
b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java
index 6befe2c..1b95c1b 100644
--- 
a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java
+++ 
b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java
@@ -115,11 +115,7 @@ public class BulkConsumePool implements ConsumerPool {
         }
 
         public static int recommendMaxSize() {
-            int processorNum = Runtime.getRuntime().availableProcessors();
-            if (processorNum > 1) {
-                processorNum -= 1;
-            }
-            return processorNum;
+            return Runtime.getRuntime().availableProcessors() * 2;
         }
     }
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
index ae4bc67..158f01f 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
@@ -45,7 +45,11 @@ public class RegisterDistinctWorker extends 
AbstractWorker<RegisterSource> {
         this.sources = new HashMap<>();
         this.dataCarrier = new DataCarrier<>(1, 1000);
         String name = "REGISTER_L1";
-        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 
200);
+        int size = BulkConsumePool.Creator.recommendMaxSize() / 8;
+        if (size == 0) {
+            size = 1;
+        }
+        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 
size, 200);
         try {
             ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
         } catch (Exception e) {
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
index 1ccd21d..2bd11e5 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
@@ -55,7 +55,11 @@ public class RegisterPersistentWorker extends 
AbstractWorker<RegisterSource> {
         this.dataCarrier = new DataCarrier<>("IndicatorPersistentWorker." + 
modelName, 1, 1000);
 
         String name = "REGISTER_L2";
-        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 
200);
+        int size = BulkConsumePool.Creator.recommendMaxSize() / 8;
+        if (size == 0) {
+            size = 1;
+        }
+        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 
size, 200);
         try {
             ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
         } catch (Exception e) {
diff --git 
a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/jetty/JettyServer.java
 
b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/jetty/JettyServer.java
index dcd66d0..af8c91e 100644
--- 
a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/jetty/JettyServer.java
+++ 
b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/jetty/JettyServer.java
@@ -20,11 +20,8 @@ package 
org.apache.skywalking.oap.server.library.server.jetty;
 
 import java.net.InetSocketAddress;
 import java.util.Objects;
-import org.apache.skywalking.oap.server.library.server.Server;
 import org.apache.skywalking.oap.server.library.server.*;
-import org.eclipse.jetty.server.*;
 import org.eclipse.jetty.servlet.*;
-import org.eclipse.jetty.util.thread.QueuedThreadPool;
 import org.slf4j.*;
 
 /**
diff --git 
a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
 
b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
index a74afac..e0452f8 100644
--- 
a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
+++ 
b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
@@ -20,27 +20,15 @@ package org.apache.skywalking.aop.server.receiver.mesh;
 
 import java.util.Objects;
 import org.apache.logging.log4j.util.Strings;
-import org.apache.skywalking.apm.network.servicemesh.Protocol;
-import org.apache.skywalking.apm.network.servicemesh.ServiceMeshMetric;
+import org.apache.skywalking.apm.network.servicemesh.*;
 import org.apache.skywalking.oap.server.core.CoreModule;
-import 
org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
-import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
+import org.apache.skywalking.oap.server.core.cache.*;
 import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
-import 
org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
-import 
org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
-import org.apache.skywalking.oap.server.core.source.All;
-import org.apache.skywalking.oap.server.core.source.DetectPoint;
-import org.apache.skywalking.oap.server.core.source.Endpoint;
-import org.apache.skywalking.oap.server.core.source.RequestType;
-import org.apache.skywalking.oap.server.core.source.Service;
-import org.apache.skywalking.oap.server.core.source.ServiceInstance;
-import org.apache.skywalking.oap.server.core.source.ServiceInstanceRelation;
-import org.apache.skywalking.oap.server.core.source.ServiceRelation;
-import org.apache.skywalking.oap.server.core.source.SourceReceiver;
+import org.apache.skywalking.oap.server.core.register.service.*;
+import org.apache.skywalking.oap.server.core.source.*;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
 
 /**
  * TelemetryDataDispatcher processes the {@link ServiceMeshMetric} format 
telemetry data, transfers it to source
@@ -89,7 +77,7 @@ public class TelemetryDataDispatcher {
         ServiceMeshMetric metric = decorator.getMetric();
         long minuteTimeBucket = 
TimeBucketUtils.INSTANCE.getMinuteTimeBucket(metric.getStartTime());
 
-        //heartbeat(decorator, minuteTimeBucket);
+        heartbeat(decorator, minuteTimeBucket);
         if 
(org.apache.skywalking.apm.network.common.DetectPoint.server.equals(metric.getDetectPoint()))
 {
             toAll(decorator, minuteTimeBucket);
             toService(decorator, minuteTimeBucket);
@@ -108,7 +96,10 @@ public class TelemetryDataDispatcher {
         int instanceId = metric.getSourceServiceInstanceId();
         ServiceInstanceInventory serviceInstanceInventory = 
SERVICE_INSTANCE_CACHE.get(instanceId);
         if (Objects.nonNull(serviceInstanceInventory)) {
-            
SERVICE_INVENTORY_REGISTER.heartbeat(serviceInstanceInventory.getServiceId(), 
metric.getEndTime());
+            if (metric.getEndTime() - 
serviceInstanceInventory.getHeartbeatTime() > 10 * 1000L) {
+                // trigger heartbeat every 10s.
+                
SERVICE_INVENTORY_REGISTER.heartbeat(serviceInstanceInventory.getServiceId(), 
metric.getEndTime());
+            }
         } else {
             logger.warn("Can't found service by service instance id from 
cache, service instance id is: {}", instanceId);
         }
@@ -118,7 +109,10 @@ public class TelemetryDataDispatcher {
         instanceId = metric.getDestServiceInstanceId();
         serviceInstanceInventory = SERVICE_INSTANCE_CACHE.get(instanceId);
         if (Objects.nonNull(serviceInstanceInventory)) {
-            
SERVICE_INVENTORY_REGISTER.heartbeat(serviceInstanceInventory.getServiceId(), 
metric.getEndTime());
+            if (metric.getEndTime() - 
serviceInstanceInventory.getHeartbeatTime() > 10 * 1000L) {
+                // trigger heartbeat every 10s.
+                
SERVICE_INVENTORY_REGISTER.heartbeat(serviceInstanceInventory.getServiceId(), 
metric.getEndTime());
+            }
         } else {
             logger.warn("Can't found service by service instance id from 
cache, service instance id is: {}", instanceId);
         }

Reply via email to