[ https://issues.apache.org/jira/browse/AMQ-3965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Torsten Mielke resolved AMQ-3965. --------------------------------- Resolution: Fixed Fix Version/s: 5.7.0 Resolved by this [commit|https://fisheye6.atlassian.com/changelog/activemq?cs=1371722]. > Expired msgs not getting acked to broker causing consumer to fill up its > prefetch and not getting more msgs. > ------------------------------------------------------------------------------------------------------------ > > Key: AMQ-3965 > URL: https://issues.apache.org/jira/browse/AMQ-3965 > Project: ActiveMQ > Issue Type: Bug > Components: JMS client > Affects Versions: 5.6.0 > Reporter: Torsten Mielke > Assignee: Torsten Mielke > Labels: optimizeDispatch > Fix For: 5.7.0 > > Attachments: AMQ-3956.patch, > OptimizeAcknowledgeWithExpiredMsgsTest.java, testcase.tgz > > > It is possible to get a consumer stalled and not receiving any more messages > when using optimizeAcknowledge. > Let me illustrate in an example (JUnit test attached). > Suppose a consumer with optimizeAcknowledge and a prefetch of 100 msgs. > The broker's queue contains 105 msg. The first 45 msgs have a very low expiry > time, the remaining don't expiry. > So the first 100 msgs get dispatched to the consumer (due to prefetch=100). > Out of these the first 45 msgs do not get dispatched to consumer code because > their expiry has elapsed by the time that are handled in the client. > {code:title=ActiveMQMessageConsumer.java} > public void dispatch(MessageDispatch md) { > MessageListener listener = this.messageListener.get(); > try { > [...] > synchronized (unconsumedMessages.getMutex()) { > if (!unconsumedMessages.isClosed()) { > if (this.info.isBrowser() || > !session.connection.isDuplicate(this, md.getMessage())) { > if (listener != null && > unconsumedMessages.isRunning()) { > ActiveMQMessage message = > createActiveMQMessage(md); > beforeMessageIsConsumed(md); > try { > boolean expired = message.isExpired(); > if (!expired) { > listener.onMessage(message); > } > afterMessageIsConsumed(md, expired); > {code} > listener.onMessage() above is not called as the msg has expired. > However it will calls into afterMessagesIsConsumed() > {code:title=ActiveMQMessageConsumer.java} > private void afterMessageIsConsumed(MessageDispatch md, boolean > messageExpired) throws JMSException { > [...] > if (messageExpired) { > synchronized (deliveredMessages) { > deliveredMessages.remove(md); > } > stats.getExpiredMessageCount().increment(); > ackLater(md, MessageAck.DELIVERED_ACK_TYPE); > {code} > and will remove the expired msg from the deliveredMessages list. It then > calls into ackLater(). > However ackLater() only fires an ack back to the broker when the number of > unsent acks has reached 50% of the prefetch value. > {code:title=ActiveMQMessageConsumer.java} > private void ackLater(MessageDispatch md, byte ackType) throws JMSException { > [...] > if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter - > additionalWindowSize)) { > session.sendAck(pendingAck); > {code} > In our example it has not reached that mark (only 45 expired msgs, i.e. 45%). > So the first 45 msgs, which expired before being dispatched, did not cause an > ack being sent to the broker. > Now the next 55 messages get processed. These don't have an expiry so they > get dispatched to consumer code. > After dispatching each msg to the registered application code, we call into > afterMessageIsConsumed() but this time executing a different branch as the > msgs are not expired > {code:title=ActiveMQMessageConsumer.java} > private void afterMessageIsConsumed(MessageDispatch md, boolean > messageExpired) throws JMSException { > [...] > else if (isAutoAcknowledgeEach()) { > if (deliveryingAcknowledgements.compareAndSet(false, true)) { > synchronized (deliveredMessages) { > if (!deliveredMessages.isEmpty()) { > if (optimizeAcknowledge) { > ackCounter++; > if (ackCounter >= (info.getPrefetchSize() * > .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= > (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) { > MessageAck ack = > makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); > if (ack != null) { > deliveredMessages.clear(); > ackCounter = 0; > session.sendAck(ack); > optimizeAckTimestamp = > System.currentTimeMillis(); > } > } > {code} > with optimizeAcknowledge=true we only send an ack back to the broker if > either optimizeAcknowledgeTimeOut has elapsed or the ackCounter has reached > 65% of the prefetch (100). > The timeout will not have kicked in. The ackCounter will be at 55 after > processing the last of 100 prefetched messages which is less than 65% of 100. > So with the last prefetched msg being processed, it will not generate an ack > back to the broker. > As a result, the client has processed all prefetched message and will not get > any new messages dispatched from the broker. The broker has another 5 msgs on > the queue but since it never received an ack from the client, it won't > dispatch any further messages. > The client is stalled. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira