Hi David,

did you try using failover transport

http://activemq.apache.org/failover-transport-reference.html

It should take care of detecting network problems and reconnecting.
Then you should just use receive() or message listener

Cheers
--
Dejan Bosanac - http://twitter.com/dejanb

Open Source Integration - http://fusesource.com/
ActiveMQ in Action - http://www.manning.com/snyder/
Blog - http://www.nighttale.net



On Tue, Aug 3, 2010 at 10:56 AM, David Delbecq <david.delb...@oma.be> wrote:
> Hello,
>
> we are having fiability troubles with activeMQ (5.2). To get around those,
> we use in consumer receive(timeout) with a timeout of 60 seconds to detect
> early problems with broker.  We assumed if we called receive() on a closed
> connection (broker side closed) we sould somehow get an exception.  However,
> when activeMQ server is shutdown for any reason, the receiver never detect
> this and the receive call never return any message. We suspect the timeout
> occurs, but we would expect some kind of exception telling us we need to
> reconnect.
>
>
> Here is consumer code:
> while (!stopNow) {
>                Message m = receiver.receive(60000);//wait 60 then next loop
>                if (m == null) {
>                    continue; // timeout?
>                }
>                if (m instanceof MapMessage) { // The event queue should
> contain only map messages !
>                    processEventMessage((MapMessage) m);
>
>
> Here is the threaddump of consumer. There are messages in that queue waiting
> since yesterday, but the consumer never received any of those! This is very
> problematic as, for now, we must restart every consumer after restarting the
> broker. We would expect the consumer to be able to detect this and
> reconnect. What's the procedure to follow for this? We could disconnect  /
> reconnect every minutes to be sure, but that would mean some kind of
> additional load on the broker.
>
> INFO   | jvm 1    | 2010/08/03 08:35:35 | "ActiveMQ Connection Worker:
> tcp://localhost/127.0.0.1:61616" daemon prio=10 tid=0x00007f1cc1d7f000
> nid=0x5d3a waiting on condition [0x0000000041f7b000]
> INFO   | jvm 1    | 2010/08/03 08:35:35 |    java.lang.Thread.State: WAITING
> (parking)
> INFO   | jvm 1    | 2010/08/03 08:35:35 |     at sun.misc.Unsafe.park(Native
> Method)
> INFO   | jvm 1    | 2010/08/03 08:35:35 |     - parking to wait for
> <0x00007f1cc8412618> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> INFO   | jvm 1    | 2010/08/03 08:35:35 |     at
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
> INFO   | jvm 1    | 2010/08/03 08:35:35 |     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
> INFO   | jvm 1    | 2010/08/03 08:35:35 |     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
> INFO   | jvm 1    | 2010/08/03 08:35:35 |     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
> INFO   | jvm 1    | 2010/08/03 08:35:35 |     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
> INFO   | jvm 1    | 2010/08/03 08:35:35 |     at
> java.lang.Thread.run(Thread.java:619)
> INFO   | jvm 1    | 2010/08/03 08:35:35 | "WrapperJarAppMain" prio=10
> tid=0x00007f1cc22b8000 nid=0x5f4d in Object.wait() [0x0000000040af6000]
> INFO   | jvm 1    | 2010/08/03 08:35:35 |    java.lang.Thread.State:
> TIMED_WAITING (on object monitor)
> INFO   | jvm 1    | 2010/08/03 08:35:35 |     at
> java.lang.Object.wait(Native Method)
> INFO   | jvm 1    | 2010/08/03 08:35:35 |     - waiting on
> <0x00007f1cc8413280> (a java.lang.Object)
> INFO   | jvm 1    | 2010/08/03 08:35:35 |     at
> org.apache.activemq.MessageDispatchChannel.dequeue(MessageDispatchChannel.java:77)
> INFO   | jvm 1    | 2010/08/03 08:35:35 |     - locked <0x00007f1cc8413280>
> (a java.lang.Object)
> INFO   | jvm 1    | 2010/08/03 08:35:35 |     at
> org.apache.activemq.ActiveMQMessageConsumer.dequeue(ActiveMQMessageConsumer.java:412)
> INFO   | jvm 1    | 2010/08/03 08:35:35 |     at
> org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:531)
> INFO   | jvm 1    | 2010/08/03 08:35:35 |     at
> be.meteo.shark.client.emailer.JmsEmailer.processMessages(JmsEmailer.java:178)
> INFO   | jvm 1    | 2010/08/03 08:35:35 |     at
> be.meteo.shark.client.emailer.JmsEmailer.main(JmsEmailer.java:409)
> INFO   | jvm 1    | 2010/08/03 08:35:35 |     at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> INFO   | jvm 1    | 2010/08/03 08:35:35 |     at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> INFO   | jvm 1    | 2010/08/03 08:35:35 |     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> INFO   | jvm 1    | 2010/08/03 08:35:35 |     at
> java.lang.reflect.Method.invoke(Method.java:597)
> INFO   | jvm 1    | 2010/08/03 08:35:35 |     at
> org.tanukisoftware.wrapper.WrapperJarApp.run(WrapperJarApp.java:352)
> INFO   | jvm 1    | 2010/08/03 08:35:35 |     at
> java.lang.Thread.run(Thread.java:619)
>
> --
> David Delbecq
> ICT
> Institut Royal Météorologique
> Ext:557
>
>
>

Reply via email to