This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push: new 7881c57 Introduct DataCarrier to avoid service blocked (#5660) 7881c57 is described below commit 7881c57023db1d861179ffa65ba6854ab8a37462 Author: Daming <zt...@foxmail.com> AuthorDate: Wed Oct 14 08:12:22 2020 +0800 Introduct DataCarrier to avoid service blocked (#5660) --- .../core/kafka/KafkaTraceSegmentServiceClient.java | 59 ++++++++++++++++++---- 1 file changed, 50 insertions(+), 9 deletions(-) diff --git a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaTraceSegmentServiceClient.java b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaTraceSegmentServiceClient.java index 6672048..36572a8 100644 --- a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaTraceSegmentServiceClient.java +++ b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaTraceSegmentServiceClient.java @@ -18,6 +18,8 @@ package org.apache.skywalking.apm.agent.core.kafka; +import java.util.List; +import java.util.Objects; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.utils.Bytes; @@ -30,18 +32,26 @@ import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment; import org.apache.skywalking.apm.agent.core.logging.api.ILog; import org.apache.skywalking.apm.agent.core.logging.api.LogManager; import org.apache.skywalking.apm.agent.core.remote.TraceSegmentServiceClient; +import org.apache.skywalking.apm.commons.datacarrier.DataCarrier; +import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy; +import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer; import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject; +import static org.apache.skywalking.apm.agent.core.conf.Config.Buffer.BUFFER_SIZE; +import static org.apache.skywalking.apm.agent.core.conf.Config.Buffer.CHANNEL_SIZE; + /** - * A tracing segment data reporter. + * A tracing segment data reporter. */ @OverrideImplementor(TraceSegmentServiceClient.class) -public class KafkaTraceSegmentServiceClient implements BootService, TracingContextListener { +public class KafkaTraceSegmentServiceClient implements BootService, IConsumer<TraceSegment>, TracingContextListener { private static final ILog LOGGER = LogManager.getLogger(KafkaTraceSegmentServiceClient.class); private String topic; private KafkaProducer<String, Bytes> producer; + private volatile DataCarrier<TraceSegment> carrier; + @Override public void prepare() { topic = KafkaReporterPluginConfig.Plugin.Kafka.TOPIC_SEGMENT; @@ -49,6 +59,10 @@ public class KafkaTraceSegmentServiceClient implements BootService, TracingConte @Override public void boot() { + carrier = new DataCarrier<>(CHANNEL_SIZE, BUFFER_SIZE); + carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE); + carrier.consume(this, 1); + producer = ServiceManager.INSTANCE.findService(KafkaProducerManager.class).getProducer(); } @@ -60,6 +74,39 @@ public class KafkaTraceSegmentServiceClient implements BootService, TracingConte @Override public void shutdown() { TracingContext.ListenerManager.remove(this); + carrier.shutdownConsumers(); + } + + @Override + public void init() { + + } + + @Override + public void consume(final List<TraceSegment> data) { + data.forEach(traceSegment -> { + SegmentObject upstreamSegment = traceSegment.transform(); + ProducerRecord<String, Bytes> record = new ProducerRecord<>( + topic, + upstreamSegment.getTraceSegmentId(), + Bytes.wrap(upstreamSegment.toByteArray()) + ); + producer.send(record, (m, e) -> { + if (Objects.nonNull(e)) { + LOGGER.error("Failed to report TraceSegment.", e); + } + }); + }); + } + + @Override + public void onError(final List<TraceSegment> data, final Throwable t) { + LOGGER.error(t, "Try to send {} trace segments to collector, with unexpected exception.", data.size()); + } + + @Override + public void onExit() { + } @Override @@ -72,13 +119,7 @@ public class KafkaTraceSegmentServiceClient implements BootService, TracingConte LOGGER.debug("Trace[TraceId={}] is ignored.", traceSegment.getTraceSegmentId()); return; } - SegmentObject upstreamSegment = traceSegment.transform(); - ProducerRecord<String, Bytes> record = new ProducerRecord<>( - topic, - upstreamSegment.getTraceSegmentId(), - Bytes.wrap(upstreamSegment.toByteArray()) - ); - producer.send(record); + carrier.produce(traceSegment); } } \ No newline at end of file