Author: jgomes Date: Fri May 23 18:26:34 2014 New Revision: 1597145 URL: http://svn.apache.org/r1597145 Log: Copy the ignoreExpiration implementation from the OpenWire provider. Fixes [AMQNET-478]. (See https://issues.apache.org/jira/browse/AMQNET-478)
Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/MessageConsumerTest.cs Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs?rev=1597145&r1=1597144&r2=1597145&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs Fri May 23 18:26:34 2014 @@ -15,6 +15,7 @@ * limitations under the License. */ using System; +using System.Collections.Specialized; using System.Threading; using System.Collections.Generic; using Apache.NMS.Stomp.Commands; @@ -58,14 +59,45 @@ namespace Apache.NMS.Stomp private IRedeliveryPolicy redeliveryPolicy; private Exception failureError; - // Constructor internal to prevent clients from creating an instance. - internal MessageConsumer(Session session, ConsumerInfo info) + // Constructor internal to prevent clients from creating an instance. + internal MessageConsumer(Session session, ConsumerId id, Destination destination, string name, string selector, int prefetch, bool noLocal) { - this.session = session; - this.info = info; + if(destination == null) + { + throw new InvalidDestinationException("Consumer cannot receive on Null Destinations."); + } + + this.session = session; this.redeliveryPolicy = this.session.Connection.RedeliveryPolicy; this.messageTransformation = this.session.Connection.MessageTransformation; - } + + this.info = new ConsumerInfo(); + this.info.ConsumerId = id; + this.info.Destination = Destination.Transform(destination); + this.info.SubscriptionName = name; + this.info.Selector = selector; + this.info.PrefetchSize = prefetch; + this.info.MaximumPendingMessageLimit = session.Connection.PrefetchPolicy.MaximumPendingMessageLimit; + this.info.NoLocal = noLocal; + this.info.DispatchAsync = session.DispatchAsync; + this.info.Retroactive = session.Retroactive; + this.info.Exclusive = session.Exclusive; + this.info.Priority = session.Priority; + this.info.AckMode = session.AcknowledgementMode; + + // If the destination contained a URI query, then use it to set public properties + // on the ConsumerInfo + if(destination.Options != null) + { + // Get options prefixed with "consumer.*" + StringDictionary options = URISupport.GetProperties(destination.Options, "consumer."); + // Extract out custom extension options "consumer.nms.*" + StringDictionary customConsumerOptions = URISupport.ExtractProperties(options, "nms."); + + URISupport.SetProperties(this.info, options); + URISupport.SetProperties(this, customConsumerOptions, "nms."); + } + } ~MessageConsumer() { @@ -79,7 +111,12 @@ namespace Apache.NMS.Stomp get { return info.ConsumerId; } } - public int PrefetchSize + public ConsumerInfo ConsumerInfo + { + get { return this.info; } + } + + public int PrefetchSize { get { return this.info.PrefetchSize; } } @@ -90,18 +127,26 @@ namespace Apache.NMS.Stomp set { this.redeliveryPolicy = value; } } - private ConsumerTransformerDelegate consumerTransformer; - public ConsumerTransformerDelegate ConsumerTransformer - { - get { return this.consumerTransformer; } - set { this.consumerTransformer = value; } - } + // Custom Options + private bool ignoreExpiration = false; + public bool IgnoreExpiration + { + get { return ignoreExpiration; } + set { ignoreExpiration = value; } + } - #endregion + #endregion #region IMessageConsumer Members - public event MessageListener Listener + private ConsumerTransformerDelegate consumerTransformer; + public ConsumerTransformerDelegate ConsumerTransformer + { + get { return this.consumerTransformer; } + set { this.consumerTransformer = value; } + } + + public event MessageListener Listener { add { @@ -424,7 +469,7 @@ namespace Apache.NMS.Stomp try { - bool expired = message.IsExpired(); + bool expired = (!IgnoreExpiration && message.IsExpired()); if(!expired) { @@ -548,7 +593,7 @@ namespace Apache.NMS.Stomp { return null; } - else if(dispatch.Message.IsExpired()) + else if(!IgnoreExpiration && dispatch.Message.IsExpired()) { Tracer.DebugFormat("{0} received expired message: {1}", info.ConsumerId, dispatch.Message.MessageId); Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs?rev=1597145&r1=1597144&r2=1597145&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs Fri May 23 18:26:34 2014 @@ -416,40 +416,47 @@ namespace Apache.NMS.Stomp throw new InvalidDestinationException("Cannot create a Consumer with a Null destination"); } - ConsumerInfo command = CreateConsumerInfo(destination, selector); - command.NoLocal = noLocal; - ConsumerId consumerId = command.ConsumerId; - MessageConsumer consumer = null; + int prefetchSize = this.Connection.PrefetchPolicy.DurableTopicPrefetch; - // Registered with Connection before we register at the broker. - connection.addDispatcher(consumerId, this); + if(destination.IsTopic) + { + prefetchSize = this.connection.PrefetchPolicy.TopicPrefetch; + } + else if(destination.IsQueue) + { + prefetchSize = this.connection.PrefetchPolicy.QueuePrefetch; + } + + MessageConsumer consumer = null; try { - consumer = new MessageConsumer(this, command); + Destination dest = destination as Destination; + consumer = new MessageConsumer(this, GetNextConsumerId(), dest, null, selector, prefetchSize, noLocal); consumer.ConsumerTransformer = this.ConsumerTransformer; - consumers[consumerId] = consumer; + this.AddConsumer(consumer); - if(this.Started) + // lets register the consumer first in case we start dispatching messages immediately + this.Connection.SyncRequest(consumer.ConsumerInfo); + + if(this.Started) { consumer.Start(); } - - // lets register the consumer first in case we start dispatching messages immediately - this.Connection.SyncRequest(command); - - return consumer; } catch(Exception) { if(consumer != null) { + this.RemoveConsumer(consumer); consumer.Close(); } throw; } - } + + return consumer; + } public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal) { @@ -458,33 +465,26 @@ namespace Apache.NMS.Stomp throw new InvalidDestinationException("Cannot create a Consumer with a Null destination"); } - ConsumerInfo command = CreateConsumerInfo(destination, selector); - ConsumerId consumerId = command.ConsumerId; - command.SubscriptionName = name; - command.NoLocal = noLocal; - command.PrefetchSize = this.connection.PrefetchPolicy.DurableTopicPrefetch; MessageConsumer consumer = null; - // Registered with Connection before we register at the broker. - connection.addDispatcher(consumerId, this); - try { - consumer = new MessageConsumer(this, command); - consumer.ConsumerTransformer = this.ConsumerTransformer; - consumers[consumerId] = consumer; + Destination dest = destination as Destination; + consumer = new MessageConsumer(this, GetNextConsumerId(), dest, name, selector, this.connection.PrefetchPolicy.DurableTopicPrefetch, noLocal); + consumer.ConsumerTransformer = this.ConsumerTransformer; + this.AddConsumer(consumer); + this.connection.SyncRequest(consumer.ConsumerInfo); - if(this.Started) + if(this.Started) { consumer.Start(); } - - this.connection.SyncRequest(command); } catch(Exception) { if(consumer != null) { + this.RemoveConsumer(consumer); consumer.Close(); } @@ -633,7 +633,26 @@ namespace Apache.NMS.Stomp #endregion - public void DoSend( Message message, MessageProducer producer, TimeSpan sendTimeout ) + public void AddConsumer(MessageConsumer consumer) + { + if(!this.closing) + { + // Registered with Connection before we register at the broker. + consumers[consumer.ConsumerId] = consumer; + connection.addDispatcher(consumer.ConsumerId, this); + } + } + + public void RemoveConsumer(MessageConsumer consumer) + { + connection.removeDispatcher(consumer.ConsumerId); + if(!this.closing) + { + consumers.Remove(consumer.ConsumerId); + } + } + + public void DoSend(Message message, MessageProducer producer, TimeSpan sendTimeout) { Message msg = message; @@ -699,52 +718,10 @@ namespace Apache.NMS.Stomp } } - protected virtual ConsumerInfo CreateConsumerInfo(IDestination destination, string selector) - { - ConsumerInfo answer = new ConsumerInfo(); - ConsumerId id = new ConsumerId(); - id.ConnectionId = info.SessionId.ConnectionId; - id.SessionId = info.SessionId.Value; - id.Value = Interlocked.Increment(ref consumerCounter); - answer.ConsumerId = id; - answer.Destination = Destination.Transform(destination); - answer.Selector = selector; - answer.Priority = this.Priority; - answer.Exclusive = this.Exclusive; - answer.DispatchAsync = this.DispatchAsync; - answer.Retroactive = this.Retroactive; - answer.MaximumPendingMessageLimit = this.connection.PrefetchPolicy.MaximumPendingMessageLimit; - answer.AckMode = this.AcknowledgementMode; - - if(destination is ITopic || destination is ITemporaryTopic) - { - answer.PrefetchSize = this.connection.PrefetchPolicy.TopicPrefetch; - } - else if(destination is IQueue || destination is ITemporaryQueue) - { - answer.PrefetchSize = this.connection.PrefetchPolicy.QueuePrefetch; - } - - // If the destination contained a URI query, then use it to set public properties - // on the ConsumerInfo - Destination amqDestination = destination as Destination; - if(amqDestination != null && amqDestination.Options != null) - { - StringDictionary options = URISupport.GetProperties(amqDestination.Options, "consumer."); - URISupport.SetProperties(answer, options); - } - - return answer; - } - protected virtual ProducerInfo CreateProducerInfo(IDestination destination) { ProducerInfo answer = new ProducerInfo(); - ProducerId id = new ProducerId(); - id.ConnectionId = info.SessionId.ConnectionId; - id.SessionId = info.SessionId.Value; - id.Value = Interlocked.Increment(ref producerCounter); - answer.ProducerId = id; + answer.ProducerId = GetNextProducerId(); answer.Destination = Destination.Transform(destination); // If the destination contained a URI query, then use it to set public @@ -759,7 +736,27 @@ namespace Apache.NMS.Stomp return answer; } - public void Stop() + public ConsumerId GetNextConsumerId() + { + ConsumerId id = new ConsumerId(); + id.ConnectionId = info.SessionId.ConnectionId; + id.SessionId = info.SessionId.Value; + id.Value = Interlocked.Increment(ref consumerCounter); + + return id; + } + + public ProducerId GetNextProducerId() + { + ProducerId id = new ProducerId(); + id.ConnectionId = info.SessionId.ConnectionId; + id.SessionId = info.SessionId.Value; + id.Value = Interlocked.Increment(ref producerCounter); + + return id; + } + + public void Stop() { if(this.executor != null) { Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/MessageConsumerTest.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/MessageConsumerTest.cs?rev=1597145&view=auto ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/MessageConsumerTest.cs (added) +++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/MessageConsumerTest.cs Fri May 23 18:26:34 2014 @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System.Threading; +using Apache.NMS.Test; +using NUnit.Framework; +using Apache.NMS.Stomp.Commands; +using System; +using Apache.NMS.Util; + +namespace Apache.NMS.Stomp.Test +{ + public enum ExpirationOptions + { + DEFAULT, + IGNORE, + DO_NOT_IGNORE + } + + [TestFixture] + public class MessageConsumerTest : NMSTestSupport + { + protected static string DESTINATION_NAME = "queue://TEST.MessageConsumerTestDestination"; + protected static string TEST_CLIENT_ID = "MessageConsumerTestClientId"; + + private CountDownLatch doneLatch; + private int counter; + private String errorMessage; + + [SetUp] + public override void SetUp() + { + base.SetUp(); + + this.doneLatch = new CountDownLatch(1); + this.counter = 0; + this.errorMessage = null; + } + + [Test] + public void TestBadSelectorDoesNotCloseConnection() + { + using(IConnection connection = CreateConnection(TEST_CLIENT_ID)) + { + using(ISession sender = connection.CreateSession(AcknowledgementMode.AutoAcknowledge)) + using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge)) + { + IDestination destination = sender.CreateTemporaryQueue(); + + IMessageProducer producer = sender.CreateProducer(destination); + ITextMessage goodMsg = sender.CreateTextMessage("testGood"); + producer.Send(goodMsg); + + IMessageConsumer consumer = session.CreateConsumer(destination); + connection.Start(); + Assert.NotNull(consumer.Receive(TimeSpan.FromMilliseconds(5000))); + + try + { + ISession badListenerSession = connection.CreateSession(AcknowledgementMode.AutoAcknowledge); + badListenerSession.CreateConsumer(destination, "badSelector;too"); + Assert.Fail("Exception expected."); + } + catch(Exception e) + { + Tracer.DebugFormat("Caught Ex: {0}", e); + } + + ITextMessage failMsg = sender.CreateTextMessage("testFail"); + producer.Send(failMsg); + Assert.NotNull(consumer.Receive(TimeSpan.FromMilliseconds(5000))); + } + } + } + + [Test] + public void TestAsyncDispatchExceptionRedelivers() + { + using(IConnection connection = CreateConnection(TEST_CLIENT_ID)) + { + using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge)) + { + IQueue queue = SessionUtil.GetDestination(session, DESTINATION_NAME) as IQueue; + + using(IMessageProducer producer = session.CreateProducer(queue)) + { + producer.DeliveryMode = MsgDeliveryMode.NonPersistent; + producer.Send(producer.CreateTextMessage("First")); + producer.Send(producer.CreateTextMessage("Second")); + } + + using(IMessageConsumer consumer = session.CreateConsumer(queue)) + { + consumer.Listener += OnTestAsynchRedliversMessage; + + connection.Start(); + + if(doneLatch.await(TimeSpan.FromSeconds(10))) + { + if(!String.IsNullOrEmpty(errorMessage)) + { + Assert.Fail(errorMessage); + } + } + else + { + Assert.Fail("Timeout waiting for async message delivery to complete."); + } + } + } + } + } + + private void OnTestAsynchRedliversMessage(IMessage msg) + { + counter++; + try + { + ITextMessage message = msg as ITextMessage; + switch(counter) + { + case 1: + Tracer.Debug("Got first Message: " + message.Text); + Assert.AreEqual("First", message.Text); + Assert.IsFalse(message.NMSRedelivered); + break; + case 2: + Tracer.Debug("Got Second Message: " + message.Text); + Assert.AreEqual("Second", message.Text); + Assert.IsFalse(message.NMSRedelivered); + throw new Exception("Ignore Me"); + case 3: + Tracer.Debug("Got Third Message: " + message.Text); + Assert.AreEqual("Second", message.Text); + Assert.IsTrue(message.NMSRedelivered); + doneLatch.countDown(); + break; + default: + errorMessage = "Got too many messages: " + counter; + Tracer.Debug(errorMessage); + doneLatch.countDown(); + break; + } + } + catch(Exception e) + { + if(e.Message.Equals("Ignore Me")) + { + throw; + } + errorMessage = "Got exception: " + e.Message; + Tracer.Warn("Exception on Message Receive: " + e.Message); + doneLatch.countDown(); + } + } + + [Test] + public void ConsumeInTwoThreads() + { + ParameterizedThreadStart threadStart = + delegate(object o) + { + IMessageConsumer consumer = (IMessageConsumer) o; + IMessage message = consumer.Receive(TimeSpan.FromSeconds(2)); + Assert.IsNotNull(message); + }; + + using(IConnection connection = CreateConnection(TEST_CLIENT_ID)) + { + connection.Start(); + using(ISession session = connection.CreateSession(AcknowledgementMode.Transactional)) + { + IQueue queue = SessionUtil.GetDestination(session, DESTINATION_NAME) as IQueue; + + // enqueue 2 messages + using(IMessageConsumer consumer = session.CreateConsumer(queue)) + using(IMessageProducer producer = session.CreateProducer(queue)) + { + producer.DeliveryMode = MsgDeliveryMode.Persistent; + producer.Send(producer.CreateMessage()); + producer.Send(producer.CreateMessage()); + session.Commit(); + + // receive first using a dedicated thread. This works + Thread thread = new Thread(threadStart); + thread.Start(consumer); + thread.Join(); + session.Commit(); + + // receive second using main thread. This FAILS + IMessage message = consumer.Receive(TimeSpan.FromSeconds(2)); // throws System.Threading.AbandonedMutexException + Assert.IsNotNull(message); + session.Commit(); + } + } + } + } + + [Test] + public void TestReceiveIgnoreExpirationMessage( + [Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge, + AcknowledgementMode.DupsOkAcknowledge, AcknowledgementMode.Transactional)] + AcknowledgementMode ackMode, + [Values(MsgDeliveryMode.NonPersistent, MsgDeliveryMode.Persistent)] + MsgDeliveryMode deliveryMode, + [Values(ExpirationOptions.DEFAULT, ExpirationOptions.IGNORE, ExpirationOptions.DO_NOT_IGNORE)] + ExpirationOptions expirationOption) + { + using(IConnection connection = CreateConnection(TEST_CLIENT_ID)) + { + connection.Start(); + using(Session session = connection.CreateSession(ackMode) as Session) + { + string destinationName = DESTINATION_NAME; + + if(ExpirationOptions.IGNORE == expirationOption) + { + destinationName += "?consumer.nms.ignoreExpiration=true"; + } + else if(ExpirationOptions.DO_NOT_IGNORE == expirationOption) + { + destinationName += "?consumer.nms.ignoreExpiration=false"; + } + + try + { + IDestination destination = SessionUtil.GetDestination(session, destinationName); + + using(IMessageConsumer consumer = session.CreateConsumer(destination)) + using(IMessageProducer producer = session.CreateProducer(destination)) + { + producer.DeliveryMode = deliveryMode; + + string msgText = string.Format("ExpiredMessage: {0}", Guid.NewGuid().ToString()); + + TextMessage msg = session.CreateTextMessage(msgText) as TextMessage; + + // Give it two seconds to live. + msg.NMSTimeToLive = TimeSpan.FromMilliseconds(2000); + + producer.Send(msg); + + if(AcknowledgementMode.Transactional == ackMode) + { + session.Commit(); + } + + // Wait for four seconds before processing it. The broker will have sent it to our local + // client dispatch queue, but we won't attempt to process the message until it has had + // a chance to expire within our internal queue system. + Thread.Sleep(4000); + + TextMessage rcvMsg = consumer.ReceiveNoWait() as TextMessage; + + if(ExpirationOptions.IGNORE == expirationOption) + { + Assert.IsNotNull(rcvMsg, "Did not receive expired message."); + rcvMsg.Acknowledge(); + + Assert.AreEqual(msgText, rcvMsg.Text, "Message text does not match."); + Assert.IsTrue(rcvMsg.IsExpired()); + + if(AcknowledgementMode.Transactional == ackMode) + { + session.Commit(); + } + } + else + { + // Should not receive a message. + Assert.IsNull(rcvMsg, "Received an expired message!"); + } + + consumer.Close(); + producer.Close(); + } + } + finally + { + try + { + // Ensure that Session resources on the Broker release transacted Consumers. + session.Close(); + // Give the Broker some time to remove the subscriptions. + Thread.Sleep(2000); + SessionUtil.DeleteDestination(session, destinationName); + } + catch + { + } + } + } + } + } + } +} Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj?rev=1597145&r1=1597144&r2=1597145&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj (original) +++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj Fri May 23 18:26:34 2014 @@ -68,6 +68,7 @@ <HintPath>lib\NUnit\net-2.0\nunit.framework.dll</HintPath> </Reference> <Reference Include="System" /> + <Reference Include="System.Data" /> <Reference Include="System.Xml" /> </ItemGroup> <ItemGroup> @@ -104,6 +105,7 @@ <ItemGroup> <Compile Include="src\test\csharp\AMQNET383Test.cs" /> <Compile Include="src\test\csharp\InvalidCredentialsTest.cs" /> + <Compile Include="src\test\csharp\MessageConsumerTest.cs" /> <Compile Include="src\test\csharp\StompHelperTest.cs" /> <Compile Include="src\test\csharp\StompRedeliveryPolicyTest.cs" /> <Compile Include="src\test\csharp\Threads\CompositeTaskRunnerTest.cs" />