[ 
https://issues.apache.org/jira/browse/AMQNET-413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13847888#comment-13847888
 ] 

Timothy Bish commented on AMQNET-413:
-------------------------------------

Test in patch seems to rely on methods that don't exist in the 
NetTXMessageConsumer.  Issue name mentions producers but patch seems related to 
consumers so not clear here.  

> Message producers do not respect DTC Transactions correctly
> -----------------------------------------------------------
>
>                 Key: AMQNET-413
>                 URL: https://issues.apache.org/jira/browse/AMQNET-413
>             Project: ActiveMQ .Net
>          Issue Type: Bug
>          Components: ActiveMQ
>            Reporter: Remo Gloor
>            Assignee: Jim Gomes
>         Attachments: AMQNET-413.patch, 
> AllMessagesAreAcknowledgedAndRolledbackIndependentOfTheTransaction.patch, 
> AllMessagesAreAcknowledgedAndRolledbackIndependentOfTheTransaction.patch, 
> allDTCImprovments.patch
>
>
> When consuming messages in a transaction and sending new ones during 
> processing of that message and the transaction is rolled back and commited on 
> retry the number of published messages should be equal to the received one.
> But the number of sent message is bigger than the number of received ones. 
> This means some of the message sends are not rolled back others are.
> EDIT: Further analysis have shown that the TransactionContext.TransactionId 
> is null when sending eventhough a transaction is in progress and not yet 
> completed. It must incorrectly be assigned to null somewhere.
> The following application demonstrates the problem when enqueuing 100+ 
> messages to foo.bar
>     class Program
>     {
>         private static INetTxSession activeMqSession;
>         private static IMessageConsumer consumer;
>         private static INetTxConnection connection;
>         static void Main(string[] args)
>         {
>             using (connection = CreateActiveMqConnection())
>             using (activeMqSession = connection.CreateNetTxSession())
>             using (consumer = 
> activeMqSession.CreateConsumer(SessionUtil.GetQueue(activeMqSession, 
> "queue://foo.bar")))
>             {
>                 connection.Start();
>                 while (true)
>                 {
>                     try
>                     {
>                         using (TransactionScope scoped = new 
> TransactionScope(TransactionScopeOption.RequiresNew))
>                         {
>                             IMessage msg = null;
>                             while (msg == null)
>                             {
>                                 msg = consumer.ReceiveNoWait();
>                             }
>                             OnMessage(msg);
>                             scoped.Complete();
>                         }
>                     }
>                     catch(Exception exception) {}
>                 }
>             }
>         }
>         private static INetTxConnection CreateActiveMqConnection()
>         {
>             var connectionFactory = new 
> Apache.NMS.ActiveMQ.NetTxConnectionFactory("activemq:tcp://localhost:61616")
>             {
>                 AcknowledgementMode = AcknowledgementMode.Transactional
>             };
>             return connectionFactory.CreateNetTxConnection();
>         }
>         private static void OnMessage(IMessage message)
>         {
>             var x = new TestSinglePhaseCommit();
>             Console.WriteLine("Processing message {0} in transaction {1} - 
> {2}", message.NMSMessageId, 
> Transaction.Current.TransactionInformation.LocalIdentifier, 
> Transaction.Current.TransactionInformation.DistributedIdentifier);
>             var session2 = activeMqSession;
>             {
>                 Transaction.Current.EnlistDurable(Guid.NewGuid(), x, 
> EnlistmentOptions.None);
>                 using (var producer = 
> session2.CreateProducer(SessionUtil.GetQueue(session2, "queue://foo.baz")))
>                 {
>                     producer.Send(new ActiveMQTextMessage("foo"));
>                 }
>                 if (!message.NMSRedelivered) throw new Exception();
>             }
>         }
>     }
>     internal class TestSinglePhaseCommit : ISinglePhaseNotification
>     {
>         public void Prepare(PreparingEnlistment preparingEnlistment)
>         {
>             preparingEnlistment.Prepared();
>         }
>         public void Commit(Enlistment enlistment)
>         {
>             enlistment.Done();
>         }
>         public void Rollback(Enlistment enlistment)
>         {
>             enlistment.Done();
>         }
>         public void InDoubt(Enlistment enlistment)
>         {
>             enlistment.Done();
>         }
>         public void SinglePhaseCommit(SinglePhaseEnlistment 
> singlePhaseEnlistment)
>         {
>             singlePhaseEnlistment.Committed();
>         }
>     }



--
This message was sent by Atlassian JIRA
(v6.1.4#6159)

Reply via email to