[ https://issues.apache.org/jira/browse/AMQ-2908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Timothy Bish resolved AMQ-2908. ------------------------------- Resolution: Fixed Fix Version/s: 5.6.0 The addition of the inactive destination cleanup and the auto sweep for expired messages should resolve this issue. > Slow consumer stops receiving messages because > PrefetchSubscription.dispatched is filled with expired messages. > --------------------------------------------------------------------------------------------------------------- > > Key: AMQ-2908 > URL: https://issues.apache.org/jira/browse/AMQ-2908 > Project: ActiveMQ > Issue Type: Bug > Components: Broker > Affects Versions: 5.3.2 > Reporter: Siim Kaalep > Assignee: Gary Tully > Fix For: 5.6.0 > > > Slow consumer gets stuck when consuming from queue that has expiring messages > in it. > Looked into broker while it got stuck and saw that > PrefetchSubscription.dispatched is full of expired messages. > WORKAROUND > Into doActualDispatch added check that if subscription is full, it will > remove all expired message from dispatch. > {code} > Index: > trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java > =================================================================== > --- > trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java > (revision 42304) > +++ > trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java > (working copy) > @@ -400,6 +400,21 @@ > } > } > > + public void removeExpiredMessagesFromDispatch() { > + synchronized(dispatchLock) { > + for (Iterator<MessageReference> iter = > dispatched.iterator(); iter.hasNext(); ) { > + final MessageReference node = iter.next(); > + if (node.isExpired()) { > + if (broker.isExpired(node)) { > + > node.getRegionDestination().messageExpired(context, this, node); > + } > + dispatched.remove(node); > + > node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); > + } > + } > + } > + } > + > /** > * Checks an ack versus the contents of the dispatched list. > * > {code} > {code} > Index: > trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java > =================================================================== > --- > trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java > (revision 42304) > +++ > trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java > (working copy) > @@ -1543,6 +1543,9 @@ > } > if (dispatchSelector.canSelect(s, node)) { > if (!fullConsumers.contains(s)) { > + if (s.isFull() && s instanceof > PrefetchSubscription) { > + > ((PrefetchSubscription)s).removeExpiredMessagesFromDispatch(); > + } > if (!s.isFull()) { > // Dispatch it. > s.add(node); > {code} -- This message is automatically generated by JIRA. For more information on JIRA, see: http://www.atlassian.com/software/jira