[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16659403#comment-16659403 ]
ASF GitHub Bot commented on NIFI-4914: -------------------------------------- Github user pvillard31 commented on the issue: https://github.com/apache/nifi/pull/2882 Hi @david-streamlio - sorry it took so long, I've been busy with a lot of things... I've not been able to complete all my tests successfully but I wanted to give a status with my tests so far. All the code changes I did are available here: 8ca2db6712b9fc843a3d2ee7917ecc6db2dd54f2 Most of the changes are just to be consistent with the code base but some have been required to make things work. In addition to that, I have the following comments: if we leave the ack timeout to 0sec, we get the following error ```` 2018-10-22 08:23:18,998 WARN [Timer-Driven Process Thread-8] o.a.n.controller.tasks.ConnectableTask Administratively Yielding ConsumePulsar[id=1a70a923-0166-1000-e698-63a0160cc922] due to uncaught Exception: java.lang.IllegalArgumentException: Ack timeout should be should be greater than 1000 ms java.lang.IllegalArgumentException: Ack timeout should be should be greater than 1000 ms at org.apache.pulsar.shade.com.google.common.base.Preconditions.checkArgument(Preconditions.java:122) at org.apache.pulsar.client.impl.ConsumerBuilderImpl.ackTimeout(ConsumerBuilderImpl.java:147) at org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor.getConsumerBulder(AbstractPulsarConsumerProcessor.java:361) at org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor.getConsumer(AbstractPulsarConsumerProcessor.java:338) at org.apache.nifi.processors.pulsar.pubsub.ConsumePulsar.onTrigger(ConsumePulsar.java:46) at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165) at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ```` Is it something that changed with the latest version of Pulsar? In the consume processors, I'd add the topic name as attribute of flowfile and possibly other attributes that could be useful for later routing/filtering. When stopping the processors or stopping NiFi, some processors does not stop cleanly and it requires a process kill for NiFi to shutdown. During my last test, it was on ConsumePulsar but it could be on other processors as well. > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > ------------------------------------------------------------------------------------------ > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions > Affects Versions: 1.6.0 > Reporter: David Kjerrumgaard > Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)