[ 
https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16659403#comment-16659403
 ] 

ASF GitHub Bot commented on NIFI-4914:
--------------------------------------

Github user pvillard31 commented on the issue:

    https://github.com/apache/nifi/pull/2882
  
    Hi @david-streamlio - sorry it took so long, I've been busy with a lot of 
things...
    
    I've not been able to complete all my tests successfully but I wanted to 
give a status with my tests so far. All the code changes I did are available 
here: 8ca2db6712b9fc843a3d2ee7917ecc6db2dd54f2 Most of the changes are just to 
be consistent with the code base but some have been required to make things 
work.
    
    In addition to that, I have the following comments: if we leave the ack 
timeout to 0sec, we get the following error
    
    ````
    2018-10-22 08:23:18,998 WARN [Timer-Driven Process Thread-8] 
o.a.n.controller.tasks.ConnectableTask Administratively Yielding 
ConsumePulsar[id=1a70a923-0166-1000-e698-63a0160cc922] due to uncaught 
Exception: java.lang.IllegalArgumentException: Ack timeout should be should be 
greater than 1000 ms
    java.lang.IllegalArgumentException: Ack timeout should be should be greater 
than 1000 ms
    at 
org.apache.pulsar.shade.com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
    at 
org.apache.pulsar.client.impl.ConsumerBuilderImpl.ackTimeout(ConsumerBuilderImpl.java:147)
    at 
org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor.getConsumerBulder(AbstractPulsarConsumerProcessor.java:361)
    at 
org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor.getConsumer(AbstractPulsarConsumerProcessor.java:338)
    at 
org.apache.nifi.processors.pulsar.pubsub.ConsumePulsar.onTrigger(ConsumePulsar.java:46)
    at 
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
    at 
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
    at 
org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
    at 
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
    ````
    
    Is it something that changed with the latest version of Pulsar?
    
    In the consume processors, I'd add the topic name as attribute of flowfile 
and possibly other attributes that could be useful for later routing/filtering.
    
    When stopping the processors or stopping NiFi, some processors does not 
stop cleanly and it requires a process kill for NiFi to shutdown. During my 
last test, it was on ConsumePulsar but it could be on other processors as well.


> Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, 
> PublishPulsarRecord
> ------------------------------------------------------------------------------------------
>
>                 Key: NIFI-4914
>                 URL: https://issues.apache.org/jira/browse/NIFI-4914
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>    Affects Versions: 1.6.0
>            Reporter: David Kjerrumgaard
>            Priority: Minor
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Create record-based processors for Apache Pulsar 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to