Hi Gary,

prefetch=1 alone would not work for my case;

We build CXF web services with JMS transport on top of ActiveMQ.

And syncronous cyclic "(client)->A->B->A" calls sometimes lead to deadlock:
"B->A" message gets into prefetch buffer of the very same consumer that
currently handles "(client)->A" message.
We have load tests that highlight this issue. And polling consumer was a
workaround until we figured out the reconnect implications.

I'll try to rework my example into unit test and then file a jira as you
suggested.

Thanks,
Pavel

On Mon, Feb 15, 2010 at 10:13 PM, Gary Tully <gary.tu...@gmail.com> wrote:

> Have not looked into the test case yet, but when prefetch=0, a consumer is
> polling for messages. The broker will not actively dispatch messages to
> that
> consumer. On each call to receive, the  consumer sends a pull command to
> the
> broker and then waits for a message dispatch. If failover occurs between
> the
> send of the pull and before the dispatch, the receive will remain blocked
> as
> the pull command will be lost and not replayed.
>
> One solution is to have the state tracker track message pull commands so
> that they can be replayed. Afaik that is not there at the moment. Can you
> raise an enhancement request for that?
>
> For your use case though, is prefetch=1 an option?
>
> On 15 February 2010 17:35, Pavel <pag...@gmail.com> wrote:
>
> > Here is another example, this time activemq + Spring; no CXF at all.
> >
> > Any ideas?
> >
> > Thanks,
> > Pavel
> >
> >
> > On Sat, Feb 13, 2010 at 3:04 PM, Pavel <pag...@gmail.com> wrote:
> >
> >> Hi,
> >>
> >> I'm hitting an issue with activeMQ reconnect.
> >>
> >> We are using CXF and SOAP over JMS with ActiveMQ. Also, at some point we
> >> started using consumer.prefetchSize=0. This is to prevent deadlocks when
> >> doing cyclic synchronous webservice calls, like
> A.foo()->B.bar()->A.baz().
> >>
> >> So, attached is a small example, echo-like webservice called over JMS
> >> every 2 seconds.
> >> ActiveMQ 5.3 or Fuse 5.3.0.5.
> >>
> >> activemq.broker.url=failover:(tcp://localhost:61616)
> >>
> jms.destinationQueueName=SAMPLE___SERVICE___QUEUE?consumer.prefetchSize=0
> >>
> >>
> >> All works well, until AMQ restart. Once restart happens, client seems to
> >> reconnect (I can see consumer in a web console), but starts timing out
> [1].
> >> AMQ logs contain several messages like this:
> >> DEBUG | SAMPLE___SERVICE___QUEUE toPageIn: 1, Inflight: 0,
> >> pagedInMessages.size 5
> >>
> >> at some point this follows by a bunch of
> >> DEBUG | Message expired Message
> >> ID:EPBYMINW0436-4688-1266064267343-0:0:22:1:1 dropped=false acked=false
> >> locked=false
> >>
> >>
> >> After application restart things are back to normal - until the next
> >> reconnect.
> >>
> >> If I simply start consumers with AMQ down, then start AMQ - connect
> works
> >> fine and messages start flowing. So it looks like the issue is with
> >> reconnect only.
> >>
> >> Am I hitting some known issue? Are there solutions/workarounds for that?
> >>
> >>
> >> [1] Client stacktrace.
> >> Caused by: java.lang.RuntimeException: Timeout receiving message with
> >> correlationId e10ec6062aec42d899e3c36d1171c6530000000000000032
> >>         at
> >>
> org.apache.cxf.transport.jms.JMSConduit.sendExchange(JMSConduit.java:206)
> >>         at
> >>
> org.apache.cxf.transport.jms.JMSOutputStream.doClose(JMSOutputStream.java:56)
> >>         at
> >> org.apache.cxf.io.CachedOutputStream.close(CachedOutputStream.java:185)
> >>         at
> >>
> org.apache.cxf.io.CacheAndWriteOutputStream.postClose(CacheAndWriteOutputStream.java:47)
> >>         at
> >> org.apache.cxf.io.CachedOutputStream.close(CachedOutputStream.java:188)
> >>         at
> >> org.apache.cxf.transport.AbstractConduit.close(AbstractConduit.java:66)
> >>         at
> >>
> org.apache.cxf.interceptor.MessageSenderInterceptor$MessageSenderEndingInterceptor.handleMessage(MessageSenderInterceptor.java:62)
> >>         at
> >>
> org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:236)
> >>         at
> org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:472)
> >>         at
> org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:302)
> >>         at
> org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:254)
> >>         at
> >> org.apache.cxf.frontend.ClientProxy.invokeSync(ClientProxy.java:73)
> >>         at
> >> org.apache.cxf.jaxws.JaxWsClientProxy.invoke(JaxWsClientProxy.java:123)
> >>
> >>
> >> Thanks,
> >> Pavel
> >
> >
> >
> >
>
>
> --
> http://blog.garytully.com
>
> Open Source Integration
> http://fusesource.com
>

Reply via email to