Author: jgomes
Date: Fri May 23 18:26:34 2014
New Revision: 1597145

URL: http://svn.apache.org/r1597145
Log:
Copy the ignoreExpiration implementation from the OpenWire provider.
Fixes [AMQNET-478]. (See https://issues.apache.org/jira/browse/AMQNET-478)

Added:
    
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/MessageConsumerTest.cs
Modified:
    
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj

Modified: 
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs?rev=1597145&r1=1597144&r2=1597145&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs
 Fri May 23 18:26:34 2014
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 using System;
+using System.Collections.Specialized;
 using System.Threading;
 using System.Collections.Generic;
 using Apache.NMS.Stomp.Commands;
@@ -58,14 +59,45 @@ namespace Apache.NMS.Stomp
         private IRedeliveryPolicy redeliveryPolicy;
         private Exception failureError;
 
-        // Constructor internal to prevent clients from creating an instance.
-        internal MessageConsumer(Session session, ConsumerInfo info)
+               // Constructor internal to prevent clients from creating an 
instance.
+        internal MessageConsumer(Session session, ConsumerId id, Destination 
destination, string name, string selector, int prefetch, bool noLocal)
         {
-            this.session = session;
-            this.info = info;
+                       if(destination == null)
+                       {
+                               throw new InvalidDestinationException("Consumer 
cannot receive on Null Destinations.");
+                       }
+                       
+                       this.session = session;
             this.redeliveryPolicy = this.session.Connection.RedeliveryPolicy;
             this.messageTransformation = 
this.session.Connection.MessageTransformation;
-        }
+
+                       this.info = new ConsumerInfo();
+                       this.info.ConsumerId = id;
+                       this.info.Destination = 
Destination.Transform(destination);
+                       this.info.SubscriptionName = name;
+                       this.info.Selector = selector;
+                       this.info.PrefetchSize = prefetch;
+                       this.info.MaximumPendingMessageLimit = 
session.Connection.PrefetchPolicy.MaximumPendingMessageLimit;
+                       this.info.NoLocal = noLocal;
+                       this.info.DispatchAsync = session.DispatchAsync;
+                       this.info.Retroactive = session.Retroactive;
+                       this.info.Exclusive = session.Exclusive;
+                       this.info.Priority = session.Priority;
+                       this.info.AckMode = session.AcknowledgementMode;
+
+                       // If the destination contained a URI query, then use 
it to set public properties
+                       // on the ConsumerInfo
+                       if(destination.Options != null)
+                       {
+                               // Get options prefixed with "consumer.*"
+                               StringDictionary options = 
URISupport.GetProperties(destination.Options, "consumer.");
+                               // Extract out custom extension options 
"consumer.nms.*"
+                               StringDictionary customConsumerOptions = 
URISupport.ExtractProperties(options, "nms.");
+
+                               URISupport.SetProperties(this.info, options);
+                               URISupport.SetProperties(this, 
customConsumerOptions, "nms.");
+                       }
+               }
 
         ~MessageConsumer()
         {
@@ -79,7 +111,12 @@ namespace Apache.NMS.Stomp
             get { return info.ConsumerId; }
         }
 
-        public int PrefetchSize
+               public ConsumerInfo ConsumerInfo
+               {
+                       get { return this.info; }
+               }
+               
+               public int PrefetchSize
         {
             get { return this.info.PrefetchSize; }
         }
@@ -90,18 +127,26 @@ namespace Apache.NMS.Stomp
             set { this.redeliveryPolicy = value; }
         }
 
-        private ConsumerTransformerDelegate consumerTransformer;
-        public ConsumerTransformerDelegate ConsumerTransformer
-        {
-            get { return this.consumerTransformer; }
-            set { this.consumerTransformer = value; }
-        }
+               // Custom Options
+               private bool ignoreExpiration = false;
+               public bool IgnoreExpiration
+               {
+                       get { return ignoreExpiration; }
+                       set { ignoreExpiration = value; }
+               }
 
-        #endregion
+               #endregion
 
         #region IMessageConsumer Members
 
