exceptionfactory commented on code in PR #10880:
URL: https://github.com/apache/nifi/pull/10880#discussion_r2789876404


##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java:
##########
@@ -498,6 +507,27 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
             });
     }
 
+    private void reportCurrentLag(final KafkaConsumerService consumerService, 
final ProcessSession session, final Set<TopicPartitionSummary> 
topicPartitionSummaries) {
+        Map<TopicPartitionSummary, OptionalLong> topicPartitionLag =
+                topicPartitionSummaries.stream()
+                        .map(ps -> new TopicPartitionSummary(ps.getTopic(), 
ps.getPartition()))
+                        .collect(Collectors.toMap(
+                                Function.identity(),
+                                tps -> consumerService.currentLag(tps)
+                        ));
+
+        topicPartitionLag.forEach((tps, lag) -> {
+            if (lag.isPresent()) {
+                final String gaugeName = makeLagMetricName(tps);
+                session.recordGauge(gaugeName, lag.getAsLong(), 
CommitTiming.NOW);
+            }
+        });

Review Comment:
   This functional approach results in creating an unnecessary intermediate 
Map. I recommend rewriting this using traditional for-each loops and avoiding 
the intermediate Map creation.



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java:
##########
@@ -498,6 +507,27 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
             });
     }
 
+    private void reportCurrentLag(final KafkaConsumerService consumerService, 
final ProcessSession session, final Set<TopicPartitionSummary> 
topicPartitionSummaries) {
+        Map<TopicPartitionSummary, OptionalLong> topicPartitionLag =
+                topicPartitionSummaries.stream()
+                        .map(ps -> new TopicPartitionSummary(ps.getTopic(), 
ps.getPartition()))
+                        .collect(Collectors.toMap(
+                                Function.identity(),
+                                tps -> consumerService.currentLag(tps)
+                        ));
+
+        topicPartitionLag.forEach((tps, lag) -> {
+            if (lag.isPresent()) {
+                final String gaugeName = makeLagMetricName(tps);
+                session.recordGauge(gaugeName, lag.getAsLong(), 
CommitTiming.NOW);
+            }
+        });
+    }
+
+    String makeLagMetricName(final TopicPartitionSummary tps) {
+        return "consume.kafka." + tps.getTopic() + "." + tps.getPartition() + 
".currentLag";

Review Comment:
   Instead of String concatenation, I recommend using a static format string 
and adjusting the format without the `consume.kafka` prefix, since the context 
of the gauge already includes the Processor Type.



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java:
##########
@@ -686,4 +716,31 @@ private PollingContext createPollingContext(final 
ProcessContext context) {
 
         return pollingContext;
     }
+
+    static class TopicPartitionScanningIterator implements 
Iterator<ByteRecord> {
+
+        private final Iterator<ByteRecord> delegate;
+        private final Set<TopicPartitionSummary> topicPartitionSummaries = new 
HashSet<>();
+
+        TopicPartitionScanningIterator(Iterator<ByteRecord> delegate) {
+            this.delegate = delegate;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return delegate.hasNext();
+        }
+
+        @Override
+        public ByteRecord next() {
+            ByteRecord record = delegate.next();
+            topicPartitionSummaries.add(new 
TopicPartitionSummary(record.getTopic(), record.getPartition()));

Review Comment:
   This approach results in creating a new `TopicPartitionSummary` for every 
Record, which can create a lot of objects in a short period of time. Instead, 
keeping track of the last `TopicPartitionSummary` and comparing current topic 
and partition values should be a way to avoid unnecessary object creation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to