JIRA ticket created:

https://issues.apache.org/jira/browse/NIFI-5360


From: Kelsey RIDER <kelsey.ri...@ineat-conseil.fr>
Sent: vendredi 29 juin 2018 09:12
To: users@nifi.apache.org
Subject: RE: simple JMS pub-sub not working

Hi Mark,

I see what you’re saying, but conceptually, it doesn’t seem like a good idea to 
me.

From what I can gather, the purpose of Controller Services is to have a shared 
component that deals with external special cases (like DB connections).
From my point of view (which is admittedly that of a newb) it seems like:
1. The JMSConnectionFactoryProvider should handle the special case of dealing 
with a specific external system. It should expose a method that allows the 
Processor to request a FlowFile fetched from a queue. This would be analagous 
to a DB – if I want to fetch rows from, say, a PostgreSQL DB, the Controller 
Service handles all the Postgres-specific stuff (including the Driver library), 
and the Processor just gets a FlowFile. (Edit – looked at the code and realized 
it doesn’t work like this. But then why don’t we have classpath issues when 
fetching from a DB?)
2. Adding the MQ libraries to the Processor would be redundant (DRY), because 
they’re already specified on the JMSConnectionFactoryProvider.

I can certainly open a JIRA ticket – what do I need to sign up for an account? 
And if I get some time I wouldn’t mind trying to implement something like what 
I just described.

Kelsey

From: Mark Payne <marka...@hotmail.com<mailto:marka...@hotmail.com>>
Sent: jeudi 28 juin 2018 17:20
To: users@nifi.apache.org<mailto:users@nifi.apache.org>
Subject: Re: simple JMS pub-sub not working

Thanks Kelsey.

So it appears that when using RabbitMQ, it's attempting to access the RabbitMQ 
classes in a call
from the ConsumeJMS processor, not from within the ConnectionFactory. I think 
we may end up
needing to include a new property in ConsumeJMS / PublishJMS that optionally 
allow you to specify
the libraries to use for those processors, as well. I.e., in addition to 
needing the libraries in the
Connection Factory Controller Service, for some vendors we will need to also 
provide the libraries
to the Processor.

Fortunately, it should be an easy enough change to make, if that's what is 
necessary. We would basically
just need to create a Property Descriptor in the processor that is similar to 
the one in the Controller Service:

    public static final PropertyDescriptor CLIENT_LIB_DIR_PATH = new 
PropertyDescriptor.Builder()
            .name(CF_LIB)
            .displayName("MQ Client Libraries path (i.e., /usr/jms/lib)")
            .description("Path to the directory with additional resources 
(i.e., JARs, configuration files etc.) to be added "
                    + "to the classpath. Such resources typically represent 
target MQ client libraries for the "
                    + "ConnectionFactory implementation.")
            .addValidator(StandardValidators.createListValidator(true, true, 
StandardValidators.createURLorFileValidator()))
            .required(true)
            .dynamicallyModifiesClasspath(true)
            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
            .build();

And then return that in the processor's list of properties. The Processors 
itself doesn't actually need to do anything with that property,
because the framework will detect the ".dynamicallyModifiesClasspath(true)" 
part and handle it for us.

If you're the type who is inclined to create a JIRA and PR, I'd be happy to 
review it! Or if you are able to even just test it out and validate
that this solves the issue, then that would be helpful as well.

Thanks
-Mark


On Jun 28, 2018, at 11:02 AM, Kelsey RIDER 
<kelsey.ri...@ineat-conseil.fr<mailto:kelsey.ri...@ineat-conseil.fr>> wrote:

Hello Mark,

Here you go:

2018-06-28 15:47:27,161 ERROR [Timer-Driven Process Thread-9] 
o.apache.nifi.jms.processors.ConsumeJMS 
ConsumeJMS[id=01641047-1441-10a4-ce22-23d392689dd9] 
ConsumeJMS[id=01641047-1441-10a4-ce22-23d392689dd9] failed to process session 
due to org.springframework.jms.UncategorizedJmsException: Uncategorized 
exception occured during JMS processing; nested exception is 
com.rabbitmq.jms.util.RMQJMSException: 
com.rabbitmq.jms.client.message.RMQTextMessage; Processor Administratively 
Yielded for 1 sec: org.springframework.jms.UncategorizedJmsException: 
Uncategorized exception occured during JMS processing; nested exception is 
com.rabbitmq.jms.util.RMQJMSException: 
com.rabbitmq.jms.client.message.RMQTextMessage
org.springframework.jms.UncategorizedJmsException: Uncategorized exception 
occured during JMS processing; nested exception is 
com.rabbitmq.jms.util.RMQJMSException: 
com.rabbitmq.jms.client.message.RMQTextMessage
               at 
