On 12/18/2014 10:07 AM, Wayna Runa wrote:
Hi everybody !

I am using Hello.java sample of qpid-client-0.30 to publish and consume
messages from a RabbitMQ queue, but i get this error:

--.--.--.--.--.--.--.--.--.--.--.--.--.--.--.--
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
details.
javax.jms.JMSException: Error registering consumer:
org.apache.qpid.AMQConnectionClosedException: Error: NOT_IMPLEMENTED -
active=false [error code 540: not implemented]
     at
org.apache.qpid.client.AMQSession.toJMSException(AMQSession.java:3719)
     at org.apache.qpid.client.AMQSession$4.execute(AMQSession.java:2169)
     at org.apache.qpid.client.AMQSession$4.execute(AMQSession.java:2121)
     at
org.apache.qpid.client.AMQConnectionDelegate_8_0.executeRetrySupport(AMQConnectionDelegate_8_0.java:341)
     at
org.apache.qpid.client.AMQConnection.executeRetrySupport(AMQConnection.java:652)
     at
org.apache.qpid.client.failover.FailoverRetrySupport.execute(FailoverRetrySupport.java:96)
     at
org.apache.qpid.client.AMQSession.createConsumerImpl(AMQSession.java:2119)
     at
org.apache.qpid.client.AMQSession.createConsumer(AMQSession.java:1067)
     at info.intix.rabbitmq.samples.Hello.runTest(Hello.java:53)
     at info.intix.rabbitmq.samples.Hello.main(Hello.java:28)
Caused by: org.apache.qpid.AMQConnectionClosedException: Error:
NOT_IMPLEMENTED - active=false [error code 540: not implemented]
     at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
     at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
     at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
     at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
     at
org.apache.qpid.AMQException.cloneForCurrentThread(AMQException.java:126)
     at
org.apache.qpid.client.protocol.AMQProtocolHandler.writeCommandFrameAndWaitForReply(AMQProtocolHandler.java:693)
     at
org.apache.qpid.client.protocol.AMQProtocolHandler.syncWrite(AMQProtocolHandler.java:730)
     at
org.apache.qpid.client.protocol.AMQProtocolHandler.syncWrite(AMQProtocolHandler.java:724)
     at
org.apache.qpid.client.AMQSession_0_8.sendConsume(AMQSession_0_8.java:539)
     at
org.apache.qpid.client.AMQSession_0_8.sendConsume(AMQSession_0_8.java:71)
     at
org.apache.qpid.client.AMQSession.consumeFromQueue(AMQSession.java:2716)
     at
org.apache.qpid.client.AMQSession.registerConsumer(AMQSession.java:3143)
     at org.apache.qpid.client.AMQSession.access$400(AMQSession.java:97)
     at org.apache.qpid.client.AMQSession$4.execute(AMQSession.java:2146)
     ... 8 more
Caused by: org.apache.qpid.AMQConnectionClosedException: Error:
NOT_IMPLEMENTED - active=false [error code 540: not implemented]
     at
org.apache.qpid.client.handler.ConnectionCloseMethodHandler.methodReceived(ConnectionCloseMethodHandler.java:92)
     at
org.apache.qpid.client.handler.ClientMethodDispatcherImpl.dispatchConnectionClose(ClientMethodDispatcherImpl.java:197)
     at
org.apache.qpid.framing.amqp_0_91.ConnectionCloseBodyImpl.execute(ConnectionCloseBodyImpl.java:127)
     at
org.apache.qpid.client.state.AMQStateManager.methodReceived(AMQStateManager.java:116)
     at
org.apache.qpid.client.protocol.AMQProtocolHandler.methodBodyReceived(AMQProtocolHandler.java:533)
     at
org.apache.qpid.client.protocol.AMQProtocolSession.methodFrameReceived(AMQProtocolSession.java:467)
     at
org.apache.qpid.framing.AMQMethodBodyImpl.handle(AMQMethodBodyImpl.java:99)
     at
org.apache.qpid.client.protocol.AMQProtocolHandler.received(AMQProtocolHandler.java:490)
     at
org.apache.qpid.client.protocol.AMQProtocolHandler.received(AMQProtocolHandler.java:118)
     at
org.apache.qpid.transport.network.io.IoReceiver.run(IoReceiver.java:161)
     at java.lang.Thread.run(Thread.java:744)
--.--.--.--.--.--.--.--.--.--.--.--.--.--.--.--

Seems there is an issue about this here:
https://issues.apache.org/jira/browse/QPID-5906

My JNDI config is:

connectionfactory.myRabbitMQConnectionFactory1 = amqp://admin:xxx@clientid
/DES_DEV?brokerlist='tcp://10.105.135.53:5672'
# queueName: des_queue1
destination.myJndiDestQueue1 =
BURL:direct://amq.direct//?routingkey='des_rk_queue1'

But if I add the next line in Hello.java
(System.setProperty("IMMEDIATE_PREFETCH", "true")) I can publish and
consume the message but with collateral effects:

1) In the RabbitMQ side will be created a TemporalQueue every time that i
run Hello.java.
2) The message "read" in the original queue is not removed.

My question is: Is there any workaround to create just a MessageConsumer
without IMMEDIATE_PREFETCH=true ?

Hi,

No, IMMEDIATE_PREFETCH is required to interoperate with RabbitMQ as described in the ticket (+1 for getting it addressed!). Or, convince RabbitMQ to implement or provide a plugin for 'active=false' to work as expected.... To get things working for me, I also had to use the following patch:

Index: src/main/java/org/apache/qpid/client/AMQSession.java
===================================================================
--- src/main/java/org/apache/qpid/client/AMQSession.java        (revision 
1640219)
+++ src/main/java/org/apache/qpid/client/AMQSession.java        (working copy)
@@ -3241,6 +3241,11 @@
     {
         synchronized (_suspensionLock)
         {
+            if(_immediatePrefetch)
+            {
+ _logger.warn("IMMEDIATE_PREFETCH is in force; no channel suspends processed");
+                return;
+            }
             try
             {
                 if (_logger.isDebugEnabled())


I haven't submitted the patch as it is a blunt force attempt to avoid the issue rather than fix it.

1) Your binding URL for consuming messages does not name a queue, so RabbitMQ is helpfully making one. I found for Qpid that I ended up with a set of JNDI resources for consuming (direct://exchg/route/q) and a set for sending (direct://exchg/route/). Note that consuming lists a queue name, explicitly from that queue, while sending does not, letting the routing key determine the consumers. Consuming worked best with a queue pre-declared and pre-bound on the RabbitMQ side.

2) There is an exception being thrown during your onMessage method, so the framework is not sending an ACK for the message. That is the mechanism of createSession(false, Session.AUTO_ACK), that the framework ACK's a message once the onMessage method returns without error.

Taking into account those two things I think you'll find you can send and consume messages just fine.

Hope that helps,
Nathan


If I can not create a MessageConsumer for RabbitMQ queue, I should try
change RabbitMQ for other as Qpid    :(

Any idea will be welcome.

Regards.

-- wr



____
This email message is a private communication. The information transmitted, 
including attachments, is intended only for the person or entity to which it is 
addressed and may contain confidential, privileged, and/or proprietary 
material. Any review, duplication, retransmission, distribution, or other use 
of, or taking of any action in reliance upon, this information by persons or 
entities other than the intended recipient is unauthorized by the sender and is 
prohibited. If you have received this message in error, please contact the 
sender immediately by return email and delete the original message from all 
computer systems. Thank you.

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org
For additional commands, e-mail: users-h...@qpid.apache.org

Reply via email to