This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton-dotnet.git


The following commit(s) were added to refs/heads/main by this push:
     new c8c0630  PROTON-2638 AwaitAccepted API should work for transaction 
settlement
c8c0630 is described below

commit c8c0630100a163c03e4b4340816b2920c1761581
Author: Timothy Bish <tabish...@gmail.com>
AuthorDate: Fri Oct 28 16:51:22 2022 -0400

    PROTON-2638 AwaitAccepted API should work for transaction settlement
    
    Check in the transactional state if settlement outcome is accepted and then
    answer that the delivery state is accepted.
---
 .../Client/Implementation/ClientDeliveryState.cs   |   4 +-
 .../ClientReconnectTransactionTest.cs              | 240 +++++++++++++++++++++
 .../Implementation/ClilentTransactionsTest.cs      |  58 +++++
 3 files changed, 301 insertions(+), 1 deletion(-)

diff --git a/src/Proton.Client/Client/Implementation/ClientDeliveryState.cs 
b/src/Proton.Client/Client/Implementation/ClientDeliveryState.cs
index 8df2ce7..593a54f 100644
--- a/src/Proton.Client/Client/Implementation/ClientDeliveryState.cs
+++ b/src/Proton.Client/Client/Implementation/ClientDeliveryState.cs
@@ -28,7 +28,7 @@ namespace Apache.Qpid.Proton.Client.Implementation
    /// </summary>
    public abstract class ClientDeliveryState : IDeliveryState
    {
-      public bool IsAccepted => Type == DeliveryStateType.Accepted;
+      public virtual bool IsAccepted => Type == DeliveryStateType.Accepted;
 
       public abstract DeliveryStateType Type { get; }
 
@@ -166,6 +166,8 @@ namespace Apache.Qpid.Proton.Client.Implementation
          this.txnState.TxnId = txnState.TxnId.Copy();
       }
 
+      public override bool IsAccepted => txnState.Outcome is Accepted;
+
       public override DeliveryStateType Type => 
DeliveryStateType.Transactional;
 
       public override Types.Transport.IDeliveryState ProtonDeliveryState => 
txnState;
diff --git 
a/test/Proton.Client.Tests/Client/Implementation/ClientReconnectTransactionTest.cs
 
b/test/Proton.Client.Tests/Client/Implementation/ClientReconnectTransactionTest.cs
index f864aa0..0c95e04 100644
--- 
a/test/Proton.Client.Tests/Client/Implementation/ClientReconnectTransactionTest.cs
+++ 
b/test/Proton.Client.Tests/Client/Implementation/ClientReconnectTransactionTest.cs
@@ -17,6 +17,7 @@
 
 using System.Threading;
 using Apache.Qpid.Proton.Client.Exceptions;
+using Apache.Qpid.Proton.Client.TestSupport;
 using Apache.Qpid.Proton.Test.Driver;
 using Microsoft.Extensions.Logging;
 using NUnit.Framework;
@@ -84,5 +85,244 @@ namespace Apache.Qpid.Proton.Client.Implementation
             secondPeer.WaitForScriptToComplete();
          }
       }
