This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 7881c57  Introduct DataCarrier to avoid service blocked (#5660)
7881c57 is described below

commit 7881c57023db1d861179ffa65ba6854ab8a37462
Author: Daming <zt...@foxmail.com>
AuthorDate: Wed Oct 14 08:12:22 2020 +0800

    Introduct DataCarrier to avoid service blocked (#5660)
---
 .../core/kafka/KafkaTraceSegmentServiceClient.java | 59 ++++++++++++++++++----
 1 file changed, 50 insertions(+), 9 deletions(-)

diff --git 
a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaTraceSegmentServiceClient.java
 
b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaTraceSegmentServiceClient.java
index 6672048..36572a8 100644
--- 
a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaTraceSegmentServiceClient.java
+++ 
b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaTraceSegmentServiceClient.java
@@ -18,6 +18,8 @@
 
 package org.apache.skywalking.apm.agent.core.kafka;
 
+import java.util.List;
+import java.util.Objects;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.utils.Bytes;
@@ -30,18 +32,26 @@ import 
org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
 import org.apache.skywalking.apm.agent.core.logging.api.ILog;
 import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
 import org.apache.skywalking.apm.agent.core.remote.TraceSegmentServiceClient;
+import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
+import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
+import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
 import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject;
 
+import static 
org.apache.skywalking.apm.agent.core.conf.Config.Buffer.BUFFER_SIZE;
+import static 
org.apache.skywalking.apm.agent.core.conf.Config.Buffer.CHANNEL_SIZE;
+
 /**
- *  A tracing segment data reporter.
+ * A tracing segment data reporter.
  */
 @OverrideImplementor(TraceSegmentServiceClient.class)
-public class KafkaTraceSegmentServiceClient implements BootService, 
TracingContextListener {
+public class KafkaTraceSegmentServiceClient implements BootService, 
IConsumer<TraceSegment>, TracingContextListener {
     private static final ILog LOGGER = 
LogManager.getLogger(KafkaTraceSegmentServiceClient.class);
 
     private String topic;
     private KafkaProducer<String, Bytes> producer;
 
+    private volatile DataCarrier<TraceSegment> carrier;
+
     @Override
     public void prepare() {
         topic = KafkaReporterPluginConfig.Plugin.Kafka.TOPIC_SEGMENT;
@@ -49,6 +59,10 @@ public class KafkaTraceSegmentServiceClient implements 
BootService, TracingConte
 
     @Override
     public void boot() {
+        carrier = new DataCarrier<>(CHANNEL_SIZE, BUFFER_SIZE);
+        carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);
+        carrier.consume(this, 1);
+
         producer = 
ServiceManager.INSTANCE.findService(KafkaProducerManager.class).getProducer();
     }
 
@@ -60,6 +74,39 @@ public class KafkaTraceSegmentServiceClient implements 
BootService, TracingConte
     @Override
     public void shutdown() {
         TracingContext.ListenerManager.remove(this);
+        carrier.shutdownConsumers();
+    }
+
+    @Override
+    public void init() {
+
+    }
+
+    @Override
+    public void consume(final List<TraceSegment> data) {
+        data.forEach(traceSegment -> {
+            SegmentObject upstreamSegment = traceSegment.transform();
+            ProducerRecord<String, Bytes> record = new ProducerRecord<>(
+                topic,
+                upstreamSegment.getTraceSegmentId(),
+                Bytes.wrap(upstreamSegment.toByteArray())
+            );
+            producer.send(record, (m, e) -> {
+                if (Objects.nonNull(e)) {
+                    LOGGER.error("Failed to report TraceSegment.", e);
+                }
+            });
+        });
+    }
+
+    @Override
+    public void onError(final List<TraceSegment> data, final Throwable t) {
+        LOGGER.error(t, "Try to send {} trace segments to collector, with 
unexpected exception.", data.size());
+    }
+
+    @Override
+    public void onExit() {
+
     }
 
     @Override
@@ -72,13 +119,7 @@ public class KafkaTraceSegmentServiceClient implements 
BootService, TracingConte
             LOGGER.debug("Trace[TraceId={}] is ignored.", 
traceSegment.getTraceSegmentId());
             return;
         }
-        SegmentObject upstreamSegment = traceSegment.transform();
-        ProducerRecord<String, Bytes> record = new ProducerRecord<>(
-            topic,
-            upstreamSegment.getTraceSegmentId(),
-            Bytes.wrap(upstreamSegment.toByteArray())
-        );
-        producer.send(record);
+        carrier.produce(traceSegment);
     }
 
 }
\ No newline at end of file

Reply via email to