This is an automated email from the ASF dual-hosted git repository. robbie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-jms.git
The following commit(s) were added to refs/heads/master by this push: new ede0219 QPIDJMS-484: track client-ack recovery beyond recover() call, send modified-failed dispositions where appropriate at consumer/session close/shutdown ede0219 is described below commit ede021921ea98a86013e5e9504fc8207db989e29 Author: Robbie Gemmell <rob...@apache.org> AuthorDate: Fri Nov 29 17:19:48 2019 +0000 QPIDJMS-484: track client-ack recovery beyond recover() call, send modified-failed dispositions where appropriate at consumer/session close/shutdown --- .../main/java/org/apache/qpid/jms/JmsSession.java | 2 +- .../jms/message/JmsInboundMessageDispatch.java | 9 + .../qpid/jms/provider/ProviderConstants.java | 3 +- .../qpid/jms/provider/amqp/AmqpConsumer.java | 37 ++- .../jms/integration/ConsumerIntegrationTest.java | 4 +- .../jms/integration/SessionIntegrationTest.java | 259 ++++++++++++++++++++- 6 files changed, 307 insertions(+), 7 deletions(-) diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java index 7015a6f..09bcd05 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java @@ -378,7 +378,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe try { if (getSessionMode() == Session.CLIENT_ACKNOWLEDGE) { - acknowledge(ACK_TYPE.MODIFIED_FAILED); + acknowledge(ACK_TYPE.SESSION_SHUTDOWN); } } catch (Exception e) { LOG.trace("Exception during session shutdown cleanup acknowledgement", e); diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java index e55426f..51c0441 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java @@ -31,6 +31,7 @@ public class JmsInboundMessageDispatch extends JmsAbstractResourceId { private JmsMessage message; private boolean enqueueFirst; private boolean delivered; + private boolean recovered; private transient JmsConsumerInfo consumerInfo; private transient String stringView; @@ -75,6 +76,14 @@ public class JmsInboundMessageDispatch extends JmsAbstractResourceId { this.delivered = delivered; } + public boolean isRecovered() { + return recovered; + } + + public void setRecovered(boolean recovered) { + this.recovered = recovered; + } + public int getRedeliveryCount() { int redeliveryCount = 0; diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderConstants.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderConstants.java index d00f35f..2e07a75 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderConstants.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderConstants.java @@ -31,6 +31,7 @@ public final class ProviderConstants { MODIFIED_FAILED, MODIFIED_FAILED_UNDELIVERABLE, // Conceptual - DELIVERED + DELIVERED, + SESSION_SHUTDOWN } } diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java index 0acbbb5..202a9e7 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java @@ -24,6 +24,8 @@ import java.util.ArrayList; import java.util.ListIterator; import java.util.concurrent.ScheduledFuture; +import javax.jms.Session; + import org.apache.qpid.jms.JmsDestination; import org.apache.qpid.jms.message.JmsInboundMessageDispatch; import org.apache.qpid.jms.message.JmsMessage; @@ -54,6 +56,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver private static final Logger LOG = LoggerFactory.getLogger(AmqpConsumer.class); protected final AmqpSession session; + protected final int acknowledgementMode; protected AsyncResult stopRequest; protected AsyncResult pullRequest; protected long incomingSequence; @@ -65,10 +68,13 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver super(info, receiver, session); this.session = session; + this.acknowledgementMode = info.getAcknowledgementMode(); } @Override public void close(AsyncResult request) { + acknowledgeUndeliveredRecoveredMessages(); + // If we have pending deliveries we remain open to allow for ACK or for a // pending transaction that this consumer is active in to complete. if (shouldDeferClose()) { @@ -79,6 +85,27 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver } } + private void acknowledgeUndeliveredRecoveredMessages() { + if(acknowledgementMode == Session.CLIENT_ACKNOWLEDGE) { + // Send dispositions for any messages which were previously delivered and + // session recovered, but were then not delivered again afterwards. + Delivery delivery = getEndpoint().head(); + while (delivery != null) { + Delivery current = delivery; + delivery = delivery.next(); + + if (!(current.getContext() instanceof JmsInboundMessageDispatch)) { + continue; + } + + JmsInboundMessageDispatch envelope = (JmsInboundMessageDispatch) current.getContext(); + if (envelope.isRecovered() && !envelope.isDelivered()) { + handleDisposition(envelope, current, MODIFIED_FAILED); + } + } + } + } + /** * Starts the consumer by setting the link credit to the given prefetch value. * @@ -220,12 +247,14 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver delivery = delivery.next(); if (!(current.getContext() instanceof JmsInboundMessageDispatch)) { - LOG.debug("{} Found incomplete delivery with no context during recover processing", AmqpConsumer.this); + LOG.debug("{} Found incomplete delivery with no context during session acknowledge processing", AmqpConsumer.this); continue; } JmsInboundMessageDispatch envelope = (JmsInboundMessageDispatch) current.getContext(); - if (envelope.isDelivered()) { + if(ackType == ACK_TYPE.SESSION_SHUTDOWN && (envelope.isDelivered() || envelope.isRecovered())) { + handleDisposition(envelope, current, MODIFIED_FAILED); + } else if (envelope.isDelivered()) { final DeliveryState disposition; switch (ackType) { @@ -304,6 +333,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver private void handleDelivered(JmsInboundMessageDispatch envelope, Delivery delivery) { LOG.debug("Delivered Ack of message: {}", envelope); deliveredCount++; + envelope.setRecovered(false); envelope.setDelivered(true); delivery.setDefaultDeliveryState(MODIFIED_FAILED); } @@ -410,6 +440,9 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver envelope.getMessage().getFacade().getRedeliveryCount() + 1); envelope.setEnqueueFirst(true); envelope.setDelivered(false); + if(acknowledgementMode == Session.CLIENT_ACKNOWLEDGE) { + envelope.setRecovered(true); + } redispatchList.add(envelope); } diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java index e04b8f6..0c6dab9 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java @@ -413,7 +413,7 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase { } @Test(timeout=20000) - public void testCloseDurableSubscriberWithUnackedAnUnconsumedPrefetchedMessages() throws Exception { + public void testCloseDurableSubscriberWithUnconsumedPrefetchedMessages() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); connection.start(); @@ -429,7 +429,7 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase { int messageCount = 5; // Create a consumer and fill the prefetch with some messages, - // which we will consume some of but ack none of. + // which we will only consume some of. testPeer.expectDurableSubscriberAttach(topicName, subscriptionName); testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), messageCount); diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java index 46e7ba5..ce50edc 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java @@ -64,6 +64,7 @@ import org.apache.qpid.jms.JmsConnection; import org.apache.qpid.jms.JmsDefaultConnectionListener; import org.apache.qpid.jms.JmsOperationTimedOutException; import org.apache.qpid.jms.JmsSession; +import org.apache.qpid.jms.message.JmsInboundMessageDispatch; import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy; import org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper; import org.apache.qpid.jms.test.QpidJmsTestCase; @@ -2259,8 +2260,8 @@ public class SessionIntegrationTest extends QpidJmsTestCase { receivedTextMessage.acknowledge(); - testPeer.expectDetach(false, true, false); testPeer.expectDisposition(true, new ModifiedMatcher().withDeliveryFailed(equalTo(true)), 2, 2); + testPeer.expectDetach(false, true, false); testPeer.expectDisposition(true, new ReleasedMatcher(), 3, 3); subscriber.close(); @@ -2328,4 +2329,260 @@ public class SessionIntegrationTest extends QpidJmsTestCase { connection.close(); } } + + @Test(timeout = 20000) + public void testCloseConnectionWithRecoveredUndeliveredAndRedeliveredClientAckMessages() throws Exception { + // Send 6, recover 4, redeliver 2, close connection (and so implicitly, session) + doCloseWithWithRecoveredUndeliveredClientAckMessagesTestImpl(false, false, 6, 4, 2); + } + + @Test(timeout = 20000) + public void testCloseConnectionWithRecoveredUndeliveredClientAckMessages() throws Exception { + // Send 4, recover 2, redeliver none, close connection (and so implicitly, session) + doCloseWithWithRecoveredUndeliveredClientAckMessagesTestImpl(false, false, 4, 2, 0); + } + + @Test(timeout = 20000) + public void testCloseSessionWithRecoveredUndeliveredAndRedeliveredClientAckMessages() throws Exception { + // Send 6, recover 4, redeliver 2, close session (then connection) + doCloseWithWithRecoveredUndeliveredClientAckMessagesTestImpl(false, true, 6, 4, 2); + } + + @Test(timeout = 20000) + public void testCloseSessionWithRecoveredUndeliveredClientAckMessages() throws Exception { + // Send 4, recover 2, redeliver none, close session (then connection) + doCloseWithWithRecoveredUndeliveredClientAckMessagesTestImpl(false, true, 4, 2, 0); + } + + @Test(timeout = 20000) + public void testCloseConsumerWithRecoveredUndeliveredAndRedeliveredClientAckMessages() throws Exception { + // Send 6, recover 4, redeliver 2, close consumer then connection (and so implicitly, session) + doCloseWithWithRecoveredUndeliveredClientAckMessagesTestImpl(true, false, 6, 4, 2); + } + + @Test(timeout = 20000) + public void testCloseConsumerWithRecoveredUndeliveredClientAckMessages() throws Exception { + // Send 4, recover 2, redeliver none, close consumer then connection (and so implicitly, session) + doCloseWithWithRecoveredUndeliveredClientAckMessagesTestImpl(true, false, 4, 2, 0); + } + + @Test(timeout = 20000) + public void testCloseConsumerAndSessionWithRecoveredAndRedeliveredUndeliveredClientAckMessages() throws Exception { + // Send 6, recover 4, redeliver 2, close consumer then session (then connection) + doCloseWithWithRecoveredUndeliveredClientAckMessagesTestImpl(true, true, 6, 4, 2); + } + + @Test(timeout = 20000) + public void testCloseConsumerAndSessionWithRecoveredUndeliveredClientAckMessages() throws Exception { + // Send 6, recover 4, redeliver 2, close consumer then session (then connection) + doCloseWithWithRecoveredUndeliveredClientAckMessagesTestImpl(true, true, 4, 2, 0); + } + + private void doCloseWithWithRecoveredUndeliveredClientAckMessagesTestImpl( + boolean closeConsumer, boolean closeSession, int msgCount, int deliverBeforeRecoverCount, int deliverAfterRecoverCount) throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer); + connection.start(); + + testPeer.expectBegin(); + + Session session = connection.createSession(Session.CLIENT_ACKNOWLEDGE); + + String topicName = "myTopic"; + Topic topic = session.createTopic(topicName); + + final CountDownLatch incoming = new CountDownLatch(msgCount); + ((JmsConnection) connection).addConnectionListener(new JmsDefaultConnectionListener() { + + @Override + public void onInboundMessage(JmsInboundMessageDispatch envelope) { + incoming.countDown(); + } + }); + + testPeer.expectReceiverAttach(); + testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), msgCount, false, false, + equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)), 1, false, true); + + MessageConsumer consumer = session.createConsumer(topic); + + TextMessage receivedTextMessage = null; + for (int i = 1; i <= deliverBeforeRecoverCount; i++) { + assertNotNull("Expected message did not arrive: " + i, receivedTextMessage = (TextMessage) consumer.receive(3000)); + assertEquals("Unexpected delivery number", i, receivedTextMessage.getIntProperty(TestAmqpPeer.MESSAGE_NUMBER) + 1); + } + + // Await all incoming messages to arrive at consumer before we recover, ensure deterministic test behaviour. + assertTrue("Messages did not arrive in a timely fashion", incoming.await(3, TimeUnit.SECONDS)); + + session.recover(); + + testPeer.waitForAllHandlersToComplete(1000); + + for (int i = 1; i <= deliverAfterRecoverCount; i++) { + assertNotNull("Expected message did not arrive after recover: " + i, receivedTextMessage = (TextMessage) consumer.receive(3000)); + assertEquals("Unexpected delivery number after recover", i, receivedTextMessage.getIntProperty(TestAmqpPeer.MESSAGE_NUMBER) + 1); + } + + int deliveredAtAnyPoint = Math.max(deliverBeforeRecoverCount, deliverAfterRecoverCount); + + if (closeConsumer) { + if(deliverAfterRecoverCount > 0) { + // Remaining credit will be drained if there are delivered messages yet to be acknowledged or recovered again. + testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH - msgCount))); + } + + // Any message delivered+recovered before but not then delivered again afterwards, will have disposition sent now. + for (int i = deliverAfterRecoverCount + 1; i <= deliverBeforeRecoverCount; i++) { + testPeer.expectDisposition(true, new ModifiedMatcher().withDeliveryFailed(equalTo(true)), i, i); + } + + if(deliverAfterRecoverCount > 0) { + // Any further remaining messages prefetched will be released. + for (int i = deliveredAtAnyPoint + 1; i <= msgCount; i++) { + testPeer.expectDisposition(true, new ReleasedMatcher(), i, i); + } + } else { + // The link will close now + testPeer.expectDetach(true, true, true); + + // Dispositions sent by proton when the link is freed + for (int i = deliveredAtAnyPoint + 1; i <= msgCount; i++) { + testPeer.expectDisposition(true, new ReleasedMatcher(), i, i); + } + } + + consumer.close(); + + testPeer.waitForAllHandlersToComplete(1000); + + if(deliverAfterRecoverCount > 0) { + // When the session or connection is closed, outstanding delivered messages will have disposition sent. + for (int i = 1; i <= deliverAfterRecoverCount; i++) { + testPeer.expectDisposition(true, new ModifiedMatcher().withDeliveryFailed(equalTo(true)), i, i); + } + testPeer.expectDetach(true, true, true); + } + } else { + // If we dont close the consumer first, all previously delivered messages will have + // disposition sent when the session or connection is closed. + for (int i = 1; i <= deliveredAtAnyPoint; i++) { + testPeer.expectDisposition(true, new ModifiedMatcher().withDeliveryFailed(equalTo(true)), i, i); + } + } + + if(closeSession) { + testPeer.expectEnd(); + + session.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + + testPeer.expectClose(); + + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 20000) + public void testAcknowledgeAllPreviouslyRecoveredClientAckMessages() throws Exception { + doAcknowledgePreviouslyRecoveredClientAckMessagesTestImpl(true, false, true); + doAcknowledgePreviouslyRecoveredClientAckMessagesTestImpl(false, true, true); + doAcknowledgePreviouslyRecoveredClientAckMessagesTestImpl(false, false, true); + } + + @Test(timeout = 20000) + public void testAcknowledgeSomePreviouslyRecoveredClientAckMessages() throws Exception { + doAcknowledgePreviouslyRecoveredClientAckMessagesTestImpl(true, false, false); + doAcknowledgePreviouslyRecoveredClientAckMessagesTestImpl(false, true, false); + doAcknowledgePreviouslyRecoveredClientAckMessagesTestImpl(false, false, false); + } + + private void doAcknowledgePreviouslyRecoveredClientAckMessagesTestImpl(boolean closeConsumer, boolean closeSession, boolean consumeAllRecovered) throws JMSException, Exception, IOException { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, false, "?jms.clientID=myClientId", null, null, false); + connection.start(); + + int msgCount = 7; + int deliverBeforeRecoverCount = 4; + int acknowledgeAfterRecoverCount = consumeAllRecovered ? 5 : 2; + + testPeer.expectBegin(); + + Session session = connection.createSession(Session.CLIENT_ACKNOWLEDGE); + + String topicName = "myTopic"; + Topic topic = session.createTopic(topicName); + + final CountDownLatch incoming = new CountDownLatch(msgCount); + ((JmsConnection) connection).addConnectionListener(new JmsDefaultConnectionListener() { + + @Override + public void onInboundMessage(JmsInboundMessageDispatch envelope) { + incoming.countDown(); + } + }); + + testPeer.expectReceiverAttach(); + + testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), msgCount, false, false, + equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)), 1, false, true); + + MessageConsumer consumer = session.createConsumer(topic); + + TextMessage receivedTextMessage = null; + for (int i = 1; i <= deliverBeforeRecoverCount; i++) { + assertNotNull("Expected message did not arrive: " + i, receivedTextMessage = (TextMessage) consumer.receive(3000)); + assertEquals("Unexpected delivery number", i, receivedTextMessage.getIntProperty(TestAmqpPeer.MESSAGE_NUMBER) + 1); + } + + // Await all incoming messages to arrive at consumer before we recover, ensure deterministic test behaviour. + assertTrue("Messages did not arrive in a timely fashion", incoming.await(3, TimeUnit.SECONDS)); + + session.recover(); + + testPeer.waitForAllHandlersToComplete(1000); + + for (int i = 1; i <= acknowledgeAfterRecoverCount; i++) { + assertNotNull("Expected message did not arrive after recover: " + i, receivedTextMessage = (TextMessage) consumer.receive(3000)); + assertEquals("Unexpected delivery number after recover", i, receivedTextMessage.getIntProperty(TestAmqpPeer.MESSAGE_NUMBER) + 1); + testPeer.expectDisposition(true, new AcceptedMatcher(), i, i); + receivedTextMessage.acknowledge(); + } + + testPeer.waitForAllHandlersToComplete(1000); + + if(!consumeAllRecovered) { + // Any message delivered+recovered before but not then delivered and acknowledged afterwards, will have + // disposition sent as consumer/session/connection is closed. + for (int i = acknowledgeAfterRecoverCount + 1; i <= deliverBeforeRecoverCount; i++) { + testPeer.expectDisposition(true, new ModifiedMatcher().withDeliveryFailed(equalTo(true)), i, i); + } + } + + if(closeConsumer) { + testPeer.expectDetach(true, true, true); + + // Dispositions sent by proton when the link is freed + for (int i = Math.max(deliverBeforeRecoverCount, acknowledgeAfterRecoverCount) + 1; i <= msgCount; i++) { + testPeer.expectDisposition(true, new ReleasedMatcher(), i, i); + } + + consumer.close(); + } + + if(closeSession) { + testPeer.expectEnd(); + + session.close(); + } + + testPeer.expectClose(); + + connection.close(); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org