-        public event MessageListener Listener
+               private ConsumerTransformerDelegate consumerTransformer;
+               public ConsumerTransformerDelegate ConsumerTransformer
+               {
+                       get { return this.consumerTransformer; }
+                       set { this.consumerTransformer = value; }
+               }
+
+               public event MessageListener Listener
         {
             add
             {
@@ -424,7 +469,7 @@ namespace Apache.NMS.Stomp
 
                             try
                             {
-                                bool expired = message.IsExpired();
+                                                               bool expired = 
(!IgnoreExpiration && message.IsExpired());
 
                                 if(!expired)
                                 {
@@ -548,7 +593,7 @@ namespace Apache.NMS.Stomp
                 {
                     return null;
                 }
-                else if(dispatch.Message.IsExpired())
+                               else if(!IgnoreExpiration && 
dispatch.Message.IsExpired())
                 {
                     Tracer.DebugFormat("{0} received expired message: {1}", 
info.ConsumerId, dispatch.Message.MessageId);
 

Modified: 
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs?rev=1597145&r1=1597144&r2=1597145&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs 
(original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs 
Fri May 23 18:26:34 2014
@@ -416,40 +416,47 @@ namespace Apache.NMS.Stomp
                 throw new InvalidDestinationException("Cannot create a 
Consumer with a Null destination");
             }
 
-            ConsumerInfo command = CreateConsumerInfo(destination, selector);
-            command.NoLocal = noLocal;
-            ConsumerId consumerId = command.ConsumerId;
-            MessageConsumer consumer = null;
+                       int prefetchSize = 
this.Connection.PrefetchPolicy.DurableTopicPrefetch;
 
-            // Registered with Connection before we register at the broker.
-            connection.addDispatcher(consumerId, this);
+                       if(destination.IsTopic)
+                       {
+                               prefetchSize = 
this.connection.PrefetchPolicy.TopicPrefetch;
+                       }
+                       else if(destination.IsQueue)
+                       {
+                               prefetchSize = 
this.connection.PrefetchPolicy.QueuePrefetch;
+                       }
+                       
+            MessageConsumer consumer = null;
 
             try
             {
-                consumer = new MessageConsumer(this, command);
+                   Destination dest = destination as Destination;
+                               consumer = new MessageConsumer(this, 
GetNextConsumerId(), dest, null, selector, prefetchSize, noLocal);
                 consumer.ConsumerTransformer = this.ConsumerTransformer;
-                consumers[consumerId] = consumer;
+                               this.AddConsumer(consumer);
 
-                if(this.Started)
+                               // lets register the consumer first in case we 
start dispatching messages immediately
+                               
this.Connection.SyncRequest(consumer.ConsumerInfo);
+
+                               if(this.Started)
                 {
                     consumer.Start();
                 }
-
-                // lets register the consumer first in case we start 
dispatching messages immediately
-                this.Connection.SyncRequest(command);
-
-                return consumer;
             }
             catch(Exception)
             {
                 if(consumer != null)
                 {
+                                       this.RemoveConsumer(consumer);
                     consumer.Close();
                 }
 
                 throw;
             }
-        }
+
+                       return consumer;
+               }
 
         public IMessageConsumer CreateDurableConsumer(ITopic destination, 
string name, string selector, bool noLocal)
         {
@@ -458,33 +465,26 @@ namespace Apache.NMS.Stomp
                 throw new InvalidDestinationException("Cannot create a 
Consumer with a Null destination");
             }
 
-            ConsumerInfo command = CreateConsumerInfo(destination, selector);
-            ConsumerId consumerId = command.ConsumerId;
-            command.SubscriptionName = name;
-            command.NoLocal = noLocal;
-            command.PrefetchSize = 
this.connection.PrefetchPolicy.DurableTopicPrefetch;
             MessageConsumer consumer = null;
 
-            // Registered with Connection before we register at the broker.
-            connection.addDispatcher(consumerId, this);
-
             try
             {
-                consumer = new MessageConsumer(this, command);
-                consumer.ConsumerTransformer = this.ConsumerTransformer;
-                consumers[consumerId] = consumer;
+                               Destination dest = destination as Destination;
+                               consumer = new MessageConsumer(this, 
GetNextConsumerId(), dest, name, selector, 
this.connection.PrefetchPolicy.DurableTopicPrefetch, noLocal);
+                               consumer.ConsumerTransformer = 
this.ConsumerTransformer;
+                               this.AddConsumer(consumer);
+                               
this.connection.SyncRequest(consumer.ConsumerInfo);
 
-                if(this.Started)
+                               if(this.Started)
                 {
                     consumer.Start();
                 }
-
-                this.connection.SyncRequest(command);
             }
             catch(Exception)
             {
                 if(consumer != null)
                 {
+                                       this.RemoveConsumer(consumer);
                     consumer.Close();
                 }
 
@@ -633,7 +633,26 @@ namespace Apache.NMS.Stomp
 
         #endregion
 
-        public void DoSend( Message message, MessageProducer producer, 
TimeSpan sendTimeout )
+               public void AddConsumer(MessageConsumer consumer)
+               {
+                       if(!this.closing)
+                       {
+                               // Registered with Connection before we 
register at the broker.
+                               consumers[consumer.ConsumerId] = consumer;
+                               connection.addDispatcher(consumer.ConsumerId, 
this);
+                       }
+               }
+
+               public void RemoveConsumer(MessageConsumer consumer)
+               {
+                       connection.removeDispatcher(consumer.ConsumerId);
+                       if(!this.closing)
+                       {
+                               consumers.Remove(consumer.ConsumerId);
+                       }
+               }
+
+               public void DoSend(Message message, MessageProducer producer, 
TimeSpan sendTimeout)
         {
             Message msg = message;
 
@@ -699,52 +718,10 @@ namespace Apache.NMS.Stomp
             }
         }
 
-        protected virtual ConsumerInfo CreateConsumerInfo(IDestination 
destination, string selector)
-        {
-            ConsumerInfo answer = new ConsumerInfo();
-            ConsumerId id = new ConsumerId();
-            id.ConnectionId = info.SessionId.ConnectionId;
-            id.SessionId = info.SessionId.Value;
-            id.Value = Interlocked.Increment(ref consumerCounter);
-            answer.ConsumerId = id;
-            answer.Destination = Destination.Transform(destination);
-            answer.Selector = selector;
-            answer.Priority = this.Priority;
-            answer.Exclusive = this.Exclusive;
-            answer.DispatchAsync = this.DispatchAsync;
-            answer.Retroactive = this.Retroactive;
-            answer.MaximumPendingMessageLimit = 
this.connection.PrefetchPolicy.MaximumPendingMessageLimit;
-            answer.AckMode = this.AcknowledgementMode;
-
-            if(destination is ITopic || destination is ITemporaryTopic)
-            {
-                answer.PrefetchSize = 
this.connection.PrefetchPolicy.TopicPrefetch;
-            }
-            else if(destination is IQueue || destination is ITemporaryQueue)
-            {
-                answer.PrefetchSize = 
this.connection.PrefetchPolicy.QueuePrefetch;
-            }
-
-            // If the destination contained a URI query, then use it to set 
public properties
-            // on the ConsumerInfo
-            Destination amqDestination = destination as Destination;
-            if(amqDestination != null && amqDestination.Options != null)
-            {
-                StringDictionary options = 
URISupport.GetProperties(amqDestination.Options, "consumer.");
-                URISupport.SetProperties(answer, options);
-            }
-
-            return answer;
-        }
-
         protected virtual ProducerInfo CreateProducerInfo(IDestination 
destination)
         {
             ProducerInfo answer = new ProducerInfo();
-            ProducerId id = new ProducerId();
-            id.ConnectionId = info.SessionId.ConnectionId;
-            id.SessionId = info.SessionId.Value;
-            id.Value = Interlocked.Increment(ref producerCounter);
-            answer.ProducerId = id;
+            answer.ProducerId = GetNextProducerId();
             answer.Destination = Destination.Transform(destination);
 
             // If the destination contained a URI query, then use it to set 
public
@@ -759,7 +736,27 @@ namespace Apache.NMS.Stomp
             return answer;
         }
 
-        public void Stop()
+               public ConsumerId GetNextConsumerId()
+               {
+                       ConsumerId id = new ConsumerId();
+                       id.ConnectionId = info.SessionId.ConnectionId;
+                       id.SessionId = info.SessionId.Value;
+                       id.Value = Interlocked.Increment(ref consumerCounter);
+
+                       return id;
+               }
+
+               public ProducerId GetNextProducerId()
+               {
+                       ProducerId id = new ProducerId();
+                       id.ConnectionId = info.SessionId.ConnectionId;
+                       id.SessionId = info.SessionId.Value;
+                       id.Value = Interlocked.Increment(ref producerCounter);
+
+                       return id;
+               }
+
+               public void Stop()
         {
             if(this.executor != null)
             {

Added: 
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/MessageConsumerTest.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/MessageConsumerTest.cs?rev=1597145&view=auto
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/MessageConsumerTest.cs
 (added)
+++ 
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/MessageConsumerTest.cs
 Fri May 23 18:26:34 2014
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Threading;
+using Apache.NMS.Test;
+using NUnit.Framework;
+using Apache.NMS.Stomp.Commands;
+using System;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.Stomp.Test
+{
+    public enum ExpirationOptions
+    {
+        DEFAULT,
+        IGNORE,
+        DO_NOT_IGNORE
+    }
+
+    [TestFixture]
+    public class MessageConsumerTest : NMSTestSupport
+    {
+        protected static string DESTINATION_NAME = 
"queue://TEST.MessageConsumerTestDestination";
+        protected static string TEST_CLIENT_ID = "MessageConsumerTestClientId";
+
+        private CountDownLatch doneLatch;
+        private int counter;
+        private String errorMessage;
+
+        [SetUp]
+        public override void SetUp()
+        {
+            base.SetUp();
+
+            this.doneLatch = new CountDownLatch(1);
+            this.counter = 0;
+            this.errorMessage = null;
+        }
+
+        [Test]
+        public void TestBadSelectorDoesNotCloseConnection()
+        {
+            using(IConnection connection = CreateConnection(TEST_CLIENT_ID))
+            {
+                using(ISession sender = 
connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+                using(ISession session = 
connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+                {
+                    IDestination destination = sender.CreateTemporaryQueue();
+
+                    IMessageProducer producer = 
sender.CreateProducer(destination);
+                    ITextMessage goodMsg = 
sender.CreateTextMessage("testGood");
+                    producer.Send(goodMsg);
+
+                    IMessageConsumer consumer = 
session.CreateConsumer(destination);
+                    connection.Start();
+                    
Assert.NotNull(consumer.Receive(TimeSpan.FromMilliseconds(5000)));
+
+                    try
+                    {
+                        ISession badListenerSession = 
connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                        badListenerSession.CreateConsumer(destination, 
"badSelector;too");
+                        Assert.Fail("Exception expected.");
+                    }
+                    catch(Exception e)
+                    {
+                        Tracer.DebugFormat("Caught Ex: {0}", e);
+                    }
+
+                    ITextMessage failMsg = 
sender.CreateTextMessage("testFail");
+                    producer.Send(failMsg);
+                    
Assert.NotNull(consumer.Receive(TimeSpan.FromMilliseconds(5000)));
+                }
+            }
+        }
+
+        [Test]
+        public void TestAsyncDispatchExceptionRedelivers()
+        {
+            using(IConnection connection = CreateConnection(TEST_CLIENT_ID))
+            {
+                using(ISession session = 
connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+                {
+                    IQueue queue = SessionUtil.GetDestination(session, 
DESTINATION_NAME) as IQueue;
+
+                    using(IMessageProducer producer = 
session.CreateProducer(queue))
+                    {
+                        producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
+                        producer.Send(producer.CreateTextMessage("First"));
+                        producer.Send(producer.CreateTextMessage("Second"));
+                    }
+
+                    using(IMessageConsumer consumer = 
session.CreateConsumer(queue))
+                    {
+                        consumer.Listener += OnTestAsynchRedliversMessage;
+
+                        connection.Start();
+
+                        if(doneLatch.await(TimeSpan.FromSeconds(10)))
+                        {
+                            if(!String.IsNullOrEmpty(errorMessage))
+                            {
+                                Assert.Fail(errorMessage);
+                            }
+                        }
+                        else
+                        {
+                            Assert.Fail("Timeout waiting for async message 
delivery to complete.");
+                        }
+                    }
+                }
+            }
+        }
+
+        private void OnTestAsynchRedliversMessage(IMessage msg)
+        {
+            counter++;
+            try
+            {
+                ITextMessage message = msg as ITextMessage;
+                switch(counter)
+                {
+                    case 1:
+                        Tracer.Debug("Got first Message: " + message.Text);
+                        Assert.AreEqual("First", message.Text);
+                        Assert.IsFalse(message.NMSRedelivered);
+                        break;
+                    case 2:
+                        Tracer.Debug("Got Second Message: " + message.Text);
+                        Assert.AreEqual("Second", message.Text);
+                        Assert.IsFalse(message.NMSRedelivered);
+                        throw new Exception("Ignore Me");
+                    case 3:
+                        Tracer.Debug("Got Third Message: " + message.Text);
+                        Assert.AreEqual("Second", message.Text);
+                        Assert.IsTrue(message.NMSRedelivered);
+                        doneLatch.countDown();
+                        break;
+                    default:
+                        errorMessage = "Got too many messages: " + counter;
+                        Tracer.Debug(errorMessage);
+                        doneLatch.countDown();
+                        break;
+                }
+            }
+            catch(Exception e)
+            {
+                if(e.Message.Equals("Ignore Me"))
+                {
+                    throw;
+                }
+                errorMessage = "Got exception: " + e.Message;
+                Tracer.Warn("Exception on Message Receive: " + e.Message);
+                doneLatch.countDown();
+            }
+        }
+
+        [Test]
+        public void ConsumeInTwoThreads()
+        {
+            ParameterizedThreadStart threadStart =
+                delegate(object o)
+                {
+                    IMessageConsumer consumer = (IMessageConsumer) o;
+                    IMessage message = 
consumer.Receive(TimeSpan.FromSeconds(2));
+                    Assert.IsNotNull(message);
+                };
+
+            using(IConnection connection = CreateConnection(TEST_CLIENT_ID))
+            {
+                connection.Start();
+                using(ISession session = 
connection.CreateSession(AcknowledgementMode.Transactional))
+                {
+                    IQueue queue = SessionUtil.GetDestination(session, 
DESTINATION_NAME) as IQueue;
+
+                    // enqueue 2 messages
+                    using(IMessageConsumer consumer = 
session.CreateConsumer(queue))
+                    using(IMessageProducer producer = 
session.CreateProducer(queue))
+                    {
+                        producer.DeliveryMode = MsgDeliveryMode.Persistent;
+                        producer.Send(producer.CreateMessage());
+                        producer.Send(producer.CreateMessage());
+                        session.Commit();
+
+                        // receive first using a dedicated thread. This works
+                        Thread thread = new Thread(threadStart);
+                        thread.Start(consumer);
+                        thread.Join();
+                        session.Commit();
+
+                        // receive second using main thread. This FAILS
+                        IMessage message = 
consumer.Receive(TimeSpan.FromSeconds(2)); // throws 
System.Threading.AbandonedMutexException
+                        Assert.IsNotNull(message);
+                        session.Commit();
+                    }
+                }
+            }
+        }
+
+        [Test]
+        public void TestReceiveIgnoreExpirationMessage(
+            [Values(AcknowledgementMode.AutoAcknowledge, 
AcknowledgementMode.ClientAcknowledge,
+                AcknowledgementMode.DupsOkAcknowledge, 
AcknowledgementMode.Transactional)]
+            AcknowledgementMode ackMode,
+            [Values(MsgDeliveryMode.NonPersistent, MsgDeliveryMode.Persistent)]
+            MsgDeliveryMode deliveryMode,
+            [Values(ExpirationOptions.DEFAULT, ExpirationOptions.IGNORE, 
ExpirationOptions.DO_NOT_IGNORE)]
+            ExpirationOptions expirationOption)
+        {
+            using(IConnection connection = CreateConnection(TEST_CLIENT_ID))
+            {
+                connection.Start();
+                using(Session session = connection.CreateSession(ackMode) as 
Session)
+                {
+                    string destinationName = DESTINATION_NAME;
+
+                    if(ExpirationOptions.IGNORE == expirationOption)
+                    {
+                        destinationName += 
"?consumer.nms.ignoreExpiration=true";
+                    }
+                    else if(ExpirationOptions.DO_NOT_IGNORE == 
expirationOption)
+                    {
+                        destinationName += 
"?consumer.nms.ignoreExpiration=false";
+                    }
+
+                    try
+                    {
+                        IDestination destination = 
SessionUtil.GetDestination(session, destinationName);
+
+                        using(IMessageConsumer consumer = 
session.CreateConsumer(destination))
+                        using(IMessageProducer producer = 
session.CreateProducer(destination))
+                        {
+                            producer.DeliveryMode = deliveryMode;
+
+                            string msgText = string.Format("ExpiredMessage: 
{0}", Guid.NewGuid().ToString());
+
+                            TextMessage msg = 
session.CreateTextMessage(msgText) as TextMessage;
+
+                            // Give it two seconds to live.
+                            msg.NMSTimeToLive = 
TimeSpan.FromMilliseconds(2000);
+
+                            producer.Send(msg);
+
+                            if(AcknowledgementMode.Transactional == ackMode)
+                            {
+                                session.Commit();
+                            }
+
+                            // Wait for four seconds before processing it.  
The broker will have sent it to our local
+                            // client dispatch queue, but we won't attempt to 
process the message until it has had
+                            // a chance to expire within our internal queue 
system.
+                            Thread.Sleep(4000);
+
+                            TextMessage rcvMsg = consumer.ReceiveNoWait() as 
TextMessage;
+
+                            if(ExpirationOptions.IGNORE == expirationOption)
+                            {
+                                Assert.IsNotNull(rcvMsg, "Did not receive 
expired message.");
+                                rcvMsg.Acknowledge();
+
+                                Assert.AreEqual(msgText, rcvMsg.Text, "Message 
text does not match.");
+                                Assert.IsTrue(rcvMsg.IsExpired());
+
+                                if(AcknowledgementMode.Transactional == 
ackMode)
+                                {
+                                    session.Commit();
+                                }
+                            }
+                            else
+                            {
+                                // Should not receive a message.
+                                Assert.IsNull(rcvMsg, "Received an expired 
message!");
+                            }
+
+                            consumer.Close();
+                            producer.Close();
+                        }
+                    }
+                    finally
+                    {
+                        try
+                        {
+                            // Ensure that Session resources on the Broker 
release transacted Consumers.
+                            session.Close();
+                            // Give the Broker some time to remove the 
subscriptions.
+                            Thread.Sleep(2000);
+                            SessionUtil.DeleteDestination(session, 
destinationName);
+                        }
+                        catch
+                        {
+                        }
+                    }
+                }
+            }
+        }
+    }
+}

Modified: 
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj?rev=1597145&r1=1597144&r2=1597145&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj 
(original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj 
Fri May 23 18:26:34 2014
@@ -68,6 +68,7 @@
       <HintPath>lib\NUnit\net-2.0\nunit.framework.dll</HintPath>
     </Reference>
     <Reference Include="System" />
+    <Reference Include="System.Data" />
     <Reference Include="System.Xml" />
   </ItemGroup>
   <ItemGroup>
@@ -104,6 +105,7 @@
   <ItemGroup>
     <Compile Include="src\test\csharp\AMQNET383Test.cs" />
     <Compile Include="src\test\csharp\InvalidCredentialsTest.cs" />
+    <Compile Include="src\test\csharp\MessageConsumerTest.cs" />
     <Compile Include="src\test\csharp\StompHelperTest.cs" />
     <Compile Include="src\test\csharp\StompRedeliveryPolicyTest.cs" />
     <Compile Include="src\test\csharp\Threads\CompositeTaskRunnerTest.cs" />


Reply via email to