Hi,
Was this issue figured out ?
We have the same problem, wherein a consumer stops receiving further
messages, as it has not committed after receiving around 1000 messages.  We
cannot commit until we get a set of messages and hence the consume is
waiting indefinitely.

Any suggestions welcome.
AmQ 5.2 in Windows.
-----------------------------------------------


IBeaumont wrote:
> 
> I'm sending a mixture of persistent and non persistent messages.
> 
> I have seen a negative queue count before so will look at the trunk.
> 
> Can't see anything unusal in JConsole, but I have just downlaoded the
> source and in the ActiveMQMessageConsumer.receiveNoWait there are these
> lines...
>         if (info.getPrefetchSize() == 0) {
>             md = dequeue(-1); // We let the broker let us know when we
>             // timeout.
>         } else {
>             md = dequeue(0);
>         }
> 
> So that explains why my consumes seem to hang when things go wrong -
> because they wait forever because I have a prefetchSize of zero.
> 
> I've now changed me code so that instead of using receiveNoWait, I'll use
> "receive" with a short timeout.  Hopefully then things will timeout and
> messages will contiue to be processed.  I'll then need to find out if all
> any messages get skipped.   ALthough that doesn't help find the cause.
> 
> 
> 
> 
> 
> Gary Tully wrote:
>> 
>> Your use case sounds typical and reasonable but just to understand,
>> are you sending persistent and non persistent messages to the same
>> queue?
>> 
>> Can you check in Jconsole, in the ActiveMQ BrokerView - Queues and
>> Subscriptions:
>> Do the queue metrics (size, dispatched) look as expected?
>> A negative queue size is indicative of a issue that was resolved on
>> trunk recently. If you see that you could try a snapshot.
>> 
>> Re: prefetch==0. ok, yea, you need that if you want to depende on
>> redelivered(). tThat is a known issue, prefetched messages that are
>> not consumed are deemed redelivered when they are dispatched to the
>> next consumer.
>> ActiveMQ redelivered means delivered to a amq consumer, not delivered
>> to the end user. see:
>> https://issues.apache.org/activemq/browse/AMQ-1952
>> 
>> 2009/1/13 IBeaumont <ibeaum...@categoric.com>:
>>>
>>> Hi Gary,
>>> Thanks for the feedback.
>>> As you can see from my stack traces earlier in jconsole, the consumers
>>> are
>>> stuck on pulling messages from the queues, they are in:
>>> org.apache.activemq.MessageDispatchChannel.dequeue
>>>
>>>
>>> The reason I've ended up with the settings I have as so far they seem to
>>> be
>>> give me the desired results (apart from this last problem).  Maybe it
>>> would
>>> be easier to explain the behaviur I need and someone might be able to
>>> advise
>>> on the best settings...
>>>
>>> So most messages sent are persistent, and I need to process every
>>> message -
>>> I can't afford to drop any.  I may have slow "consumers", and I might be
>>> putting messages on to the queues at a far greater rate than they can
>>> get
>>> processed (I want any available harddisk space to be used for message
>>> storage).  There are a a number of consumers and producers, and a number
>>> of
>>> different queues.
>>>
>>>
>>> The pre-fetch size was set to zero to overcome an issue I was having
>>> with
>>> redelivery count.  Our application uses the JMS redelivered to see if it
>>> has
>>> already tried processing the message.  Using a pre-fetch size bigger
>>> than 0
>>> was causing me some issues with this.  Out of interest, do you know what
>>> happens to messages that are pre-fetched and then the consumer dies.  Do
>>> they get flagged as redelivered when a new consumer picks them up?
>>>
>>>
>>>
>>>
>>> Gary Tully wrote:
>>>>
>>>> A good place to start is with JCosole, to have a look at the queues
>>>> and consumers to see how many messages are currently in the queues.
>>>>
>>>>>The broker is configured with producerFlowControl="false", using TCP, a
>>>>>pre-fetch size of 0 and I've also set sendFailIfNoSpace="true".
>>>>>
>>>> The sendFailIfNoSpace is disabled if producerFlowControl==false, so
>>>> your producers will hang if memory for the queues are exhausted.
>>>> sendFailIfNoSpace is considered a flowControl mechanism that does not
>>>> block so it needs producerFlowControl to be enabled (the default) and
>>>> no producer window to be set (also the default).
>>>>
>>>> Just a thought, but is it possible that the consumers are blocking
>>>> when trying to "stick messages" back on a queue?
>>>>
>>>> Out of interest, why are you using prefetch==0?
>>>>
>>>> 2009/1/13 IBeaumont <ibeaum...@categoric.com>:
>>>>>
>>>>> I've got a fairly complex app, that takes msgs, processing them and
>>>>> sticks
>>>>> them on the same or different queues.
>>>>>
>>>>> The queues are pre-loaded with persistent messages before the
>>>>> application
>>>>> starts (50000).  Once it starts processing things work fine for a
>>>>> while
>>>>> and
>>>>> then the consumers stop receiving any messages.  I have a 4 consumers
>>>>> for
>>>>> each queue, and there are 3 different queues.
>>>>>
>>>>> Looking at jconsole all consumes are waiting here:
>>>>>
>>>>> State: WAITING on java.lang.obj...@1801d4a
>>>>> Total blocked: 20  Total waited: 158
>>>>>
>>>>> Stack trace:
>>>>> java.lang.Object.wait(Native Method)
>>>>> java.lang.Object.wait(Object.java:485)
>>>>> org.apache.activemq.MessageDispatchChannel.dequeue(MessageDispatchChannel.java:75)
>>>>> org.apache.activemq.ActiveMQMessageConsumer.dequeue(ActiveMQMessageConsumer.java:412)
>>>>> org.apache.activemq.ActiveMQMessageConsumer.receiveNoWait(ActiveMQMessageConsumer.java:560)
>>>>> com.xalert.server.queuing.SessionManager.getAlert(SessionManager.java:236)
>>>>>
>>>>>
>>>>> If I look at the broker (and I'm not really sure what to look at
>>>>> here),
>>>>> the
>>>>> thread for my queue that should be dispatching messages looks like
>>>>> this
>>>>> Name: QueueThread:queue://csPIQ
>>>>> State: WAITING on
>>>>> java.util.concurrent.locks.abstractqueuedsynchronizer$conditionobj...@b0a518
>>>>> Total blocked: 2,365  Total waited: 6,717
>>>>>
>>>>> Stack trace:
>>>>> sun.misc.Unsafe.park(Native Method)
>>>>> java.util.concurrent.locks.LockSupport.park(Unknown Source)
>>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown
>>>>> Source)
>>>>> java.util.concurrent.LinkedBlockingQueue.take(Unknown Source)
>>>>> java.util.concurrent.ThreadPoolExecutor.getTask(Unknown Source)
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
>>>>> java.lang.Thread.run(Unknown Source)
>>>>>
>>>>> The broker is configured with producerFlowControl="false", using TCP,
>>>>> a
>>>>> pre-fetch size of 0 and I've also set sendFailIfNoSpace="true".
>>>>>
>>>>> Any ideas on what my problem is or how/where I look in ActiveMQ to
>>>>> find
>>>>> the
>>>>> cause, or if the problem is with the consumer.
>>>>>
>>>>> TIA
>>>>> Ian
>>>>>
>>>>> --
>>>>> View this message in context:
>>>>> http://www.nabble.com/Q-Consumers-stop-receiving-messages-tp21438163p21438163.html
>>>>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> http://blog.garytully.com
>>>>
>>>> Open Source SOA
>>>> http://FUSESource.com
>>>>
>>>>
>>>
>>> --
>>> View this message in context:
>>> http://www.nabble.com/Q-Consumers-stop-receiving-messages-tp21438163p21440240.html
>>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>>
>>>
>> 
>> 
>> 
>> -- 
>> http://blog.garytully.com
>> 
>> Open Source SOA
>> http://FUSESource.com
>> 
>> 
> 
> 

-- 
View this message in context: 
http://www.nabble.com/Q-Consumers-stop-receiving-messages-tp21438163p23346978.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to