Repository: activemq Updated Branches: refs/heads/master 3e237ca73 -> a9f9d4a4d
https://issues.apache.org/jira/browse/AMQ-6464 Correct handling of rejected outcome to archive the message in the DLQ Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a9f9d4a4 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a9f9d4a4 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a9f9d4a4 Branch: refs/heads/master Commit: a9f9d4a4d2ede752afa74b22394b80822052543d Parents: 3e237ca Author: Timothy Bish <tabish...@gmail.com> Authored: Fri Oct 14 09:35:17 2016 -0400 Committer: Timothy Bish <tabish...@gmail.com> Committed: Fri Oct 14 09:35:17 2016 -0400 ---------------------------------------------------------------------- .../activemq/transport/amqp/protocol/AmqpSender.java | 9 +++++---- .../transport/amqp/interop/AmqpReceiverTest.java | 13 +++---------- 2 files changed, 8 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/a9f9d4a4/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java index 149b2e8..23f8597 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java @@ -270,10 +270,11 @@ public class AmqpSender extends AmqpAbstractLink<Sender> { } settle(delivery, MessageAck.INDIVIDUAL_ACK_TYPE); } else if (state instanceof Rejected) { - // re-deliver /w incremented delivery counter. - md.setRedeliveryCounter(md.getRedeliveryCounter() + 1); - LOG.trace("onDelivery: Rejected state = {}, delivery count now {}", state, md.getRedeliveryCounter()); - settle(delivery, -1); + // Rejection is a terminal outcome, we poison the message for dispatch to + // the DLQ. If a custom redelivery policy is used on the broker the message + // can still be redelivered based on the configation of that policy. + LOG.trace("onDelivery: Rejected state = {}, message poisoned.", state, md.getRedeliveryCounter()); + settle(delivery, MessageAck.POSION_ACK_TYPE); } else if (state instanceof Released) { LOG.trace("onDelivery: Released state = {}", state); // re-deliver && don't increment the counter. http://git-wip-us.apache.org/repos/asf/activemq/blob/a9f9d4a4/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java index 2a06561..3cda78d 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java @@ -514,16 +514,9 @@ public class AmqpReceiverTest extends AmqpClientTestSupport { message.reject(); - // Read the message again and validate its state - - message = receiver.receive(10, TimeUnit.SECONDS); - assertNotNull("did not receive message again", message); - - message.accept(); - - protonMessage = message.getWrappedMessage(); - assertNotNull(protonMessage); - assertEquals("Unexpected updated value for AMQP delivery-count", 1, protonMessage.getDeliveryCount()); + // Attempt to read the message again but should not get it. + message = receiver.receive(2, TimeUnit.SECONDS); + assertNull("shoudl not receive message again", message); connection.close(); }