olivier brobecker created NIFI-14247:
----------------------------------------
Summary: tombstone messages in consumeKafka processor
Key: NIFI-14247
URL: https://issues.apache.org/jira/browse/NIFI-14247
Project: Apache NiFi
Issue Type: Bug
Components: Extensions
Affects Versions: 2.0.0
Environment: kubernetes using apache/nifi docker image
Reporter: olivier brobecker
when using the following parameters for processor consumeKafka 2.0.0 from
bundle nifi-kafka-nar with processing strategy set to FLOW_FILE or DEMARCATOR,
the processor failed to consume a kafka document if this one has no messages (
a tombstone message ).
When reading document, a flow file should have been generated with no content
and the attribute kafka.tombstone set to true.
Instead, the processor yield and the following error can be seen in the logs :
{code:java}
2025-02-07 06:29:46,619 ERROR [Timer-Driven Process Thread-10]
o.a.nifi.kafka.processors.ConsumeKafka
ConsumeKafka[id=df115ac8-0194-1000-ffff-fffffa8cb386] Failed to consume Kafka
Records
java.lang.NullPointerException: Value required
at java.base/java.util.Objects.requireNonNull(Objects.java:259)
at
org.apache.nifi.kafka.service.api.record.ByteRecord.<init>(ByteRecord.java:55)
at
org.apache.nifi.kafka.service.consumer.Kafka3ConsumerService$RecordIterator.next(Kafka3ConsumerService.java:195)
at
org.apache.nifi.kafka.service.consumer.Kafka3ConsumerService$RecordIterator.next(Kafka3ConsumerService.java:167)
at
java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
at
org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:251)
at
org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler$ProxiedReturnObjectInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:237)
at jdk.proxy12/jdk.proxy12.$Proxy213.next(Unknown Source)
at
org.apache.nifi.kafka.processors.consumer.convert.FlowFileStreamKafkaMessageConverter.toFlowFiles(FlowFileStreamKafkaMessageConverter.java:59)
at
org.apache.nifi.kafka.processors.ConsumeKafka.processInputFlowFile(ConsumeKafka.java:489)
at
org.apache.nifi.kafka.processors.ConsumeKafka.processConsumerRecords(ConsumeKafka.java:437)
at
org.apache.nifi.kafka.processors.ConsumeKafka.onTrigger(ConsumeKafka.java:376)
at
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1274)
at
org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:244)
at
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:102)
at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
at
java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:358)
at
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583) {code}
kafka3ConnectionService is configured to connect to an internal kubernetes
kafka cluster
--
This message was sent by Atlassian Jira
(v8.20.10#820010)