Author: tabish Date: Thu Nov 19 23:19:50 2015 New Revision: 1715304 URL: http://svn.apache.org/viewvc?rev=1715304&view=rev Log: https://issues.apache.org/jira/browse/AMQNET-513
Preserve the rollback cause when poisoning a message. Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/ (props changed) activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/main/csharp/MessageConsumer.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/test/csharp/MessageListenerRedeliveryTest.cs Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Nov 19 23:19:50 2015 @@ -1,4 +1,4 @@ /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x:1082291,1135831,1137081,1171843,1171874,1177390,1177395,1186568,1187123,1238881,1293360,1294890,1295257,1311395,1312026,1374469,1375295,1376782 /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/tags/1.0.0:692591,693525 /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/tags/1.1.0:788230,788233,790183 -/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk:1689517,1695609-1695737,1705945 +/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk:1689517,1695609-1695737,1705945,1715303 Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/main/csharp/MessageConsumer.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/main/csharp/MessageConsumer.cs?rev=1715304&r1=1715303&r2=1715304&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/main/csharp/MessageConsumer.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/main/csharp/MessageConsumer.cs Thu Nov 19 23:19:50 2015 @@ -883,10 +883,10 @@ namespace Apache.NMS.ActiveMQ } catch(Exception e) { + dispatch.RollbackCause = e; if(IsAutoAcknowledgeBatch || IsAutoAcknowledgeEach || IsIndividualAcknowledge) { // Schedule redelivery and possible dlq processing - dispatch.RollbackCause = e; Rollback(); } else @@ -1094,9 +1094,9 @@ namespace Apache.NMS.ActiveMQ } } - private bool ConsumeExpiredMessage(MessageDispatch dispatch) + private bool ConsumeExpiredMessage(MessageDispatch dispatch) { - if (dispatch.Message.IsExpired()) + if (dispatch.Message.IsExpired()) { return !info.Browser && !IgnoreExpiration; } @@ -1480,7 +1480,9 @@ namespace Apache.NMS.ActiveMQ cause.ExceptionClass = "javax.jms.JMSException"; cause.Message = lastMd.RollbackCause.Message; poisonCause.Cause = cause; + poisonCause.Message = poisonCause.Message + " cause: " + lastMd.RollbackCause.Message; } + ack.FirstMessageId = firstMsgId; ack.PoisonCause = poisonCause; Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/test/csharp/MessageListenerRedeliveryTest.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/test/csharp/MessageListenerRedeliveryTest.cs?rev=1715304&r1=1715303&r2=1715304&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/test/csharp/MessageListenerRedeliveryTest.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/test/csharp/MessageListenerRedeliveryTest.cs Thu Nov 19 23:19:50 2015 @@ -15,7 +15,11 @@ * limitations under the License. */ +using System; using System.Threading; +using System.Collections; +using Apache.NMS; +using Apache.NMS.Util; using Apache.NMS.Test; using Apache.NMS.Policies; using NUnit.Framework; @@ -26,15 +30,29 @@ namespace Apache.NMS.ActiveMQ.Test public class MessageListenerRedeliveryTest : NMSTestSupport { private Connection connection; - private int counter; + private volatile int counter; private ISession session; + private ArrayList received; + private ArrayList dlqMessages; + private int maxDeliveries; + + private CountDownLatch gotOneMessage; + private CountDownLatch gotTwoMessages; + private CountDownLatch gotOneDlqMessage; + private CountDownLatch gotMaxRedeliveries; [SetUp] public override void SetUp() { this.connection = (Connection) CreateConnection(); this.connection.RedeliveryPolicy = GetRedeliveryPolicy(); - + this.gotOneMessage = new CountDownLatch(1); + this.gotTwoMessages = new CountDownLatch(2); + this.gotOneDlqMessage = new CountDownLatch(1); + this.maxDeliveries = GetRedeliveryPolicy().MaximumRedeliveries; + this.gotMaxRedeliveries = new CountDownLatch(maxDeliveries); + this.received = new ArrayList(); + this.dlqMessages = new ArrayList(); this.counter = 0; } @@ -76,6 +94,39 @@ namespace Apache.NMS.ActiveMQ.Test } } + private void OnTracedReceiveMessage(IMessage message) + { + try + { + received.Add(((ITextMessage) message).Text); + } + catch (Exception e) + { + Assert.Fail("Error: " + e.Message); + } + + if (++counter < maxDeliveries) + { + throw new Exception("force a redelivery"); + } + + // new blood + counter = 0; + gotTwoMessages.countDown(); + } + + private void OnDlqMessage(IMessage message) + { + dlqMessages.Add(message); + gotOneDlqMessage.countDown(); + } + + private void OnRedeliveredMessage(IMessage message) + { + gotMaxRedeliveries.countDown(); + throw new Exception("Test Forcing a Rollback"); + } + [Test] public void TestQueueRollbackConsumerListener() { @@ -91,7 +142,7 @@ namespace Apache.NMS.ActiveMQ.Test IMessageConsumer consumer = session.CreateConsumer(queue); consumer.Listener += new MessageListener(OnMessageListener); - + Thread.Sleep(500); // first try.. should get 2 since there is no delay on the @@ -125,6 +176,191 @@ namespace Apache.NMS.ActiveMQ.Test session.Close(); } - + + [Test] + public void TestQueueRollbackSessionListener() + { + connection.Start(); + + this.session = connection.CreateSession(AcknowledgementMode.Transactional); + IQueue queue = session.CreateTemporaryQueue(); + IMessageProducer producer = CreateProducer(session, queue); + IMessage message = CreateTextMessage(session); + producer.Send(message); + session.Commit(); + + IMessageConsumer consumer = session.CreateConsumer(queue); + consumer.Listener += new MessageListener(OnMessageListener); + + Thread.Sleep(1000); + + // first try + Assert.AreEqual(2, counter); + + Thread.Sleep(1500); + + // second try (redelivery after 1 sec) + Assert.AreEqual(3, counter); + + Thread.Sleep(3000); + + // third try (redelivery after 2 seconds) - it should give up after that + Assert.AreEqual(4, counter); + + // create new message + producer.Send(CreateTextMessage(session)); + session.Commit(); + + Thread.Sleep(1000); + + // it should be committed, so no redelivery + Assert.AreEqual(5, counter); + + Thread.Sleep(2000); + + // no redelivery, counter should still be 4 + Assert.AreEqual(5, counter); + + session.Close(); + } + + [Test] + public void TestQueueSessionListenerExceptionRetry() + { + connection.Start(); + + ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge); + IQueue queue = session.CreateTemporaryQueue(); + IMessageProducer producer = CreateProducer(session, queue); + IMessage message = CreateTextMessage(session, "1"); + producer.Send(message); + message = CreateTextMessage(session, "2"); + producer.Send(message); + + IMessageConsumer consumer = session.CreateConsumer(queue); + consumer.Listener += new MessageListener(OnTracedReceiveMessage); + + Assert.IsTrue(gotTwoMessages.await(TimeSpan.FromSeconds(20)), "got message before retry expiry"); + + for (int i = 0; i < maxDeliveries; i++) + { + Assert.AreEqual("1", received[i], "got first redelivered: " + i); + } + for (int i = maxDeliveries; i < maxDeliveries * 2; i++) + { + Assert.AreEqual("2", received[i], "got first redelivered: " + i); + } + + session.Close(); + } + + [Test] + public void TestQueueSessionListenerExceptionDlq() + { + connection.Start(); + + session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge); + IQueue queue = session.CreateTemporaryQueue(); + IMessageProducer producer = CreateProducer(session, queue); + IMessage message = CreateTextMessage(session); + producer.Send(message); + + IDestination dlqDestination = session.GetQueue("ActiveMQ.DLQ"); + connection.DeleteDestination(dlqDestination); + IMessageConsumer dlqConsumer = session.CreateConsumer(dlqDestination); + dlqConsumer.Listener += new MessageListener(OnDlqMessage); + + IMessageConsumer consumer = session.CreateConsumer(queue); + consumer.Listener += new MessageListener(OnRedeliveredMessage); + + Assert.IsTrue(gotMaxRedeliveries.await(TimeSpan.FromSeconds(20)), "got message before retry expiry"); + + // check DLQ + Assert.IsTrue(gotOneDlqMessage.await(TimeSpan.FromSeconds(20)), "got dlq message"); + + // check DLQ message cause is captured + message = dlqMessages[0] as IMessage; + Assert.IsNotNull(message, "dlq message captured"); + String cause = message.Properties.GetString("dlqDeliveryFailureCause"); + + Assert.IsTrue(cause.Contains("JMSException"), "cause 'cause' exception is remembered"); + Assert.IsTrue(cause.Contains("Test"), "is correct exception"); + Assert.IsTrue(cause.Contains("RedeliveryPolicy"), "cause policy is remembered"); + + session.Close(); + } + + private void OnMessageThenRollback(IMessage message) + { + gotOneMessage.countDown(); + try + { + session.Rollback(); + } + catch (Exception) + { + } + + throw new Exception("Test force a redelivery"); + } + + [Test] + public void TestTransactedQueueSessionListenerExceptionDlq() + { + connection.Start(); + + session = connection.CreateSession(AcknowledgementMode.Transactional); + IQueue queue = session.CreateTemporaryQueue(); + IMessageProducer producer = CreateProducer(session, queue); + IMessage message = CreateTextMessage(session); + producer.Send(message); + session.Commit(); + + IDestination dlqDestination = session.GetQueue("ActiveMQ.DLQ"); + connection.DeleteDestination(dlqDestination); + IMessageConsumer dlqConsumer = session.CreateConsumer(dlqDestination); + dlqConsumer.Listener += new MessageListener(OnDlqMessage); + + IMessageConsumer consumer = session.CreateConsumer(queue); + consumer.Listener += new MessageListener(OnMessageThenRollback); + + Assert.IsTrue(gotOneMessage.await(TimeSpan.FromSeconds(20)), "got message before retry expiry"); + + // check DLQ + Assert.IsTrue(gotOneDlqMessage.await(TimeSpan.FromSeconds(20)), "got dlq message"); + + // check DLQ message cause is captured + message = dlqMessages[0] as IMessage; + Assert.IsNotNull(message, "dlq message captured"); + String cause = message.Properties.GetString("dlqDeliveryFailureCause"); + + Assert.IsTrue(cause.Contains("JMSException"), "cause 'cause' exception is remembered"); + Assert.IsTrue(cause.Contains("Test force"), "is correct exception"); + Assert.IsTrue(cause.Contains("RedeliveryPolicy"), "cause policy is remembered"); + + session.Close(); + } + + private ITextMessage CreateTextMessage(ISession session, String text) + { + return session.CreateTextMessage(text); + } + + private ITextMessage CreateTextMessage(ISession session) + { + return session.CreateTextMessage("Hello"); + } + + private IMessageProducer CreateProducer(ISession session, IDestination queue) + { + IMessageProducer producer = session.CreateProducer(queue); + producer.DeliveryMode = GetDeliveryMode(); + return producer; + } + + protected MsgDeliveryMode GetDeliveryMode() + { + return MsgDeliveryMode.Persistent; + } } }