[ https://issues.apache.org/jira/browse/AMQNET-412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Remo Gloor updated AMQNET-412: ------------------------------ Patch Info: Patch Available > Messages are enlisted to the wrong transaction > ---------------------------------------------- > > Key: AMQNET-412 > URL: https://issues.apache.org/jira/browse/AMQNET-412 > Project: ActiveMQ .Net > Issue Type: Bug > Environment: Apache.NMS.ActiveMq 1.5.7 > Reporter: Remo Gloor > Assignee: Jim Gomes > Priority: Critical > Attachments: allDTCImprovments.patch, > MessagesAreEnlistedToTheWrongTransaction.patch > > > Under load active mq enlists a message to a previous transactions. This leads > to very strange behaviors: > - Database is updated and message is rolled back > - Message is completed but database rolledback > All this results in an invalid system state making. DTC is not usable this > way. > Analysis of the source code have shown that the problem is in > NetTxSession.DoStartTransaction There it checks if a .NET Transaction in the > TransactionContext. In this case it adds the message to that transaction. But > this can be the previous transaction because DTC 2-PhaseCommit is > asyncronous. It needs to check if the Current Transaction is the same as one > before and wait if they do not match. > The following applacation demonstrates the problem when enough messages are > processed E.g. enqueue 100 msg in foo.bar. It is basically > TestRedeliveredCase3 but with half of the messages failing. > Whenever a SinglePhaseCommit occurs in the TestSinglePhaseCommit this means > the database would be commited in an own transaction. > class Program > { > private static INetTxSession activeMqSession; > private static IMessageConsumer consumer; > private static INetTxConnection connection; > static void Main(string[] args) > { > using (connection = CreateActiveMqConnection()) > using (activeMqSession = connection.CreateNetTxSession()) > using (consumer = > activeMqSession.CreateConsumer(SessionUtil.GetQueue(activeMqSession, > "queue://foo.bar"))) > { > connection.Start(); > while (true) > { > try > { > using (TransactionScope scoped = new > TransactionScope(TransactionScopeOption.RequiresNew)) > { > IMessage msg = null; > while (msg == null) > { > msg = consumer.ReceiveNoWait(); > } > OnMessage(msg); > scoped.Complete(); > } > } > catch(Exception exception) {} > } > } > } > private static INetTxConnection CreateActiveMqConnection() > { > var connectionFactory = new > Apache.NMS.ActiveMQ.NetTxConnectionFactory("activemq:tcp://localhost:61616") > { > AcknowledgementMode = AcknowledgementMode.Transactional > }; > return connectionFactory.CreateNetTxConnection(); > } > private static void OnMessage(IMessage message) > { > var x = new TestSinglePhaseCommit(); > var session2 = activeMqSession; > { > Transaction.Current.EnlistDurable(Guid.NewGuid(), x, > EnlistmentOptions.None); > using (var producer = > session2.CreateProducer(SessionUtil.GetQueue(session2, "queue://foo.baz"))) > { > producer.Send(new ActiveMQTextMessage("foo")); > } > if (new Random().Next(2) == 0) throw new Exception(); > } > } > } > internal class TestSinglePhaseCommit : ISinglePhaseNotification > { > public void Prepare(PreparingEnlistment preparingEnlistment) > { > Console.WriteLine("Tx Prepare"); > preparingEnlistment.Prepared(); > } > public void Commit(Enlistment enlistment) > { > Console.WriteLine("Tx Commit"); > enlistment.Done(); > } > public void Rollback(Enlistment enlistment) > { > Console.WriteLine("Tx Rollback"); > enlistment.Done(); > } > public void InDoubt(Enlistment enlistment) > { > Console.WriteLine("Tx InDoubt"); > enlistment.Done(); > } > public void SinglePhaseCommit(SinglePhaseEnlistment > singlePhaseEnlistment) > { > Console.WriteLine("Tx SinglePhaseCommit"); > singlePhaseEnlistment.Committed(); > } > } -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira