Hi Gary, prefetch=1 alone would not work for my case;
We build CXF web services with JMS transport on top of ActiveMQ. And syncronous cyclic "(client)->A->B->A" calls sometimes lead to deadlock: "B->A" message gets into prefetch buffer of the very same consumer that currently handles "(client)->A" message. We have load tests that highlight this issue. And polling consumer was a workaround until we figured out the reconnect implications. I'll try to rework my example into unit test and then file a jira as you suggested. Thanks, Pavel On Mon, Feb 15, 2010 at 10:13 PM, Gary Tully <gary.tu...@gmail.com> wrote: > Have not looked into the test case yet, but when prefetch=0, a consumer is > polling for messages. The broker will not actively dispatch messages to > that > consumer. On each call to receive, the consumer sends a pull command to > the > broker and then waits for a message dispatch. If failover occurs between > the > send of the pull and before the dispatch, the receive will remain blocked > as > the pull command will be lost and not replayed. > > One solution is to have the state tracker track message pull commands so > that they can be replayed. Afaik that is not there at the moment. Can you > raise an enhancement request for that? > > For your use case though, is prefetch=1 an option? > > On 15 February 2010 17:35, Pavel <pag...@gmail.com> wrote: > > > Here is another example, this time activemq + Spring; no CXF at all. > > > > Any ideas? > > > > Thanks, > > Pavel > > > > > > On Sat, Feb 13, 2010 at 3:04 PM, Pavel <pag...@gmail.com> wrote: > > > >> Hi, > >> > >> I'm hitting an issue with activeMQ reconnect. > >> > >> We are using CXF and SOAP over JMS with ActiveMQ. Also, at some point we > >> started using consumer.prefetchSize=0. This is to prevent deadlocks when > >> doing cyclic synchronous webservice calls, like > A.foo()->B.bar()->A.baz(). > >> > >> So, attached is a small example, echo-like webservice called over JMS > >> every 2 seconds. > >> ActiveMQ 5.3 or Fuse 5.3.0.5. > >> > >> activemq.broker.url=failover:(tcp://localhost:61616) > >> > jms.destinationQueueName=SAMPLE___SERVICE___QUEUE?consumer.prefetchSize=0 > >> > >> > >> All works well, until AMQ restart. Once restart happens, client seems to > >> reconnect (I can see consumer in a web console), but starts timing out > [1]. > >> AMQ logs contain several messages like this: > >> DEBUG | SAMPLE___SERVICE___QUEUE toPageIn: 1, Inflight: 0, > >> pagedInMessages.size 5 > >> > >> at some point this follows by a bunch of > >> DEBUG | Message expired Message > >> ID:EPBYMINW0436-4688-1266064267343-0:0:22:1:1 dropped=false acked=false > >> locked=false > >> > >> > >> After application restart things are back to normal - until the next > >> reconnect. > >> > >> If I simply start consumers with AMQ down, then start AMQ - connect > works > >> fine and messages start flowing. So it looks like the issue is with > >> reconnect only. > >> > >> Am I hitting some known issue? Are there solutions/workarounds for that? > >> > >> > >> [1] Client stacktrace. > >> Caused by: java.lang.RuntimeException: Timeout receiving message with > >> correlationId e10ec6062aec42d899e3c36d1171c6530000000000000032 > >> at > >> > org.apache.cxf.transport.jms.JMSConduit.sendExchange(JMSConduit.java:206) > >> at > >> > org.apache.cxf.transport.jms.JMSOutputStream.doClose(JMSOutputStream.java:56) > >> at > >> org.apache.cxf.io.CachedOutputStream.close(CachedOutputStream.java:185) > >> at > >> > org.apache.cxf.io.CacheAndWriteOutputStream.postClose(CacheAndWriteOutputStream.java:47) > >> at > >> org.apache.cxf.io.CachedOutputStream.close(CachedOutputStream.java:188) > >> at > >> org.apache.cxf.transport.AbstractConduit.close(AbstractConduit.java:66) > >> at > >> > org.apache.cxf.interceptor.MessageSenderInterceptor$MessageSenderEndingInterceptor.handleMessage(MessageSenderInterceptor.java:62) > >> at > >> > org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:236) > >> at > org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:472) > >> at > org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:302) > >> at > org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:254) > >> at > >> org.apache.cxf.frontend.ClientProxy.invokeSync(ClientProxy.java:73) > >> at > >> org.apache.cxf.jaxws.JaxWsClientProxy.invoke(JaxWsClientProxy.java:123) > >> > >> > >> Thanks, > >> Pavel > > > > > > > > > > > -- > http://blog.garytully.com > > Open Source Integration > http://fusesource.com >