Author: tabish
Date: Thu Aug 21 21:20:25 2014
New Revision: 1619595

URL: http://svn.apache.org/r1619595
Log:
https://issues.apache.org/jira/browse/AMQNET-489

Merge in fixes from AMQ-5146 to honor the redelivery policy on messages 
dispatched from the Broker.

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/   (props 
changed)
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/MessageConsumer.cs
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Threads/CompositeTaskRunner.cs
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/test/csharp/AMQRedeliveryPolicyTest.cs

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/
------------------------------------------------------------------------------
  Merged /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk:r1619590-1619593

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/MessageConsumer.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/MessageConsumer.cs?rev=1619595&r1=1619594&r2=1619595&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/MessageConsumer.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/MessageConsumer.cs
 Thu Aug 21 21:20:25 2014
@@ -39,6 +39,8 @@ namespace Apache.NMS.ActiveMQ
        /// </summary>
        public class MessageConsumer : IMessageConsumer, IDispatcher
        {
+        private const int NO_MAXIMUM_REDELIVERIES = -1;
+
         private readonly MessageTransformation messageTransformation;
         private readonly MessageDispatchChannel unconsumedMessages;
         private readonly LinkedList<MessageDispatch> dispatchedMessages = new 
LinkedList<MessageDispatch>();
@@ -789,7 +791,15 @@ namespace Apache.NMS.ActiveMQ
                                                {
                                                        if(listener != null && 
this.unconsumedMessages.Running)
                                                        {
-                                                               dispatchMessage 
= true;
+                                if (RedeliveryExceeded(dispatch)) 
+                                {
+                                    PosionAck(dispatch, "dispatch to " + 
ConsumerId + " exceeds redelivery policy limit:" + 
redeliveryPolicy.MaximumRedeliveries);
+                                    return;
+                                } 
+                                else
+                                {
+                                                                   
dispatchMessage = true;
+                                }
                                                        }
                                                        else
                                                        {
@@ -1014,6 +1024,11 @@ namespace Apache.NMS.ActiveMQ
                                                }
                                        }
                                }
+                else if (RedeliveryExceeded(dispatch))
+                {
+                    Tracer.DebugFormat("[{0}] received with excessive 
redelivered: {1}", ConsumerId, dispatch);
+                    PosionAck(dispatch, "dispatch to " + ConsumerId + " 
exceeds redelivery policy limit:" + redeliveryPolicy.MaximumRedeliveries);
+                }
                                else
                                {
                                        return dispatch;
@@ -1231,7 +1246,7 @@ namespace Apache.NMS.ActiveMQ
                        }
 
                // evaluate both expired and normal msgs as otherwise consumer 
may get stalled
-                       if((0.5 * this.info.PrefetchSize) <= 
(this.deliveredCounter + this.ackCounter - this.additionalWindowSize))
+                       if ((0.5 * this.info.PrefetchSize) <= 
(this.deliveredCounter + this.ackCounter - this.additionalWindowSize))
                        {
                                this.session.SendAck(pendingAck);
                                this.pendingAck = null;
@@ -1250,6 +1265,18 @@ namespace Apache.NMS.ActiveMQ
                this.session.Connection.SyncRequest(ack);
            }
 
+        private void PosionAck(MessageDispatch dispatch, string cause)
+        {
+            BrokerError poisonCause = new BrokerError();
+            poisonCause.ExceptionClass = "javax.jms.JMSException";
+            poisonCause.Message = cause;
+
+            MessageAck posionAck = new MessageAck(dispatch, (byte) 
AckType.PoisonAck, 1);
+            posionAck.FirstMessageId = dispatch.Message.MessageId;
+            posionAck.PoisonCause = poisonCause;
+            this.session.Connection.SyncRequest(posionAck);
+        }
+
            private void RegisterSync()
                {
                        // Don't acknowledge now, but we may need to let the 
broker know the
@@ -1387,14 +1414,19 @@ namespace Apache.NMS.ActiveMQ
                                                this.info.ConsumerId, 
this.dispatchedMessages.Count, this.redeliveryPolicy.MaximumRedeliveries);
                         }
 
+                        BrokerError poisonCause = new BrokerError();
+                        poisonCause.ExceptionClass = "javax.jms.JMSException";
+                        poisonCause.Message = "Exceeded RedeliveryPolicy 
limit: " + RedeliveryPolicy.MaximumRedeliveries;
+
                                                if (lastMd.RollbackCause != 
null)
                                                {
                                                        BrokerError cause = new 
BrokerError();
-                                                       cause.ExceptionClass = 
"javax.jms.JMSException";
-                                                       cause.Message = 
lastMd.RollbackCause.Message;
-                                                       ack.PoisonCause = cause;
+                            poisonCause.ExceptionClass = 
"javax.jms.JMSException";
+                            poisonCause.Message = lastMd.RollbackCause.Message;
+                            poisonCause.Cause = cause;
                                                }
                        ack.FirstMessageId = firstMsgId;
+                        ack.PoisonCause = poisonCause;
 
                                                this.session.SendAck(ack);
 
@@ -1743,6 +1775,24 @@ namespace Apache.NMS.ActiveMQ
                        }
                }
 
