olivier brobecker created NIFI-14247:
----------------------------------------

             Summary: tombstone messages in consumeKafka processor
                 Key: NIFI-14247
                 URL: https://issues.apache.org/jira/browse/NIFI-14247
             Project: Apache NiFi
          Issue Type: Bug
          Components: Extensions
    Affects Versions: 2.0.0
         Environment: kubernetes using apache/nifi docker image
            Reporter: olivier brobecker


when using the following parameters for processor consumeKafka 2.0.0 from 
bundle nifi-kafka-nar with processing strategy set to FLOW_FILE or DEMARCATOR, 
the processor failed to consume a kafka document if this one has no messages ( 
a tombstone message ).

When reading document, a flow file should have been generated with no content 
and the attribute kafka.tombstone set to true.

 

Instead, the processor yield and the following error can be seen in the logs :
{code:java}
2025-02-07 06:29:46,619 ERROR [Timer-Driven Process Thread-10] 
o.a.nifi.kafka.processors.ConsumeKafka 
ConsumeKafka[id=df115ac8-0194-1000-ffff-fffffa8cb386] Failed to consume Kafka 
Records
java.lang.NullPointerException: Value required
        at java.base/java.util.Objects.requireNonNull(Objects.java:259)
        at 
org.apache.nifi.kafka.service.api.record.ByteRecord.<init>(ByteRecord.java:55)
        at 
org.apache.nifi.kafka.service.consumer.Kafka3ConsumerService$RecordIterator.next(Kafka3ConsumerService.java:195)
        at 
org.apache.nifi.kafka.service.consumer.Kafka3ConsumerService$RecordIterator.next(Kafka3ConsumerService.java:167)
        at 
java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
        at java.base/java.lang.reflect.Method.invoke(Method.java:580)
        at 
org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:251)
        at 
org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler$ProxiedReturnObjectInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:237)
        at jdk.proxy12/jdk.proxy12.$Proxy213.next(Unknown Source)
        at 
org.apache.nifi.kafka.processors.consumer.convert.FlowFileStreamKafkaMessageConverter.toFlowFiles(FlowFileStreamKafkaMessageConverter.java:59)
        at 
org.apache.nifi.kafka.processors.ConsumeKafka.processInputFlowFile(ConsumeKafka.java:489)
        at 
org.apache.nifi.kafka.processors.ConsumeKafka.processConsumerRecords(ConsumeKafka.java:437)
        at 
org.apache.nifi.kafka.processors.ConsumeKafka.onTrigger(ConsumeKafka.java:376)
        at 
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
        at 
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1274)
        at 
org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:244)
        at 
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:102)
        at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
        at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
        at 
java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:358)
        at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
        at java.base/java.lang.Thread.run(Thread.java:1583) {code}
 

kafka3ConnectionService is configured to connect to an internal kubernetes 
kafka cluster

 

 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to