Great Nathan !

This is the best gift for me in x-mas !

Regards.



On 18 December 2014 at 17:52, Nathan Kunkee <nkun...@genome.wustl.edu>
wrote:
>
> On 12/18/2014 10:07 AM, Wayna Runa wrote:
>
>> Hi everybody !
>>
>> I am using Hello.java sample of qpid-client-0.30 to publish and consume
>> messages from a RabbitMQ queue, but i get this error:
>>
>> --.--.--.--.--.--.--.--.--.--.--.--.--.--.--.--
>> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
>> SLF4J: Defaulting to no-operation (NOP) logger implementation
>> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
>> details.
>> javax.jms.JMSException: Error registering consumer:
>> org.apache.qpid.AMQConnectionClosedException: Error: NOT_IMPLEMENTED -
>> active=false [error code 540: not implemented]
>>      at
>> org.apache.qpid.client.AMQSession.toJMSException(AMQSession.java:3719)
>>      at org.apache.qpid.client.AMQSession$4.execute(AMQSession.java:2169)
>>      at org.apache.qpid.client.AMQSession$4.execute(AMQSession.java:2121)
>>      at
>> org.apache.qpid.client.AMQConnectionDelegate_8_0.executeRetrySupport(
>> AMQConnectionDelegate_8_0.java:341)
>>      at
>> org.apache.qpid.client.AMQConnection.executeRetrySupport(
>> AMQConnection.java:652)
>>      at
>> org.apache.qpid.client.failover.FailoverRetrySupport.
>> execute(FailoverRetrySupport.java:96)
>>      at
>> org.apache.qpid.client.AMQSession.createConsumerImpl(
>> AMQSession.java:2119)
>>      at
>> org.apache.qpid.client.AMQSession.createConsumer(AMQSession.java:1067)
>>      at info.intix.rabbitmq.samples.Hello.runTest(Hello.java:53)
>>      at info.intix.rabbitmq.samples.Hello.main(Hello.java:28)
>> Caused by: org.apache.qpid.AMQConnectionClosedException: Error:
>> NOT_IMPLEMENTED - active=false [error code 540: not implemented]
>>      at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>> Method)
>>      at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(
>> NativeConstructorAccessorImpl.java:57)
>>      at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
>> DelegatingConstructorAccessorImpl.java:45)
>>      at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>>      at
>> org.apache.qpid.AMQException.cloneForCurrentThread(AMQException.java:126)
>>      at
>> org.apache.qpid.client.protocol.AMQProtocolHandler.
>> writeCommandFrameAndWaitForReply(AMQProtocolHandler.java:693)
>>      at
>> org.apache.qpid.client.protocol.AMQProtocolHandler.
>> syncWrite(AMQProtocolHandler.java:730)
>>      at
>> org.apache.qpid.client.protocol.AMQProtocolHandler.
>> syncWrite(AMQProtocolHandler.java:724)
>>      at
>> org.apache.qpid.client.AMQSession_0_8.sendConsume(
>> AMQSession_0_8.java:539)
>>      at
>> org.apache.qpid.client.AMQSession_0_8.sendConsume(AMQSession_0_8.java:71)
>>      at
>> org.apache.qpid.client.AMQSession.consumeFromQueue(AMQSession.java:2716)
>>      at
>> org.apache.qpid.client.AMQSession.registerConsumer(AMQSession.java:3143)
>>      at org.apache.qpid.client.AMQSession.access$400(AMQSession.java:97)
>>      at org.apache.qpid.client.AMQSession$4.execute(AMQSession.java:2146)
>>      ... 8 more
>> Caused by: org.apache.qpid.AMQConnectionClosedException: Error:
>> NOT_IMPLEMENTED - active=false [error code 540: not implemented]
>>      at
>> org.apache.qpid.client.handler.ConnectionCloseMethodHandler.
>> methodReceived(ConnectionCloseMethodHandler.java:92)
>>      at
>> org.apache.qpid.client.handler.ClientMethodDispatcherImpl.
>> dispatchConnectionClose(ClientMethodDispatcherImpl.java:197)
>>      at
>> org.apache.qpid.framing.amqp_0_91.ConnectionCloseBodyImpl.execute(
>> ConnectionCloseBodyImpl.java:127)
>>      at
>> org.apache.qpid.client.state.AMQStateManager.methodReceived(
>> AMQStateManager.java:116)
>>      at
>> org.apache.qpid.client.protocol.AMQProtocolHandler.methodBodyReceived(
>> AMQProtocolHandler.java:533)
>>      at
>> org.apache.qpid.client.protocol.AMQProtocolSession.methodFrameReceived(
>> AMQProtocolSession.java:467)
>>      at
>> org.apache.qpid.framing.AMQMethodBodyImpl.handle(
>> AMQMethodBodyImpl.java:99)
>>      at
>> org.apache.qpid.client.protocol.AMQProtocolHandler.
>> received(AMQProtocolHandler.java:490)
>>      at
>> org.apache.qpid.client.protocol.AMQProtocolHandler.
>> received(AMQProtocolHandler.java:118)
>>      at
>> org.apache.qpid.transport.network.io.IoReceiver.run(IoReceiver.java:161)
>>      at java.lang.Thread.run(Thread.java:744)
>> --.--.--.--.--.--.--.--.--.--.--.--.--.--.--.--
>>
>> Seems there is an issue about this here:
>> https://issues.apache.org/jira/browse/QPID-5906
>>
>> My JNDI config is:
>>
>> connectionfactory.myRabbitMQConnectionFactory1 =
>> amqp://admin:xxx@clientid
>> /DES_DEV?brokerlist='tcp://10.105.135.53:5672'
>> # queueName: des_queue1
>> destination.myJndiDestQueue1 =
>> BURL:direct://amq.direct//?routingkey='des_rk_queue1'
>>
>> But if I add the next line in Hello.java
>> (System.setProperty("IMMEDIATE_PREFETCH", "true")) I can publish and
>> consume the message but with collateral effects:
>>
>> 1) In the RabbitMQ side will be created a TemporalQueue every time that i
>> run Hello.java.
>> 2) The message "read" in the original queue is not removed.
>>
>> My question is: Is there any workaround to create just a MessageConsumer
>> without IMMEDIATE_PREFETCH=true ?
>>
>
> Hi,
>
> No, IMMEDIATE_PREFETCH is required to interoperate with RabbitMQ as
> described in the ticket (+1 for getting it addressed!). Or, convince
> RabbitMQ to implement or provide a plugin for 'active=false' to work as
> expected.... To get things working for me, I also had to use the following
> patch:
>
> Index: src/main/java/org/apache/qpid/client/AMQSession.java
> ===================================================================
> --- src/main/java/org/apache/qpid/client/AMQSession.java        (revision
> 1640219)
> +++ src/main/java/org/apache/qpid/client/AMQSession.java        (working
> copy)
> @@ -3241,6 +3241,11 @@
>      {
>          synchronized (_suspensionLock)
>          {
> +            if(_immediatePrefetch)
> +            {
> +                _logger.warn("IMMEDIATE_PREFETCH is in force; no channel
> suspends processed");
> +                return;
> +            }
>              try
>              {
>                  if (_logger.isDebugEnabled())
>
>
> I haven't submitted the patch as it is a blunt force attempt to avoid the
> issue rather than fix it.
>
> 1) Your binding URL for consuming messages does not name a queue, so
> RabbitMQ is helpfully making one. I found for Qpid that I ended up with a
> set of JNDI resources for consuming (direct://exchg/route/q) and a set for
> sending (direct://exchg/route/). Note that consuming lists a queue name,
> explicitly from that queue, while sending does not, letting the routing key
> determine the consumers. Consuming worked best with a queue pre-declared
> and pre-bound on the RabbitMQ side.
>
> 2) There is an exception being thrown during your onMessage method, so the
> framework is not sending an ACK for the message. That is the mechanism of
> createSession(false, Session.AUTO_ACK), that the framework ACK's a message
> once the onMessage method returns without error.
>
> Taking into account those two things I think you'll find you can send and
> consume messages just fine.
>
> Hope that helps,
> Nathan
>
>
>> If I can not create a MessageConsumer for RabbitMQ queue, I should try
>> change RabbitMQ for other as Qpid    :(
>>
>> Any idea will be welcome.
>>
>> Regards.
>>
>> -- wr
>>
>>
>
> ____
> This email message is a private communication. The information
> transmitted, including attachments, is intended only for the person or
> entity to which it is addressed and may contain confidential, privileged,
> and/or proprietary material. Any review, duplication, retransmission,
> distribution, or other use of, or taking of any action in reliance upon,
> this information by persons or entities other than the intended recipient
> is unauthorized by the sender and is prohibited. If you have received this
> message in error, please contact the sender immediately by return email and
> delete the original message from all computer systems. Thank you.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org
> For additional commands, e-mail: users-h...@qpid.apache.org
>
>

-- 

--
Wayna Runa

+34 668872813 | Skype waynaruna2
waynar...@gmail.com | Barcelona, Spain

Reply via email to