Repository: qpid-jms Updated Branches: refs/heads/master 6667bcffc -> e60d3bd40
https://issues.apache.org/jira/browse/QPIDJMS-70 Adds a redelivery policy that contains configuration for max redeliveries before the cleint reject an incomnig message. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/e60d3bd4 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/e60d3bd4 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/e60d3bd4 Branch: refs/heads/master Commit: e60d3bd40796d04c2d280a651c505b1171a6a1d8 Parents: 6667bcf Author: Timothy Bish <[email protected]> Authored: Wed Jun 10 11:26:11 2015 -0400 Committer: Timothy Bish <[email protected]> Committed: Wed Jun 10 11:26:11 2015 -0400 ---------------------------------------------------------------------- .../java/org/apache/qpid/jms/JmsConnection.java | 11 +- .../apache/qpid/jms/JmsConnectionFactory.java | 20 ++++ .../org/apache/qpid/jms/JmsMessageConsumer.java | 24 +++-- .../org/apache/qpid/jms/JmsPrefetchPolicy.java | 9 ++ .../apache/qpid/jms/JmsRedeliveryPolicy.java | 62 +++++++++++ .../apache/qpid/jms/meta/JmsConsumerInfo.java | 11 ++ .../qpid/jms/provider/amqp/AmqpConsumer.java | 43 ++++---- .../qpid/jms/JmsConnectionFactoryTest.java | 29 ++++++ .../org/apache/qpid/jms/JmsConnectionTest.java | 3 +- .../jms/integration/SessionIntegrationTest.java | 36 +++++++ .../JmsTransactionRedeliveryPolicyTest.java | 103 +++++++++++++++++++ 11 files changed, 319 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e60d3bd4/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java index bd8ebbd..441fef5 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java @@ -97,6 +97,7 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti private URI configuredURI; private URI connectedURI; private JmsPrefetchPolicy prefetchPolicy = new JmsPrefetchPolicy(); + private JmsRedeliveryPolicy redeliveryPolicy = new JmsRedeliveryPolicy(); private boolean localMessagePriority; private boolean clientIdSet; private boolean sendAcksAsync; @@ -858,7 +859,15 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti } public void setPrefetchPolicy(JmsPrefetchPolicy prefetchPolicy) { - this.prefetchPolicy = prefetchPolicy; + this.prefetchPolicy = prefetchPolicy.copy(); + } + + public JmsRedeliveryPolicy getRedeliveryPolicy() { + return redeliveryPolicy; + } + + public void setRedeliveryPolicy(JmsRedeliveryPolicy redeliveryPolicy) { + this.redeliveryPolicy = redeliveryPolicy.copy(); } public boolean isLocalMessagePriority() { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e60d3bd4/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java index dbeec40..a71df04 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java @@ -74,6 +74,7 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact private ExceptionListener exceptionListener; private JmsPrefetchPolicy prefetchPolicy = new JmsPrefetchPolicy(); + private JmsRedeliveryPolicy redeliveryPolicy = new JmsRedeliveryPolicy(); public JmsConnectionFactory() { } @@ -505,6 +506,25 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact } /** + * Returns the JmsRedeliveryPolicy that is applied when a new connection is created. + * + * @return the redeliveryPolicy that is currently configured for this factory. + */ + public JmsRedeliveryPolicy getRedeliveryPolicy() { + return redeliveryPolicy; + } + + /** + * Sets the JmsRedeliveryPolicy that is applied when a new connection is created. + * + * @param redeliveryPolicy + * The new redeliveryPolicy to set + */ + public void setRedeliveryPolicy(JmsRedeliveryPolicy redeliveryPolicy) { + this.redeliveryPolicy = redeliveryPolicy; + } + + /** * @return the currently configured client ID prefix for auto-generated client IDs. */ public synchronized String getClientIDPrefix() { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e60d3bd4/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java index b261fbd..7cc3ec6 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java @@ -102,17 +102,19 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC this.messageQueue = new FifoMessageQueue(); } - JmsPrefetchPolicy policy = this.connection.getPrefetchPolicy(); - - this.consumerInfo = new JmsConsumerInfo(consumerId); - this.consumerInfo.setClientId(connection.getClientID()); - this.consumerInfo.setSelector(selector); - this.consumerInfo.setSubscriptionName(name); - this.consumerInfo.setDestination(destination); - this.consumerInfo.setAcknowledgementMode(acknowledgementMode); - this.consumerInfo.setNoLocal(noLocal); - this.consumerInfo.setBrowser(isBrowser()); - this.consumerInfo.setPrefetchSize(getConfiguredPrefetch(destination, policy)); + JmsPrefetchPolicy policy = connection.getPrefetchPolicy(); + JmsRedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy().copy(); + + consumerInfo = new JmsConsumerInfo(consumerId); + consumerInfo.setClientId(connection.getClientID()); + consumerInfo.setSelector(selector); + consumerInfo.setSubscriptionName(name); + consumerInfo.setDestination(destination); + consumerInfo.setAcknowledgementMode(acknowledgementMode); + consumerInfo.setNoLocal(noLocal); + consumerInfo.setBrowser(isBrowser()); + consumerInfo.setPrefetchSize(getConfiguredPrefetch(destination, policy)); + consumerInfo.setRedeliveryPolicy(redeliveryPolicy); session.getConnection().createResource(consumerInfo); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e60d3bd4/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPrefetchPolicy.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPrefetchPolicy.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPrefetchPolicy.java index a8586d2..a8165ac 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPrefetchPolicy.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPrefetchPolicy.java @@ -63,6 +63,15 @@ public class JmsPrefetchPolicy { } /** + * Copy this policy into a newly allocated instance. + * + * @return a new JmsPrefetchPolicy that is a copy of this one. + */ + public JmsPrefetchPolicy copy() { + return new JmsPrefetchPolicy(this); + } + + /** * @return Returns the durableTopicPrefetch. */ public int getDurableTopicPrefetch() { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e60d3bd4/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsRedeliveryPolicy.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsRedeliveryPolicy.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsRedeliveryPolicy.java new file mode 100644 index 0000000..5201fc8 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsRedeliveryPolicy.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms; + +/** + * Defines the policy used to manage redelivered and recovered Messages. + */ +public class JmsRedeliveryPolicy { + + public static final int DEFAULT_MAX_REDELIVERIES = -1; + + private int maxRedeliveries; + + public JmsRedeliveryPolicy() { + maxRedeliveries = DEFAULT_MAX_REDELIVERIES; + } + + public JmsRedeliveryPolicy(JmsRedeliveryPolicy source) { + maxRedeliveries = source.maxRedeliveries; + } + + public JmsRedeliveryPolicy copy() { + return new JmsRedeliveryPolicy(this); + } + + /** + * Returns the configured maximum redeliveries that a message will be + * allowed to have before it is rejected by this client. + * + * @return the maxRedeliveries + * the maximum number of redeliveries allowed before a message is rejected. + */ + public int getMaxRedeliveries() { + return maxRedeliveries; + } + + /** + * Configures the maximum number of time a message can be redelivered before it + * will be rejected by this client. + * + * The default value of (-1) disables max redelivery processing. + * + * @param maxRedeliveries the maxRedeliveries to set + */ + public void setMaxRedeliveries(int maxRedeliveries) { + this.maxRedeliveries = maxRedeliveries; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e60d3bd4/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java index 192a6b4..a7e7ad0 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java @@ -17,6 +17,7 @@ package org.apache.qpid.jms.meta; import org.apache.qpid.jms.JmsDestination; +import org.apache.qpid.jms.JmsRedeliveryPolicy; import org.apache.qpid.jms.util.ToStringSupport; public final class JmsConsumerInfo implements JmsResource, Comparable<JmsConsumerInfo> { @@ -31,6 +32,8 @@ public final class JmsConsumerInfo implements JmsResource, Comparable<JmsConsume protected boolean noLocal; protected int acknowledgementMode; + protected JmsRedeliveryPolicy redeliveryPolicy; + // Can be used to track the last consumed message. private transient long lastDeliveredSequenceId; @@ -150,6 +153,14 @@ public final class JmsConsumerInfo implements JmsResource, Comparable<JmsConsume this.acknowledgementMode = acknowledgementMode; } + public JmsRedeliveryPolicy getRedeliveryPolicy() { + return redeliveryPolicy; + } + + public void setRedeliveryPolicy(JmsRedeliveryPolicy redeliveryPolicy) { + this.redeliveryPolicy = redeliveryPolicy; + } + @Override public String toString() { return ToStringSupport.toString(this); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e60d3bd4/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java index fcdd0f3..46212af 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java @@ -81,7 +81,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver private final ByteBuf incomingBuffer = Unpooled.buffer(INITIAL_BUFFER_CAPACITY); - private final AtomicLong _incomingSequence = new AtomicLong(0); + private final AtomicLong incomingSequence = new AtomicLong(0); private AsyncResult stopRequest; @@ -115,7 +115,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver stopRequest = request; } } else { - //TODO: We dont actually want the additional messages that could be sent while + // TODO: We dont actually want the additional messages that could be sent while // draining. We could explicitly reduce credit first, or possibly use 'echo' instead // of drain if it was supported. We would first need to understand what happens // if we reduce credit below the number of messages already in-flight before @@ -321,7 +321,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver } } } else if (ackType.equals(ACK_TYPE.POISONED)) { - deliveryFailed(delivery, false); + deliveryFailed(delivery); } else if (ackType.equals(ACK_TYPE.RELEASED)) { delivery.disposition(Released.getInstance()); delivery.settle(); @@ -387,14 +387,13 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver do { incoming = getEndpoint().current(); if (incoming != null) { - if(incoming.isReadable() && !incoming.isPartial()) { + if (incoming.isReadable() && !incoming.isPartial()) { LOG.trace("{} has incoming Message(s).", this); try { processDelivery(incoming); } catch (Exception e) { throw IOExceptionSupport.create(e); } - getEndpoint().advance(); } else { LOG.trace("{} has a partial incoming Message(s), deferring.", this); incoming = null; @@ -402,12 +401,9 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver } else { // We have exhausted the locally queued messages on this link. // Check if we tried to stop and have now run out of credit. - if(stopRequest != null) { - if(getEndpoint().getRemoteCredit() <= 0) - { - stopRequest.onSuccess(); - stopRequest = null; - } + if (stopRequest != null && getEndpoint().getRemoteCredit() <= 0) { + stopRequest.onSuccess(); + stopRequest = null; } } } while (incoming != null); @@ -416,9 +412,22 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver } private void processDelivery(Delivery incoming) throws Exception { + + Message amqpMessage = decodeIncomingMessage(incoming); + long deliveryCount = amqpMessage.getDeliveryCount(); + int maxRedeliveries = getJmsResource().getRedeliveryPolicy().getMaxRedeliveries(); + + if (maxRedeliveries >= 0 && deliveryCount > maxRedeliveries) { + LOG.trace("{} rejecting delivery that exceeds max redelivery count. {}", this, amqpMessage.getMessageId()); + deliveryFailed(incoming); + return; + } else { + getEndpoint().advance(); + } + JmsMessage message = null; try { - message = AmqpJmsMessageBuilder.createJmsMessage(this, decodeIncomingMessage(incoming)); + message = AmqpJmsMessageBuilder.createJmsMessage(this, amqpMessage); } catch (Exception e) { LOG.warn("Error on transform: {}", e.getMessage()); // TODO - We could signal provider error but not sure we want to fail @@ -426,7 +435,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver // In the future once the JMS mapping is complete we should be // able to convert everything to some message even if its just // a bytes messages as a fall back. - deliveryFailed(incoming, true); + deliveryFailed(incoming); return; } @@ -449,7 +458,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver } protected long getNextIncomingSequenceNumber() { - return _incomingSequence.incrementAndGet(); + return incomingSequence.incrementAndGet(); } @Override @@ -498,15 +507,13 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver return "AmqpConsumer { " + this.resource.getConsumerId() + " }"; } - protected void deliveryFailed(Delivery incoming, boolean expandCredit) { + protected void deliveryFailed(Delivery incoming) { Modified disposition = new Modified(); disposition.setUndeliverableHere(true); disposition.setDeliveryFailed(true); incoming.disposition(disposition); incoming.settle(); - if (expandCredit) { - getEndpoint().flow(1); - } + sendFlowIfNeeded(); } protected void deliver(JmsInboundMessageDispatch envelope) throws Exception { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e60d3bd4/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionFactoryTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionFactoryTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionFactoryTest.java index c645920..75317cb 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionFactoryTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionFactoryTest.java @@ -344,6 +344,35 @@ public class JmsConnectionFactoryTest extends QpidJmsTestCase { assertEquals("Properties were not equal", props, props2); } + /** + * The redelivery policy is maintained in a child-object, which we extract the properties from + * when serializing the factory. Ensure this functions by doing a round trip on a factory + * configured with some new redelivery configuration via the URI. + */ + @Test + public void testSerializeThenDeserializeMaintainsRedeliveryPolicy() throws Exception { + String maxRedeliveryValue = "5"; + String maxRedeliveryKey = "redeliveryPolicy.maxRedeliveries"; + String uri = "amqp://localhost:1234?jms." + maxRedeliveryKey + "=" + maxRedeliveryValue; + + JmsConnectionFactory cf = new JmsConnectionFactory(uri); + Map<String, String> props = cf.getProperties(); + + assertTrue("Props dont contain expected redelivery policy change", props.containsKey(maxRedeliveryKey)); + assertEquals("Unexpected value", maxRedeliveryValue, props.get(maxRedeliveryKey)); + + Object roundTripped = roundTripSerialize(cf); + + assertNotNull("Null object returned", roundTripped); + assertEquals("Unexpected type", JmsConnectionFactory.class, roundTripped.getClass()); + + Map<String, String> props2 = ((JmsConnectionFactory)roundTripped).getProperties(); + assertTrue("Props dont contain expected redelivery policy change", props2.containsKey(maxRedeliveryKey)); + assertEquals("Unexpected value", maxRedeliveryValue, props2.get(maxRedeliveryKey)); + + assertEquals("Properties were not equal", props, props2); + } + @Test public void testSerializeTwoConnectionFactories() throws Exception { String uri = "amqp://localhost:1234"; http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e60d3bd4/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java index 231a8de..fe5480d 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java @@ -21,7 +21,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -105,7 +104,7 @@ public class JmsConnectionTest { assertNotSame(newPolicy, connection.getPrefetchPolicy()); connection.setPrefetchPolicy(newPolicy); - assertSame(newPolicy, connection.getPrefetchPolicy()); + assertEquals(newPolicy, connection.getPrefetchPolicy()); } @Test(timeout=30000) http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e60d3bd4/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java index f6bd0d8..e87c34d 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java @@ -66,6 +66,7 @@ import org.apache.qpid.jms.test.testpeer.describedtypes.Rejected; import org.apache.qpid.jms.test.testpeer.describedtypes.Released; import org.apache.qpid.jms.test.testpeer.describedtypes.TransactionalState; import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType; +import org.apache.qpid.jms.test.testpeer.describedtypes.sections.HeaderDescribedType; import org.apache.qpid.jms.test.testpeer.matchers.AcceptedMatcher; import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher; import org.apache.qpid.jms.test.testpeer.matchers.ModifiedMatcher; @@ -898,6 +899,41 @@ public class SessionIntegrationTest extends QpidJmsTestCase { } @Test(timeout=5000) + public void testIncomingMessageExceedsMaxRedeliveries() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + final int COUNT = 5; + + Connection connection = testFixture.establishConnecton(testPeer); + connection.start(); + + ((JmsConnection) connection).getRedeliveryPolicy().setMaxRedeliveries(1); + + testPeer.expectBegin(true); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("myQueue"); + + HeaderDescribedType header = new HeaderDescribedType(); + header.setDeliveryCount(new UnsignedInteger(2)); + + testPeer.expectReceiverAttach(); + testPeer.expectLinkFlowRespondWithTransfer(header, null, null, null, new AmqpValueDescribedType("content"), COUNT); + + for (int i = 0; i < COUNT; i++) { + // Then expect an *settled* Modified disposition that rejects each message once + ModifiedMatcher modified = new ModifiedMatcher(); + modified.withDeliveryFailed(equalTo(true)); + modified.withUndeliverableHere(equalTo(true)); + testPeer.expectDisposition(true, modified); + } + + session.createConsumer(queue); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout=5000) public void testProducedMessagesOnTransactedSessionCarryTxnId() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e60d3bd4/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactionRedeliveryPolicyTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactionRedeliveryPolicyTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactionRedeliveryPolicyTest.java new file mode 100644 index 0000000..5bd4d1e --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactionRedeliveryPolicyTest.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.transactions; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.qpid.jms.support.AmqpTestSupport; +import org.apache.qpid.jms.support.Wait; +import org.junit.Test; + +/** + * test redelivery policy application in a TX session. + */ +public class JmsTransactionRedeliveryPolicyTest extends AmqpTestSupport { + + @Override + public String getAmqpConnectionURIOptions() { + return "jms.redeliveryPolicy.maxRedeliveries=5"; + } + + @Test(timeout = 30000) + public void testConsumeAndRollbackWithMaxRedeliveries() throws Exception { + final int MAX_REDELIVERIES = 5; + final int MSG_COUNT = 5; + + connection = createAmqpConnection(); + connection.start(); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue(getDestinationName()); + MessageConsumer consumer = session.createConsumer(queue); + sendMessages(connection, queue, MSG_COUNT); + + final QueueViewMBean queueView = getProxyToQueue(getDestinationName()); + + // Consume the message for the first time. + Message incoming = null; + for (int i = 0; i < MSG_COUNT; ++i) { + incoming = consumer.receive(2000); + assertNotNull(incoming); + assertFalse(incoming.getJMSRedelivered()); + assertTrue(incoming instanceof TextMessage); + } + session.rollback(); + + for (int i = 0; i < MAX_REDELIVERIES; ++i) { + LOG.info("Queue size before consume is: {}", queueView.getQueueSize()); + assertEquals(MSG_COUNT, queueView.getQueueSize()); + + for (int j = 0; j < MSG_COUNT; ++j) { + incoming = consumer.receive(2000); + assertNotNull(incoming); + assertTrue(incoming.getJMSRedelivered()); + assertTrue(incoming instanceof TextMessage); + } + + assertEquals(MSG_COUNT, queueView.getQueueSize()); + + session.rollback(); + LOG.info("Queue size after session rollback is: {}", queueView.getQueueSize()); + } + + assertTrue("Message should get DLQ'd", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return queueView.getQueueSize() == 0; + } + })); + + QueueViewMBean dlq = getProxyToQueue("ActiveMQ.DLQ"); + assertEquals(MSG_COUNT, dlq.getQueueSize()); + + assertNull(consumer.receive(50)); + + session.commit(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
