[ https://issues.apache.org/jira/browse/AMQNET-471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Timothy Bish updated AMQNET-471: -------------------------------- Fix Version/s: 1.7.0 1.6.4 > Synchronous message consumer will lose a message that failed to commit whilst > the broker was unavailable > -------------------------------------------------------------------------------------------------------- > > Key: AMQNET-471 > URL: https://issues.apache.org/jira/browse/AMQNET-471 > Project: ActiveMQ .Net > Issue Type: Bug > Components: ActiveMQ > Affects Versions: 1.6.2 > Reporter: Imran > Assignee: Timothy Bish > Fix For: 1.6.4, 1.7.0 > > Attachments: TransactionContext.cs.patch > > > If the broker is down then the client can not commit the current message. An > exception is thrown by the library. This is the behavior you would expect. > If you then try and rollback the transaction on the session due to the > exception and resume message consumption, the rolled back message will never > be redelivered. > {code:title=Failing Test|borderStyle=solid} > [TestFixture, Explicit] > public class BrokerRestart > { > [Test] > public void > Message_should_be_redilivered_if_broker_is_down_and_try_commit() > { > StartService(ActiveMqMaster); > DeleteQueue(); > SendMessageToQueue(); > var session = > _connection.CreateSession(AcknowledgementMode.Transactional); > var consumer = > session.CreateConsumer(SessionUtil.GetDestination(session, InQ)); > var message = consumer.Receive(TimeSpan.FromSeconds(30)); > _log.Debug("Received message"); > StopService(ActiveMqMaster); > _log.Debug("Commiting transaction"); > try > { > session.Commit(); > } > catch (Exception ex) > { > _log.ErrorFormat("Exception: {0}", ex.ToString().Substring(0, > 250)); > try > { > session.Rollback(); > } > catch (Exception einner) > { > _log.Debug("Rollback transaction"); > _log.ErrorFormat("Exception: {0}", > einner.ToString().Substring(0, 250)); > } > } > StartService(ActiveMqMaster); > message = consumer.Receive(TimeSpan.FromSeconds(30)); > Assert.That(message, Is.Not.Null, "message was not redilivered"); > } > private void StartService(ServiceController service) > { > if (service.Status != ServiceControllerStatus.Running) > service.Start(); > service.WaitForStatus(ServiceControllerStatus.Running); > _log.Debug("Started Broker"); > } > private void StopService(ServiceController service) > { > if (service.Status != ServiceControllerStatus.Stopped) > service.Stop(); > service.WaitForStatus(ServiceControllerStatus.Stopped); > _log.Debug("Stopped Broker Broker"); > } > private void SendMessageToQueue() > { > using (var session = _connection.CreateSession()) > using (var producer = > session.CreateProducer(SessionUtil.GetDestination(session, InQ))) > { > producer.Send(producer.CreateTextMessage(1.ToString())); > session.Commit(); > } > _log.Debug("Primed Input Queue"); > } > private void DeleteQueue() > { > using (var session = _connection.CreateSession()) > { > SessionUtil.DeleteDestination(session, InQ); > } > } > [SetUp] > public void TestSetup() > { > LogManager.Adapter = new > ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true, true, true, "HH:MM:ss"); > _log = LogManager.GetLogger(typeof (BrokerRestart).Name); > var factory = new > ConnectionFactory(@"failover:(tcp://localhost:61616)") > { > AcknowledgementMode = AcknowledgementMode.Transactional, > RedeliveryPolicy = new RedeliveryPolicy { > InitialRedeliveryDelay = 0, MaximumRedeliveries = 3, BackOffMultiplier = 0, > UseExponentialBackOff = false }, > AsyncSend = false, > PrefetchPolicy = new PrefetchPolicy {All = 5} > }; > _connection = factory.CreateConnection(); > _connection.Start(); > //Tracer.Trace = new CommonLoggingTraceAdapter(); > } > protected ServiceController ActiveMqMaster = new > ServiceController(@"ActiveMQ"); > //protected ServiceController ActiveMqSlave = new > ServiceController(@"ActiveMQSlave"); > private IConnection _connection; > private const string InQ = "integration-test-q"; > private ILog _log; > } > {code} > {code:title=Passing Test With Patch|borderStyle=solid} > using System; > using System.Configuration; > using System.ServiceProcess; > using System.Threading; > using System.Threading.Tasks; > using Apache.NMS; > using Apache.NMS.ActiveMQ; > using Apache.NMS.Policies; > using Apache.NMS.Util; > using Common.Logging; > using Common.Logging.Simple; > using NUnit.Framework; > namespace IntegrationTests.ApacheNms.Jira > { > [TestFixture] > public class BrokerRestart > { > //AMQNET-471 > [Test] > public void > Message_should_be_redilivered_if_broker_is_down_and_try_to_commit() > { > var session = > _connection.CreateSession(AcknowledgementMode.Transactional); > var consumer = > session.CreateConsumer(SessionUtil.GetDestination(session, InQueue)); > SendMessageToQueue(); > consumer.Receive(TimeSpan.FromSeconds(5)); > StopService(ActiveMqMaster); > var commiter = TryCommit(session); > StartService(ActiveMqMaster); > commiter.Wait(); > var message = consumer.Receive(TimeSpan.FromSeconds(5)); > TryCommit(session).Wait(); > Assert.That(message, Is.Not.Null, "message was not redilivered"); > Assert.That(CountMessagesInQueue(OutQueue), Is.EqualTo(0)); > } > //Commit blocks if the broker is down with the patch for AMQNET-471 > private Task TryCommit(ISession session) > { > var task = Task.Factory.StartNew(() => > { > try > { > session.Commit(); > } > catch (Exception ex) > { > _log.ErrorFormat("Exception: {0}", > ex.ToString().Substring(0, 250)); > try > { > session.Rollback(); > } > catch (Exception einner) > { > _log.Debug("Rollback transaction"); > _log.ErrorFormat("Exception: {0}", > einner.ToString().Substring(0, 250)); > } > } > }); > //Give it a chance to start. > Thread.Sleep(1000); > return task; > } > private int CountMessagesInQueue(string queue) > { > var count = 0; > using (var session = > _connection.CreateSession(AcknowledgementMode.Transactional)) > using (var consumerIn = > session.CreateConsumer(SessionUtil.GetDestination(session, queue))) > { > while (true) > { > var msg = consumerIn.Receive(TimeSpan.FromSeconds(2)); > if (msg == null) > break; > count++; > } > } > return count; > } > private void StartService(ServiceController service) > { > if (service.Status != ServiceControllerStatus.Running || > service.Status == ServiceControllerStatus.StartPending) > service.Start(); > service.WaitForStatus(ServiceControllerStatus.Running); > _log.Debug("Started Broker"); > } > private void StopService(ServiceController service) > { > if (service.Status != ServiceControllerStatus.Stopped) > service.Stop(); > service.WaitForStatus(ServiceControllerStatus.Stopped); > _log.Debug("Stopped Broker"); > } > private void SendMessageToQueue() > { > using (var session = _connection.CreateSession()) > using (var producer = > session.CreateProducer(SessionUtil.GetDestination(session, InQueue))) > { > producer.Send(producer.CreateTextMessage(1.ToString())); > session.Commit(); > } > _log.Debug("Primed Input Queue"); > } > private void DeleteQueue(string queue) > { > using (var session = _connection.CreateSession()) > { > SessionUtil.DeleteDestination(session, queue); > } > } > [SetUp] > public void TestSetup() > { > LogManager.Adapter = new > ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true, true, true, "HH:MM:ss"); > _log = LogManager.GetLogger(typeof(BrokerRestart).Name); > StartService(ActiveMqMaster); > _factory = new ConnectionFactory(ActiveMqConnectionString) > { > AcknowledgementMode = AcknowledgementMode.Transactional, > RedeliveryPolicy = new RedeliveryPolicy { > InitialRedeliveryDelay = 0, MaximumRedeliveries = 3, BackOffMultiplier = 0, > UseExponentialBackOff = false }, > AsyncSend = false, > PrefetchPolicy = new PrefetchPolicy { All = 5 } > }; > _connection = _factory.CreateConnection(); > _log.Debug("Starting connection"); > _connection.Start(); > _log.Debug("Connection established"); > DeleteQueue(InQueue); > DeleteQueue(OutQueue); > //Tracer.Trace = new CommonLoggingTraceAdapter(); > } > [TearDown] > public void TestTearDown() > { > _connection.Dispose(); > } > protected ServiceController ActiveMqMaster = new > ServiceController(ActiveMasterServiceName, ActiveMqMachineName); > protected ServiceController ActiveMqSlave = new > ServiceController(ActiveMqSlaveServiceName, ActiveMqMachineName); > private static readonly string ActiveMqMachineName = > ConfigurationManager.AppSettings["ActiveMqServiceMachineName"]; > private static readonly string ActiveMqConnectionString = > ConfigurationManager.ConnectionStrings["ActiveMqServer"].ConnectionString; > private static readonly string ActiveMasterServiceName = > ConfigurationManager.AppSettings["ActiveMqMasterName"]; > private static readonly string ActiveMqSlaveServiceName = > ConfigurationManager.AppSettings["ActiveMqSlaveName"]; > private IConnection _connection; > private const string InQueue = "integration-test-q-in"; > private const string OutQueue = "integration-test-q-out"; > private ILog _log; > private ConnectionFactory _factory; > } > } > {code} > {code:title=Java Client|borderStyle=solid} > import static org.junit.Assert.*; > import org.junit.Test; > import org.apache.activemq.*; > import javax.jms.*; > import javax.jms.Message; > public class BrokerRestart { > @Test > public void test() throws Exception { > String SERVICE_NAME = "ActiveMQ"; > String[] stop = {"cmd.exe", "/c", "sc", "stop", SERVICE_NAME}; > String[] start = {"cmd.exe", "/c", "sc", "start", SERVICE_NAME}; > > Runtime.getRuntime().exec(start); > Thread.sleep(2000); > ActiveMQConnectionFactory connectionFactory = new > ActiveMQConnectionFactory("failover:(tcp://localhost:61616)"); > SendMessage(connectionFactory); > > Connection connection = connectionFactory.createConnection(); > connection.start(); > Session session = connection.createSession(true, > Session.SESSION_TRANSACTED); > Destination destination = session.createQueue("TEST.FOO"); > MessageConsumer consumer = session.createConsumer(destination); > Message message = consumer.receive(1000); > > Runtime.getRuntime().exec(stop); > Thread.sleep(2000); > try > { > System.out.println("Committing transaction"); > //Looks like this blocks when the broker is down. If > you start the service manually here, the test will pass. > session.commit(); > System.out.println("Committed transaction"); > } > catch(Exception e) > { > try > { > System.out.println("Transaction commit > exception: " + e.toString()); > session.rollback(); > } > catch(Exception e2) > { > System.out.println("Transaction rollback > exception: " + e2.toString()); > } > } > Runtime.getRuntime().exec(start); > Thread.sleep(5000); > message = consumer.receive(1000); > session.commit(); > consumer.close(); > session.close(); > connection.close(); > > assertNotNull(message); > } > > private static void SendMessage(ActiveMQConnectionFactory > connectionFactory) throws Exception > { > Connection connection = connectionFactory.createConnection(); > connection.start(); > Session session = connection.createSession(false, > Session.AUTO_ACKNOWLEDGE); > Destination destination = session.createQueue("TEST.FOO"); > MessageProducer producer = session.createProducer(destination); > producer.setDeliveryMode(DeliveryMode.PERSISTENT); > String text = "Hello world! From: " + > Thread.currentThread().getName(); > TextMessage message = session.createTextMessage(text); > producer.send(message); > session.close(); > connection.close(); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.2#6252)