NMS Client does not respect TimeToLive with Listener callback
-------------------------------------------------------------

                 Key: AMQNET-323
                 URL: https://issues.apache.org/jira/browse/AMQNET-323
             Project: ActiveMQ .Net
          Issue Type: Bug
          Components: ActiveMQ, NMS
    Affects Versions: 1.5.0
         Environment: Windows 7
            Reporter: Matthew Good
            Assignee: Jim Gomes


When TimeToLive expires while a listener is in a redeliver loop, the redelivery 
never stops.  The following unit tests show this.  The first unit test uses 
Receive and it works fine.  The second uses a listener and it fails.

I added these tests to AMQRedeliveryPolicyTests

        [Test]
        public void TestNornalRedeliveryPolicyOnRollbackUntilTimeToLive()
        {
            using(Connection connection = (Connection) CreateConnection())
            {
                IRedeliveryPolicy policy = connection.RedeliveryPolicy;
                policy.MaximumRedeliveries = -1;
                policy.InitialRedeliveryDelay = 500;
                policy.UseExponentialBackOff = false;

                connection.Start();
                ISession session = 
connection.CreateSession(AcknowledgementMode.Transactional);
                IDestination destination = session.CreateTemporaryQueue();

                IMessageProducer producer = session.CreateProducer(destination);
                IMessageConsumer consumer = session.CreateConsumer(destination);

                // Send the messages
                ITextMessage textMessage = session.CreateTextMessage("1st");
                textMessage.NMSTimeToLive = TimeSpan.FromMilliseconds(800.0);
                producer.Send(textMessage);
                session.Commit();

                ITextMessage m;
                m = 
(ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
                Assert.IsNotNull(m);
                Assert.AreEqual("1st", m.Text);
                session.Rollback();

                // No delay on first Rollback..
                m = 
(ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100));
                Assert.IsNotNull(m);
                session.Rollback();

                // Show subsequent re-delivery delay is incrementing.
                m = 
(ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100));
                Assert.IsNull(m);
                m = 
(ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(700));
                Assert.IsNotNull(m);
                Assert.AreEqual("1st", m.Text);
                session.Rollback();

                // The message gets redelivered after 500 ms every time since
                // we are not using exponential backoff.
                m = 
(ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(700));
                Assert.IsNull(m);
            
            }
        }

        [Test]
        public void 
TestNornalRedeliveryPolicyOnRollbackUntilTimeToLiveCallback()
        {
            using(Connection connection = (Connection) CreateConnection())
            {
                IRedeliveryPolicy policy = connection.RedeliveryPolicy;
                policy.MaximumRedeliveries = -1;
                policy.InitialRedeliveryDelay = 500;
                policy.UseExponentialBackOff = false;

                connection.Start();
                ISession session = 
connection.CreateSession(AcknowledgementMode.Transactional);
                IDestination destination = session.CreateTemporaryQueue();

                IMessageProducer producer = session.CreateProducer(destination);
                IMessageConsumer consumer = session.CreateConsumer(destination);
                CallbackClass cc = new CallbackClass(session);
                consumer.Listener += new MessageListener(cc.consumer_Listener);

                // Send the messages
                ITextMessage textMessage = session.CreateTextMessage("1st");
                textMessage.NMSTimeToLive = TimeSpan.FromMilliseconds(800.0);
                producer.Send(textMessage);
                session.Commit();

                // sends normal message, then immediate retry, then retry after 
500 ms, then expire.
                Thread.Sleep(2000);
                Assert.AreEqual(3, cc.numReceived);
            
            }
        }


        class CallbackClass
        {
            private ISession session;
            public int numReceived = 0;

            public CallbackClass(ISession session)
            {
                this.session = session;
            }


            public void consumer_Listener(IMessage message)
            {
                numReceived++;
                ITextMessage m = message as ITextMessage;
                Assert.IsNotNull(m);
                Assert.AreEqual("1st", m.Text);
                session.Rollback();
            }
        }



--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to