This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 66074bc881 NIFI-14656 Fixed Kafka Consumer handling when not 
committing offsets and added new counters (#10015)
66074bc881 is described below

commit 66074bc881ce7b46989cce89d4ee13b96d543ebe
Author: Mark Payne <[email protected]>
AuthorDate: Fri Jun 13 15:48:40 2025 -0400

    NIFI-14656 Fixed Kafka Consumer handling when not committing offsets and 
added new counters (#10015)
    
    Ensure that we return the Kafka Consumer to the pool even if we are not 
committing offsets. Updated counters to show how many events are acknowledged 
vs rolled back to help clarify.
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../apache/nifi/kafka/processors/ConsumeKafka.java | 46 ++++++++++++----------
 .../kafka/processors/consumer/OffsetTracker.java   | 14 ++++---
 .../convert/RecordStreamKafkaMessageConverter.java |  3 +-
 3 files changed, 36 insertions(+), 27 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
index ad1bd6d2c8..c797f1946b 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
@@ -412,21 +412,24 @@ public class ConsumeKafka extends AbstractProcessor 
implements VerifiableProcess
         }
 
         session.commitAsync(
-            () -> commitOffsets(consumerService, offsetTracker, 
pollingContext),
+            () -> commitOffsets(consumerService, offsetTracker, 
pollingContext, session),
             throwable -> {
                 getLogger().error("Failed to commit session; will roll back 
any uncommitted records", throwable);
-                rollback(consumerService);
+                rollback(consumerService, offsetTracker, session);
                 context.yield();
             });
     }
 
-    private void commitOffsets(final KafkaConsumerService consumerService, 
final OffsetTracker offsetTracker, final PollingContext pollingContext) {
-        if (!commitOffsets) {
-            return;
-        }
-
+    private void commitOffsets(final KafkaConsumerService consumerService, 
final OffsetTracker offsetTracker, final PollingContext pollingContext, final 
ProcessSession session) {
         try {
-            
consumerService.commit(offsetTracker.getPollingSummary(pollingContext));
+            if (commitOffsets) {
+                
consumerService.commit(offsetTracker.getPollingSummary(pollingContext));
+
+                offsetTracker.getRecordCounts().forEach((topic, count) -> {
+                    session.adjustCounter("Records Acknowledged for " + topic, 
count, true);
+                });
+            }
+
             consumerServices.offer(consumerService);
             getLogger().debug("Committed offsets for Kafka Consumer Service");
         } catch (final Exception e) {
@@ -435,8 +438,12 @@ public class ConsumeKafka extends AbstractProcessor 
implements VerifiableProcess
         }
     }
 
-    private void rollback(final KafkaConsumerService consumerService) {
+    private void rollback(final KafkaConsumerService consumerService, final 
OffsetTracker offsetTracker, final ProcessSession session) {
         if (!consumerService.isClosed()) {
+            offsetTracker.getRecordCounts().forEach((topic, count) -> {
+                session.adjustCounter("Records Rolled Back for " + topic, 
count, true);
+            });
+
             try {
                 consumerService.rollback();
                 consumerServices.offer(consumerService);
@@ -539,18 +546,17 @@ public class ConsumeKafka extends AbstractProcessor 
implements VerifiableProcess
         final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
         final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
-        final KafkaMessageConverter converter = switch (outputStrategy) {
-            case USE_VALUE -> new 
RecordStreamKafkaMessageConverter(readerFactory, writerFactory,
-                    headerEncoding, headerNamePattern, keyEncoding, 
commitOffsets, offsetTracker, getLogger(), brokerUri);
-
-        case USE_WRAPPER, INJECT_METADATA -> {
-                final RecordReaderFactory keyReaderFactory = keyFormat == 
KeyFormat.RECORD
-                        ? 
context.getProperty(KEY_RECORD_READER).asControllerService(RecordReaderFactory.class)
 : null;
+        final KafkaMessageConverter converter;
+        if (outputStrategy == OutputStrategy.USE_VALUE) {
+            converter = new RecordStreamKafkaMessageConverter(readerFactory, 
writerFactory, headerEncoding, headerNamePattern,
+                keyEncoding, commitOffsets, offsetTracker, getLogger(), 
brokerUri);
+        } else {
+            final RecordReaderFactory keyReaderFactory = keyFormat == 
KeyFormat.RECORD
+                ? 
context.getProperty(KEY_RECORD_READER).asControllerService(RecordReaderFactory.class)
 : null;
 
-                yield new 
WrapperRecordStreamKafkaMessageConverter(readerFactory, writerFactory, 
keyReaderFactory,
-                    headerEncoding, headerNamePattern, keyFormat, keyEncoding, 
commitOffsets, offsetTracker, getLogger(), brokerUri, outputStrategy);
-            }
-        };
+            converter = new 
WrapperRecordStreamKafkaMessageConverter(readerFactory, writerFactory, 
keyReaderFactory,
+                headerEncoding, headerNamePattern, keyFormat, keyEncoding, 
commitOffsets, offsetTracker, getLogger(), brokerUri, outputStrategy);
+        }
 
         converter.toFlowFiles(session, consumerRecords);
     }
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/OffsetTracker.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/OffsetTracker.java
index 7489e2b608..03b0b8c29b 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/OffsetTracker.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/OffsetTracker.java
@@ -22,21 +22,23 @@ import 
org.apache.nifi.kafka.service.api.consumer.PollingContext;
 import org.apache.nifi.kafka.service.api.consumer.PollingSummary;
 import org.apache.nifi.kafka.service.api.record.ByteRecord;
 
-import java.util.LinkedHashMap;
+import java.util.HashMap;
 import java.util.Map;
 
 public class OffsetTracker {
-    private final Map<TopicPartitionSummary, OffsetSummary> offsets;
-
-    public OffsetTracker() {
-        offsets = new LinkedHashMap<>();
-    }
+    private final Map<TopicPartitionSummary, OffsetSummary> offsets = new 
HashMap<>();
+    private final Map<String, Long> recordCounts = new HashMap<>();
 
     public void update(final ByteRecord consumerRecord) {
         final TopicPartitionSummary topicPartitionSummary = new 
TopicPartitionSummary(consumerRecord.getTopic(), consumerRecord.getPartition());
         final long offset = consumerRecord.getOffset();
         final OffsetSummary offsetSummary = 
offsets.computeIfAbsent(topicPartitionSummary, (summary) -> new 
OffsetSummary(offset));
         offsetSummary.setOffset(offset);
+        recordCounts.merge(consumerRecord.getTopic(), 
consumerRecord.getBundledCount(), Long::sum);
+    }
+
+    public Map<String, Long> getRecordCounts() {
+        return recordCounts;
     }
 
     public PollingSummary getPollingSummary(final PollingContext 
pollingContext) {
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/convert/RecordStreamKafkaMessageConverter.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/convert/RecordStreamKafkaMessageConverter.java
index b957a84b0c..4bf1fe882c 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/convert/RecordStreamKafkaMessageConverter.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/convert/RecordStreamKafkaMessageConverter.java
@@ -19,6 +19,7 @@ package org.apache.nifi.kafka.processors.consumer.convert;
 import org.apache.nifi.kafka.processors.consumer.OffsetTracker;
 import org.apache.nifi.kafka.service.api.header.RecordHeader;
 import org.apache.nifi.kafka.service.api.record.ByteRecord;
+import org.apache.nifi.kafka.shared.property.KeyEncoding;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.RecordReaderFactory;
@@ -39,7 +40,7 @@ public class RecordStreamKafkaMessageConverter extends 
AbstractRecordStreamKafka
             final RecordSetWriterFactory writerFactory,
             final Charset headerEncoding,
             final Pattern headerNamePattern,
-            final org.apache.nifi.kafka.shared.property.KeyEncoding 
keyEncoding,
+            final KeyEncoding keyEncoding,
             final boolean commitOffsets,
             final OffsetTracker offsetTracker,
             final ComponentLog logger,

Reply via email to