kezhenxu94 commented on a change in pull request #7032:
URL: https://github.com/apache/skywalking/pull/7032#discussion_r643096667



##########
File path: 
oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/MeterServiceHandler.java
##########
@@ -38,42 +38,39 @@
  */
 @Slf4j
 public class MeterServiceHandler extends AbstractKafkaHandler {
-    private IMeterProcessService processService;
+    private final IMeterProcessService processService;
     private final HistogramMetrics histogram;
     private final CounterMetrics errorCounter;
 
     public MeterServiceHandler(ModuleManager manager, KafkaFetcherConfig 
config) {
         super(manager, config);
         this.processService = 
manager.find(AnalyzerModule.NAME).provider().getService(IMeterProcessService.class);
         MetricsCreator metricsCreator = manager.find(TelemetryModule.NAME)
-                .provider()
-                .getService(MetricsCreator.class);
+                                               .provider()
+                                               
.getService(MetricsCreator.class);
         histogram = metricsCreator.createHistogramMetric(
-                "meter_in_latency", "The process latency of meter",
-                new MetricsTag.Keys("protocol"), new 
MetricsTag.Values("kafka-fetcher")
+            "meter_in_latency",
+            "The process latency of meter",
+            new MetricsTag.Keys("protocol"),
+            new MetricsTag.Values("kafka-fetcher")
         );
-        errorCounter = 
metricsCreator.createCounter("meter_analysis_error_count", "The error number of 
meter analysis",
-                new MetricsTag.Keys("protocol"),
-                new MetricsTag.Values("kafka-fetcher")
+        errorCounter = metricsCreator.createCounter(
+            "meter_analysis_error_count",
+            "The error number of meter analysis",
+            new MetricsTag.Keys("protocol"),
+            new MetricsTag.Values("kafka-fetcher")
         );
     }
 
     @Override
     public void handle(final ConsumerRecord<String, Bytes> record) {
-        try {
+        try (HistogramMetrics.Timer ignored = histogram.createTimer()) {
             MeterDataCollection meterDataCollection = 
MeterDataCollection.parseFrom(record.value().get());
             MeterProcessor processor = processService.createProcessor();
-            meterDataCollection.getMeterDataList().forEach(meterData -> {
-                try (HistogramMetrics.Timer ignored = histogram.createTimer()) 
{
-                    processor.read(meterData);
-                } catch (Exception e) {
-                    errorCounter.inc();
-                    log.error(e.getMessage(), e);
-                }
-            });
+            meterDataCollection.getMeterDataList().forEach(processor::read);
             processor.process();
-
         } catch (Exception e) {
+            errorCounter.inc();

Review comment:
       This is not preferable changes IMO, when any one of the item in 
`meterDataCollection` failed (`processor::read`), all remaining items are 
skipped because exception is thrown and not captured, also, the `errorCounter` 
is not correct after your changes




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to