+
+      [Test]
+      public void TestTransactionInDoubtAfterReconnect()
+      {
+         byte[] txnId = new byte[] { 0, 1, 2, 3 };
+
+         using (ProtonTestServer firstPeer = new 
ProtonTestServer(loggerFactory))
+         using (ProtonTestServer secondPeer = new 
ProtonTestServer(loggerFactory))
+         {
+            firstPeer.ExpectSASLAnonymousConnect();
+            firstPeer.ExpectOpen().Respond();
+            firstPeer.ExpectBegin().Respond();
+            firstPeer.ExpectCoordinatorAttach().Respond();
+            firstPeer.RemoteFlow().WithLinkCredit(2).Queue();
+            firstPeer.ExpectDeclare().Accept(txnId);
+            
firstPeer.ExpectAttach().OfSender().WithTarget().WithAddress("test").And().Respond();
+            firstPeer.RemoteFlow().WithLinkCredit(1).Queue();
+            firstPeer.ExpectTransfer().WithNonNullPayload();
+            firstPeer.DropAfterLastHandler();
+            firstPeer.Start();
+
+            secondPeer.ExpectSASLAnonymousConnect();
+            secondPeer.ExpectOpen().Respond();
+            secondPeer.ExpectBegin().Respond();
+            
secondPeer.ExpectAttach().OfSender().WithTarget().WithAddress("test").And().Respond();
+            secondPeer.Start();
+
+            string primaryAddress = firstPeer.ServerAddress;
+            int primaryPort = firstPeer.ServerPort;
+            string backupAddress = secondPeer.ServerAddress;
+            int backupPort = secondPeer.ServerPort;
+
+            logger.LogInformation("Test started, first peer listening on: 
{0}:{1}", primaryAddress, primaryPort);
+            logger.LogInformation("Test started, backup peer listening on: 
{0}:{1}", backupAddress, backupPort);
+
+            ConnectionOptions options = new ConnectionOptions();
+            options.ReconnectOptions.ReconnectEnabled = true;
+            options.ReconnectOptions.AddReconnectLocation(backupAddress, 
backupPort);
+
+            IClient container = IClient.Create();
+            IConnection connection = container.Connect(primaryAddress, 
primaryPort, options);
+            ISession session = connection.OpenSession().OpenTask.Result;
+
+            session.BeginTransaction();
+
+            ISender sender = session.OpenSender("test").OpenTask.Result;
+            sender.Send(IMessage<string>.Create("Hello"));
+
+            firstPeer.WaitForScriptToComplete();
+
+            secondPeer.WaitForScriptToComplete();
+            secondPeer.ExpectClose().Respond();
+
+            try
+            {
+               session.CommitTransaction();
+               Assert.Fail("Should have failed to declare transaction");
+            }
+            catch (ClientTransactionRolledBackException cliEx)
+            {
+               logger.LogInformation("Caught expected error from test", cliEx);
+            }
+
+            connection.Close();
+
+            secondPeer.WaitForScriptToComplete();
+         }
+      }
+
+      [Test]
+      public void TestSendInTransactionIsNoOpAfterReconnect()
+      {
+         byte[] txnId = new byte[] { 0, 1, 2, 3 };
+
+         using (ProtonTestServer firstPeer = new 
ProtonTestServer(loggerFactory))
+         using (ProtonTestServer secondPeer = new 
ProtonTestServer(loggerFactory))
+         {
+            firstPeer.ExpectSASLAnonymousConnect();
+            firstPeer.ExpectOpen().Respond();
+            firstPeer.ExpectBegin().Respond();
+            firstPeer.ExpectCoordinatorAttach().Respond();
+            firstPeer.RemoteFlow().WithLinkCredit(2).Queue();
+            firstPeer.ExpectDeclare().Accept(txnId);
+            
firstPeer.ExpectAttach().OfSender().WithTarget().WithAddress("test").And().Respond();
+            firstPeer.RemoteFlow().WithLinkCredit(1).Queue();
+            firstPeer.ExpectTransfer().WithNonNullPayload();
+            firstPeer.DropAfterLastHandler();
+            firstPeer.Start();
+
+            secondPeer.ExpectSASLAnonymousConnect();
+            secondPeer.ExpectOpen().Respond();
+            secondPeer.ExpectBegin().Respond();
+            
secondPeer.ExpectAttach().OfSender().WithTarget().WithAddress("test").And().Respond();
+            secondPeer.RemoteFlow().WithLinkCredit(1).Queue();
+            secondPeer.Start();
+
+            string primaryAddress = firstPeer.ServerAddress;
+            int primaryPort = firstPeer.ServerPort;
+            string backupAddress = secondPeer.ServerAddress;
+            int backupPort = secondPeer.ServerPort;
+
+            logger.LogInformation("Test started, first peer listening on: 
{0}:{1}", primaryAddress, primaryPort);
+            logger.LogInformation("Test started, backup peer listening on: 
{0}:{1}", backupAddress, backupPort);
+
+            ConnectionOptions options = new ConnectionOptions();
+            options.ReconnectOptions.ReconnectEnabled = true;
+            options.ReconnectOptions.AddReconnectLocation(backupAddress, 
backupPort);
+
+            IClient container = IClient.Create();
+            IConnection connection = container.Connect(primaryAddress, 
primaryPort, options);
+            ISession session = connection.OpenSession().OpenTask.Result;
+
+            session.BeginTransaction();
+
+            ISender sender = session.OpenSender("test").OpenTask.Result;
+            sender.Send(IMessage<string>.Create("Hello"));
+
+            firstPeer.WaitForScriptToComplete();
+
+            secondPeer.WaitForScriptToComplete();
+            secondPeer.ExpectClose().Respond();
+
+            sender.Send(IMessage<string>.Create("Hello Again"));
+
+            try
+            {
+               session.CommitTransaction();
+               Assert.Fail("Should have failed to declare transaction");
+            }
+            catch (ClientTransactionRolledBackException cliEx)
+            {
+               logger.LogInformation("Caught expected error from test", cliEx);
+            }
+
+            connection.Close();
+
+            secondPeer.WaitForScriptToComplete();
+         }
+      }
+
+      [Test]
+      public void 
TestNewTransactionCanBeCreatedAfterOldInstanceRolledBackByReconnect()
+      {
+         byte[] txnId = new byte[] { 0, 1, 2, 3 };
+
+         using (ProtonTestServer firstPeer = new 
ProtonTestServer(loggerFactory))
+         using (ProtonTestServer secondPeer = new 
ProtonTestServer(loggerFactory))
+         {
+            firstPeer.ExpectSASLAnonymousConnect();
+            firstPeer.ExpectOpen().Respond();
+            firstPeer.ExpectBegin().Respond();
+            
firstPeer.ExpectAttach().OfSender().WithTarget().WithAddress("test").And().Respond();
+            firstPeer.ExpectCoordinatorAttach().Respond();
+            firstPeer.RemoteFlow().WithLinkCredit(2).Queue();
+            firstPeer.ExpectDeclare().Accept(txnId);
+            firstPeer.DropAfterLastHandler(5);
+            firstPeer.Start();
+
+            secondPeer.ExpectSASLAnonymousConnect();
+            secondPeer.ExpectOpen().Respond();
+            secondPeer.ExpectBegin().Respond();
+            
secondPeer.ExpectAttach().OfSender().WithTarget().WithAddress("test").And().Respond();
+            secondPeer.RemoteFlow().WithLinkCredit(1).Queue();
+            secondPeer.Start();
+
+            string primaryAddress = firstPeer.ServerAddress;
+            int primaryPort = firstPeer.ServerPort;
+            string backupAddress = secondPeer.ServerAddress;
+            int backupPort = secondPeer.ServerPort;
+
+            logger.LogInformation("Test started, first peer listening on: 
{0}:{1}", primaryAddress, primaryPort);
+            logger.LogInformation("Test started, backup peer listening on: 
{0}:{1}", backupAddress, backupPort);
+
+            ConnectionOptions options = new ConnectionOptions();
+            options.ReconnectOptions.ReconnectEnabled = true;
+            options.ReconnectOptions.AddReconnectLocation(backupAddress, 
backupPort);
+
+            IClient container = IClient.Create();
+            IConnection connection = container.Connect(primaryAddress, 
primaryPort, options);
+            ISession session = connection.OpenSession().OpenTask.Result;
+            ISender sender = session.OpenSender("test").OpenTask.Result;
+
+            session.BeginTransaction();
+
+            firstPeer.WaitForScriptToComplete();
+
+            secondPeer.WaitForScriptToComplete();
+            secondPeer.ExpectCoordinatorAttach().Respond();
+            secondPeer.RemoteFlow().WithLinkCredit(2).Queue();
+            secondPeer.ExpectDeclare().Accept(txnId);
+            secondPeer.ExpectTransfer().WithHandle(0)
+                                      .WithNonNullPayload()
+                                      
.WithState().Transactional().WithTxnId(txnId).And()
+                                      .Respond()
+                                      
.WithState().Transactional().WithTxnId(txnId).WithAccepted().And()
+                                      .WithSettled(true);
+            
secondPeer.ExpectDischarge().WithFail(false).WithTxnId(txnId).Accept();
+            secondPeer.ExpectEnd().Respond();
+            secondPeer.ExpectClose().Respond();
+
+            try
+            {
+               session.CommitTransaction();
+               Assert.Fail("Should have failed to declare transaction");
+            }
+            catch (ClientTransactionRolledBackException cliEx)
+            {
+               logger.LogInformation("Caught expected error from test", cliEx);
+            }
+
+            session.BeginTransaction();
+
+            ITracker tracker = 
sender.Send(IMessage<string>.Create("test-message"));
+
+            Assert.IsNotNull(tracker);
+            Assert.IsNotNull(tracker.SettlementTask.Result);
+            Assert.AreEqual(tracker.RemoteState.Type, 
DeliveryStateType.Transactional);
+            Assert.IsNotNull(tracker.State);
+            Assert.AreEqual(tracker.State.Type, 
DeliveryStateType.Transactional,
+                "Delivery inside transaction should have Transactional state: 
" + tracker.State.Type);
+            Wait.AssertTrue("Delivery in transaction should be locally settled 
after response", () => tracker.Settled);
+
+            try
+            {
+               session.CommitTransaction();
+            }
+            catch (ClientException cliEx)
+            {
+               logger.LogInformation("Caught unexpected error from test", 
cliEx);
+               Assert.Fail("Should not have failed to declare transaction");
+            }
+
+            session.Close();
+            connection.Close();
+
+            secondPeer.WaitForScriptToComplete();
+         }
+      }
+
    }
 }
\ No newline at end of file
diff --git 
a/test/Proton.Client.Tests/Client/Implementation/ClilentTransactionsTest.cs 
b/test/Proton.Client.Tests/Client/Implementation/ClilentTransactionsTest.cs
index bdf8749..2e2f3bd 100644
--- a/test/Proton.Client.Tests/Client/Implementation/ClilentTransactionsTest.cs
+++ b/test/Proton.Client.Tests/Client/Implementation/ClilentTransactionsTest.cs
@@ -1725,5 +1725,63 @@ namespace Apache.Qpid.Proton.Client.Implementation
             peer.WaitForScriptToComplete();
          }
       }
+
+      [Test]
+      public void TestAwaitSettlementWorksForMessageSentInTransaction()
+      {
+         byte[] txnId = new byte[] { 0, 1, 2, 3 };
+
+         using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+         {
+            peer.ExpectSASLAnonymousConnect();
+            peer.ExpectOpen().Respond();
+            peer.ExpectBegin().Respond();
+            peer.ExpectAttach().OfSender().Respond();
+            peer.RemoteFlow().WithLinkCredit(1).Queue();
+            peer.ExpectCoordinatorAttach().Respond();
+            peer.RemoteFlow().WithLinkCredit(2).Queue();
+            peer.ExpectDeclare().Accept(txnId);
+            peer.ExpectTransfer().WithHandle(0)
+                                 .WithNonNullPayload()
+                                 
.WithState().Transactional().WithTxnId(txnId).And()
+                                 .Respond()
+                                 
.WithState().Transactional().WithTxnId(txnId).WithAccepted().And()
+                                 .WithSettled(true);
+            peer.ExpectDischarge().WithFail(false).WithTxnId(txnId).Accept();
+            peer.ExpectEnd().Respond();
+            peer.ExpectClose().Respond();
+            peer.Start();
+
+            string remoteAddress = peer.ServerAddress;
+            int remotePort = peer.ServerPort;
+
+            logger.LogInformation("Test started, peer listening on: {0}:{1}", 
remoteAddress, remotePort);
+
+            IClient container = IClient.Create();
+            IConnection connection = container.Connect(remoteAddress, 
remotePort);
+            ISession session = connection.OpenSession();
+            ISender sender = session.OpenSender("address").OpenTask.Result;
+
+            session.BeginTransaction();
+
+            ITracker tracker = 
sender.Send(IMessage<string>.Create("test-message"));
+
+            Assert.IsNotNull(tracker);
+            Assert.IsNotNull(tracker.AwaitAccepted());
+            Assert.IsTrue(tracker.RemoteState.IsAccepted);
+            Assert.AreEqual(tracker.RemoteState.Type, 
DeliveryStateType.Transactional,
+                           "Delivery inside transaction should have 
Transactional state");
+            Assert.AreEqual(tracker.State.Type, 
DeliveryStateType.Transactional,
+                           "Delivery inside transaction should have 
Transactional state: " + tracker.State.Type);
+            Wait.AssertTrue("Delivery in transaction should be locally settled 
after response", () => tracker.Settled);
+
+            session.CommitTransaction();
+
+            session.CloseAsync();
+            connection.Close();
+
+            peer.WaitForScriptToComplete();
+         }
+      }
    }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to