org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:316)
               at 
org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:169)
               at 
org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:497)
               at 
org.apache.nifi.jms.processors.JMSConsumer.consume(JMSConsumer.java:85)
               at 
org.apache.nifi.jms.processors.ConsumeJMS.rendezvousWithJms(ConsumeJMS.java:181)
               at 
org.apache.nifi.jms.processors.ConsumeJMS.rendezvousWithJms(ConsumeJMS.java:59)
               at 
org.apache.nifi.jms.processors.AbstractJMSProcessor.onTrigger(AbstractJMSProcessor.java:157)
               at 
org.apache.nifi.jms.processors.ConsumeJMS.onTrigger(ConsumeJMS.java:59)
               at 
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
               at 
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1147)
               at 
org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:175)
               at 
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
               at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
               at 
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
               at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
               at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
               at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
               at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
               at java.lang.Thread.run(Thread.java:748)
Caused by: com.rabbitmq.jms.util.RMQJMSException: 
com.rabbitmq.jms.client.message.RMQTextMessage
               at 
com.rabbitmq.jms.client.RMQMessage.instantiateRmqMessage(RMQMessage.java:1070)
               at 
com.rabbitmq.jms.client.RMQMessage.fromMessage(RMQMessage.java:1030)
               at 
com.rabbitmq.jms.client.RMQMessage.convertJmsMessage(RMQMessage.java:871)
               at 
com.rabbitmq.jms.client.RMQMessage.convertMessage(RMQMessage.java:865)
               at 
com.rabbitmq.jms.client.RMQMessageConsumer.receive(RMQMessageConsumer.java:331)
               at 
com.rabbitmq.jms.client.RMQMessageConsumer.receive(RMQMessageConsumer.java:246)
               at 
org.apache.nifi.jms.processors.JMSConsumer$1.doInJms(JMSConsumer.java:95)
               at 
org.apache.nifi.jms.processors.JMSConsumer$1.doInJms(JMSConsumer.java:85)
               at 
org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:494)
               ... 16 common frames omitted
Caused by: java.lang.ClassNotFoundException: 
com.rabbitmq.jms.client.message.RMQTextMessage
               at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
               at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
               at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
               at java.lang.Class.forName0(Native Method)
               at java.lang.Class.forName(Class.java:348)
               at 
com.rabbitmq.jms.client.RMQMessage.instantiateRmqMessage(RMQMessage.java:1064)
               ... 24 common frames omitted

From: Mark Payne <marka...@hotmail.com<mailto:marka...@hotmail.com>>
Sent: jeudi 28 juin 2018 16:40
To: users@nifi.apache.org<mailto:users@nifi.apache.org>
Subject: Re: simple JMS pub-sub not working

Hi Kelsey,

Can you provide the entire stack trace? What you've given here is just a 
"Caused by" portion of it.

Thanks
-Mark



On Jun 28, 2018, at 9:50 AM, Kelsey RIDER 
<kelsey.ri...@ineat-conseil.fr<mailto:kelsey.ri...@ineat-conseil.fr>> wrote:

Hello again,

I’m trying to set up a very simple round-trip JMS test.
I have a RabbitMQ server set up and installed locally.
I set up a simple NiFi flow that generates a FlowFile and uses PublishJMS to 
send it to the queue.
I then have a ConsumeJMS processor that tries to consume what I just published.

The problem is that the consumer always dies with the following exception:
Caused by: java.lang.ClassNotFoundException: 
com.rabbitmq.jms.client.message.RMQTextMessage
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at 
com.rabbitmq.jms.client.RMQMessage.instantiateRmqMessage(RMQMessage.java:1064)

Both the publisher and the consumer use the same JMSConnectionFactoryProvider, 
which is configured with the appropriate libraries and is successfully 
communicating with RabbitMQ (I can see the published messages from the RabbitMQ 
admin console).

Why doesn’t ConsumeJMS use the libraries configured on the JMS controller 
service? How could I get this to work? I tried dropping the required JARs in 
lib/ but this led to the JMS controller service not starting (don’t recall the 
exception). Dropping them in lib/bootstrap/ didn’t have any effect.

Thanks,

Kelsey
Suite à l’évolution des dispositifs de réglementation du travail, si vous 
recevez ce mail avant 7h00, en soirée, durant le week-end ou vos congés merci, 
sauf cas d’urgence exceptionnelle, de ne pas le traiter ni d’y répondre 
immédiatement.

Reply via email to