Author: tabish Date: Thu Aug 21 21:20:25 2014 New Revision: 1619595 URL: http://svn.apache.org/r1619595 Log: https://issues.apache.org/jira/browse/AMQNET-489
Merge in fixes from AMQ-5146 to honor the redelivery policy on messages dispatched from the Broker. Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/ (props changed) activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/MessageConsumer.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Threads/CompositeTaskRunner.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/test/csharp/AMQRedeliveryPolicyTest.cs Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/ ------------------------------------------------------------------------------ Merged /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk:r1619590-1619593 Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/MessageConsumer.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/MessageConsumer.cs?rev=1619595&r1=1619594&r2=1619595&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/MessageConsumer.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/MessageConsumer.cs Thu Aug 21 21:20:25 2014 @@ -39,6 +39,8 @@ namespace Apache.NMS.ActiveMQ /// </summary> public class MessageConsumer : IMessageConsumer, IDispatcher { + private const int NO_MAXIMUM_REDELIVERIES = -1; + private readonly MessageTransformation messageTransformation; private readonly MessageDispatchChannel unconsumedMessages; private readonly LinkedList<MessageDispatch> dispatchedMessages = new LinkedList<MessageDispatch>(); @@ -789,7 +791,15 @@ namespace Apache.NMS.ActiveMQ { if(listener != null && this.unconsumedMessages.Running) { - dispatchMessage = true; + if (RedeliveryExceeded(dispatch)) + { + PosionAck(dispatch, "dispatch to " + ConsumerId + " exceeds redelivery policy limit:" + redeliveryPolicy.MaximumRedeliveries); + return; + } + else + { + dispatchMessage = true; + } } else { @@ -1014,6 +1024,11 @@ namespace Apache.NMS.ActiveMQ } } } + else if (RedeliveryExceeded(dispatch)) + { + Tracer.DebugFormat("[{0}] received with excessive redelivered: {1}", ConsumerId, dispatch); + PosionAck(dispatch, "dispatch to " + ConsumerId + " exceeds redelivery policy limit:" + redeliveryPolicy.MaximumRedeliveries); + } else { return dispatch; @@ -1231,7 +1246,7 @@ namespace Apache.NMS.ActiveMQ } // evaluate both expired and normal msgs as otherwise consumer may get stalled - if((0.5 * this.info.PrefetchSize) <= (this.deliveredCounter + this.ackCounter - this.additionalWindowSize)) + if ((0.5 * this.info.PrefetchSize) <= (this.deliveredCounter + this.ackCounter - this.additionalWindowSize)) { this.session.SendAck(pendingAck); this.pendingAck = null; @@ -1250,6 +1265,18 @@ namespace Apache.NMS.ActiveMQ this.session.Connection.SyncRequest(ack); } + private void PosionAck(MessageDispatch dispatch, string cause) + { + BrokerError poisonCause = new BrokerError(); + poisonCause.ExceptionClass = "javax.jms.JMSException"; + poisonCause.Message = cause; + + MessageAck posionAck = new MessageAck(dispatch, (byte) AckType.PoisonAck, 1); + posionAck.FirstMessageId = dispatch.Message.MessageId; + posionAck.PoisonCause = poisonCause; + this.session.Connection.SyncRequest(posionAck); + } + private void RegisterSync() { // Don't acknowledge now, but we may need to let the broker know the @@ -1387,14 +1414,19 @@ namespace Apache.NMS.ActiveMQ this.info.ConsumerId, this.dispatchedMessages.Count, this.redeliveryPolicy.MaximumRedeliveries); } + BrokerError poisonCause = new BrokerError(); + poisonCause.ExceptionClass = "javax.jms.JMSException"; + poisonCause.Message = "Exceeded RedeliveryPolicy limit: " + RedeliveryPolicy.MaximumRedeliveries; + if (lastMd.RollbackCause != null) { BrokerError cause = new BrokerError(); - cause.ExceptionClass = "javax.jms.JMSException"; - cause.Message = lastMd.RollbackCause.Message; - ack.PoisonCause = cause; + poisonCause.ExceptionClass = "javax.jms.JMSException"; + poisonCause.Message = lastMd.RollbackCause.Message; + poisonCause.Cause = cause; } ack.FirstMessageId = firstMsgId; + ack.PoisonCause = poisonCause; this.session.SendAck(ack); @@ -1743,6 +1775,24 @@ namespace Apache.NMS.ActiveMQ } } + private bool RedeliveryExceeded(MessageDispatch dispatch) + { + try + { + ActiveMQMessage amqMessage = dispatch.Message as ActiveMQMessage; + + return session.IsTransacted && redeliveryPolicy != null && + redeliveryPolicy.MaximumRedeliveries != NO_MAXIMUM_REDELIVERIES && + dispatch.RedeliveryCounter > redeliveryPolicy.MaximumRedeliveries && + // redeliveryCounter > x expected after resend via brokerRedeliveryPlugin + !amqMessage.Properties.Contains("redeliveryDelay"); + } + catch (Exception ignored) + { + return false; + } + } + #endregion #region Nested ISyncronization Types Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Threads/CompositeTaskRunner.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Threads/CompositeTaskRunner.cs?rev=1619595&r1=1619594&r2=1619595&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Threads/CompositeTaskRunner.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Threads/CompositeTaskRunner.cs Thu Aug 21 21:20:25 2014 @@ -17,7 +17,6 @@ using System; using System.Collections.Generic; -using System.Linq; using System.Threading; namespace Apache.NMS.ActiveMQ.Threads Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/test/csharp/AMQRedeliveryPolicyTest.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/test/csharp/AMQRedeliveryPolicyTest.cs?rev=1619595&r1=1619594&r2=1619595&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/test/csharp/AMQRedeliveryPolicyTest.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/test/csharp/AMQRedeliveryPolicyTest.cs Thu Aug 21 21:20:25 2014 @@ -19,6 +19,7 @@ using System; using System.Threading; using Apache.NMS.Test; using Apache.NMS.ActiveMQ.Commands; +using Apache.NMS.Util; using NUnit.Framework; namespace Apache.NMS.ActiveMQ.Test @@ -27,6 +28,7 @@ namespace Apache.NMS.ActiveMQ.Test public class AMQRedeliveryPolicyTest : NMSTestSupport { private const string DESTINATION_NAME = "TEST.RedeliveryPolicyTestDest"; + private const string DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY = "dlqDeliveryFailureCause"; [Test] public void TestExponentialRedeliveryPolicyDelaysDeliveryOnRollback() @@ -443,5 +445,161 @@ namespace Apache.NMS.ActiveMQ.Test session.Rollback(); } } + + [Test] + public void TestRepeatedRedeliveryReceiveNoCommit() + { + using(Connection connection = (Connection) CreateConnection()) + { + connection.Start(); + + ISession dlqSession = connection.CreateSession(AcknowledgementMode.AutoAcknowledge); + IDestination destination = dlqSession.GetQueue("TestRepeatedRedeliveryReceiveNoCommit"); + IDestination dlq = dlqSession.GetQueue("ActiveMQ.DLQ"); + connection.DeleteDestination(destination); + connection.DeleteDestination(dlq); + IMessageProducer producer = dlqSession.CreateProducer(destination); + producer.Send(dlqSession.CreateTextMessage("1st")); + IMessageConsumer dlqConsumer = dlqSession.CreateConsumer(dlq); + + const int maxRedeliveries = 4; + for (int i = 0; i <= maxRedeliveries + 1; i++) + { + using(Connection loopConnection = (Connection) CreateConnection()) + { + // Receive a message with the JMS API + IRedeliveryPolicy policy = loopConnection.RedeliveryPolicy; + policy.InitialRedeliveryDelay = 0; + policy.UseExponentialBackOff = false; + policy.MaximumRedeliveries = maxRedeliveries; + + loopConnection.Start(); + ISession session = loopConnection.CreateSession(AcknowledgementMode.Transactional); + IMessageConsumer consumer = session.CreateConsumer(destination); + + ActiveMQTextMessage m = consumer.Receive(TimeSpan.FromMilliseconds(4000)) as ActiveMQTextMessage; + if (m != null) + { + Tracer.DebugFormat("Received Message: {0} delivery count = {1}", m.Text, m.RedeliveryCounter); + } + + if (i <= maxRedeliveries) + { + Assert.IsNotNull(m); + Assert.AreEqual("1st", m.Text); + Assert.AreEqual(i, m.RedeliveryCounter); + } + else + { + Assert.IsNull(m, "null on exceeding redelivery count"); + } + } + } + + // We should be able to get the message off the DLQ now. + ITextMessage msg = dlqConsumer.Receive(TimeSpan.FromMilliseconds(2000)) as ITextMessage; + Assert.IsNotNull(msg, "Got message from DLQ"); + Assert.AreEqual("1st", msg.Text); + String cause = msg.Properties.GetString(DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY); + if (cause != null) + { + Tracer.DebugFormat("Rollback Cause = {0}", cause); + Assert.IsTrue(cause.Contains("RedeliveryPolicy"), "cause exception has no policy ref"); + } + else + { + Tracer.Debug("DLQ'd message has no cause tag."); + } + } + } + + [Test] + public void TestRepeatedRedeliveryOnMessageNoCommit() + { + using(Connection connection = (Connection) CreateConnection()) + { + connection.Start(); + ISession dlqSession = connection.CreateSession(AcknowledgementMode.AutoAcknowledge); + IDestination destination = dlqSession.GetQueue("TestRepeatedRedeliveryOnMessageNoCommit"); + IDestination dlq = dlqSession.GetQueue("ActiveMQ.DLQ"); + connection.DeleteDestination(destination); + connection.DeleteDestination(dlq); + IMessageProducer producer = dlqSession.CreateProducer(destination); + IMessageConsumer dlqConsumer = dlqSession.CreateConsumer(dlq); + + producer.Send(dlqSession.CreateTextMessage("1st")); + + const int maxRedeliveries = 4; + Atomic<int> receivedCount = new Atomic<int>(0); + + for (int i = 0; i <= maxRedeliveries + 1; i++) + { + using(Connection loopConnection = (Connection) CreateConnection()) + { + IRedeliveryPolicy policy = loopConnection.RedeliveryPolicy; + policy.InitialRedeliveryDelay = 0; + policy.UseExponentialBackOff = false; + policy.MaximumRedeliveries = maxRedeliveries; + + loopConnection.Start(); + ISession session = loopConnection.CreateSession(AcknowledgementMode.Transactional); + IMessageConsumer consumer = session.CreateConsumer(destination); + OnMessageNoCommitCallback callback = new OnMessageNoCommitCallback(receivedCount); + consumer.Listener += new MessageListener(callback.consumer_Listener); + + if (i <= maxRedeliveries) + { + Assert.IsTrue(callback.Await(), "listener should have dispatched a message"); + } + else + { + // final redlivery gets poisoned before dispatch + Assert.IsFalse(callback.Await(), "listener should not have dispatched after max redliveries"); + } + } + } + + // We should be able to get the message off the DLQ now. + ITextMessage msg = dlqConsumer.Receive(TimeSpan.FromMilliseconds(2000)) as ITextMessage; + Assert.IsNotNull(msg, "Got message from DLQ"); + Assert.AreEqual("1st", msg.Text); + String cause = msg.Properties.GetString(DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY); + if (cause != null) + { + Tracer.DebugFormat("Rollback Cause = {0}", cause); + Assert.IsTrue(cause.Contains("RedeliveryPolicy"), "cause exception has no policy ref"); + } + else + { + Tracer.Debug("DLQ'd message has no cause tag."); + } + } + } + + class OnMessageNoCommitCallback + { + private Atomic<int> receivedCount; + private CountDownLatch done = new CountDownLatch(1); + + public OnMessageNoCommitCallback(Atomic<int> receivedCount) + { + this.receivedCount = receivedCount; + } + + public bool Await() + { + return done.await(TimeSpan.FromMilliseconds(5000)); + } + + public void consumer_Listener(IMessage message) + { + ActiveMQTextMessage m = message as ActiveMQTextMessage; + Tracer.DebugFormat("Received Message: {0} delivery count = {1}", m.Text, m.RedeliveryCounter); + Assert.AreEqual("1st", m.Text); + Assert.AreEqual(receivedCount.Value, m.RedeliveryCounter); + receivedCount.GetAndSet(receivedCount.Value + 1); + done.countDown(); + } + } } }