exceptionfactory commented on code in PR #10880:
URL: https://github.com/apache/nifi/pull/10880#discussion_r2789876404
##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java:
##########
@@ -498,6 +507,27 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
});
}
+ private void reportCurrentLag(final KafkaConsumerService consumerService,
final ProcessSession session, final Set<TopicPartitionSummary>
topicPartitionSummaries) {
+ Map<TopicPartitionSummary, OptionalLong> topicPartitionLag =
+ topicPartitionSummaries.stream()
+ .map(ps -> new TopicPartitionSummary(ps.getTopic(),
ps.getPartition()))
+ .collect(Collectors.toMap(
+ Function.identity(),
+ tps -> consumerService.currentLag(tps)
+ ));
+
+ topicPartitionLag.forEach((tps, lag) -> {
+ if (lag.isPresent()) {
+ final String gaugeName = makeLagMetricName(tps);
+ session.recordGauge(gaugeName, lag.getAsLong(),
CommitTiming.NOW);
+ }
+ });
Review Comment:
This functional approach results in creating an unnecessary intermediate
Map. I recommend rewriting this using traditional for-each loops and avoiding
the intermediate Map creation.
##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java:
##########
@@ -498,6 +507,27 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
});
}
+ private void reportCurrentLag(final KafkaConsumerService consumerService,
final ProcessSession session, final Set<TopicPartitionSummary>
topicPartitionSummaries) {
+ Map<TopicPartitionSummary, OptionalLong> topicPartitionLag =
+ topicPartitionSummaries.stream()
+ .map(ps -> new TopicPartitionSummary(ps.getTopic(),
ps.getPartition()))
+ .collect(Collectors.toMap(
+ Function.identity(),
+ tps -> consumerService.currentLag(tps)
+ ));
+
+ topicPartitionLag.forEach((tps, lag) -> {
+ if (lag.isPresent()) {
+ final String gaugeName = makeLagMetricName(tps);
+ session.recordGauge(gaugeName, lag.getAsLong(),
CommitTiming.NOW);
+ }
+ });
+ }
+
+ String makeLagMetricName(final TopicPartitionSummary tps) {
+ return "consume.kafka." + tps.getTopic() + "." + tps.getPartition() +
".currentLag";
Review Comment:
Instead of String concatenation, I recommend using a static format string
and adjusting the format without the `consume.kafka` prefix, since the context
of the gauge already includes the Processor Type.
##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java:
##########
@@ -686,4 +716,31 @@ private PollingContext createPollingContext(final
ProcessContext context) {
return pollingContext;
}
+
+ static class TopicPartitionScanningIterator implements
Iterator<ByteRecord> {
+
+ private final Iterator<ByteRecord> delegate;
+ private final Set<TopicPartitionSummary> topicPartitionSummaries = new
HashSet<>();
+
+ TopicPartitionScanningIterator(Iterator<ByteRecord> delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return delegate.hasNext();
+ }
+
+ @Override
+ public ByteRecord next() {
+ ByteRecord record = delegate.next();
+ topicPartitionSummaries.add(new
TopicPartitionSummary(record.getTopic(), record.getPartition()));
Review Comment:
This approach results in creating a new `TopicPartitionSummary` for every
Record, which can create a lot of objects in a short period of time. Instead,
keeping track of the last `TopicPartitionSummary` and comparing current topic
and partition values should be a way to avoid unnecessary object creation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]