[
https://issues.apache.org/jira/browse/AMQ-3965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Torsten Mielke updated AMQ-3965:
--------------------------------
Attachment: testcase.tgz
Attaching testcase as mvn project.
The testclasss can also be copied into
activemq-core/src/test/java/org/apache/activemq/bugs
> Expired msgs not getting acked to broker causing consumer to fill up its
> prefetch and not getting more msgs.
> ------------------------------------------------------------------------------------------------------------
>
> Key: AMQ-3965
> URL: https://issues.apache.org/jira/browse/AMQ-3965
> Project: ActiveMQ
> Issue Type: Bug
> Components: JMS client
> Affects Versions: 5.6.0
> Reporter: Torsten Mielke
> Labels: optimizeDispatch
> Attachments: testcase.tgz
>
>
> It is possible to get a consumer stalled and not receiving any more messages
> when using optimizeAcknowledge.
> Let me illustrate in an example (JUnit test attached).
> Suppose a consumer with optimizeAcknowledge and a prefetch of 100 msgs.
> The broker's queue contains 105 msg. The first 45 msgs have a very low expiry
> time, the remaining don't expiry.
> So the first 100 msgs get dispatched to the consumer (due to prefetch=100).
> Out of these the first 45 msgs do not get dispatched to consumer code because
> their expiry has elapsed by the time that are handled in the client.
> {code:title=ActiveMQMessageConsumer.java}
> public void dispatch(MessageDispatch md) {
> MessageListener listener = this.messageListener.get();
> try {
> [...]
> synchronized (unconsumedMessages.getMutex()) {
> if (!unconsumedMessages.isClosed()) {
> if (this.info.isBrowser() ||
> !session.connection.isDuplicate(this, md.getMessage())) {
> if (listener != null &&
> unconsumedMessages.isRunning()) {
> ActiveMQMessage message =
> createActiveMQMessage(md);
> beforeMessageIsConsumed(md);
> try {
> boolean expired = message.isExpired();
> if (!expired) {
> listener.onMessage(message);
> }
> afterMessageIsConsumed(md, expired);
> {code}
> listener.onMessage() above is not called as the msg has expired.
> However it will calls into afterMessagesIsConsumed()
> {code:title=ActiveMQMessageConsumer.java}
> private void afterMessageIsConsumed(MessageDispatch md, boolean
> messageExpired) throws JMSException {
> [...]
> if (messageExpired) {
> synchronized (deliveredMessages) {
> deliveredMessages.remove(md);
> }
> stats.getExpiredMessageCount().increment();
> ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
> {code}
> and will remove the expired msg from the deliveredMessages list. It then
> calls into ackLater().
> However ackLater() only fires an ack back to the broker when the number of
> unsent acks has reached 50% of the prefetch value.
> {code:title=ActiveMQMessageConsumer.java}
> private void ackLater(MessageDispatch md, byte ackType) throws JMSException {
> [...]
> if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter -
> additionalWindowSize)) {
> session.sendAck(pendingAck);
> {code}
> In our example it has not reached that mark (only 45 expired msgs, i.e. 45%).
> So the first 45 msgs, which expired before being dispatched, did not cause an
> ack being sent to the broker.
> Now the next 55 messages get processed. These don't have an expiry so they
> get dispatched to consumer code.
> After dispatching each msg to the registered application code, we call into
> afterMessageIsConsumed() but this time executing a different branch as the
> msgs are not expired
> {code:title=ActiveMQMessageConsumer.java}
> private void afterMessageIsConsumed(MessageDispatch md, boolean
> messageExpired) throws JMSException {
> [...]
> else if (isAutoAcknowledgeEach()) {
> if (deliveryingAcknowledgements.compareAndSet(false, true)) {
> synchronized (deliveredMessages) {
> if (!deliveredMessages.isEmpty()) {
> if (optimizeAcknowledge) {
> ackCounter++;
> if (ackCounter >= (info.getPrefetchSize() *
> .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >=
> (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) {
> MessageAck ack =
> makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
> if (ack != null) {
> deliveredMessages.clear();
> ackCounter = 0;
> session.sendAck(ack);
> optimizeAckTimestamp =
> System.currentTimeMillis();
> }
> }
> {code}
> with optimizeAcknowledge=true we only send an ack back to the broker if
> either optimizeAcknowledgeTimeOut has elapsed or the ackCounter has reached
> 65% of the prefetch (100).
> The timeout will not have kicked in. The ackCounter will be at 55 after
> processing the last of 100 prefetched messages which is less than 65% of 100.
> So with the last prefetched msg being processed, it will not generate an ack
> back to the broker.
> As a result, the client has processed all prefetched message and will not get
> any new messages dispatched from the broker. The broker has another 5 msgs on
> the queue but since it never received an ack from the client, it won't
> dispatch any further messages.
> The client is stalled.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators:
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira