This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch log-reporter in repository https://gitbox.apache.org/repos/asf/skywalking-java.git
commit 05225685585f9bdffc6504f9d7481f119a6f61da Author: Wu Sheng <wu.sh...@foxmail.com> AuthorDate: Wed Jun 15 20:56:11 2022 +0800 Optimize gRPC Log reporter to set service name for the first element --- CHANGES.md | 1 + .../agent/core/remote/LogReportServiceClient.java | 68 ++++++++++++---------- .../log4j/v1/x/log/GRPCLogAppenderInterceptor.java | 9 ++- .../core/kafka/KafkaLogReporterServiceClient.java | 12 ++-- 4 files changed, 50 insertions(+), 40 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index b7dc0c836..0b6a902e5 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -24,6 +24,7 @@ Release Notes. * Add type name checking in ArgumentTypeNameMatch and ReturnTypeNameMatch * Highlight ArgumentTypeNameMatch and ReturnTypeNameMatch type naming rule in docs/en/setup/service-agent/java-agent/Java-Plugin-Development-Guide.md * Fix FileWriter scheduled task NPE +* Optimize gRPC Log reporter to set service name for the first element in the streaming.(No change for Kafka reporter) #### Documentation diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/LogReportServiceClient.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/LogReportServiceClient.java index 8db7fbd3c..7b3a3a2dc 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/LogReportServiceClient.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/LogReportServiceClient.java @@ -21,7 +21,6 @@ package org.apache.skywalking.apm.agent.core.remote; import io.grpc.Channel; import io.grpc.stub.StreamObserver; import java.util.List; - import java.util.Objects; import java.util.Properties; import java.util.concurrent.TimeUnit; @@ -42,10 +41,10 @@ import org.apache.skywalking.apm.network.logging.v3.LogData; import org.apache.skywalking.apm.network.logging.v3.LogReportServiceGrpc; @DefaultImplementor -public class LogReportServiceClient implements BootService, GRPCChannelListener, IConsumer<LogData> { +public class LogReportServiceClient implements BootService, GRPCChannelListener, IConsumer<LogData.Builder> { private static final ILog LOGGER = LogManager.getLogger(LogReportServiceClient.class); - private volatile DataCarrier<LogData> carrier; + private volatile DataCarrier<LogData.Builder> carrier; private volatile GRPCChannelStatus status; private volatile LogReportServiceGrpc.LogReportServiceStub logReportServiceStub; @@ -70,7 +69,7 @@ public class LogReportServiceClient implements BootService, GRPCChannelListener, } - public void produce(LogData logData) { + public void produce(LogData.Builder logData) { if (Objects.nonNull(logData) && !carrier.produce(logData)) { if (LOGGER.isDebugEnable()) { LOGGER.debug("One log has been abandoned, cause by buffer is full."); @@ -84,7 +83,7 @@ public class LogReportServiceClient implements BootService, GRPCChannelListener, } @Override - public void consume(final List<LogData> dataList) { + public void consume(final List<LogData.Builder> dataList) { if (CollectionUtil.isEmpty(dataList)) { return; } @@ -95,31 +94,38 @@ public class LogReportServiceClient implements BootService, GRPCChannelListener, StreamObserver<LogData> logDataStreamObserver = logReportServiceStub .withDeadlineAfter(Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS) .collect( - new StreamObserver<Commands>() { - @Override - public void onNext(final Commands commands) { - - } - - @Override - public void onError(final Throwable throwable) { - status.finished(); - LOGGER.error(throwable, "Try to send {} log data to collector, with unexpected exception.", - dataList.size() - ); - ServiceManager.INSTANCE - .findService(GRPCChannelManager.class) - .reportError(throwable); - } - - @Override - public void onCompleted() { - status.finished(); - } - }); - - for (final LogData logData : dataList) { - logDataStreamObserver.onNext(logData); + new StreamObserver<Commands>() { + @Override + public void onNext(final Commands commands) { + + } + + @Override + public void onError(final Throwable throwable) { + status.finished(); + LOGGER.error(throwable, "Try to send {} log data to collector, with unexpected exception.", + dataList.size() + ); + ServiceManager.INSTANCE + .findService(GRPCChannelManager.class) + .reportError(throwable); + } + + @Override + public void onCompleted() { + status.finished(); + } + }); + + boolean isFirst = true; + for (final LogData.Builder logData : dataList) { + if (isFirst) { + // Only set service name of the first element in one stream + // https://github.com/apache/skywalking-data-collect-protocol/blob/master/logging/Logging.proto + // Log collecting protocol defines LogData#service is required in the first element only. + logData.setService(Config.Agent.SERVICE_NAME); + } + logDataStreamObserver.onNext(logData.build()); } logDataStreamObserver.onCompleted(); status.wait4Finish(); @@ -127,7 +133,7 @@ public class LogReportServiceClient implements BootService, GRPCChannelListener, } @Override - public void onError(final List<LogData> data, final Throwable t) { + public void onError(final List<LogData.Builder> data, final Throwable t) { LOGGER.error(t, "Try to consume {} log data to sender, with unexpected exception.", data.size()); } diff --git a/apm-sniffer/apm-toolkit-activation/apm-toolkit-log4j-1.x-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/log/log4j/v1/x/log/GRPCLogAppenderInterceptor.java b/apm-sniffer/apm-toolkit-activation/apm-toolkit-log4j-1.x-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/log/log4j/v1/x/log/GRPCLogAppenderInterceptor.java index 642cbe8e7..ca3c3af50 100644 --- a/apm-sniffer/apm-toolkit-activation/apm-toolkit-log4j-1.x-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/log/log4j/v1/x/log/GRPCLogAppenderInterceptor.java +++ b/apm-sniffer/apm-toolkit-activation/apm-toolkit-log4j-1.x-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/log/log4j/v1/x/log/GRPCLogAppenderInterceptor.java @@ -74,12 +74,11 @@ public class GRPCLogAppenderInterceptor implements InstanceMethodsAroundIntercep * * @param appender the real {@link AppenderSkeleton appender} * @param event {@link LoggingEvent} - * @return {@link LogData} with filtered trace context in order to reduce the cost on the network + * @return {@link LogData.Builder} with filtered trace context in order to reduce the cost on the network */ - private LogData transform(final AppenderSkeleton appender, LoggingEvent event) { + private LogData.Builder transform(final AppenderSkeleton appender, LoggingEvent event) { LogData.Builder builder = LogData.newBuilder() .setTimestamp(event.getTimeStamp()) - .setService(Config.Agent.SERVICE_NAME) .setServiceInstance(Config.Agent.INSTANCE_NAME) .setTraceContext(TraceContext.newBuilder() .setTraceId(ContextManager.getGlobalTraceId()) @@ -102,12 +101,12 @@ public class GRPCLogAppenderInterceptor implements InstanceMethodsAroundIntercep builder.setEndpoint(primaryEndpointName); } - return -1 == ContextManager.getSpanId() ? builder.build() + return -1 == ContextManager.getSpanId() ? builder : builder.setTraceContext(TraceContext.newBuilder() .setTraceId(ContextManager.getGlobalTraceId()) .setSpanId(ContextManager.getSpanId()) .setTraceSegmentId(ContextManager.getSegmentId()) - .build()).build(); + .build()); } private String transformLogText(final AppenderSkeleton appender, final LoggingEvent event) { diff --git a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaLogReporterServiceClient.java b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaLogReporterServiceClient.java index 5c65145f3..43b251e4a 100644 --- a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaLogReporterServiceClient.java +++ b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaLogReporterServiceClient.java @@ -23,6 +23,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.utils.Bytes; import org.apache.skywalking.apm.agent.core.boot.OverrideImplementor; import org.apache.skywalking.apm.agent.core.boot.ServiceManager; +import org.apache.skywalking.apm.agent.core.conf.Config; import org.apache.skywalking.apm.agent.core.remote.LogReportServiceClient; import org.apache.skywalking.apm.agent.core.util.CollectionUtil; import org.apache.skywalking.apm.network.logging.v3.LogData; @@ -41,18 +42,21 @@ public class KafkaLogReporterServiceClient extends LogReportServiceClient implem } @Override - public void produce(final LogData logData) { + public void produce(final LogData.Builder logData) { super.produce(logData); } @Override - public void consume(final List<LogData> dataList) { + public void consume(final List<LogData.Builder> dataList) { if (producer == null || CollectionUtil.isEmpty(dataList)) { return; } - for (LogData data : dataList) { - producer.send(new ProducerRecord<>(topic, data.getService(), Bytes.wrap(data.toByteArray()))); + for (LogData.Builder data : dataList) { + // Kafka Log reporter sends one log per time. + // Every time, service name should be set to keep data integrity. + data.setService(Config.Agent.SERVICE_NAME); + producer.send(new ProducerRecord<>(topic, data.getService(), Bytes.wrap(data.build().toByteArray()))); } }