[ 
https://issues.apache.org/jira/browse/AMQ-3965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Torsten Mielke resolved AMQ-3965.
---------------------------------

       Resolution: Fixed
    Fix Version/s: 5.7.0

Resolved by this 
[commit|https://fisheye6.atlassian.com/changelog/activemq?cs=1371722].
                
> Expired msgs not getting acked to broker causing consumer to fill up its 
> prefetch and not getting more msgs.
> ------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-3965
>                 URL: https://issues.apache.org/jira/browse/AMQ-3965
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: JMS client
>    Affects Versions: 5.6.0
>            Reporter: Torsten Mielke
>            Assignee: Torsten Mielke
>              Labels: optimizeDispatch
>             Fix For: 5.7.0
>
>         Attachments: AMQ-3956.patch, 
> OptimizeAcknowledgeWithExpiredMsgsTest.java, testcase.tgz
>
>
> It is possible to get a consumer stalled and not receiving any more messages 
> when using optimizeAcknowledge.
> Let me illustrate in an example (JUnit test attached).
> Suppose a consumer with optimizeAcknowledge and a prefetch of 100 msgs.
> The broker's queue contains 105 msg. The first 45 msgs have a very low expiry 
> time, the remaining don't expiry. 
> So the first 100 msgs get dispatched to the consumer (due to prefetch=100). 
> Out of these the first 45 msgs do not get dispatched to consumer code because 
> their expiry has elapsed by the time that are handled in the client. 
> {code:title=ActiveMQMessageConsumer.java}
> public void dispatch(MessageDispatch md) {
>         MessageListener listener = this.messageListener.get();
>         try {
>             [...]
>             synchronized (unconsumedMessages.getMutex()) {
>                 if (!unconsumedMessages.isClosed()) {
>                     if (this.info.isBrowser() || 
> !session.connection.isDuplicate(this, md.getMessage())) {
>                         if (listener != null && 
> unconsumedMessages.isRunning()) {
>                             ActiveMQMessage message = 
> createActiveMQMessage(md);
>                             beforeMessageIsConsumed(md);
>                             try {
>                                 boolean expired = message.isExpired();
>                                 if (!expired) {
>                                     listener.onMessage(message);
>                                 }
>                                 afterMessageIsConsumed(md, expired);
> {code}
> listener.onMessage() above is not called as the msg has expired. 
> However it will calls into afterMessagesIsConsumed()
> {code:title=ActiveMQMessageConsumer.java}
>     private void afterMessageIsConsumed(MessageDispatch md, boolean 
> messageExpired) throws JMSException {
>       [...]  
>       if (messageExpired) {
>             synchronized (deliveredMessages) {
>                 deliveredMessages.remove(md);
>             }
>             stats.getExpiredMessageCount().increment();
>             ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
> {code}
> and will remove the expired msg from the deliveredMessages list. It then 
> calls into ackLater(). 
> However ackLater() only fires an ack back to the broker when the number of 
> unsent acks has reached 50% of the prefetch value.
> {code:title=ActiveMQMessageConsumer.java}
>  private void ackLater(MessageDispatch md, byte ackType) throws JMSException {
>     [...]
>     if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter - 
> additionalWindowSize)) {
>             session.sendAck(pendingAck);
> {code}        
> In our example it has not reached that mark (only 45 expired msgs, i.e. 45%). 
> So the first 45 msgs, which expired before being dispatched, did not cause an 
> ack being sent to the broker.
> Now the next 55 messages get processed. These don't have an expiry so they 
> get dispatched to consumer code. 
> After dispatching each msg to the registered application code, we call into 
> afterMessageIsConsumed() but this time executing a different branch as the 
> msgs are not expired
> {code:title=ActiveMQMessageConsumer.java}
> private void afterMessageIsConsumed(MessageDispatch md, boolean 
> messageExpired) throws JMSException {
>     [...]
>     else if (isAutoAcknowledgeEach()) {
>                 if (deliveryingAcknowledgements.compareAndSet(false, true)) {
>                     synchronized (deliveredMessages) {
>                         if (!deliveredMessages.isEmpty()) {
>                             if (optimizeAcknowledge) {
>                                 ackCounter++;
>                                 if (ackCounter >= (info.getPrefetchSize() * 
> .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= 
> (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) {
>                                     MessageAck ack = 
> makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
>                                     if (ack != null) {
>                                         deliveredMessages.clear();
>                                         ackCounter = 0;
>                                         session.sendAck(ack);
>                                         optimizeAckTimestamp = 
> System.currentTimeMillis();
>                                     }
>                                 }
> {code}
> with optimizeAcknowledge=true we only send an ack back to the broker if 
> either optimizeAcknowledgeTimeOut has elapsed or the ackCounter has reached 
> 65% of the prefetch (100). 
> The timeout will not have kicked in. The ackCounter will be at 55 after 
> processing the last of 100 prefetched messages which is less than 65% of 100. 
> So with the last prefetched msg being processed, it will not generate an ack 
> back to the broker. 
> As a result, the client has processed all prefetched message and will not get 
> any new messages dispatched from the broker. The broker has another 5 msgs on 
> the queue but since it never received an ack from the client, it won't 
> dispatch any further messages. 
> The client is stalled. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to