Hi, Was this issue figured out ? We have the same problem, wherein a consumer stops receiving further messages, as it has not committed after receiving around 1000 messages. We cannot commit until we get a set of messages and hence the consume is waiting indefinitely.
Any suggestions welcome. AmQ 5.2 in Windows. ----------------------------------------------- IBeaumont wrote: > > I'm sending a mixture of persistent and non persistent messages. > > I have seen a negative queue count before so will look at the trunk. > > Can't see anything unusal in JConsole, but I have just downlaoded the > source and in the ActiveMQMessageConsumer.receiveNoWait there are these > lines... > if (info.getPrefetchSize() == 0) { > md = dequeue(-1); // We let the broker let us know when we > // timeout. > } else { > md = dequeue(0); > } > > So that explains why my consumes seem to hang when things go wrong - > because they wait forever because I have a prefetchSize of zero. > > I've now changed me code so that instead of using receiveNoWait, I'll use > "receive" with a short timeout. Hopefully then things will timeout and > messages will contiue to be processed. I'll then need to find out if all > any messages get skipped. ALthough that doesn't help find the cause. > > > > > > Gary Tully wrote: >> >> Your use case sounds typical and reasonable but just to understand, >> are you sending persistent and non persistent messages to the same >> queue? >> >> Can you check in Jconsole, in the ActiveMQ BrokerView - Queues and >> Subscriptions: >> Do the queue metrics (size, dispatched) look as expected? >> A negative queue size is indicative of a issue that was resolved on >> trunk recently. If you see that you could try a snapshot. >> >> Re: prefetch==0. ok, yea, you need that if you want to depende on >> redelivered(). tThat is a known issue, prefetched messages that are >> not consumed are deemed redelivered when they are dispatched to the >> next consumer. >> ActiveMQ redelivered means delivered to a amq consumer, not delivered >> to the end user. see: >> https://issues.apache.org/activemq/browse/AMQ-1952 >> >> 2009/1/13 IBeaumont <ibeaum...@categoric.com>: >>> >>> Hi Gary, >>> Thanks for the feedback. >>> As you can see from my stack traces earlier in jconsole, the consumers >>> are >>> stuck on pulling messages from the queues, they are in: >>> org.apache.activemq.MessageDispatchChannel.dequeue >>> >>> >>> The reason I've ended up with the settings I have as so far they seem to >>> be >>> give me the desired results (apart from this last problem). Maybe it >>> would >>> be easier to explain the behaviur I need and someone might be able to >>> advise >>> on the best settings... >>> >>> So most messages sent are persistent, and I need to process every >>> message - >>> I can't afford to drop any. I may have slow "consumers", and I might be >>> putting messages on to the queues at a far greater rate than they can >>> get >>> processed (I want any available harddisk space to be used for message >>> storage). There are a a number of consumers and producers, and a number >>> of >>> different queues. >>> >>> >>> The pre-fetch size was set to zero to overcome an issue I was having >>> with >>> redelivery count. Our application uses the JMS redelivered to see if it >>> has >>> already tried processing the message. Using a pre-fetch size bigger >>> than 0 >>> was causing me some issues with this. Out of interest, do you know what >>> happens to messages that are pre-fetched and then the consumer dies. Do >>> they get flagged as redelivered when a new consumer picks them up? >>> >>> >>> >>> >>> Gary Tully wrote: >>>> >>>> A good place to start is with JCosole, to have a look at the queues >>>> and consumers to see how many messages are currently in the queues. >>>> >>>>>The broker is configured with producerFlowControl="false", using TCP, a >>>>>pre-fetch size of 0 and I've also set sendFailIfNoSpace="true". >>>>> >>>> The sendFailIfNoSpace is disabled if producerFlowControl==false, so >>>> your producers will hang if memory for the queues are exhausted. >>>> sendFailIfNoSpace is considered a flowControl mechanism that does not >>>> block so it needs producerFlowControl to be enabled (the default) and >>>> no producer window to be set (also the default). >>>> >>>> Just a thought, but is it possible that the consumers are blocking >>>> when trying to "stick messages" back on a queue? >>>> >>>> Out of interest, why are you using prefetch==0? >>>> >>>> 2009/1/13 IBeaumont <ibeaum...@categoric.com>: >>>>> >>>>> I've got a fairly complex app, that takes msgs, processing them and >>>>> sticks >>>>> them on the same or different queues. >>>>> >>>>> The queues are pre-loaded with persistent messages before the >>>>> application >>>>> starts (50000). Once it starts processing things work fine for a >>>>> while >>>>> and >>>>> then the consumers stop receiving any messages. I have a 4 consumers >>>>> for >>>>> each queue, and there are 3 different queues. >>>>> >>>>> Looking at jconsole all consumes are waiting here: >>>>> >>>>> State: WAITING on java.lang.obj...@1801d4a >>>>> Total blocked: 20 Total waited: 158 >>>>> >>>>> Stack trace: >>>>> java.lang.Object.wait(Native Method) >>>>> java.lang.Object.wait(Object.java:485) >>>>> org.apache.activemq.MessageDispatchChannel.dequeue(MessageDispatchChannel.java:75) >>>>> org.apache.activemq.ActiveMQMessageConsumer.dequeue(ActiveMQMessageConsumer.java:412) >>>>> org.apache.activemq.ActiveMQMessageConsumer.receiveNoWait(ActiveMQMessageConsumer.java:560) >>>>> com.xalert.server.queuing.SessionManager.getAlert(SessionManager.java:236) >>>>> >>>>> >>>>> If I look at the broker (and I'm not really sure what to look at >>>>> here), >>>>> the >>>>> thread for my queue that should be dispatching messages looks like >>>>> this >>>>> Name: QueueThread:queue://csPIQ >>>>> State: WAITING on >>>>> java.util.concurrent.locks.abstractqueuedsynchronizer$conditionobj...@b0a518 >>>>> Total blocked: 2,365 Total waited: 6,717 >>>>> >>>>> Stack trace: >>>>> sun.misc.Unsafe.park(Native Method) >>>>> java.util.concurrent.locks.LockSupport.park(Unknown Source) >>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown >>>>> Source) >>>>> java.util.concurrent.LinkedBlockingQueue.take(Unknown Source) >>>>> java.util.concurrent.ThreadPoolExecutor.getTask(Unknown Source) >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) >>>>> java.lang.Thread.run(Unknown Source) >>>>> >>>>> The broker is configured with producerFlowControl="false", using TCP, >>>>> a >>>>> pre-fetch size of 0 and I've also set sendFailIfNoSpace="true". >>>>> >>>>> Any ideas on what my problem is or how/where I look in ActiveMQ to >>>>> find >>>>> the >>>>> cause, or if the problem is with the consumer. >>>>> >>>>> TIA >>>>> Ian >>>>> >>>>> -- >>>>> View this message in context: >>>>> http://www.nabble.com/Q-Consumers-stop-receiving-messages-tp21438163p21438163.html >>>>> Sent from the ActiveMQ - User mailing list archive at Nabble.com. >>>>> >>>>> >>>> >>>> >>>> >>>> -- >>>> http://blog.garytully.com >>>> >>>> Open Source SOA >>>> http://FUSESource.com >>>> >>>> >>> >>> -- >>> View this message in context: >>> http://www.nabble.com/Q-Consumers-stop-receiving-messages-tp21438163p21440240.html >>> Sent from the ActiveMQ - User mailing list archive at Nabble.com. >>> >>> >> >> >> >> -- >> http://blog.garytully.com >> >> Open Source SOA >> http://FUSESource.com >> >> > > -- View this message in context: http://www.nabble.com/Q-Consumers-stop-receiving-messages-tp21438163p23346978.html Sent from the ActiveMQ - User mailing list archive at Nabble.com.