This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 66074bc881 NIFI-14656 Fixed Kafka Consumer handling when not
committing offsets and added new counters (#10015)
66074bc881 is described below
commit 66074bc881ce7b46989cce89d4ee13b96d543ebe
Author: Mark Payne <[email protected]>
AuthorDate: Fri Jun 13 15:48:40 2025 -0400
NIFI-14656 Fixed Kafka Consumer handling when not committing offsets and
added new counters (#10015)
Ensure that we return the Kafka Consumer to the pool even if we are not
committing offsets. Updated counters to show how many events are acknowledged
vs rolled back to help clarify.
Signed-off-by: David Handermann <[email protected]>
---
.../apache/nifi/kafka/processors/ConsumeKafka.java | 46 ++++++++++++----------
.../kafka/processors/consumer/OffsetTracker.java | 14 ++++---
.../convert/RecordStreamKafkaMessageConverter.java | 3 +-
3 files changed, 36 insertions(+), 27 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
index ad1bd6d2c8..c797f1946b 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
@@ -412,21 +412,24 @@ public class ConsumeKafka extends AbstractProcessor
implements VerifiableProcess
}
session.commitAsync(
- () -> commitOffsets(consumerService, offsetTracker,
pollingContext),
+ () -> commitOffsets(consumerService, offsetTracker,
pollingContext, session),
throwable -> {
getLogger().error("Failed to commit session; will roll back
any uncommitted records", throwable);
- rollback(consumerService);
+ rollback(consumerService, offsetTracker, session);
context.yield();
});
}
- private void commitOffsets(final KafkaConsumerService consumerService,
final OffsetTracker offsetTracker, final PollingContext pollingContext) {
- if (!commitOffsets) {
- return;
- }
-
+ private void commitOffsets(final KafkaConsumerService consumerService,
final OffsetTracker offsetTracker, final PollingContext pollingContext, final
ProcessSession session) {
try {
-
consumerService.commit(offsetTracker.getPollingSummary(pollingContext));
+ if (commitOffsets) {
+
consumerService.commit(offsetTracker.getPollingSummary(pollingContext));
+
+ offsetTracker.getRecordCounts().forEach((topic, count) -> {
+ session.adjustCounter("Records Acknowledged for " + topic,
count, true);
+ });
+ }
+
consumerServices.offer(consumerService);
getLogger().debug("Committed offsets for Kafka Consumer Service");
} catch (final Exception e) {
@@ -435,8 +438,12 @@ public class ConsumeKafka extends AbstractProcessor
implements VerifiableProcess
}
}
- private void rollback(final KafkaConsumerService consumerService) {
+ private void rollback(final KafkaConsumerService consumerService, final
OffsetTracker offsetTracker, final ProcessSession session) {
if (!consumerService.isClosed()) {
+ offsetTracker.getRecordCounts().forEach((topic, count) -> {
+ session.adjustCounter("Records Rolled Back for " + topic,
count, true);
+ });
+
try {
consumerService.rollback();
consumerServices.offer(consumerService);
@@ -539,18 +546,17 @@ public class ConsumeKafka extends AbstractProcessor
implements VerifiableProcess
final RecordReaderFactory readerFactory =
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
- final KafkaMessageConverter converter = switch (outputStrategy) {
- case USE_VALUE -> new
RecordStreamKafkaMessageConverter(readerFactory, writerFactory,
- headerEncoding, headerNamePattern, keyEncoding,
commitOffsets, offsetTracker, getLogger(), brokerUri);
-
- case USE_WRAPPER, INJECT_METADATA -> {
- final RecordReaderFactory keyReaderFactory = keyFormat ==
KeyFormat.RECORD
- ?
context.getProperty(KEY_RECORD_READER).asControllerService(RecordReaderFactory.class)
: null;
+ final KafkaMessageConverter converter;
+ if (outputStrategy == OutputStrategy.USE_VALUE) {
+ converter = new RecordStreamKafkaMessageConverter(readerFactory,
writerFactory, headerEncoding, headerNamePattern,
+ keyEncoding, commitOffsets, offsetTracker, getLogger(),
brokerUri);
+ } else {
+ final RecordReaderFactory keyReaderFactory = keyFormat ==
KeyFormat.RECORD
+ ?
context.getProperty(KEY_RECORD_READER).asControllerService(RecordReaderFactory.class)
: null;
- yield new
WrapperRecordStreamKafkaMessageConverter(readerFactory, writerFactory,
keyReaderFactory,
- headerEncoding, headerNamePattern, keyFormat, keyEncoding,
commitOffsets, offsetTracker, getLogger(), brokerUri, outputStrategy);
- }
- };
+ converter = new
WrapperRecordStreamKafkaMessageConverter(readerFactory, writerFactory,
keyReaderFactory,
+ headerEncoding, headerNamePattern, keyFormat, keyEncoding,
commitOffsets, offsetTracker, getLogger(), brokerUri, outputStrategy);
+ }
converter.toFlowFiles(session, consumerRecords);
}
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/OffsetTracker.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/OffsetTracker.java
index 7489e2b608..03b0b8c29b 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/OffsetTracker.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/OffsetTracker.java
@@ -22,21 +22,23 @@ import
org.apache.nifi.kafka.service.api.consumer.PollingContext;
import org.apache.nifi.kafka.service.api.consumer.PollingSummary;
import org.apache.nifi.kafka.service.api.record.ByteRecord;
-import java.util.LinkedHashMap;
+import java.util.HashMap;
import java.util.Map;
public class OffsetTracker {
- private final Map<TopicPartitionSummary, OffsetSummary> offsets;
-
- public OffsetTracker() {
- offsets = new LinkedHashMap<>();
- }
+ private final Map<TopicPartitionSummary, OffsetSummary> offsets = new
HashMap<>();
+ private final Map<String, Long> recordCounts = new HashMap<>();
public void update(final ByteRecord consumerRecord) {
final TopicPartitionSummary topicPartitionSummary = new
TopicPartitionSummary(consumerRecord.getTopic(), consumerRecord.getPartition());
final long offset = consumerRecord.getOffset();
final OffsetSummary offsetSummary =
offsets.computeIfAbsent(topicPartitionSummary, (summary) -> new
OffsetSummary(offset));
offsetSummary.setOffset(offset);
+ recordCounts.merge(consumerRecord.getTopic(),
consumerRecord.getBundledCount(), Long::sum);
+ }
+
+ public Map<String, Long> getRecordCounts() {
+ return recordCounts;
}
public PollingSummary getPollingSummary(final PollingContext
pollingContext) {
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/convert/RecordStreamKafkaMessageConverter.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/convert/RecordStreamKafkaMessageConverter.java
index b957a84b0c..4bf1fe882c 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/convert/RecordStreamKafkaMessageConverter.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/convert/RecordStreamKafkaMessageConverter.java
@@ -19,6 +19,7 @@ package org.apache.nifi.kafka.processors.consumer.convert;
import org.apache.nifi.kafka.processors.consumer.OffsetTracker;
import org.apache.nifi.kafka.service.api.header.RecordHeader;
import org.apache.nifi.kafka.service.api.record.ByteRecord;
+import org.apache.nifi.kafka.shared.property.KeyEncoding;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordReaderFactory;
@@ -39,7 +40,7 @@ public class RecordStreamKafkaMessageConverter extends
AbstractRecordStreamKafka
final RecordSetWriterFactory writerFactory,
final Charset headerEncoding,
final Pattern headerNamePattern,
- final org.apache.nifi.kafka.shared.property.KeyEncoding
keyEncoding,
+ final KeyEncoding keyEncoding,
final boolean commitOffsets,
final OffsetTracker offsetTracker,
final ComponentLog logger,