[ 
https://issues.apache.org/jira/browse/AMQ-3965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13431945#comment-13431945
 ] 

Torsten Mielke edited comment on AMQ-3965 at 8/9/12 4:45 PM:
-------------------------------------------------------------

We thought that the following fix could do the job 
{code:title=ActiveMQMessageConsumer.java} 
private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) 
throws JMSException {
[...]
        if (messageExpired) {
            synchronized (deliveredMessages) {
                deliveredMessages.remove(md);
            }
            stats.getExpiredMessageCount().increment();
            ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
        } else {
            stats.onMessage();
            if (session.getTransacted()) {
                // Do nothing.
            } else if (isAutoAcknowledgeEach()) {
                if (deliveryingAcknowledgements.compareAndSet(false, true)) {
                    synchronized (deliveredMessages) {
                        if (!deliveredMessages.isEmpty()) {
                            if (optimizeAcknowledge) {
                                ackCounter++;
                                
                                // AMQ-3965 - this alone does not fix it.
                                float threshold = (float) 
info.getPrefetchSize() * (float) 0.65;
                                if (pendingAck != null && (ackCounter + 
deliveredCounter) >= (threshold)) {
                                    session.sendAck(pendingAck);
                                    pendingAck = null;
                                    deliveredCounter = 0;
                                }
                                if (ackCounter >= (threshold) || 
(optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= 
(optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) {
                                    MessageAck ack = 
makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
                                    if (ack != null) {
                                        deliveredMessages.clear();
                                        ackCounter = 0;
                                        session.sendAck(ack);
                                        optimizeAckTimestamp = 
System.currentTimeMillis();
                                    }
                                }
{code} 

but that extra code 
{code} 
// AMQ-3965 - this alone does not fix it.
float threshold = (float) info.getPrefetchSize() * (float) 0.65;
if (optimizeAcknowledge && pendingAck != null && (ackCounter + 
deliveredCounter) >= (threshold)) {
  session.sendAck(pendingAck);
  pendingAck = null;
  deliveredCounter = 0;
} 

{code} alone is not enough. Let me explain why: 

Suppose a prefetch of 100. Consumer receives 56 normal msgs. So ackCounter is 
at 56, no ack sent back to broker yet. It then receives 44 msgs that expire on 
consumer before dispatch. So deliveredCounter=44 and ackCounter=56. In 
afterMessageIsConsumed() it does not go into the proposed code for the expired 
msgs, only for normal msgs. So for the last 44 expired msgs there is no trigger 
fired to sent an ack to the broker. The result is a hanging consumer that does 
not receive any more msgs. Problem not fixed. 
                
      was (Author: tmielke):
    We thought that the following fix could do the job 
{code:title=ActiveMQMessageConsumer.java} 
private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) 
throws JMSException {
[...]
        if (messageExpired) {
            synchronized (deliveredMessages) {
                deliveredMessages.remove(md);
            }
            stats.getExpiredMessageCount().increment();
            ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
        } else {
            stats.onMessage();
            if (session.getTransacted()) {
                // Do nothing.
            } else if (isAutoAcknowledgeEach()) {
                if (deliveryingAcknowledgements.compareAndSet(false, true)) {
                    synchronized (deliveredMessages) {
                        if (!deliveredMessages.isEmpty()) {
                            if (optimizeAcknowledge) {
                                ackCounter++;
                                
                                // AMQ-3965 - this alone does not fix it.
                                float threshold = (float) 
info.getPrefetchSize() * (float) 0.65;
                                if (optimizeAcknowledge && pendingAck != null 
&& (ackCounter + deliveredCounter) >= (threshold)) {
                                    session.sendAck(pendingAck);
                                    pendingAck = null;
                                    deliveredCounter = 0;
                                }
                                if (ackCounter >= (threshold) || 
(optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= 
(optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) {
                                    MessageAck ack = 
makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
                                    if (ack != null) {
                                        deliveredMessages.clear();
                                        ackCounter = 0;
                                        session.sendAck(ack);
                                        optimizeAckTimestamp = 
System.currentTimeMillis();
                                    }
                                }
{code} 

but that extra code 
{code} 
// AMQ-3965 - this alone does not fix it.
float threshold = (float) info.getPrefetchSize() * (float) 0.65;
if (optimizeAcknowledge && pendingAck != null && (ackCounter + 
deliveredCounter) >= (threshold)) {
  session.sendAck(pendingAck);
  pendingAck = null;
  deliveredCounter = 0;
} 

{code} alone is not enough. Let me explain why: 

Suppose a prefetch of 100. Consumer receives 56 normal msgs. So ackCounter is 
at 56, no ack sent back to broker yet. It then receives 44 msgs that expire on 
consumer before dispatch. So deliveredCounter=44 and ackCounter=56. In 
afterMessageIsConsumed() it does not go into the proposed code for the expired 
msgs, only for normal msgs. So for the last 44 expired msgs there is no trigger 
fired to sent an ack to the broker. The result is a hanging consumer that does 
not receive any more msgs. Problem not fixed. 
                  
> Expired msgs not getting acked to broker causing consumer to fill up its 
> prefetch and not getting more msgs.
> ------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-3965
>                 URL: https://issues.apache.org/jira/browse/AMQ-3965
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: JMS client
>    Affects Versions: 5.6.0
>            Reporter: Torsten Mielke
>              Labels: optimizeDispatch
>         Attachments: OptimizeAcknowledgeWithExpiredMsgsTest.java, testcase.tgz
>
>
> It is possible to get a consumer stalled and not receiving any more messages 
> when using optimizeAcknowledge.
> Let me illustrate in an example (JUnit test attached).
> Suppose a consumer with optimizeAcknowledge and a prefetch of 100 msgs.
> The broker's queue contains 105 msg. The first 45 msgs have a very low expiry 
> time, the remaining don't expiry. 
> So the first 100 msgs get dispatched to the consumer (due to prefetch=100). 
> Out of these the first 45 msgs do not get dispatched to consumer code because 
> their expiry has elapsed by the time that are handled in the client. 
> {code:title=ActiveMQMessageConsumer.java}
> public void dispatch(MessageDispatch md) {
>         MessageListener listener = this.messageListener.get();
>         try {
>             [...]
>             synchronized (unconsumedMessages.getMutex()) {
>                 if (!unconsumedMessages.isClosed()) {
>                     if (this.info.isBrowser() || 
> !session.connection.isDuplicate(this, md.getMessage())) {
>                         if (listener != null && 
> unconsumedMessages.isRunning()) {
>                             ActiveMQMessage message = 
> createActiveMQMessage(md);
>                             beforeMessageIsConsumed(md);
>                             try {
>                                 boolean expired = message.isExpired();
>                                 if (!expired) {
>                                     listener.onMessage(message);
>                                 }
>                                 afterMessageIsConsumed(md, expired);
> {code}
> listener.onMessage() above is not called as the msg has expired. 
> However it will calls into afterMessagesIsConsumed()
> {code:title=ActiveMQMessageConsumer.java}
>     private void afterMessageIsConsumed(MessageDispatch md, boolean 
> messageExpired) throws JMSException {
>       [...]  
>       if (messageExpired) {
>             synchronized (deliveredMessages) {
>                 deliveredMessages.remove(md);
>             }
>             stats.getExpiredMessageCount().increment();
>             ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
> {code}
> and will remove the expired msg from the deliveredMessages list. It then 
> calls into ackLater(). 
> However ackLater() only fires an ack back to the broker when the number of 
> unsent acks has reached 50% of the prefetch value.
> {code:title=ActiveMQMessageConsumer.java}
>  private void ackLater(MessageDispatch md, byte ackType) throws JMSException {
>     [...]
>     if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter - 
> additionalWindowSize)) {
>             session.sendAck(pendingAck);
> {code}        
> In our example it has not reached that mark (only 45 expired msgs, i.e. 45%). 
> So the first 45 msgs, which expired before being dispatched, did not cause an 
> ack being sent to the broker.
> Now the next 55 messages get processed. These don't have an expiry so they 
> get dispatched to consumer code. 
> After dispatching each msg to the registered application code, we call into 
> afterMessageIsConsumed() but this time executing a different branch as the 
> msgs are not expired
> {code:title=ActiveMQMessageConsumer.java}
> private void afterMessageIsConsumed(MessageDispatch md, boolean 
> messageExpired) throws JMSException {
>     [...]
>     else if (isAutoAcknowledgeEach()) {
>                 if (deliveryingAcknowledgements.compareAndSet(false, true)) {
>                     synchronized (deliveredMessages) {
>                         if (!deliveredMessages.isEmpty()) {
>                             if (optimizeAcknowledge) {
>                                 ackCounter++;
>                                 if (ackCounter >= (info.getPrefetchSize() * 
> .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= 
> (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) {
>                                     MessageAck ack = 
> makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
>                                     if (ack != null) {
>                                         deliveredMessages.clear();
>                                         ackCounter = 0;
>                                         session.sendAck(ack);
>                                         optimizeAckTimestamp = 
> System.currentTimeMillis();
>                                     }
>                                 }
> {code}
> with optimizeAcknowledge=true we only send an ack back to the broker if 
> either optimizeAcknowledgeTimeOut has elapsed or the ackCounter has reached 
> 65% of the prefetch (100). 
> The timeout will not have kicked in. The ackCounter will be at 55 after 
> processing the last of 100 prefetched messages which is less than 65% of 100. 
> So with the last prefetched msg being processed, it will not generate an ack 
> back to the broker. 
> As a result, the client has processed all prefetched message and will not get 
> any new messages dispatched from the broker. The broker has another 5 msgs on 
> the queue but since it never received an ack from the client, it won't 
> dispatch any further messages. 
> The client is stalled. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to