On 12/18/2014 10:07 AM, Wayna Runa wrote:
Hi everybody !
I am using Hello.java sample of qpid-client-0.30 to publish and consume
messages from a RabbitMQ queue, but i get this error:
--.--.--.--.--.--.--.--.--.--.--.--.--.--.--.--
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
details.
javax.jms.JMSException: Error registering consumer:
org.apache.qpid.AMQConnectionClosedException: Error: NOT_IMPLEMENTED -
active=false [error code 540: not implemented]
at
org.apache.qpid.client.AMQSession.toJMSException(AMQSession.java:3719)
at org.apache.qpid.client.AMQSession$4.execute(AMQSession.java:2169)
at org.apache.qpid.client.AMQSession$4.execute(AMQSession.java:2121)
at
org.apache.qpid.client.AMQConnectionDelegate_8_0.executeRetrySupport(AMQConnectionDelegate_8_0.java:341)
at
org.apache.qpid.client.AMQConnection.executeRetrySupport(AMQConnection.java:652)
at
org.apache.qpid.client.failover.FailoverRetrySupport.execute(FailoverRetrySupport.java:96)
at
org.apache.qpid.client.AMQSession.createConsumerImpl(AMQSession.java:2119)
at
org.apache.qpid.client.AMQSession.createConsumer(AMQSession.java:1067)
at info.intix.rabbitmq.samples.Hello.runTest(Hello.java:53)
at info.intix.rabbitmq.samples.Hello.main(Hello.java:28)
Caused by: org.apache.qpid.AMQConnectionClosedException: Error:
NOT_IMPLEMENTED - active=false [error code 540: not implemented]
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at
org.apache.qpid.AMQException.cloneForCurrentThread(AMQException.java:126)
at
org.apache.qpid.client.protocol.AMQProtocolHandler.writeCommandFrameAndWaitForReply(AMQProtocolHandler.java:693)
at
org.apache.qpid.client.protocol.AMQProtocolHandler.syncWrite(AMQProtocolHandler.java:730)
at
org.apache.qpid.client.protocol.AMQProtocolHandler.syncWrite(AMQProtocolHandler.java:724)
at
org.apache.qpid.client.AMQSession_0_8.sendConsume(AMQSession_0_8.java:539)
at
org.apache.qpid.client.AMQSession_0_8.sendConsume(AMQSession_0_8.java:71)
at
org.apache.qpid.client.AMQSession.consumeFromQueue(AMQSession.java:2716)
at
org.apache.qpid.client.AMQSession.registerConsumer(AMQSession.java:3143)
at org.apache.qpid.client.AMQSession.access$400(AMQSession.java:97)
at org.apache.qpid.client.AMQSession$4.execute(AMQSession.java:2146)
... 8 more
Caused by: org.apache.qpid.AMQConnectionClosedException: Error:
NOT_IMPLEMENTED - active=false [error code 540: not implemented]
at
org.apache.qpid.client.handler.ConnectionCloseMethodHandler.methodReceived(ConnectionCloseMethodHandler.java:92)
at
org.apache.qpid.client.handler.ClientMethodDispatcherImpl.dispatchConnectionClose(ClientMethodDispatcherImpl.java:197)
at
org.apache.qpid.framing.amqp_0_91.ConnectionCloseBodyImpl.execute(ConnectionCloseBodyImpl.java:127)
at
org.apache.qpid.client.state.AMQStateManager.methodReceived(AMQStateManager.java:116)
at
org.apache.qpid.client.protocol.AMQProtocolHandler.methodBodyReceived(AMQProtocolHandler.java:533)
at
org.apache.qpid.client.protocol.AMQProtocolSession.methodFrameReceived(AMQProtocolSession.java:467)
at
org.apache.qpid.framing.AMQMethodBodyImpl.handle(AMQMethodBodyImpl.java:99)
at
org.apache.qpid.client.protocol.AMQProtocolHandler.received(AMQProtocolHandler.java:490)
at
org.apache.qpid.client.protocol.AMQProtocolHandler.received(AMQProtocolHandler.java:118)
at
org.apache.qpid.transport.network.io.IoReceiver.run(IoReceiver.java:161)
at java.lang.Thread.run(Thread.java:744)
--.--.--.--.--.--.--.--.--.--.--.--.--.--.--.--
Seems there is an issue about this here:
https://issues.apache.org/jira/browse/QPID-5906
My JNDI config is:
connectionfactory.myRabbitMQConnectionFactory1 = amqp://admin:xxx@clientid
/DES_DEV?brokerlist='tcp://10.105.135.53:5672'
# queueName: des_queue1
destination.myJndiDestQueue1 =
BURL:direct://amq.direct//?routingkey='des_rk_queue1'
But if I add the next line in Hello.java
(System.setProperty("IMMEDIATE_PREFETCH", "true")) I can publish and
consume the message but with collateral effects:
1) In the RabbitMQ side will be created a TemporalQueue every time that i
run Hello.java.
2) The message "read" in the original queue is not removed.
My question is: Is there any workaround to create just a MessageConsumer
without IMMEDIATE_PREFETCH=true ?
Hi,
No, IMMEDIATE_PREFETCH is required to interoperate with RabbitMQ as
described in the ticket (+1 for getting it addressed!). Or, convince
RabbitMQ to implement or provide a plugin for 'active=false' to work as
expected.... To get things working for me, I also had to use the
following patch:
Index: src/main/java/org/apache/qpid/client/AMQSession.java
===================================================================
--- src/main/java/org/apache/qpid/client/AMQSession.java (revision
1640219)
+++ src/main/java/org/apache/qpid/client/AMQSession.java (working copy)
@@ -3241,6 +3241,11 @@
{
synchronized (_suspensionLock)
{
+ if(_immediatePrefetch)
+ {
+ _logger.warn("IMMEDIATE_PREFETCH is in force; no
channel suspends processed");
+ return;
+ }
try
{
if (_logger.isDebugEnabled())
I haven't submitted the patch as it is a blunt force attempt to avoid
the issue rather than fix it.
1) Your binding URL for consuming messages does not name a queue, so
RabbitMQ is helpfully making one. I found for Qpid that I ended up with
a set of JNDI resources for consuming (direct://exchg/route/q) and a set
for sending (direct://exchg/route/). Note that consuming lists a queue
name, explicitly from that queue, while sending does not, letting the
routing key determine the consumers. Consuming worked best with a queue
pre-declared and pre-bound on the RabbitMQ side.
2) There is an exception being thrown during your onMessage method, so
the framework is not sending an ACK for the message. That is the
mechanism of createSession(false, Session.AUTO_ACK), that the framework
ACK's a message once the onMessage method returns without error.
Taking into account those two things I think you'll find you can send
and consume messages just fine.
Hope that helps,
Nathan
If I can not create a MessageConsumer for RabbitMQ queue, I should try
change RabbitMQ for other as Qpid :(
Any idea will be welcome.
Regards.
-- wr
____
This email message is a private communication. The information transmitted,
including attachments, is intended only for the person or entity to which it is
addressed and may contain confidential, privileged, and/or proprietary
material. Any review, duplication, retransmission, distribution, or other use
of, or taking of any action in reliance upon, this information by persons or
entities other than the intended recipient is unauthorized by the sender and is
prohibited. If you have received this message in error, please contact the
sender immediately by return email and delete the original message from all
computer systems. Thank you.
---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org
For additional commands, e-mail: users-h...@qpid.apache.org