This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch log-reporter
in repository https://gitbox.apache.org/repos/asf/skywalking-java.git

commit 05225685585f9bdffc6504f9d7481f119a6f61da
Author: Wu Sheng <wu.sh...@foxmail.com>
AuthorDate: Wed Jun 15 20:56:11 2022 +0800

    Optimize gRPC Log reporter to set service name for the first element
---
 CHANGES.md                                         |  1 +
 .../agent/core/remote/LogReportServiceClient.java  | 68 ++++++++++++----------
 .../log4j/v1/x/log/GRPCLogAppenderInterceptor.java |  9 ++-
 .../core/kafka/KafkaLogReporterServiceClient.java  | 12 ++--
 4 files changed, 50 insertions(+), 40 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index b7dc0c836..0b6a902e5 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -24,6 +24,7 @@ Release Notes.
 * Add type name checking in ArgumentTypeNameMatch and ReturnTypeNameMatch
 * Highlight ArgumentTypeNameMatch and ReturnTypeNameMatch type naming rule in 
docs/en/setup/service-agent/java-agent/Java-Plugin-Development-Guide.md
 * Fix FileWriter scheduled task NPE
+* Optimize gRPC Log reporter to set service name for the first element in the 
streaming.(No change for Kafka reporter)
 
 #### Documentation
 
diff --git 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/LogReportServiceClient.java
 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/LogReportServiceClient.java
index 8db7fbd3c..7b3a3a2dc 100644
--- 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/LogReportServiceClient.java
+++ 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/LogReportServiceClient.java
@@ -21,7 +21,6 @@ package org.apache.skywalking.apm.agent.core.remote;
 import io.grpc.Channel;
 import io.grpc.stub.StreamObserver;
 import java.util.List;
-
 import java.util.Objects;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
@@ -42,10 +41,10 @@ import org.apache.skywalking.apm.network.logging.v3.LogData;
 import org.apache.skywalking.apm.network.logging.v3.LogReportServiceGrpc;
 
 @DefaultImplementor
