Hi there! I would like to share my configurations and Hello.java updated with you. I hope this helps someone.
---------[hello.properties]---------- java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory # RabbitMQ v3.3.5 connectionfactory.myRabbitMQConnectionFactory1 = amqp://admin:xxx@clientid /DES_DEV?brokerlist='tcp://10.105.135.53:5672' destination.myJndiDestQueuePublisher1 = BURL:direct://amq.direct//?routingkey='des_rk_queue1' destination.myJndiDestQueueConsumer1 = BURL:direct://amq.direct/des_rk_queue1/des_queue1 ------------------------------------------- -----[helloRabbitMQ.java]----- import java.io.InputStream; import java.util.Properties; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.naming.Context; import javax.naming.InitialContext; public class HelloRabbitMQ { public HelloRabbitMQ() { } public static void main(String[] args) { HelloRabbitMQ hello = new HelloRabbitMQ(); hello.runTest(); } private void runTest() { try (InputStream resourceAsStream = this.getClass().getResourceAsStream("hello.properties")) { //System.setProperty("qpid.amqp.version", "0-91"); //System.setProperty("qpid.dest_syntax", "BURL"); System.setProperty("IMMEDIATE_PREFETCH", "true"); System.setProperty("qpid.declare_exchanges", "false"); System.setProperty("qpid.declare_queues", "false"); Properties properties = new Properties(); properties.load(resourceAsStream); Context context = new InitialContext(properties); ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("myRabbitMQConnectionFactory1"); //Connection connection = connectionFactory.createConnection("admin","JRq7b1Ct"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destinationPublisher = (Destination) context.lookup("myJndiDestQueuePublisher1"); Destination destinationConsumer = (Destination) context.lookup("myJndiDestQueueConsumer1"); MessageProducer messageProducer = session.createProducer(destinationPublisher); MessageConsumer messageConsumer = session.createConsumer(destinationConsumer); TextMessage message = session.createTextMessage("Hello RabbitMQ 3.3.5 !!"); messageProducer.setDeliveryMode(Session.AUTO_ACKNOWLEDGE); messageProducer.send(message); messageProducer.close(); Thread.sleep(10000); // 10 secs message = (TextMessage)messageConsumer.receive(); System.out.println(message.getText()); session.close(); connection.close(); context.close(); } catch (Exception exp) { exp.printStackTrace(); } } } ---------------------------------------- Special thanks to Nathan Kunkee for his support. Kind regards. - Wayna Runa On 19 December 2014 at 08:55, Wayna Runa <waynar...@gmail.com> wrote: > > Great Nathan ! > > This is the best gift for me in x-mas ! > > Regards. > > > > On 18 December 2014 at 17:52, Nathan Kunkee <nkun...@genome.wustl.edu> > wrote: >> >> On 12/18/2014 10:07 AM, Wayna Runa wrote: >> >>> Hi everybody ! >>> >>> I am using Hello.java sample of qpid-client-0.30 to publish and consume >>> messages from a RabbitMQ queue, but i get this error: >>> >>> --.--.--.--.--.--.--.--.--.--.--.--.--.--.--.-- >>> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". >>> SLF4J: Defaulting to no-operation (NOP) logger implementation >>> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for >>> further >>> details. >>> javax.jms.JMSException: Error registering consumer: >>> org.apache.qpid.AMQConnectionClosedException: Error: NOT_IMPLEMENTED - >>> active=false [error code 540: not implemented] >>> at >>> org.apache.qpid.client.AMQSession.toJMSException(AMQSession.java:3719) >>> at org.apache.qpid.client.AMQSession$4.execute( >>> AMQSession.java:2169) >>> at org.apache.qpid.client.AMQSession$4.execute( >>> AMQSession.java:2121) >>> at >>> org.apache.qpid.client.AMQConnectionDelegate_8_0.executeRetrySupport( >>> AMQConnectionDelegate_8_0.java:341) >>> at >>> org.apache.qpid.client.AMQConnection.executeRetrySupport( >>> AMQConnection.java:652) >>> at >>> org.apache.qpid.client.failover.FailoverRetrySupport. >>> execute(FailoverRetrySupport.java:96) >>> at >>> org.apache.qpid.client.AMQSession.createConsumerImpl( >>> AMQSession.java:2119) >>> at >>> org.apache.qpid.client.AMQSession.createConsumer(AMQSession.java:1067) >>> at info.intix.rabbitmq.samples.Hello.runTest(Hello.java:53) >>> at info.intix.rabbitmq.samples.Hello.main(Hello.java:28) >>> Caused by: org.apache.qpid.AMQConnectionClosedException: Error: >>> NOT_IMPLEMENTED - active=false [error code 540: not implemented] >>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native >>> Method) >>> at >>> sun.reflect.NativeConstructorAccessorImpl.newInstance( >>> NativeConstructorAccessorImpl.java:57) >>> at >>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance( >>> DelegatingConstructorAccessorImpl.java:45) >>> at java.lang.reflect.Constructor.newInstance(Constructor.java:526) >>> at >>> org.apache.qpid.AMQException.cloneForCurrentThread( >>> AMQException.java:126) >>> at >>> org.apache.qpid.client.protocol.AMQProtocolHandler. >>> writeCommandFrameAndWaitForReply(AMQProtocolHandler.java:693) >>> at >>> org.apache.qpid.client.protocol.AMQProtocolHandler. >>> syncWrite(AMQProtocolHandler.java:730) >>> at >>> org.apache.qpid.client.protocol.AMQProtocolHandler. >>> syncWrite(AMQProtocolHandler.java:724) >>> at >>> org.apache.qpid.client.AMQSession_0_8.sendConsume( >>> AMQSession_0_8.java:539) >>> at >>> org.apache.qpid.client.AMQSession_0_8.sendConsume( >>> AMQSession_0_8.java:71) >>> at >>> org.apache.qpid.client.AMQSession.consumeFromQueue(AMQSession.java:2716) >>> at >>> org.apache.qpid.client.AMQSession.registerConsumer(AMQSession.java:3143) >>> at org.apache.qpid.client.AMQSession.access$400(AMQSession.java:97) >>> at org.apache.qpid.client.AMQSession$4.execute( >>> AMQSession.java:2146) >>> ... 8 more >>> Caused by: org.apache.qpid.AMQConnectionClosedException: Error: >>> NOT_IMPLEMENTED - active=false [error code 540: not implemented] >>> at >>> org.apache.qpid.client.handler.ConnectionCloseMethodHandler. >>> methodReceived(ConnectionCloseMethodHandler.java:92) >>> at >>> org.apache.qpid.client.handler.ClientMethodDispatcherImpl. >>> dispatchConnectionClose(ClientMethodDispatcherImpl.java:197) >>> at >>> org.apache.qpid.framing.amqp_0_91.ConnectionCloseBodyImpl.execute( >>> ConnectionCloseBodyImpl.java:127) >>> at >>> org.apache.qpid.client.state.AMQStateManager.methodReceived( >>> AMQStateManager.java:116) >>> at >>> org.apache.qpid.client.protocol.AMQProtocolHandler.methodBodyReceived( >>> AMQProtocolHandler.java:533) >>> at >>> org.apache.qpid.client.protocol.AMQProtocolSession.methodFrameReceived( >>> AMQProtocolSession.java:467) >>> at >>> org.apache.qpid.framing.AMQMethodBodyImpl.handle( >>> AMQMethodBodyImpl.java:99) >>> at >>> org.apache.qpid.client.protocol.AMQProtocolHandler. >>> received(AMQProtocolHandler.java:490) >>> at >>> org.apache.qpid.client.protocol.AMQProtocolHandler. >>> received(AMQProtocolHandler.java:118) >>> at >>> org.apache.qpid.transport.network.io.IoReceiver.run(IoReceiver.java:161) >>> at java.lang.Thread.run(Thread.java:744) >>> --.--.--.--.--.--.--.--.--.--.--.--.--.--.--.-- >>> >>> Seems there is an issue about this here: >>> https://issues.apache.org/jira/browse/QPID-5906 >>> >>> My JNDI config is: >>> >>> connectionfactory.myRabbitMQConnectionFactory1 = >>> amqp://admin:xxx@clientid >>> /DES_DEV?brokerlist='tcp://10.105.135.53:5672' >>> # queueName: des_queue1 >>> destination.myJndiDestQueue1 = >>> BURL:direct://amq.direct//?routingkey='des_rk_queue1' >>> >>> But if I add the next line in Hello.java >>> (System.setProperty("IMMEDIATE_PREFETCH", "true")) I can publish and >>> consume the message but with collateral effects: >>> >>> 1) In the RabbitMQ side will be created a TemporalQueue every time that i >>> run Hello.java. >>> 2) The message "read" in the original queue is not removed. >>> >>> My question is: Is there any workaround to create just a MessageConsumer >>> without IMMEDIATE_PREFETCH=true ? >>> >> >> Hi, >> >> No, IMMEDIATE_PREFETCH is required to interoperate with RabbitMQ as >> described in the ticket (+1 for getting it addressed!). Or, convince >> RabbitMQ to implement or provide a plugin for 'active=false' to work as >> expected.... To get things working for me, I also had to use the following >> patch: >> >> Index: src/main/java/org/apache/qpid/client/AMQSession.java >> =================================================================== >> --- src/main/java/org/apache/qpid/client/AMQSession.java >> (revision 1640219) >> +++ src/main/java/org/apache/qpid/client/AMQSession.java (working >> copy) >> @@ -3241,6 +3241,11 @@ >> { >> synchronized (_suspensionLock) >> { >> + if(_immediatePrefetch) >> + { >> + _logger.warn("IMMEDIATE_PREFETCH is in force; no >> channel suspends processed"); >> + return; >> + } >> try >> { >> if (_logger.isDebugEnabled()) >> >> >> I haven't submitted the patch as it is a blunt force attempt to avoid the >> issue rather than fix it. >> >> 1) Your binding URL for consuming messages does not name a queue, so >> RabbitMQ is helpfully making one. I found for Qpid that I ended up with a >> set of JNDI resources for consuming (direct://exchg/route/q) and a set for >> sending (direct://exchg/route/). Note that consuming lists a queue name, >> explicitly from that queue, while sending does not, letting the routing key >> determine the consumers. Consuming worked best with a queue pre-declared >> and pre-bound on the RabbitMQ side. >> >> 2) There is an exception being thrown during your onMessage method, so >> the framework is not sending an ACK for the message. That is the mechanism >> of createSession(false, Session.AUTO_ACK), that the framework ACK's a >> message once the onMessage method returns without error. >> >> Taking into account those two things I think you'll find you can send and >> consume messages just fine. >> >> Hope that helps, >> Nathan >> >> >>> If I can not create a MessageConsumer for RabbitMQ queue, I should try >>> change RabbitMQ for other as Qpid :( >>> >>> Any idea will be welcome. >>> >>> Regards. >>> >>> -- wr >>> >>> >> >> ____ >> This email message is a private communication. The information >> transmitted, including attachments, is intended only for the person or >> entity to which it is addressed and may contain confidential, privileged, >> and/or proprietary material. Any review, duplication, retransmission, >> distribution, or other use of, or taking of any action in reliance upon, >> this information by persons or entities other than the intended recipient >> is unauthorized by the sender and is prohibited. If you have received this >> message in error, please contact the sender immediately by return email and >> delete the original message from all computer systems. Thank you. >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org >> For additional commands, e-mail: users-h...@qpid.apache.org >> >> > > -- > > -- > Wayna Runa > > +34 668872813 | Skype waynaruna2 > waynar...@gmail.com | Barcelona, Spain > -- -- Wayna Runa +34 668872813 | Skype waynaruna2 waynar...@gmail.com | Barcelona, Spain