Mike, are we talking about complete avro messages or bare records (schema is 
contained inside the avro file  or do they use a schema registry)

>From my own testing the ConsumeKafkaRecord tries to bundle in the incoming 
>messages to create larger flow files.
Do you use any Kafka headers in the processor?

Also what happens if you try to replicate the behavior of ConsumeKafkaRecord 
like this. I don't know if you need the ConvertRecord but it might be needed to 
pick out a schema name to use as Merge strategy.
ConsumeKafka -> (ConvertRecord) -> Merge Content

/Viking
________________________________
From: Mike Thomsen <mikerthom...@gmail.com>
Sent: Tuesday, November 13, 2018 3:02 PM
To: users@nifi.apache.org
Subject: Re: ConsumeKafkaRecord won't pull new events from Kafka

Closest thing I see to something that implies something might be awry is this 
in nifi-app.log:

javax.management.InstanceAlreadyExistsException: 
kafka.consumer:type=app-info,id=consumer-1
    at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
    at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
    at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
    at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
    at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
    at 
com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
    at 
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:57)
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:640)
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:512)
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:494)
    at 
org.apache.nifi.processors.kafka.pubsub.ConsumerPool.createKafkaConsumer(ConsumerPool.java:143)
    at 
org.apache.nifi.processors.kafka.pubsub.ConsumerPool.obtainConsumer(ConsumerPool.java:107)
    at 
org.apache.nifi.processors.kafka.pubsub.ConsumeKafka.onTrigger(ConsumeKafka.java:359)
    at 
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
    at 
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
    at 
org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
    at 
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)



On Tue, Nov 13, 2018 at 10:00 AM Pierre Villard 
<pierre.villard...@gmail.com<mailto:pierre.villard...@gmail.com>> wrote:
Hey Mike,

Anything in the logs?

Pierre

Le mar. 13 nov. 2018 à 15:56, Mike Thomsen 
<mikerthom...@gmail.com<mailto:mikerthom...@gmail.com>> a écrit :
I have an odd situation where I have ConsumeKafkaRecord and ConsumeKafka 
pulling from the same topic under different consumer groups, but only the 
latter will pull new events. I ran into a situation where the reader didn't 
like the Avro data being pulled from the queue and so I created new topics and 
configured both processors to use the new ones. However, only the non-record 
version of the processor will read.

Anyone got suggestions on how to debug this? I'm reasonably familiar with 
Kafka, but can't figure out why ConsumeKafka and the console consumer can read 
the topic, but ConsumeKafkaRecord is acting like there's nothing there at all.

Thanks,

Mike

Reply via email to