-public class LogReportServiceClient implements BootService, 
GRPCChannelListener, IConsumer<LogData> {
+public class LogReportServiceClient implements BootService, 
GRPCChannelListener, IConsumer<LogData.Builder> {
     private static final ILog LOGGER = 
LogManager.getLogger(LogReportServiceClient.class);
 
-    private volatile DataCarrier<LogData> carrier;
+    private volatile DataCarrier<LogData.Builder> carrier;
     private volatile GRPCChannelStatus status;
 
     private volatile LogReportServiceGrpc.LogReportServiceStub 
logReportServiceStub;
@@ -70,7 +69,7 @@ public class LogReportServiceClient implements BootService, 
GRPCChannelListener,
 
     }
 
-    public void produce(LogData logData) {
+    public void produce(LogData.Builder logData) {
         if (Objects.nonNull(logData) && !carrier.produce(logData)) {
             if (LOGGER.isDebugEnable()) {
                 LOGGER.debug("One log has been abandoned, cause by buffer is 
full.");
@@ -84,7 +83,7 @@ public class LogReportServiceClient implements BootService, 
GRPCChannelListener,
     }
 
     @Override
-    public void consume(final List<LogData> dataList) {
+    public void consume(final List<LogData.Builder> dataList) {
         if (CollectionUtil.isEmpty(dataList)) {
             return;
         }
@@ -95,31 +94,38 @@ public class LogReportServiceClient implements BootService, 
GRPCChannelListener,
             StreamObserver<LogData> logDataStreamObserver = 
logReportServiceStub
                 .withDeadlineAfter(Collector.GRPC_UPSTREAM_TIMEOUT, 
TimeUnit.SECONDS)
                 .collect(
-                new StreamObserver<Commands>() {
-                    @Override
-                    public void onNext(final Commands commands) {
-
-                    }
-
-                    @Override
-                    public void onError(final Throwable throwable) {
-                        status.finished();
-                        LOGGER.error(throwable, "Try to send {} log data to 
collector, with unexpected exception.",
-                                     dataList.size()
-                        );
-                        ServiceManager.INSTANCE
-                            .findService(GRPCChannelManager.class)
-                            .reportError(throwable);
-                    }
-
-                    @Override
-                    public void onCompleted() {
-                        status.finished();
-                    }
-                });
-
-            for (final LogData logData : dataList) {
-                logDataStreamObserver.onNext(logData);
+                    new StreamObserver<Commands>() {
+                        @Override
+                        public void onNext(final Commands commands) {
+
+                        }
+
+                        @Override
+                        public void onError(final Throwable throwable) {
+                            status.finished();
+                            LOGGER.error(throwable, "Try to send {} log data 
to collector, with unexpected exception.",
+                                         dataList.size()
+                            );
+                            ServiceManager.INSTANCE
+                                .findService(GRPCChannelManager.class)
+                                .reportError(throwable);
+                        }
+
+                        @Override
+                        public void onCompleted() {
+                            status.finished();
+                        }
+                    });
+
+            boolean isFirst = true;
+            for (final LogData.Builder logData : dataList) {
+                if (isFirst) {
+                    // Only set service name of the first element in one stream
+                    // 
https://github.com/apache/skywalking-data-collect-protocol/blob/master/logging/Logging.proto
+                    // Log collecting protocol defines LogData#service is 
required in the first element only.
+                    logData.setService(Config.Agent.SERVICE_NAME);
+                }
+                logDataStreamObserver.onNext(logData.build());
             }
             logDataStreamObserver.onCompleted();
             status.wait4Finish();
@@ -127,7 +133,7 @@ public class LogReportServiceClient implements BootService, 
GRPCChannelListener,
     }
 
     @Override
-    public void onError(final List<LogData> data, final Throwable t) {
+    public void onError(final List<LogData.Builder> data, final Throwable t) {
         LOGGER.error(t, "Try to consume {} log data to sender, with unexpected 
exception.", data.size());
     }
 
diff --git 
a/apm-sniffer/apm-toolkit-activation/apm-toolkit-log4j-1.x-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/log/log4j/v1/x/log/GRPCLogAppenderInterceptor.java
 
b/apm-sniffer/apm-toolkit-activation/apm-toolkit-log4j-1.x-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/log/log4j/v1/x/log/GRPCLogAppenderInterceptor.java
index 642cbe8e7..ca3c3af50 100644
--- 
a/apm-sniffer/apm-toolkit-activation/apm-toolkit-log4j-1.x-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/log/log4j/v1/x/log/GRPCLogAppenderInterceptor.java
+++ 
b/apm-sniffer/apm-toolkit-activation/apm-toolkit-log4j-1.x-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/log/log4j/v1/x/log/GRPCLogAppenderInterceptor.java
@@ -74,12 +74,11 @@ public class GRPCLogAppenderInterceptor implements 
InstanceMethodsAroundIntercep
      *
      * @param appender the real {@link AppenderSkeleton appender}
      * @param event {@link LoggingEvent}
-     * @return {@link LogData} with filtered trace context in order to reduce 
the cost on the network
+     * @return {@link LogData.Builder} with filtered trace context in order to 
reduce the cost on the network
      */
-    private LogData transform(final AppenderSkeleton appender, LoggingEvent 
event) {
+    private LogData.Builder transform(final AppenderSkeleton appender, 
LoggingEvent event) {
         LogData.Builder builder = LogData.newBuilder()
                 .setTimestamp(event.getTimeStamp())
-                .setService(Config.Agent.SERVICE_NAME)
                 .setServiceInstance(Config.Agent.INSTANCE_NAME)
                 .setTraceContext(TraceContext.newBuilder()
                         .setTraceId(ContextManager.getGlobalTraceId())
@@ -102,12 +101,12 @@ public class GRPCLogAppenderInterceptor implements 
InstanceMethodsAroundIntercep
             builder.setEndpoint(primaryEndpointName);
         }
 
-        return -1 == ContextManager.getSpanId() ? builder.build()
+        return -1 == ContextManager.getSpanId() ? builder
                 : builder.setTraceContext(TraceContext.newBuilder()
                         .setTraceId(ContextManager.getGlobalTraceId())
                         .setSpanId(ContextManager.getSpanId())
                         .setTraceSegmentId(ContextManager.getSegmentId())
-                        .build()).build();
+                        .build());
     }
 
     private String transformLogText(final AppenderSkeleton appender, final 
LoggingEvent event) {
diff --git 
a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaLogReporterServiceClient.java
 
b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaLogReporterServiceClient.java
index 5c65145f3..43b251e4a 100644
--- 
a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaLogReporterServiceClient.java
+++ 
b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaLogReporterServiceClient.java
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.skywalking.apm.agent.core.boot.OverrideImplementor;
 import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
+import org.apache.skywalking.apm.agent.core.conf.Config;
 import org.apache.skywalking.apm.agent.core.remote.LogReportServiceClient;
 import org.apache.skywalking.apm.agent.core.util.CollectionUtil;
 import org.apache.skywalking.apm.network.logging.v3.LogData;
@@ -41,18 +42,21 @@ public class KafkaLogReporterServiceClient extends 
LogReportServiceClient implem
     }
 
     @Override
-    public void produce(final LogData logData) {
+    public void produce(final LogData.Builder logData) {
         super.produce(logData);
     }
 
     @Override
-    public void consume(final List<LogData> dataList) {
+    public void consume(final List<LogData.Builder> dataList) {
         if (producer == null || CollectionUtil.isEmpty(dataList)) {
             return;
         }
 
-        for (LogData data : dataList) {
-            producer.send(new ProducerRecord<>(topic, data.getService(), 
Bytes.wrap(data.toByteArray())));
+        for (LogData.Builder data : dataList) {
+            // Kafka Log reporter sends one log per time.
+            // Every time, service name should be set to keep data integrity.
+            data.setService(Config.Agent.SERVICE_NAME);
+            producer.send(new ProducerRecord<>(topic, data.getService(), 
Bytes.wrap(data.build().toByteArray())));
         }
     }
 

Reply via email to