[ 
https://issues.apache.org/jira/browse/AMQNET-472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13933361#comment-13933361
 ] 

Imran commented on AMQNET-472:
------------------------------

I think I have found the issue here. Suggested patch is attached. It looks like 
there were some lock conditions missing. Since all dtc operations are completed 
asynchronously, the main processing thread could possibly have moved on to 
consuming a new message in a new transaction even before the previous 
transaction has committed or rolled back in this case. The message consumer 
needs to check if the next message being consumed is part of the same 
transaction or a new transaction and dispatch / block respectively. Messages 
that are part of the same transactions will be rolled back or committed 
together and trying to consume a message in a new transaction will block until 
the last transaction has completed.

> Synchronous DTC Consumer will experience duplicates on transaction rollback
> ---------------------------------------------------------------------------
>
>                 Key: AMQNET-472
>                 URL: https://issues.apache.org/jira/browse/AMQNET-472
>             Project: ActiveMQ .Net
>          Issue Type: Bug
>          Components: ActiveMQ
>    Affects Versions: 1.6.2
>            Reporter: Imran
>            Assignee: Jim Gomes
>         Attachments: NetTxMessageConsumer.cs
>
>
> Rollback when using DTC will result in a duplicate message being received.
> {code:title=FailingTest|borderStyle=solid} 
> using System;
> using System.Configuration;
> using System.ServiceProcess;
> using System.Transactions;
> using Apache.NMS;
> using Apache.NMS.ActiveMQ;
> using Apache.NMS.Policies;
> using Apache.NMS.Util;
> using Common.Logging;
> using Common.Logging.Simple;
> using NUnit.Framework;
> namespace IntegrationTests.ApacheNms.Jira
> {
>     [TestFixture]
>     public class Dtc
>     {
>         [Test, Explicit("Bug in 1.6.2")]
>         public void 
> First_message_should_be_redilivered_and_republished_on_rollback_and_second_message_processed_as_normal()
>         {
>             SendMessageToQueue("1");
>             SendMessageToQueue("2");
>             var session = _connection.CreateSession();
>             var sessionTwo = _connection.CreateSession();
>             var consumer = 
> session.CreateConsumer(SessionUtil.GetDestination(session, InQueue));
>             var producer = 
> sessionTwo.CreateProducer(SessionUtil.GetDestination(session, OutQueue));
>             _log.Debug("Process message one and rollback");
>             var transaction = new 
> TransactionScope(TransactionScopeOption.RequiresNew);
>             var message = consumer.Receive(TimeSpan.FromSeconds(30));
>             producer.Send(message);
>             transaction.Dispose();
>             _log.Debug("Processing message two and commit");
>             transaction = new 
> TransactionScope(TransactionScopeOption.RequiresNew);
>             message = consumer.Receive(TimeSpan.FromSeconds(30));
>             producer.Send(message);
>             transaction.Complete();
>             transaction.Dispose();
>             _log.Debug("Processing message one replay and commit");
>             transaction = new 
> TransactionScope(TransactionScopeOption.RequiresNew);
>             message = consumer.Receive(TimeSpan.FromSeconds(30));
>             producer.Send(message);
>             transaction.Complete();
>             transaction.Dispose();
>             _log.Debug("Process any repeats, there should be none");
>             transaction = new 
> TransactionScope(TransactionScopeOption.RequiresNew);
>             message = consumer.Receive(TimeSpan.FromSeconds(30));
>             if (message != null)
>                 producer.Send(message);
>             transaction.Complete();
>             transaction.Dispose();
>             session.Dispose();
>             sessionTwo.Dispose();
>             Assert.That(CountMessagesInQueue(InQueue), Is.EqualTo(0));
>             Assert.That(CountMessagesInQueue(OutQueue), Is.EqualTo(2));
>         }
>         public static void TransactionCallback(object s, TransactionEventArgs 
> e)
>         {
>             LogManager.GetCurrentClassLogger().DebugFormat("Tranasaction  
> {0}", e.Transaction.TransactionInformation.Status);
>         }
>         private int CountMessagesInQueue(string queue)
>         {
>             var count = 0;
>             using (var session = _connection.CreateSession())
>             using (var consumerIn = 
> session.CreateConsumer(SessionUtil.GetDestination(session, queue)))
>             using (var scope = new 
> TransactionScope(TransactionScopeOption.RequiresNew))
>             {
>                 while (true)
>                 {
>                     var msg = consumerIn.Receive(TimeSpan.FromSeconds(2));
>                     if (msg == null)
>                         break;
>                     count++;
>                 }
>             }
>             return count;
>         }
>         private void StartService(ServiceController service)
>         {
>             if (service.Status != ServiceControllerStatus.Running)
>                 service.Start();
>             service.WaitForStatus(ServiceControllerStatus.Running);
>             _log.Debug("Started Broker");
>         }
>         private void StopService(ServiceController service)
>         {
>             if (service.Status != ServiceControllerStatus.Stopped)
>                 service.Stop();
>             service.WaitForStatus(ServiceControllerStatus.Stopped);
>             _log.Debug("Stopped Broker Broker");
>         }
>         private void SendMessageToQueue(string message)
>         {
>             using (var session = _connection.CreateSession())
>             using (var producer = 
> session.CreateProducer(SessionUtil.GetDestination(session, InQueue)))
>             using (var scope = new 
> TransactionScope(TransactionScopeOption.RequiresNew))
>             {
>                 producer.Send(producer.CreateTextMessage(message));
>                 scope.Complete();
>             }
>             _log.Debug("Primed Input Queue");
>         }
>         private void DeleteQueue(string queue)
>         {
>             using (var session = _connection.CreateSession())
>             {
>                 SessionUtil.DeleteDestination(session, queue);
>             }
>         }
>         [SetUp]
>         public void TestSetup()
>         {
>             LogManager.Adapter = new 
> ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true, true, true, "HH:MM:ss");
>             _log = LogManager.GetLogger(typeof(Dtc).Name);
>             StartService(ActiveMqMaster);
>             StopService(ActiveMqSlave);
>             _factory = new NetTxConnectionFactory(ActiveMqConnectionString)
>             {
>                 AcknowledgementMode = AcknowledgementMode.Transactional,
>                 RedeliveryPolicy = new RedeliveryPolicy { 
> InitialRedeliveryDelay = 0, MaximumRedeliveries = 3, BackOffMultiplier = 0, 
> UseExponentialBackOff = false },
>                 DispatchAsync = true,
>                 AsyncSend = false,
>                 PrefetchPolicy = new PrefetchPolicy { All = 5 },
>             };
>             _connection = _factory.CreateConnection();
>             _log.Debug("Starting connection");
>             _connection.Start();
>             _log.Debug("Connection established");
>             DeleteQueue(InQueue);
>             DeleteQueue(OutQueue);
>             //Tracer.Trace = new CommonLoggingTraceAdapter();
>         }
>         [TearDown]
>         public void TestTearDown()
>         {
>             _connection.Dispose();
>         }
>         protected ServiceController ActiveMqMaster = new 
> ServiceController(ActiveMasterServiceName, ActiveMqMachineName);
>         protected ServiceController ActiveMqSlave = new 
> ServiceController(ActiveMqSlaveServiceName, ActiveMqMachineName);
>         private static readonly string ActiveMqMachineName = 
> ConfigurationManager.AppSettings["ActiveMqServiceMachineName"];
>         private static readonly string ActiveMqConnectionString = 
> ConfigurationManager.ConnectionStrings["ActiveMqServer"].ConnectionString;
>         private static readonly string ActiveMasterServiceName = 
> ConfigurationManager.AppSettings["ActiveMqMasterName"];
>         private static readonly string ActiveMqSlaveServiceName = 
> ConfigurationManager.AppSettings["ActiveMqSlaveName"];
>         private IConnection _connection;
>         private const string InQueue = "integration-test-q-in";
>         private const string OutQueue = "integration-test-q-out";
>         private ILog _log;
>         private NetTxConnectionFactory _factory;
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to