[ https://issues.apache.org/jira/browse/AMQNET-413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13815433#comment-13815433 ]
Remo Gloor commented on AMQNET-413: ----------------------------------- I can understand that you don't fix this for 1.6.x. But what is the problem to apply that on the trunk? > Message producers do not respect DTC Transactions correctly > ----------------------------------------------------------- > > Key: AMQNET-413 > URL: https://issues.apache.org/jira/browse/AMQNET-413 > Project: ActiveMQ .Net > Issue Type: Bug > Components: ActiveMQ > Reporter: Remo Gloor > Assignee: Jim Gomes > Attachments: AMQNET-413.patch, > AllMessagesAreAcknowledgedAndRolledbackIndependentOfTheTransaction.patch, > AllMessagesAreAcknowledgedAndRolledbackIndependentOfTheTransaction.patch, > allDTCImprovments.patch > > > When consuming messages in a transaction and sending new ones during > processing of that message and the transaction is rolled back and commited on > retry the number of published messages should be equal to the received one. > But the number of sent message is bigger than the number of received ones. > This means some of the message sends are not rolled back others are. > EDIT: Further analysis have shown that the TransactionContext.TransactionId > is null when sending eventhough a transaction is in progress and not yet > completed. It must incorrectly be assigned to null somewhere. > The following application demonstrates the problem when enqueuing 100+ > messages to foo.bar > class Program > { > private static INetTxSession activeMqSession; > private static IMessageConsumer consumer; > private static INetTxConnection connection; > static void Main(string[] args) > { > using (connection = CreateActiveMqConnection()) > using (activeMqSession = connection.CreateNetTxSession()) > using (consumer = > activeMqSession.CreateConsumer(SessionUtil.GetQueue(activeMqSession, > "queue://foo.bar"))) > { > connection.Start(); > while (true) > { > try > { > using (TransactionScope scoped = new > TransactionScope(TransactionScopeOption.RequiresNew)) > { > IMessage msg = null; > while (msg == null) > { > msg = consumer.ReceiveNoWait(); > } > OnMessage(msg); > scoped.Complete(); > } > } > catch(Exception exception) {} > } > } > } > private static INetTxConnection CreateActiveMqConnection() > { > var connectionFactory = new > Apache.NMS.ActiveMQ.NetTxConnectionFactory("activemq:tcp://localhost:61616") > { > AcknowledgementMode = AcknowledgementMode.Transactional > }; > return connectionFactory.CreateNetTxConnection(); > } > private static void OnMessage(IMessage message) > { > var x = new TestSinglePhaseCommit(); > Console.WriteLine("Processing message {0} in transaction {1} - > {2}", message.NMSMessageId, > Transaction.Current.TransactionInformation.LocalIdentifier, > Transaction.Current.TransactionInformation.DistributedIdentifier); > var session2 = activeMqSession; > { > Transaction.Current.EnlistDurable(Guid.NewGuid(), x, > EnlistmentOptions.None); > using (var producer = > session2.CreateProducer(SessionUtil.GetQueue(session2, "queue://foo.baz"))) > { > producer.Send(new ActiveMQTextMessage("foo")); > } > if (!message.NMSRedelivered) throw new Exception(); > } > } > } > internal class TestSinglePhaseCommit : ISinglePhaseNotification > { > public void Prepare(PreparingEnlistment preparingEnlistment) > { > preparingEnlistment.Prepared(); > } > public void Commit(Enlistment enlistment) > { > enlistment.Done(); > } > public void Rollback(Enlistment enlistment) > { > enlistment.Done(); > } > public void InDoubt(Enlistment enlistment) > { > enlistment.Done(); > } > public void SinglePhaseCommit(SinglePhaseEnlistment > singlePhaseEnlistment) > { > singlePhaseEnlistment.Committed(); > } > } -- This message was sent by Atlassian JIRA (v6.1#6144)