+        private bool RedeliveryExceeded(MessageDispatch dispatch) 
+        {
+            try 
+            {
+                ActiveMQMessage amqMessage = dispatch.Message as 
ActiveMQMessage;
+
+                return session.IsTransacted && redeliveryPolicy != null &&
+                       redeliveryPolicy.MaximumRedeliveries != 
NO_MAXIMUM_REDELIVERIES &&
+                       dispatch.RedeliveryCounter > 
redeliveryPolicy.MaximumRedeliveries &&
+                       // redeliveryCounter > x expected after resend via 
brokerRedeliveryPlugin
+                       !amqMessage.Properties.Contains("redeliveryDelay");
+            }
+            catch (Exception ignored) 
+            {
+                return false;
+            }
+        }
+
                #endregion
 
                #region Nested ISyncronization Types

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Threads/CompositeTaskRunner.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Threads/CompositeTaskRunner.cs?rev=1619595&r1=1619594&r2=1619595&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Threads/CompositeTaskRunner.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Threads/CompositeTaskRunner.cs
 Thu Aug 21 21:20:25 2014
@@ -17,7 +17,6 @@
 
 using System;
 using System.Collections.Generic;
-using System.Linq;
 using System.Threading;
 
 namespace Apache.NMS.ActiveMQ.Threads

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/test/csharp/AMQRedeliveryPolicyTest.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/test/csharp/AMQRedeliveryPolicyTest.cs?rev=1619595&r1=1619594&r2=1619595&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/test/csharp/AMQRedeliveryPolicyTest.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/test/csharp/AMQRedeliveryPolicyTest.cs
 Thu Aug 21 21:20:25 2014
@@ -19,6 +19,7 @@ using System;
 using System.Threading;
 using Apache.NMS.Test;
 using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.Util;
 using NUnit.Framework;
 
 namespace Apache.NMS.ActiveMQ.Test
@@ -27,6 +28,7 @@ namespace Apache.NMS.ActiveMQ.Test
     public class AMQRedeliveryPolicyTest : NMSTestSupport
     {
         private const string DESTINATION_NAME = 
"TEST.RedeliveryPolicyTestDest";
+        private const string DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY = 
"dlqDeliveryFailureCause";
 
         [Test]
         public void TestExponentialRedeliveryPolicyDelaysDeliveryOnRollback()
@@ -443,5 +445,161 @@ namespace Apache.NMS.ActiveMQ.Test
                 session.Rollback();
             }
         }
+
+        [Test]
+        public void TestRepeatedRedeliveryReceiveNoCommit() 
+        {
+            using(Connection connection = (Connection) CreateConnection())
+            {
+                connection.Start();
+
+                ISession dlqSession = 
connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                IDestination destination = 
dlqSession.GetQueue("TestRepeatedRedeliveryReceiveNoCommit");
+                IDestination dlq = dlqSession.GetQueue("ActiveMQ.DLQ");
+                connection.DeleteDestination(destination);
+                connection.DeleteDestination(dlq);
+                IMessageProducer producer = 
dlqSession.CreateProducer(destination);
+                producer.Send(dlqSession.CreateTextMessage("1st"));
+                IMessageConsumer dlqConsumer = dlqSession.CreateConsumer(dlq);
+
+                const int maxRedeliveries = 4;
+                for (int i = 0; i <= maxRedeliveries + 1; i++) 
+                {
+                    using(Connection loopConnection = (Connection) 
CreateConnection())
+                    {
+                        // Receive a message with the JMS API
+                        IRedeliveryPolicy policy = 
loopConnection.RedeliveryPolicy;
+                        policy.InitialRedeliveryDelay = 0;
+                        policy.UseExponentialBackOff = false;
+                        policy.MaximumRedeliveries = maxRedeliveries;
+
+                        loopConnection.Start();
+                        ISession session = 
loopConnection.CreateSession(AcknowledgementMode.Transactional);
+                        IMessageConsumer consumer = 
session.CreateConsumer(destination);
+
+                        ActiveMQTextMessage m = 
consumer.Receive(TimeSpan.FromMilliseconds(4000)) as ActiveMQTextMessage;
+                        if (m != null) 
+                        {
+                            Tracer.DebugFormat("Received Message: {0} delivery 
count = {1}", m.Text, m.RedeliveryCounter);
+                        }
+
+                        if (i <= maxRedeliveries)
+                        {
+                            Assert.IsNotNull(m);
+                            Assert.AreEqual("1st", m.Text);
+                            Assert.AreEqual(i, m.RedeliveryCounter);
+                        } 
+                        else
+                        {
+                            Assert.IsNull(m, "null on exceeding redelivery 
count");
+                        }
+                    }
+                }
+
+                // We should be able to get the message off the DLQ now.
+                ITextMessage msg = 
dlqConsumer.Receive(TimeSpan.FromMilliseconds(2000)) as ITextMessage;
+                Assert.IsNotNull(msg, "Got message from DLQ");
+                Assert.AreEqual("1st", msg.Text);
+                String cause = 
msg.Properties.GetString(DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
+                if (cause != null) 
+                {
+                    Tracer.DebugFormat("Rollback Cause = {0}", cause);
+                    Assert.IsTrue(cause.Contains("RedeliveryPolicy"), "cause 
exception has no policy ref");
+                }
+                else
+                {
+                    Tracer.Debug("DLQ'd message has no cause tag.");
+                }
+            }
+        }
+
+        [Test]
+        public void TestRepeatedRedeliveryOnMessageNoCommit() 
+        {
+            using(Connection connection = (Connection) CreateConnection())
+            {
+                connection.Start();
+                ISession dlqSession = 
connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                IDestination destination = 
dlqSession.GetQueue("TestRepeatedRedeliveryOnMessageNoCommit");
+                IDestination dlq = dlqSession.GetQueue("ActiveMQ.DLQ");
+                connection.DeleteDestination(destination);
+                connection.DeleteDestination(dlq);
+                IMessageProducer producer = 
dlqSession.CreateProducer(destination);
+                IMessageConsumer dlqConsumer = dlqSession.CreateConsumer(dlq);
+
+                producer.Send(dlqSession.CreateTextMessage("1st"));
+
+                const int maxRedeliveries = 4;
+                Atomic<int> receivedCount = new Atomic<int>(0);
+
+                for (int i = 0; i <= maxRedeliveries + 1; i++) 
+                {
+                    using(Connection loopConnection = (Connection) 
CreateConnection())
+                    {
+                        IRedeliveryPolicy policy = 
loopConnection.RedeliveryPolicy;
+                        policy.InitialRedeliveryDelay = 0;
+                        policy.UseExponentialBackOff = false;
+                        policy.MaximumRedeliveries = maxRedeliveries;
+
+                        loopConnection.Start();
+                        ISession session = 
loopConnection.CreateSession(AcknowledgementMode.Transactional);
+                        IMessageConsumer consumer = 
session.CreateConsumer(destination);
+                        OnMessageNoCommitCallback callback = new 
OnMessageNoCommitCallback(receivedCount);
+                        consumer.Listener += new 
MessageListener(callback.consumer_Listener);
+
+                        if (i <= maxRedeliveries) 
+                        {
+                            Assert.IsTrue(callback.Await(), "listener should 
have dispatched a message");
+                        } 
+                        else 
+                        {
+                            // final redlivery gets poisoned before dispatch
+                            Assert.IsFalse(callback.Await(), "listener should 
not have dispatched after max redliveries");
+                        }
+                    }
+                }
+
+                // We should be able to get the message off the DLQ now.
+                ITextMessage msg = 
dlqConsumer.Receive(TimeSpan.FromMilliseconds(2000)) as ITextMessage;
+                Assert.IsNotNull(msg, "Got message from DLQ");
+                Assert.AreEqual("1st", msg.Text);
+                String cause = 
msg.Properties.GetString(DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
+                if (cause != null) 
+                {
+                    Tracer.DebugFormat("Rollback Cause = {0}", cause);
+                    Assert.IsTrue(cause.Contains("RedeliveryPolicy"), "cause 
exception has no policy ref");
+                }
+                else
+                {
+                    Tracer.Debug("DLQ'd message has no cause tag.");
+                }
+            }
+        }
+
+        class OnMessageNoCommitCallback
+        {
+            private Atomic<int> receivedCount;
+            private CountDownLatch done = new CountDownLatch(1);
+
+            public OnMessageNoCommitCallback(Atomic<int> receivedCount)
+            {
+                this.receivedCount = receivedCount;
+            }
+
+            public bool Await() 
+            {
+                return done.await(TimeSpan.FromMilliseconds(5000));
+            }
+
+            public void consumer_Listener(IMessage message)
+            {
+                ActiveMQTextMessage m = message as ActiveMQTextMessage;
+                Tracer.DebugFormat("Received Message: {0} delivery count = 
{1}", m.Text, m.RedeliveryCounter);
+                Assert.AreEqual("1st", m.Text);
+                Assert.AreEqual(receivedCount.Value, m.RedeliveryCounter);
+                receivedCount.GetAndSet(receivedCount.Value + 1);
+                done.countDown();
+            }
+        }
     }
 }


Reply via email to