[ 
https://issues.apache.org/jira/browse/AMQNET-471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timothy Bish updated AMQNET-471:
--------------------------------

    Fix Version/s: 1.7.0
                   1.6.4

> Synchronous message consumer will lose a message that failed to commit whilst 
> the broker was unavailable
> --------------------------------------------------------------------------------------------------------
>
>                 Key: AMQNET-471
>                 URL: https://issues.apache.org/jira/browse/AMQNET-471
>             Project: ActiveMQ .Net
>          Issue Type: Bug
>          Components: ActiveMQ
>    Affects Versions: 1.6.2
>            Reporter: Imran
>            Assignee: Timothy Bish
>             Fix For: 1.6.4, 1.7.0
>
>         Attachments: TransactionContext.cs.patch
>
>
> If the broker is down then the client can not commit the current message. An 
> exception is thrown by the library. This is the behavior you would expect.
> If you then try and rollback the transaction on the session due to the 
> exception and resume message consumption, the rolled back message will never 
> be redelivered.
> {code:title=Failing Test|borderStyle=solid} 
>  [TestFixture, Explicit]
>     public class BrokerRestart
>     {
>         [Test]
>         public void 
> Message_should_be_redilivered_if_broker_is_down_and_try_commit()
>         {
>             StartService(ActiveMqMaster);
>             DeleteQueue();
>             SendMessageToQueue();
>             var session = 
> _connection.CreateSession(AcknowledgementMode.Transactional);
>             var consumer = 
> session.CreateConsumer(SessionUtil.GetDestination(session, InQ));
>             var message = consumer.Receive(TimeSpan.FromSeconds(30));
>             _log.Debug("Received message");
>             StopService(ActiveMqMaster);
>             _log.Debug("Commiting transaction");
>             try
>             {
>                 session.Commit();
>             }
>             catch (Exception ex)
>             {
>                 _log.ErrorFormat("Exception: {0}", ex.ToString().Substring(0, 
> 250));
>                 try
>                 {
>                     session.Rollback();
>                 }
>                 catch (Exception einner)
>                 {
>                     _log.Debug("Rollback transaction");
>                     _log.ErrorFormat("Exception: {0}", 
> einner.ToString().Substring(0, 250));
>                 }
>             }
>             StartService(ActiveMqMaster);
>             message = consumer.Receive(TimeSpan.FromSeconds(30));
>             Assert.That(message, Is.Not.Null, "message was not redilivered");
>         }
>         private void StartService(ServiceController service)
>         {
>             if (service.Status != ServiceControllerStatus.Running)
>                 service.Start();
>             service.WaitForStatus(ServiceControllerStatus.Running);
>             _log.Debug("Started Broker");
>         }
>         private void StopService(ServiceController service)
>         {
>             if (service.Status != ServiceControllerStatus.Stopped)
>                 service.Stop();
>             service.WaitForStatus(ServiceControllerStatus.Stopped);
>             _log.Debug("Stopped Broker Broker");
>         }
>         private void SendMessageToQueue()
>         {
>             using (var session = _connection.CreateSession())
>             using (var producer = 
> session.CreateProducer(SessionUtil.GetDestination(session, InQ)))
>             {
>                 producer.Send(producer.CreateTextMessage(1.ToString()));
>                 session.Commit();
>             }
>             _log.Debug("Primed Input Queue");
>         }
>         private void DeleteQueue()
>         {
>             using (var session = _connection.CreateSession())
>             {
>                 SessionUtil.DeleteDestination(session, InQ);
>             }
>         }
>         [SetUp]
>         public void TestSetup()
>         {
>             LogManager.Adapter = new 
> ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true, true, true, "HH:MM:ss");
>             _log = LogManager.GetLogger(typeof (BrokerRestart).Name);
>             var factory = new 
> ConnectionFactory(@"failover:(tcp://localhost:61616)")
>             {
>                 AcknowledgementMode = AcknowledgementMode.Transactional,
>                 RedeliveryPolicy = new RedeliveryPolicy { 
> InitialRedeliveryDelay = 0, MaximumRedeliveries = 3, BackOffMultiplier = 0, 
> UseExponentialBackOff = false },
>                 AsyncSend = false,
>                 PrefetchPolicy = new PrefetchPolicy {All = 5}
>             };
>             _connection = factory.CreateConnection();
>             _connection.Start();
>             //Tracer.Trace = new CommonLoggingTraceAdapter();
>         }
>         protected ServiceController ActiveMqMaster = new 
> ServiceController(@"ActiveMQ");
>         //protected ServiceController ActiveMqSlave = new 
> ServiceController(@"ActiveMQSlave");
>         private IConnection _connection;
>         private const string InQ = "integration-test-q";
>         private ILog _log;
>     }
> {code}
> {code:title=Passing Test With Patch|borderStyle=solid} 
> using System;
> using System.Configuration;
> using System.ServiceProcess;
> using System.Threading;
> using System.Threading.Tasks;
> using Apache.NMS;
> using Apache.NMS.ActiveMQ;
> using Apache.NMS.Policies;
> using Apache.NMS.Util;
> using Common.Logging;
> using Common.Logging.Simple;
> using NUnit.Framework;
> namespace IntegrationTests.ApacheNms.Jira
> {
>     [TestFixture]
>     public class BrokerRestart
>     {
>         //AMQNET-471
>         [Test]
>         public void 
> Message_should_be_redilivered_if_broker_is_down_and_try_to_commit()
>         {
>             var session = 
> _connection.CreateSession(AcknowledgementMode.Transactional);
>             var consumer = 
> session.CreateConsumer(SessionUtil.GetDestination(session, InQueue));
>             SendMessageToQueue();
>             consumer.Receive(TimeSpan.FromSeconds(5));
>             StopService(ActiveMqMaster);
>             var commiter = TryCommit(session);
>             StartService(ActiveMqMaster);
>             commiter.Wait();
>             var message = consumer.Receive(TimeSpan.FromSeconds(5));
>             TryCommit(session).Wait();
>             Assert.That(message, Is.Not.Null, "message was not redilivered");
>             Assert.That(CountMessagesInQueue(OutQueue), Is.EqualTo(0));
>         }
>         //Commit blocks if the broker is down with the patch for AMQNET-471
>         private Task TryCommit(ISession session)
>         {
>             var task = Task.Factory.StartNew(() =>
>             {
>                 try
>                 {
>                     session.Commit();
>                 }
>                 catch (Exception ex)
>                 {
>                     _log.ErrorFormat("Exception: {0}", 
> ex.ToString().Substring(0, 250));
>                     try
>                     {
>                         session.Rollback();
>                     }
>                     catch (Exception einner)
>                     {
>                         _log.Debug("Rollback transaction");
>                         _log.ErrorFormat("Exception: {0}", 
> einner.ToString().Substring(0, 250));
>                     }
>                 }
>             });
>             //Give it a chance to start.
>             Thread.Sleep(1000);
>             return task;
>         }
>         private int CountMessagesInQueue(string queue)
>         {
>             var count = 0;
>             using (var session = 
> _connection.CreateSession(AcknowledgementMode.Transactional))
>             using (var consumerIn = 
> session.CreateConsumer(SessionUtil.GetDestination(session, queue)))
>             {
>                 while (true)
>                 {
>                     var msg = consumerIn.Receive(TimeSpan.FromSeconds(2));
>                     if (msg == null)
>                         break;
>                     count++;
>                 }
>             }
>             return count;
>         }
>         private void StartService(ServiceController service)
>         {
>             if (service.Status != ServiceControllerStatus.Running || 
> service.Status == ServiceControllerStatus.StartPending)
>                 service.Start();
>             service.WaitForStatus(ServiceControllerStatus.Running);
>             _log.Debug("Started Broker");
>         }
>         private void StopService(ServiceController service)
>         {
>             if (service.Status != ServiceControllerStatus.Stopped)
>                 service.Stop();
>             service.WaitForStatus(ServiceControllerStatus.Stopped);
>             _log.Debug("Stopped Broker");
>         }
>         private void SendMessageToQueue()
>         {
>             using (var session = _connection.CreateSession())
>             using (var producer = 
> session.CreateProducer(SessionUtil.GetDestination(session, InQueue)))
>             {
>                 producer.Send(producer.CreateTextMessage(1.ToString()));
>                 session.Commit();
>             }
>             _log.Debug("Primed Input Queue");
>         }
>         private void DeleteQueue(string queue)
>         {
>             using (var session = _connection.CreateSession())
>             {
>                 SessionUtil.DeleteDestination(session, queue);
>             }
>         }
>         [SetUp]
>         public void TestSetup()
>         {
>             LogManager.Adapter = new 
> ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true, true, true, "HH:MM:ss");
>             _log = LogManager.GetLogger(typeof(BrokerRestart).Name);
>             StartService(ActiveMqMaster);
>             _factory = new ConnectionFactory(ActiveMqConnectionString)
>             {
>                 AcknowledgementMode = AcknowledgementMode.Transactional,
>                 RedeliveryPolicy = new RedeliveryPolicy { 
> InitialRedeliveryDelay = 0, MaximumRedeliveries = 3, BackOffMultiplier = 0, 
> UseExponentialBackOff = false },
>                 AsyncSend = false,
>                 PrefetchPolicy = new PrefetchPolicy { All = 5 }
>             };
>             _connection = _factory.CreateConnection();
>             _log.Debug("Starting connection");
>             _connection.Start();
>             _log.Debug("Connection established");
>             DeleteQueue(InQueue);
>             DeleteQueue(OutQueue);
>             //Tracer.Trace = new CommonLoggingTraceAdapter();
>         }
>         [TearDown]
>         public void TestTearDown()
>         {
>             _connection.Dispose();
>         }
>         protected ServiceController ActiveMqMaster = new 
> ServiceController(ActiveMasterServiceName, ActiveMqMachineName);
>         protected ServiceController ActiveMqSlave = new 
> ServiceController(ActiveMqSlaveServiceName, ActiveMqMachineName);
>         private static readonly string ActiveMqMachineName = 
> ConfigurationManager.AppSettings["ActiveMqServiceMachineName"];
>         private static readonly string ActiveMqConnectionString = 
> ConfigurationManager.ConnectionStrings["ActiveMqServer"].ConnectionString;
>         private static readonly string ActiveMasterServiceName = 
> ConfigurationManager.AppSettings["ActiveMqMasterName"];
>         private static readonly string ActiveMqSlaveServiceName = 
> ConfigurationManager.AppSettings["ActiveMqSlaveName"];
>         private IConnection _connection;
>         private const string InQueue = "integration-test-q-in";
>         private const string OutQueue = "integration-test-q-out";
>         private ILog _log;
>         private ConnectionFactory _factory;
>     }
> }
> {code}
> {code:title=Java Client|borderStyle=solid} 
> import static org.junit.Assert.*;
> import org.junit.Test;
> import org.apache.activemq.*;
> import javax.jms.*;
> import javax.jms.Message;
> public class BrokerRestart {
>       @Test
>       public void test() throws Exception {
>               String SERVICE_NAME = "ActiveMQ";
>               String[] stop = {"cmd.exe", "/c", "sc", "stop", SERVICE_NAME};
>               String[] start = {"cmd.exe", "/c", "sc", "start", SERVICE_NAME};
>               
>               Runtime.getRuntime().exec(start);
>               Thread.sleep(2000);
>               ActiveMQConnectionFactory connectionFactory = new 
> ActiveMQConnectionFactory("failover:(tcp://localhost:61616)");
>               SendMessage(connectionFactory);
>               
>         Connection connection = connectionFactory.createConnection();
>         connection.start();
>         Session session = connection.createSession(true, 
> Session.SESSION_TRANSACTED);
>         Destination destination = session.createQueue("TEST.FOO");
>         MessageConsumer consumer = session.createConsumer(destination);
>         Message message = consumer.receive(1000);
>               
>               Runtime.getRuntime().exec(stop);
>               Thread.sleep(2000);
>               try
>               {
>                       System.out.println("Committing transaction");
>                       //Looks like this blocks when the broker is down. If 
> you start the service manually here, the test will pass.
>                       session.commit();
>                       System.out.println("Committed transaction");
>               }
>               catch(Exception e)
>               {
>                       try
>                       {
>                               System.out.println("Transaction commit 
> exception: " + e.toString());
>                               session.rollback();
>                       }
>                       catch(Exception e2)
>                       {
>                               System.out.println("Transaction rollback 
> exception: " + e2.toString());
>                       }
>               }
>               Runtime.getRuntime().exec(start);
>               Thread.sleep(5000);
>         message = consumer.receive(1000);
>         session.commit();
>         consumer.close();
>         session.close();
>         connection.close();
>         
>         assertNotNull(message);
>       }
>       
>       private static void SendMessage(ActiveMQConnectionFactory 
> connectionFactory) throws Exception
>       {
>         Connection connection = connectionFactory.createConnection();
>         connection.start();
>         Session session = connection.createSession(false, 
> Session.AUTO_ACKNOWLEDGE);
>         Destination destination = session.createQueue("TEST.FOO");
>         MessageProducer producer = session.createProducer(destination);
>         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
>         String text = "Hello world! From: " + 
> Thread.currentThread().getName();
>         TextMessage message = session.createTextMessage(text);
>         producer.send(message);
>         session.close();
>         connection.close();
>       }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to