Great Nathan ! This is the best gift for me in x-mas !
Regards. On 18 December 2014 at 17:52, Nathan Kunkee <nkun...@genome.wustl.edu> wrote: > > On 12/18/2014 10:07 AM, Wayna Runa wrote: > >> Hi everybody ! >> >> I am using Hello.java sample of qpid-client-0.30 to publish and consume >> messages from a RabbitMQ queue, but i get this error: >> >> --.--.--.--.--.--.--.--.--.--.--.--.--.--.--.-- >> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". >> SLF4J: Defaulting to no-operation (NOP) logger implementation >> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further >> details. >> javax.jms.JMSException: Error registering consumer: >> org.apache.qpid.AMQConnectionClosedException: Error: NOT_IMPLEMENTED - >> active=false [error code 540: not implemented] >> at >> org.apache.qpid.client.AMQSession.toJMSException(AMQSession.java:3719) >> at org.apache.qpid.client.AMQSession$4.execute(AMQSession.java:2169) >> at org.apache.qpid.client.AMQSession$4.execute(AMQSession.java:2121) >> at >> org.apache.qpid.client.AMQConnectionDelegate_8_0.executeRetrySupport( >> AMQConnectionDelegate_8_0.java:341) >> at >> org.apache.qpid.client.AMQConnection.executeRetrySupport( >> AMQConnection.java:652) >> at >> org.apache.qpid.client.failover.FailoverRetrySupport. >> execute(FailoverRetrySupport.java:96) >> at >> org.apache.qpid.client.AMQSession.createConsumerImpl( >> AMQSession.java:2119) >> at >> org.apache.qpid.client.AMQSession.createConsumer(AMQSession.java:1067) >> at info.intix.rabbitmq.samples.Hello.runTest(Hello.java:53) >> at info.intix.rabbitmq.samples.Hello.main(Hello.java:28) >> Caused by: org.apache.qpid.AMQConnectionClosedException: Error: >> NOT_IMPLEMENTED - active=false [error code 540: not implemented] >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native >> Method) >> at >> sun.reflect.NativeConstructorAccessorImpl.newInstance( >> NativeConstructorAccessorImpl.java:57) >> at >> sun.reflect.DelegatingConstructorAccessorImpl.newInstance( >> DelegatingConstructorAccessorImpl.java:45) >> at java.lang.reflect.Constructor.newInstance(Constructor.java:526) >> at >> org.apache.qpid.AMQException.cloneForCurrentThread(AMQException.java:126) >> at >> org.apache.qpid.client.protocol.AMQProtocolHandler. >> writeCommandFrameAndWaitForReply(AMQProtocolHandler.java:693) >> at >> org.apache.qpid.client.protocol.AMQProtocolHandler. >> syncWrite(AMQProtocolHandler.java:730) >> at >> org.apache.qpid.client.protocol.AMQProtocolHandler. >> syncWrite(AMQProtocolHandler.java:724) >> at >> org.apache.qpid.client.AMQSession_0_8.sendConsume( >> AMQSession_0_8.java:539) >> at >> org.apache.qpid.client.AMQSession_0_8.sendConsume(AMQSession_0_8.java:71) >> at >> org.apache.qpid.client.AMQSession.consumeFromQueue(AMQSession.java:2716) >> at >> org.apache.qpid.client.AMQSession.registerConsumer(AMQSession.java:3143) >> at org.apache.qpid.client.AMQSession.access$400(AMQSession.java:97) >> at org.apache.qpid.client.AMQSession$4.execute(AMQSession.java:2146) >> ... 8 more >> Caused by: org.apache.qpid.AMQConnectionClosedException: Error: >> NOT_IMPLEMENTED - active=false [error code 540: not implemented] >> at >> org.apache.qpid.client.handler.ConnectionCloseMethodHandler. >> methodReceived(ConnectionCloseMethodHandler.java:92) >> at >> org.apache.qpid.client.handler.ClientMethodDispatcherImpl. >> dispatchConnectionClose(ClientMethodDispatcherImpl.java:197) >> at >> org.apache.qpid.framing.amqp_0_91.ConnectionCloseBodyImpl.execute( >> ConnectionCloseBodyImpl.java:127) >> at >> org.apache.qpid.client.state.AMQStateManager.methodReceived( >> AMQStateManager.java:116) >> at >> org.apache.qpid.client.protocol.AMQProtocolHandler.methodBodyReceived( >> AMQProtocolHandler.java:533) >> at >> org.apache.qpid.client.protocol.AMQProtocolSession.methodFrameReceived( >> AMQProtocolSession.java:467) >> at >> org.apache.qpid.framing.AMQMethodBodyImpl.handle( >> AMQMethodBodyImpl.java:99) >> at >> org.apache.qpid.client.protocol.AMQProtocolHandler. >> received(AMQProtocolHandler.java:490) >> at >> org.apache.qpid.client.protocol.AMQProtocolHandler. >> received(AMQProtocolHandler.java:118) >> at >> org.apache.qpid.transport.network.io.IoReceiver.run(IoReceiver.java:161) >> at java.lang.Thread.run(Thread.java:744) >> --.--.--.--.--.--.--.--.--.--.--.--.--.--.--.-- >> >> Seems there is an issue about this here: >> https://issues.apache.org/jira/browse/QPID-5906 >> >> My JNDI config is: >> >> connectionfactory.myRabbitMQConnectionFactory1 = >> amqp://admin:xxx@clientid >> /DES_DEV?brokerlist='tcp://10.105.135.53:5672' >> # queueName: des_queue1 >> destination.myJndiDestQueue1 = >> BURL:direct://amq.direct//?routingkey='des_rk_queue1' >> >> But if I add the next line in Hello.java >> (System.setProperty("IMMEDIATE_PREFETCH", "true")) I can publish and >> consume the message but with collateral effects: >> >> 1) In the RabbitMQ side will be created a TemporalQueue every time that i >> run Hello.java. >> 2) The message "read" in the original queue is not removed. >> >> My question is: Is there any workaround to create just a MessageConsumer >> without IMMEDIATE_PREFETCH=true ? >> > > Hi, > > No, IMMEDIATE_PREFETCH is required to interoperate with RabbitMQ as > described in the ticket (+1 for getting it addressed!). Or, convince > RabbitMQ to implement or provide a plugin for 'active=false' to work as > expected.... To get things working for me, I also had to use the following > patch: > > Index: src/main/java/org/apache/qpid/client/AMQSession.java > =================================================================== > --- src/main/java/org/apache/qpid/client/AMQSession.java (revision > 1640219) > +++ src/main/java/org/apache/qpid/client/AMQSession.java (working > copy) > @@ -3241,6 +3241,11 @@ > { > synchronized (_suspensionLock) > { > + if(_immediatePrefetch) > + { > + _logger.warn("IMMEDIATE_PREFETCH is in force; no channel > suspends processed"); > + return; > + } > try > { > if (_logger.isDebugEnabled()) > > > I haven't submitted the patch as it is a blunt force attempt to avoid the > issue rather than fix it. > > 1) Your binding URL for consuming messages does not name a queue, so > RabbitMQ is helpfully making one. I found for Qpid that I ended up with a > set of JNDI resources for consuming (direct://exchg/route/q) and a set for > sending (direct://exchg/route/). Note that consuming lists a queue name, > explicitly from that queue, while sending does not, letting the routing key > determine the consumers. Consuming worked best with a queue pre-declared > and pre-bound on the RabbitMQ side. > > 2) There is an exception being thrown during your onMessage method, so the > framework is not sending an ACK for the message. That is the mechanism of > createSession(false, Session.AUTO_ACK), that the framework ACK's a message > once the onMessage method returns without error. > > Taking into account those two things I think you'll find you can send and > consume messages just fine. > > Hope that helps, > Nathan > > >> If I can not create a MessageConsumer for RabbitMQ queue, I should try >> change RabbitMQ for other as Qpid :( >> >> Any idea will be welcome. >> >> Regards. >> >> -- wr >> >> > > ____ > This email message is a private communication. The information > transmitted, including attachments, is intended only for the person or > entity to which it is addressed and may contain confidential, privileged, > and/or proprietary material. Any review, duplication, retransmission, > distribution, or other use of, or taking of any action in reliance upon, > this information by persons or entities other than the intended recipient > is unauthorized by the sender and is prohibited. If you have received this > message in error, please contact the sender immediately by return email and > delete the original message from all computer systems. Thank you. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org > For additional commands, e-mail: users-h...@qpid.apache.org > > -- -- Wayna Runa +34 668872813 | Skype waynaruna2 waynar...@gmail.com | Barcelona, Spain