This is an automated email from the ASF dual-hosted git repository. tabish pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton-dotnet.git
The following commit(s) were added to refs/heads/main by this push: new 1e090aa PROTON-2620 Better handle connection remote close (reconnect enabled) 1e090aa is described below commit 1e090aad94851007650e4d092e0995cf71b3fb5d Author: Timothy Bish <tabish...@gmail.com> AuthorDate: Thu Oct 6 18:47:28 2022 -0400 PROTON-2620 Better handle connection remote close (reconnect enabled) Ensure that link types check that the remote close doesn't mean they should break blocking operations if the connection is going to attempt reconnect. --- .../Client/Implementation/ClientReceiver.cs | 7 +- .../Client/Implementation/ClientSender.cs | 7 +- .../Client/Implementation/ClientStreamReceiver.cs | 7 +- .../Client/Implementation/ClientStreamSender.cs | 7 +- .../Implementation/ClientReconnectReceiverTest.cs | 83 ++++++++++++++++++++++ .../Implementation/ClientReconnectSenderTest.cs | 80 +++++++++++++++++++++ .../ClientReconnectStreamReceiverTest.cs | 81 +++++++++++++++++++++ 7 files changed, 260 insertions(+), 12 deletions(-) diff --git a/src/Proton.Client/Client/Implementation/ClientReceiver.cs b/src/Proton.Client/Client/Implementation/ClientReceiver.cs index b6a685d..0b840a1 100644 --- a/src/Proton.Client/Client/Implementation/ClientReceiver.cs +++ b/src/Proton.Client/Client/Implementation/ClientReceiver.cs @@ -317,9 +317,10 @@ namespace Apache.Qpid.Proton.Client.Implementation private void HandleParentEndpointClosed(Engine.IReceiver receiver) { - // Don't react if engine was shutdown and parent closed as a result instead wait to get the - // shutdown notification and respond to that change. - if (receiver.Engine.IsRunning) + // This handle is only for the case that the parent session was remotely or locally + // closed. In all other cases we want to allow natural engine shutdown handling to + // trigger shutdown as we can check there if the parent is reconnecting or not. + if (receiver.Engine.IsRunning && !receiver.Connection.IsLocallyClosed) { ClientException failureCause; diff --git a/src/Proton.Client/Client/Implementation/ClientSender.cs b/src/Proton.Client/Client/Implementation/ClientSender.cs index 3dc161c..2cf317f 100644 --- a/src/Proton.Client/Client/Implementation/ClientSender.cs +++ b/src/Proton.Client/Client/Implementation/ClientSender.cs @@ -353,9 +353,10 @@ namespace Apache.Qpid.Proton.Client.Implementation private void HandleParentEndpointClosed(Engine.ISender sender) { - // Don't react if engine was shutdown and parent closed as a result instead wait to get the - // shutdown notification and respond to that change. - if (sender.Engine.IsRunning) + // This handle is only for the case that the parent session was remotely or locally + // closed. In all other cases we want to allow natural engine shutdown handling to + // trigger shutdown as we can check there if the parent is reconnecting or not. + if (sender.Engine.IsRunning && !sender.Connection.IsLocallyClosed) { ClientException failureCause; diff --git a/src/Proton.Client/Client/Implementation/ClientStreamReceiver.cs b/src/Proton.Client/Client/Implementation/ClientStreamReceiver.cs index 24ab436..182a915 100644 --- a/src/Proton.Client/Client/Implementation/ClientStreamReceiver.cs +++ b/src/Proton.Client/Client/Implementation/ClientStreamReceiver.cs @@ -306,9 +306,10 @@ namespace Apache.Qpid.Proton.Client.Implementation private void HandleParentEndpointClosed(Engine.IReceiver receiver) { - // Don't react if engine was shutdown and parent closed as a result instead wait to get the - // shutdown notification and respond to that change. - if (receiver.Engine.IsRunning) + // This handle is only for the case that the parent session was remotely or locally + // closed. In all other cases we want to allow natural engine shutdown handling to + // trigger shutdown as we can check there if the parent is reconnecting or not. + if (receiver.Engine.IsRunning && !receiver.Connection.IsLocallyClosed) { ClientException failureCause; diff --git a/src/Proton.Client/Client/Implementation/ClientStreamSender.cs b/src/Proton.Client/Client/Implementation/ClientStreamSender.cs index 6a7a925..73223a9 100644 --- a/src/Proton.Client/Client/Implementation/ClientStreamSender.cs +++ b/src/Proton.Client/Client/Implementation/ClientStreamSender.cs @@ -502,9 +502,10 @@ namespace Apache.Qpid.Proton.Client.Implementation private void HandleParentEndpointClosed(Engine.ISender sender) { - // Don't react if engine was shutdown and parent closed as a result instead wait to get the - // shutdown notification and respond to that change. - if (sender.Engine.IsRunning) + // This handle is only for the case that the parent session was remotely or locally + // closed. In all other cases we want to allow natural engine shutdown handling to + // trigger shutdown as we can check there if the parent is reconnecting or not. + if (sender.Engine.IsRunning && !sender.Connection.IsLocallyClosed) { ClientException failureCause; diff --git a/test/Proton.Client.Tests/Client/Implementation/ClientReconnectReceiverTest.cs b/test/Proton.Client.Tests/Client/Implementation/ClientReconnectReceiverTest.cs index 16cc9d9..9a46d29 100644 --- a/test/Proton.Client.Tests/Client/Implementation/ClientReconnectReceiverTest.cs +++ b/test/Proton.Client.Tests/Client/Implementation/ClientReconnectReceiverTest.cs @@ -20,6 +20,7 @@ using System.Threading; using Apache.Qpid.Proton.Client.Exceptions; using Apache.Qpid.Proton.Test.Driver; using Apache.Qpid.Proton.Types.Messaging; +using Apache.Qpid.Proton.Types.Transport; using Microsoft.Extensions.Logging; using NUnit.Framework; @@ -256,5 +257,87 @@ namespace Apache.Qpid.Proton.Client.Implementation Assert.IsNotNull(delivery); } } + + [Test] + public void TestReceiverWaitsWhenConnectionForcedDisconnect() + { + byte[] payload = CreateEncodedMessage(new AmqpValue("Hello World")); + + using (ProtonTestServer firstPeer = new ProtonTestServer(loggerFactory)) + using (ProtonTestServer secondPeer = new ProtonTestServer(loggerFactory)) + { + firstPeer.ExpectSASLAnonymousConnect(); + firstPeer.ExpectOpen().Respond(); + firstPeer.ExpectBegin().Respond(); + firstPeer.ExpectAttach().OfReceiver().Respond(); + firstPeer.ExpectFlow().WithLinkCredit(10); + firstPeer.RemoteClose() + .WithErrorCondition(ConnectionError.CONNECTION_FORCED.ToString(), "Forced disconnect").Queue().AfterDelay(20); + firstPeer.ExpectClose(); + firstPeer.Start(); + + secondPeer.ExpectSASLAnonymousConnect(); + secondPeer.ExpectOpen().Respond(); + secondPeer.ExpectBegin().Respond(); + secondPeer.ExpectAttach().OfReceiver().Respond(); + secondPeer.ExpectFlow().WithLinkCredit(10); + secondPeer.RemoteTransfer().WithHandle(0) + .WithDeliveryId(0) + .WithDeliveryTag(new byte[] { 1 }) + .WithMore(false) + .WithSettled(true) + .WithMessageFormat(0) + .WithPayload(payload).Queue().AfterDelay(5); + secondPeer.Start(); + + string primaryAddress = firstPeer.ServerAddress; + int primaryPort = firstPeer.ServerPort; + string backupAddress = secondPeer.ServerAddress; + int backupPort = secondPeer.ServerPort; + + logger.LogInformation("Test started, first peer listening on: {0}:{1}", primaryAddress, primaryPort); + logger.LogInformation("Test started, backup peer listening on: {0}:{1}", backupAddress, backupPort); + + ConnectionOptions options = new ConnectionOptions(); + options.ReconnectOptions.ReconnectEnabled = true; + options.ReconnectOptions.AddReconnectLocation(backupAddress, backupPort); + + IClient container = IClient.Create(); + IConnection connection = container.Connect(primaryAddress, primaryPort, options); + ISession session = connection.OpenSession(); + ReceiverOptions rcvOpts = new ReceiverOptions() + { + AutoAccept = false + }; + IReceiver receiver = session.OpenReceiver("test-queue", rcvOpts); + + IDelivery delivery = null; + try + { + delivery = receiver.Receive(TimeSpan.FromSeconds(10)); + } + catch (Exception ex) + { + Assert.Fail("Should not have failed on blocking receive call." + ex.Message); + } + + Assert.IsNotNull(delivery); + + firstPeer.WaitForScriptToComplete(); + secondPeer.WaitForScriptToComplete(); + secondPeer.ExpectDetach().Respond(); + secondPeer.ExpectEnd().Respond(); + secondPeer.ExpectClose().Respond(); + + delivery.Accept(); + + receiver.Close(); + session.Close(); + connection.Close(); + + Assert.IsNotNull(delivery); + } + } + } } \ No newline at end of file diff --git a/test/Proton.Client.Tests/Client/Implementation/ClientReconnectSenderTest.cs b/test/Proton.Client.Tests/Client/Implementation/ClientReconnectSenderTest.cs index 5e11c55..bcc7051 100644 --- a/test/Proton.Client.Tests/Client/Implementation/ClientReconnectSenderTest.cs +++ b/test/Proton.Client.Tests/Client/Implementation/ClientReconnectSenderTest.cs @@ -21,6 +21,7 @@ using System.Threading.Tasks; using Apache.Qpid.Proton.Client.Concurrent; using Apache.Qpid.Proton.Client.Exceptions; using Apache.Qpid.Proton.Test.Driver; +using Apache.Qpid.Proton.Types.Transport; using Microsoft.Extensions.Logging; using NUnit.Framework; @@ -378,5 +379,84 @@ namespace Apache.Qpid.Proton.Client.Implementation finalPeer.WaitForScriptToComplete(); } } + + [Test] + public void TestInFlightSendFailedAfterConnectionForcedCloseAndNotResent() + { + using (ProtonTestServer firstPeer = new ProtonTestServer(loggerFactory)) + using (ProtonTestServer secondPeer = new ProtonTestServer(loggerFactory)) + { + firstPeer.ExpectSASLAnonymousConnect(); + firstPeer.ExpectOpen().Respond(); + firstPeer.ExpectBegin().Respond(); + firstPeer.ExpectAttach().OfSender().WithTarget().WithAddress("test").And().Respond(); + firstPeer.RemoteFlow().WithLinkCredit(1).Queue(); + firstPeer.ExpectTransfer().WithNonNullPayload(); + firstPeer.RemoteClose() + .WithErrorCondition(ConnectionError.CONNECTION_FORCED.ToString(), "Forced disconnect").Queue().AfterDelay(20); + firstPeer.ExpectClose(); + firstPeer.Start(); + + secondPeer.ExpectSASLAnonymousConnect(); + secondPeer.ExpectOpen().Respond(); + secondPeer.ExpectBegin().Respond(); + secondPeer.ExpectAttach().OfSender().WithTarget().WithAddress("test").And().Respond(); + secondPeer.Start(); + + string primaryAddress = firstPeer.ServerAddress; + int primaryPort = firstPeer.ServerPort; + string backupAddress = secondPeer.ServerAddress; + int backupPort = secondPeer.ServerPort; + + logger.LogInformation("Test started, first peer listening on: {0}:{1}", primaryAddress, primaryPort); + logger.LogInformation("Test started, backup peer listening on: {0}:{1}", backupAddress, backupPort); + + ConnectionOptions options = new ConnectionOptions(); + options.ReconnectOptions.ReconnectEnabled = true; + options.ReconnectOptions.AddReconnectLocation(backupAddress, backupPort); + + IClient container = IClient.Create(); + IConnection connection = container.Connect(primaryAddress, primaryPort, options); + ISession session = connection.OpenSession(); + ISender sender = session.OpenSender("test"); + + AtomicReference<ITracker> tracker = new(); + AtomicReference<ClientException> error = new(); + CountdownEvent latch = new CountdownEvent(1); + + Task.Run(() => + { + try + { + tracker.Set(sender.Send(IMessage<string>.Create("Hello"))); + } + catch (ClientException e) + { + error.Set(e); + } + finally + { + latch.Signal(); + } + }); + + firstPeer.WaitForScriptToComplete(); + secondPeer.WaitForScriptToComplete(); + secondPeer.ExpectDetach().WithClosed(true).Respond(); + secondPeer.ExpectEnd().Respond(); + secondPeer.ExpectClose().Respond(); + + Assert.IsTrue(latch.Wait(TimeSpan.FromSeconds(10)), "Should have failed previously sent message"); + Assert.IsNull(error.Get()); + Assert.IsNotNull(tracker.Get()); + Assert.Throws<ClientConnectionRemotelyClosedException>(() => tracker.Get().AwaitSettlement()); + + sender.Close(); + session.Close(); + connection.Close(); + + secondPeer.WaitForScriptToComplete(); + } + } } } \ No newline at end of file diff --git a/test/Proton.Client.Tests/Client/Implementation/ClientReconnectStreamReceiverTest.cs b/test/Proton.Client.Tests/Client/Implementation/ClientReconnectStreamReceiverTest.cs index f15ccf0..c2ec0e9 100644 --- a/test/Proton.Client.Tests/Client/Implementation/ClientReconnectStreamReceiverTest.cs +++ b/test/Proton.Client.Tests/Client/Implementation/ClientReconnectStreamReceiverTest.cs @@ -15,10 +15,12 @@ * limitations under the License. */ +using System; using System.IO; using System.Threading; using Apache.Qpid.Proton.Test.Driver; using Apache.Qpid.Proton.Types.Messaging; +using Apache.Qpid.Proton.Types.Transport; using Microsoft.Extensions.Logging; using NUnit.Framework; @@ -157,5 +159,84 @@ namespace Apache.Qpid.Proton.Client.Implementation secondPeer.WaitForScriptToComplete(); } } + + [Test] + public void TestReceiverWaitsWhenConnectionForcedDisconnect() + { + byte[] payload = CreateEncodedMessage(new AmqpValue("Hello World")); + + using (ProtonTestServer firstPeer = new ProtonTestServer(loggerFactory)) + using (ProtonTestServer secondPeer = new ProtonTestServer(loggerFactory)) + { + firstPeer.ExpectSASLAnonymousConnect(); + firstPeer.ExpectOpen().Respond(); + firstPeer.ExpectBegin().Respond(); + firstPeer.ExpectAttach().OfReceiver().Respond(); + firstPeer.ExpectFlow().WithLinkCredit(10); + firstPeer.RemoteClose() + .WithErrorCondition(ConnectionError.CONNECTION_FORCED.ToString(), "Forced disconnect").Queue().AfterDelay(20); + firstPeer.ExpectClose(); + firstPeer.Start(); + + secondPeer.ExpectSASLAnonymousConnect(); + secondPeer.ExpectOpen().Respond(); + secondPeer.ExpectBegin().Respond(); + secondPeer.ExpectAttach().OfReceiver().Respond(); + secondPeer.ExpectFlow().WithLinkCredit(10); + secondPeer.RemoteTransfer().WithHandle(0) + .WithDeliveryId(0) + .WithDeliveryTag(new byte[] { 1 }) + .WithMore(false) + .WithSettled(true) + .WithMessageFormat(0) + .WithPayload(payload).Queue().AfterDelay(5); + secondPeer.Start(); + + string primaryAddress = firstPeer.ServerAddress; + int primaryPort = firstPeer.ServerPort; + string backupAddress = secondPeer.ServerAddress; + int backupPort = secondPeer.ServerPort; + + logger.LogInformation("Test started, first peer listening on: {0}:{1}", primaryAddress, primaryPort); + logger.LogInformation("Test started, backup peer listening on: {0}:{1}", backupAddress, backupPort); + + ConnectionOptions options = new ConnectionOptions(); + options.ReconnectOptions.ReconnectEnabled = true; + options.ReconnectOptions.AddReconnectLocation(backupAddress, backupPort); + + IClient container = IClient.Create(); + IConnection connection = container.Connect(primaryAddress, primaryPort, options); + StreamReceiverOptions rcvOpts = new StreamReceiverOptions() + { + AutoAccept = false + }; + IStreamReceiver receiver = connection.OpenStreamReceiver("test-receiver", rcvOpts); + + IStreamDelivery delivery = null; + try + { + delivery = receiver.Receive(System.TimeSpan.FromSeconds(10)); + } + catch (Exception ex) + { + Assert.Fail("Should not have failed on blocking receive call." + ex.Message); + } + + Assert.IsNotNull(delivery); + + firstPeer.WaitForScriptToComplete(); + secondPeer.WaitForScriptToComplete(); + secondPeer.ExpectDetach().Respond(); + secondPeer.ExpectEnd().Respond(); + secondPeer.ExpectClose().Respond(); + + delivery.Accept(); + + receiver.Close(); + connection.Close(); + + Assert.IsNotNull(delivery); + } + } } } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org