This is an automated email from the ASF dual-hosted git repository. tabish pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton-dotnet.git
The following commit(s) were added to refs/heads/main by this push: new c8c0630 PROTON-2638 AwaitAccepted API should work for transaction settlement c8c0630 is described below commit c8c0630100a163c03e4b4340816b2920c1761581 Author: Timothy Bish <tabish...@gmail.com> AuthorDate: Fri Oct 28 16:51:22 2022 -0400 PROTON-2638 AwaitAccepted API should work for transaction settlement Check in the transactional state if settlement outcome is accepted and then answer that the delivery state is accepted. --- .../Client/Implementation/ClientDeliveryState.cs | 4 +- .../ClientReconnectTransactionTest.cs | 240 +++++++++++++++++++++ .../Implementation/ClilentTransactionsTest.cs | 58 +++++ 3 files changed, 301 insertions(+), 1 deletion(-) diff --git a/src/Proton.Client/Client/Implementation/ClientDeliveryState.cs b/src/Proton.Client/Client/Implementation/ClientDeliveryState.cs index 8df2ce7..593a54f 100644 --- a/src/Proton.Client/Client/Implementation/ClientDeliveryState.cs +++ b/src/Proton.Client/Client/Implementation/ClientDeliveryState.cs @@ -28,7 +28,7 @@ namespace Apache.Qpid.Proton.Client.Implementation /// </summary> public abstract class ClientDeliveryState : IDeliveryState { - public bool IsAccepted => Type == DeliveryStateType.Accepted; + public virtual bool IsAccepted => Type == DeliveryStateType.Accepted; public abstract DeliveryStateType Type { get; } @@ -166,6 +166,8 @@ namespace Apache.Qpid.Proton.Client.Implementation this.txnState.TxnId = txnState.TxnId.Copy(); } + public override bool IsAccepted => txnState.Outcome is Accepted; + public override DeliveryStateType Type => DeliveryStateType.Transactional; public override Types.Transport.IDeliveryState ProtonDeliveryState => txnState; diff --git a/test/Proton.Client.Tests/Client/Implementation/ClientReconnectTransactionTest.cs b/test/Proton.Client.Tests/Client/Implementation/ClientReconnectTransactionTest.cs index f864aa0..0c95e04 100644 --- a/test/Proton.Client.Tests/Client/Implementation/ClientReconnectTransactionTest.cs +++ b/test/Proton.Client.Tests/Client/Implementation/ClientReconnectTransactionTest.cs @@ -17,6 +17,7 @@ using System.Threading; using Apache.Qpid.Proton.Client.Exceptions; +using Apache.Qpid.Proton.Client.TestSupport; using Apache.Qpid.Proton.Test.Driver; using Microsoft.Extensions.Logging; using NUnit.Framework; @@ -84,5 +85,244 @@ namespace Apache.Qpid.Proton.Client.Implementation secondPeer.WaitForScriptToComplete(); } } + + [Test] + public void TestTransactionInDoubtAfterReconnect() + { + byte[] txnId = new byte[] { 0, 1, 2, 3 }; + + using (ProtonTestServer firstPeer = new ProtonTestServer(loggerFactory)) + using (ProtonTestServer secondPeer = new ProtonTestServer(loggerFactory)) + { + firstPeer.ExpectSASLAnonymousConnect(); + firstPeer.ExpectOpen().Respond(); + firstPeer.ExpectBegin().Respond(); + firstPeer.ExpectCoordinatorAttach().Respond(); + firstPeer.RemoteFlow().WithLinkCredit(2).Queue(); + firstPeer.ExpectDeclare().Accept(txnId); + firstPeer.ExpectAttach().OfSender().WithTarget().WithAddress("test").And().Respond(); + firstPeer.RemoteFlow().WithLinkCredit(1).Queue(); + firstPeer.ExpectTransfer().WithNonNullPayload(); + firstPeer.DropAfterLastHandler(); + firstPeer.Start(); + + secondPeer.ExpectSASLAnonymousConnect(); + secondPeer.ExpectOpen().Respond(); + secondPeer.ExpectBegin().Respond(); + secondPeer.ExpectAttach().OfSender().WithTarget().WithAddress("test").And().Respond(); + secondPeer.Start(); + + string primaryAddress = firstPeer.ServerAddress; + int primaryPort = firstPeer.ServerPort; + string backupAddress = secondPeer.ServerAddress; + int backupPort = secondPeer.ServerPort; + + logger.LogInformation("Test started, first peer listening on: {0}:{1}", primaryAddress, primaryPort); + logger.LogInformation("Test started, backup peer listening on: {0}:{1}", backupAddress, backupPort); + + ConnectionOptions options = new ConnectionOptions(); + options.ReconnectOptions.ReconnectEnabled = true; + options.ReconnectOptions.AddReconnectLocation(backupAddress, backupPort); + + IClient container = IClient.Create(); + IConnection connection = container.Connect(primaryAddress, primaryPort, options); + ISession session = connection.OpenSession().OpenTask.Result; + + session.BeginTransaction(); + + ISender sender = session.OpenSender("test").OpenTask.Result; + sender.Send(IMessage<string>.Create("Hello")); + + firstPeer.WaitForScriptToComplete(); + + secondPeer.WaitForScriptToComplete(); + secondPeer.ExpectClose().Respond(); + + try + { + session.CommitTransaction(); + Assert.Fail("Should have failed to declare transaction"); + } + catch (ClientTransactionRolledBackException cliEx) + { + logger.LogInformation("Caught expected error from test", cliEx); + } + + connection.Close(); + + secondPeer.WaitForScriptToComplete(); + } + } + + [Test] + public void TestSendInTransactionIsNoOpAfterReconnect() + { + byte[] txnId = new byte[] { 0, 1, 2, 3 }; + + using (ProtonTestServer firstPeer = new ProtonTestServer(loggerFactory)) + using (ProtonTestServer secondPeer = new ProtonTestServer(loggerFactory)) + { + firstPeer.ExpectSASLAnonymousConnect(); + firstPeer.ExpectOpen().Respond(); + firstPeer.ExpectBegin().Respond(); + firstPeer.ExpectCoordinatorAttach().Respond(); + firstPeer.RemoteFlow().WithLinkCredit(2).Queue(); + firstPeer.ExpectDeclare().Accept(txnId); + firstPeer.ExpectAttach().OfSender().WithTarget().WithAddress("test").And().Respond(); + firstPeer.RemoteFlow().WithLinkCredit(1).Queue(); + firstPeer.ExpectTransfer().WithNonNullPayload(); + firstPeer.DropAfterLastHandler(); + firstPeer.Start(); + + secondPeer.ExpectSASLAnonymousConnect(); + secondPeer.ExpectOpen().Respond(); + secondPeer.ExpectBegin().Respond(); + secondPeer.ExpectAttach().OfSender().WithTarget().WithAddress("test").And().Respond(); + secondPeer.RemoteFlow().WithLinkCredit(1).Queue(); + secondPeer.Start(); + + string primaryAddress = firstPeer.ServerAddress; + int primaryPort = firstPeer.ServerPort; + string backupAddress = secondPeer.ServerAddress; + int backupPort = secondPeer.ServerPort; + + logger.LogInformation("Test started, first peer listening on: {0}:{1}", primaryAddress, primaryPort); + logger.LogInformation("Test started, backup peer listening on: {0}:{1}", backupAddress, backupPort); + + ConnectionOptions options = new ConnectionOptions(); + options.ReconnectOptions.ReconnectEnabled = true; + options.ReconnectOptions.AddReconnectLocation(backupAddress, backupPort); + + IClient container = IClient.Create(); + IConnection connection = container.Connect(primaryAddress, primaryPort, options); + ISession session = connection.OpenSession().OpenTask.Result; + + session.BeginTransaction(); + + ISender sender = session.OpenSender("test").OpenTask.Result; + sender.Send(IMessage<string>.Create("Hello")); + + firstPeer.WaitForScriptToComplete(); + + secondPeer.WaitForScriptToComplete(); + secondPeer.ExpectClose().Respond(); + + sender.Send(IMessage<string>.Create("Hello Again")); + + try + { + session.CommitTransaction(); + Assert.Fail("Should have failed to declare transaction"); + } + catch (ClientTransactionRolledBackException cliEx) + { + logger.LogInformation("Caught expected error from test", cliEx); + } + + connection.Close(); + + secondPeer.WaitForScriptToComplete(); + } + } + + [Test] + public void TestNewTransactionCanBeCreatedAfterOldInstanceRolledBackByReconnect() + { + byte[] txnId = new byte[] { 0, 1, 2, 3 }; + + using (ProtonTestServer firstPeer = new ProtonTestServer(loggerFactory)) + using (ProtonTestServer secondPeer = new ProtonTestServer(loggerFactory)) + { + firstPeer.ExpectSASLAnonymousConnect(); + firstPeer.ExpectOpen().Respond(); + firstPeer.ExpectBegin().Respond(); + firstPeer.ExpectAttach().OfSender().WithTarget().WithAddress("test").And().Respond(); + firstPeer.ExpectCoordinatorAttach().Respond(); + firstPeer.RemoteFlow().WithLinkCredit(2).Queue(); + firstPeer.ExpectDeclare().Accept(txnId); + firstPeer.DropAfterLastHandler(5); + firstPeer.Start(); + + secondPeer.ExpectSASLAnonymousConnect(); + secondPeer.ExpectOpen().Respond(); + secondPeer.ExpectBegin().Respond(); + secondPeer.ExpectAttach().OfSender().WithTarget().WithAddress("test").And().Respond(); + secondPeer.RemoteFlow().WithLinkCredit(1).Queue(); + secondPeer.Start(); + + string primaryAddress = firstPeer.ServerAddress; + int primaryPort = firstPeer.ServerPort; + string backupAddress = secondPeer.ServerAddress; + int backupPort = secondPeer.ServerPort; + + logger.LogInformation("Test started, first peer listening on: {0}:{1}", primaryAddress, primaryPort); + logger.LogInformation("Test started, backup peer listening on: {0}:{1}", backupAddress, backupPort); + + ConnectionOptions options = new ConnectionOptions(); + options.ReconnectOptions.ReconnectEnabled = true; + options.ReconnectOptions.AddReconnectLocation(backupAddress, backupPort); + + IClient container = IClient.Create(); + IConnection connection = container.Connect(primaryAddress, primaryPort, options); + ISession session = connection.OpenSession().OpenTask.Result; + ISender sender = session.OpenSender("test").OpenTask.Result; + + session.BeginTransaction(); + + firstPeer.WaitForScriptToComplete(); + + secondPeer.WaitForScriptToComplete(); + secondPeer.ExpectCoordinatorAttach().Respond(); + secondPeer.RemoteFlow().WithLinkCredit(2).Queue(); + secondPeer.ExpectDeclare().Accept(txnId); + secondPeer.ExpectTransfer().WithHandle(0) + .WithNonNullPayload() + .WithState().Transactional().WithTxnId(txnId).And() + .Respond() + .WithState().Transactional().WithTxnId(txnId).WithAccepted().And() + .WithSettled(true); + secondPeer.ExpectDischarge().WithFail(false).WithTxnId(txnId).Accept(); + secondPeer.ExpectEnd().Respond(); + secondPeer.ExpectClose().Respond(); + + try + { + session.CommitTransaction(); + Assert.Fail("Should have failed to declare transaction"); + } + catch (ClientTransactionRolledBackException cliEx) + { + logger.LogInformation("Caught expected error from test", cliEx); + } + + session.BeginTransaction(); + + ITracker tracker = sender.Send(IMessage<string>.Create("test-message")); + + Assert.IsNotNull(tracker); + Assert.IsNotNull(tracker.SettlementTask.Result); + Assert.AreEqual(tracker.RemoteState.Type, DeliveryStateType.Transactional); + Assert.IsNotNull(tracker.State); + Assert.AreEqual(tracker.State.Type, DeliveryStateType.Transactional, + "Delivery inside transaction should have Transactional state: " + tracker.State.Type); + Wait.AssertTrue("Delivery in transaction should be locally settled after response", () => tracker.Settled); + + try + { + session.CommitTransaction(); + } + catch (ClientException cliEx) + { + logger.LogInformation("Caught unexpected error from test", cliEx); + Assert.Fail("Should not have failed to declare transaction"); + } + + session.Close(); + connection.Close(); + + secondPeer.WaitForScriptToComplete(); + } + } + } } \ No newline at end of file diff --git a/test/Proton.Client.Tests/Client/Implementation/ClilentTransactionsTest.cs b/test/Proton.Client.Tests/Client/Implementation/ClilentTransactionsTest.cs index bdf8749..2e2f3bd 100644 --- a/test/Proton.Client.Tests/Client/Implementation/ClilentTransactionsTest.cs +++ b/test/Proton.Client.Tests/Client/Implementation/ClilentTransactionsTest.cs @@ -1725,5 +1725,63 @@ namespace Apache.Qpid.Proton.Client.Implementation peer.WaitForScriptToComplete(); } } + + [Test] + public void TestAwaitSettlementWorksForMessageSentInTransaction() + { + byte[] txnId = new byte[] { 0, 1, 2, 3 }; + + using (ProtonTestServer peer = new ProtonTestServer(loggerFactory)) + { + peer.ExpectSASLAnonymousConnect(); + peer.ExpectOpen().Respond(); + peer.ExpectBegin().Respond(); + peer.ExpectAttach().OfSender().Respond(); + peer.RemoteFlow().WithLinkCredit(1).Queue(); + peer.ExpectCoordinatorAttach().Respond(); + peer.RemoteFlow().WithLinkCredit(2).Queue(); + peer.ExpectDeclare().Accept(txnId); + peer.ExpectTransfer().WithHandle(0) + .WithNonNullPayload() + .WithState().Transactional().WithTxnId(txnId).And() + .Respond() + .WithState().Transactional().WithTxnId(txnId).WithAccepted().And() + .WithSettled(true); + peer.ExpectDischarge().WithFail(false).WithTxnId(txnId).Accept(); + peer.ExpectEnd().Respond(); + peer.ExpectClose().Respond(); + peer.Start(); + + string remoteAddress = peer.ServerAddress; + int remotePort = peer.ServerPort; + + logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort); + + IClient container = IClient.Create(); + IConnection connection = container.Connect(remoteAddress, remotePort); + ISession session = connection.OpenSession(); + ISender sender = session.OpenSender("address").OpenTask.Result; + + session.BeginTransaction(); + + ITracker tracker = sender.Send(IMessage<string>.Create("test-message")); + + Assert.IsNotNull(tracker); + Assert.IsNotNull(tracker.AwaitAccepted()); + Assert.IsTrue(tracker.RemoteState.IsAccepted); + Assert.AreEqual(tracker.RemoteState.Type, DeliveryStateType.Transactional, + "Delivery inside transaction should have Transactional state"); + Assert.AreEqual(tracker.State.Type, DeliveryStateType.Transactional, + "Delivery inside transaction should have Transactional state: " + tracker.State.Type); + Wait.AssertTrue("Delivery in transaction should be locally settled after response", () => tracker.Settled); + + session.CommitTransaction(); + + session.CloseAsync(); + connection.Close(); + + peer.WaitForScriptToComplete(); + } + } } } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org