This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton-dotnet.git


The following commit(s) were added to refs/heads/main by this push:
     new 1e090aa  PROTON-2620 Better handle connection remote close (reconnect 
enabled)
1e090aa is described below

commit 1e090aad94851007650e4d092e0995cf71b3fb5d
Author: Timothy Bish <tabish...@gmail.com>
AuthorDate: Thu Oct 6 18:47:28 2022 -0400

    PROTON-2620 Better handle connection remote close (reconnect enabled)
    
    Ensure that link types check that the remote close doesn't mean they should
    break blocking operations if the connection is going to attempt reconnect.
---
 .../Client/Implementation/ClientReceiver.cs        |  7 +-
 .../Client/Implementation/ClientSender.cs          |  7 +-
 .../Client/Implementation/ClientStreamReceiver.cs  |  7 +-
 .../Client/Implementation/ClientStreamSender.cs    |  7 +-
 .../Implementation/ClientReconnectReceiverTest.cs  | 83 ++++++++++++++++++++++
 .../Implementation/ClientReconnectSenderTest.cs    | 80 +++++++++++++++++++++
 .../ClientReconnectStreamReceiverTest.cs           | 81 +++++++++++++++++++++
 7 files changed, 260 insertions(+), 12 deletions(-)

diff --git a/src/Proton.Client/Client/Implementation/ClientReceiver.cs 
b/src/Proton.Client/Client/Implementation/ClientReceiver.cs
index b6a685d..0b840a1 100644
--- a/src/Proton.Client/Client/Implementation/ClientReceiver.cs
+++ b/src/Proton.Client/Client/Implementation/ClientReceiver.cs
@@ -317,9 +317,10 @@ namespace Apache.Qpid.Proton.Client.Implementation
 
       private void HandleParentEndpointClosed(Engine.IReceiver receiver)
       {
-         // Don't react if engine was shutdown and parent closed as a result 
instead wait to get the
-         // shutdown notification and respond to that change.
-         if (receiver.Engine.IsRunning)
+         // This handle is only for the case that the parent session was 
remotely or locally
+         // closed. In all other cases we want to allow natural engine 
shutdown handling to
+         // trigger shutdown as we can check there if the parent is 
reconnecting or not.
+         if (receiver.Engine.IsRunning && !receiver.Connection.IsLocallyClosed)
          {
             ClientException failureCause;
 
diff --git a/src/Proton.Client/Client/Implementation/ClientSender.cs 
b/src/Proton.Client/Client/Implementation/ClientSender.cs
index 3dc161c..2cf317f 100644
--- a/src/Proton.Client/Client/Implementation/ClientSender.cs
+++ b/src/Proton.Client/Client/Implementation/ClientSender.cs
@@ -353,9 +353,10 @@ namespace Apache.Qpid.Proton.Client.Implementation
 
       private void HandleParentEndpointClosed(Engine.ISender sender)
       {
-         // Don't react if engine was shutdown and parent closed as a result 
instead wait to get the
-         // shutdown notification and respond to that change.
-         if (sender.Engine.IsRunning)
+         // This handle is only for the case that the parent session was 
remotely or locally
+         // closed. In all other cases we want to allow natural engine 
shutdown handling to
+         // trigger shutdown as we can check there if the parent is 
reconnecting or not.
+         if (sender.Engine.IsRunning && !sender.Connection.IsLocallyClosed)
          {
             ClientException failureCause;
 
diff --git a/src/Proton.Client/Client/Implementation/ClientStreamReceiver.cs 
b/src/Proton.Client/Client/Implementation/ClientStreamReceiver.cs
index 24ab436..182a915 100644
--- a/src/Proton.Client/Client/Implementation/ClientStreamReceiver.cs
+++ b/src/Proton.Client/Client/Implementation/ClientStreamReceiver.cs
@@ -306,9 +306,10 @@ namespace Apache.Qpid.Proton.Client.Implementation
 
       private void HandleParentEndpointClosed(Engine.IReceiver receiver)
       {
-         // Don't react if engine was shutdown and parent closed as a result 
instead wait to get the
-         // shutdown notification and respond to that change.
-         if (receiver.Engine.IsRunning)
+         // This handle is only for the case that the parent session was 
remotely or locally
+         // closed. In all other cases we want to allow natural engine 
shutdown handling to
+         // trigger shutdown as we can check there if the parent is 
reconnecting or not.
+         if (receiver.Engine.IsRunning && !receiver.Connection.IsLocallyClosed)
          {
             ClientException failureCause;
 
diff --git a/src/Proton.Client/Client/Implementation/ClientStreamSender.cs 
b/src/Proton.Client/Client/Implementation/ClientStreamSender.cs
index 6a7a925..73223a9 100644
--- a/src/Proton.Client/Client/Implementation/ClientStreamSender.cs
+++ b/src/Proton.Client/Client/Implementation/ClientStreamSender.cs
@@ -502,9 +502,10 @@ namespace Apache.Qpid.Proton.Client.Implementation
 
       private void HandleParentEndpointClosed(Engine.ISender sender)
       {
-         // Don't react if engine was shutdown and parent closed as a result 
instead wait to get the
-         // shutdown notification and respond to that change.
-         if (sender.Engine.IsRunning)
+         // This handle is only for the case that the parent session was 
remotely or locally
+         // closed. In all other cases we want to allow natural engine 
shutdown handling to
+         // trigger shutdown as we can check there if the parent is 
reconnecting or not.
+         if (sender.Engine.IsRunning && !sender.Connection.IsLocallyClosed)
          {
             ClientException failureCause;
 
diff --git 
a/test/Proton.Client.Tests/Client/Implementation/ClientReconnectReceiverTest.cs 
b/test/Proton.Client.Tests/Client/Implementation/ClientReconnectReceiverTest.cs
index 16cc9d9..9a46d29 100644
--- 
a/test/Proton.Client.Tests/Client/Implementation/ClientReconnectReceiverTest.cs
+++ 
b/test/Proton.Client.Tests/Client/Implementation/ClientReconnectReceiverTest.cs
@@ -20,6 +20,7 @@ using System.Threading;
 using Apache.Qpid.Proton.Client.Exceptions;
 using Apache.Qpid.Proton.Test.Driver;
 using Apache.Qpid.Proton.Types.Messaging;
+using Apache.Qpid.Proton.Types.Transport;
 using Microsoft.Extensions.Logging;
 using NUnit.Framework;
 
@@ -256,5 +257,87 @@ namespace Apache.Qpid.Proton.Client.Implementation
             Assert.IsNotNull(delivery);
          }
       }
+
+      [Test]
+      public void TestReceiverWaitsWhenConnectionForcedDisconnect()
+      {
+         byte[] payload = CreateEncodedMessage(new AmqpValue("Hello World"));
+
+         using (ProtonTestServer firstPeer = new 
ProtonTestServer(loggerFactory))
+         using (ProtonTestServer secondPeer = new 
ProtonTestServer(loggerFactory))
+         {
+            firstPeer.ExpectSASLAnonymousConnect();
+            firstPeer.ExpectOpen().Respond();
+            firstPeer.ExpectBegin().Respond();
+            firstPeer.ExpectAttach().OfReceiver().Respond();
+            firstPeer.ExpectFlow().WithLinkCredit(10);
+            firstPeer.RemoteClose()
+                     
.WithErrorCondition(ConnectionError.CONNECTION_FORCED.ToString(), "Forced 
disconnect").Queue().AfterDelay(20);
+            firstPeer.ExpectClose();
+            firstPeer.Start();
+
+            secondPeer.ExpectSASLAnonymousConnect();
+            secondPeer.ExpectOpen().Respond();
+            secondPeer.ExpectBegin().Respond();
+            secondPeer.ExpectAttach().OfReceiver().Respond();
+            secondPeer.ExpectFlow().WithLinkCredit(10);
+            secondPeer.RemoteTransfer().WithHandle(0)
+                                       .WithDeliveryId(0)
+                                       .WithDeliveryTag(new byte[] { 1 })
+                                       .WithMore(false)
+                                       .WithSettled(true)
+                                       .WithMessageFormat(0)
+                                       
.WithPayload(payload).Queue().AfterDelay(5);
+            secondPeer.Start();
+
+            string primaryAddress = firstPeer.ServerAddress;
+            int primaryPort = firstPeer.ServerPort;
+            string backupAddress = secondPeer.ServerAddress;
+            int backupPort = secondPeer.ServerPort;
+
+            logger.LogInformation("Test started, first peer listening on: 
{0}:{1}", primaryAddress, primaryPort);
+            logger.LogInformation("Test started, backup peer listening on: 
{0}:{1}", backupAddress, backupPort);
+
+            ConnectionOptions options = new ConnectionOptions();
+            options.ReconnectOptions.ReconnectEnabled = true;
+            options.ReconnectOptions.AddReconnectLocation(backupAddress, 
backupPort);
+
+            IClient container = IClient.Create();
+            IConnection connection = container.Connect(primaryAddress, 
primaryPort, options);
+            ISession session = connection.OpenSession();
+            ReceiverOptions rcvOpts = new ReceiverOptions()
+            {
+               AutoAccept = false
+            };
+            IReceiver receiver = session.OpenReceiver("test-queue", rcvOpts);
+
+            IDelivery delivery = null;
+            try
+            {
+               delivery = receiver.Receive(TimeSpan.FromSeconds(10));
+            }
+            catch (Exception ex)
+            {
+               Assert.Fail("Should not have failed on blocking receive call." 
+ ex.Message);
+            }
+
+            Assert.IsNotNull(delivery);
+
+            firstPeer.WaitForScriptToComplete();
+            secondPeer.WaitForScriptToComplete();
+            secondPeer.ExpectDetach().Respond();
+            secondPeer.ExpectEnd().Respond();
+            secondPeer.ExpectClose().Respond();
+
+            delivery.Accept();
+
+            receiver.Close();
+            session.Close();
+            connection.Close();
+
+            Assert.IsNotNull(delivery);
+         }
+      }
+
    }
 }
\ No newline at end of file
diff --git 
a/test/Proton.Client.Tests/Client/Implementation/ClientReconnectSenderTest.cs 
b/test/Proton.Client.Tests/Client/Implementation/ClientReconnectSenderTest.cs
index 5e11c55..bcc7051 100644
--- 
a/test/Proton.Client.Tests/Client/Implementation/ClientReconnectSenderTest.cs
+++ 
b/test/Proton.Client.Tests/Client/Implementation/ClientReconnectSenderTest.cs
@@ -21,6 +21,7 @@ using System.Threading.Tasks;
 using Apache.Qpid.Proton.Client.Concurrent;
 using Apache.Qpid.Proton.Client.Exceptions;
 using Apache.Qpid.Proton.Test.Driver;
+using Apache.Qpid.Proton.Types.Transport;
 using Microsoft.Extensions.Logging;
 using NUnit.Framework;
 
@@ -378,5 +379,84 @@ namespace Apache.Qpid.Proton.Client.Implementation
             finalPeer.WaitForScriptToComplete();
          }
       }
+
+      [Test]
+      public void 
TestInFlightSendFailedAfterConnectionForcedCloseAndNotResent()
+      {
+         using (ProtonTestServer firstPeer = new 
ProtonTestServer(loggerFactory))
+         using (ProtonTestServer secondPeer = new 
ProtonTestServer(loggerFactory))
+         {
+            firstPeer.ExpectSASLAnonymousConnect();
+            firstPeer.ExpectOpen().Respond();
+            firstPeer.ExpectBegin().Respond();
+            
firstPeer.ExpectAttach().OfSender().WithTarget().WithAddress("test").And().Respond();
+            firstPeer.RemoteFlow().WithLinkCredit(1).Queue();
+            firstPeer.ExpectTransfer().WithNonNullPayload();
+            firstPeer.RemoteClose()
+                     
.WithErrorCondition(ConnectionError.CONNECTION_FORCED.ToString(), "Forced 
disconnect").Queue().AfterDelay(20);
+            firstPeer.ExpectClose();
+            firstPeer.Start();
+
+            secondPeer.ExpectSASLAnonymousConnect();
+            secondPeer.ExpectOpen().Respond();
+            secondPeer.ExpectBegin().Respond();
+            
secondPeer.ExpectAttach().OfSender().WithTarget().WithAddress("test").And().Respond();
+            secondPeer.Start();
+
+            string primaryAddress = firstPeer.ServerAddress;
+            int primaryPort = firstPeer.ServerPort;
+            string backupAddress = secondPeer.ServerAddress;
+            int backupPort = secondPeer.ServerPort;
+
+            logger.LogInformation("Test started, first peer listening on: 
{0}:{1}", primaryAddress, primaryPort);
+            logger.LogInformation("Test started, backup peer listening on: 
{0}:{1}", backupAddress, backupPort);
+
+            ConnectionOptions options = new ConnectionOptions();
+            options.ReconnectOptions.ReconnectEnabled = true;
+            options.ReconnectOptions.AddReconnectLocation(backupAddress, 
backupPort);
+
+            IClient container = IClient.Create();
+            IConnection connection = container.Connect(primaryAddress, 
primaryPort, options);
+            ISession session = connection.OpenSession();
+            ISender sender = session.OpenSender("test");
+
+            AtomicReference<ITracker> tracker = new();
+            AtomicReference<ClientException> error = new();
+            CountdownEvent latch = new CountdownEvent(1);
+
+            Task.Run(() =>
+            {
+               try
+               {
+                  tracker.Set(sender.Send(IMessage<string>.Create("Hello")));
+               }
+               catch (ClientException e)
+               {
+                  error.Set(e);
+               }
+               finally
+               {
+                  latch.Signal();
+               }
+            });
+
+            firstPeer.WaitForScriptToComplete();
+            secondPeer.WaitForScriptToComplete();
+            secondPeer.ExpectDetach().WithClosed(true).Respond();
+            secondPeer.ExpectEnd().Respond();
+            secondPeer.ExpectClose().Respond();
+
+            Assert.IsTrue(latch.Wait(TimeSpan.FromSeconds(10)), "Should have 
failed previously sent message");
+            Assert.IsNull(error.Get());
+            Assert.IsNotNull(tracker.Get());
+            Assert.Throws<ClientConnectionRemotelyClosedException>(() => 
tracker.Get().AwaitSettlement());
+
+            sender.Close();
+            session.Close();
+            connection.Close();
+
+            secondPeer.WaitForScriptToComplete();
+         }
+      }
    }
 }
\ No newline at end of file
diff --git 
a/test/Proton.Client.Tests/Client/Implementation/ClientReconnectStreamReceiverTest.cs
 
b/test/Proton.Client.Tests/Client/Implementation/ClientReconnectStreamReceiverTest.cs
index f15ccf0..c2ec0e9 100644
--- 
a/test/Proton.Client.Tests/Client/Implementation/ClientReconnectStreamReceiverTest.cs
+++ 
b/test/Proton.Client.Tests/Client/Implementation/ClientReconnectStreamReceiverTest.cs
@@ -15,10 +15,12 @@
  * limitations under the License.
  */
 
+using System;
 using System.IO;
 using System.Threading;
 using Apache.Qpid.Proton.Test.Driver;
 using Apache.Qpid.Proton.Types.Messaging;
+using Apache.Qpid.Proton.Types.Transport;
 using Microsoft.Extensions.Logging;
 using NUnit.Framework;
 
@@ -157,5 +159,84 @@ namespace Apache.Qpid.Proton.Client.Implementation
             secondPeer.WaitForScriptToComplete();
          }
       }
+
+      [Test]
+      public void TestReceiverWaitsWhenConnectionForcedDisconnect()
+      {
+         byte[] payload = CreateEncodedMessage(new AmqpValue("Hello World"));
+
+         using (ProtonTestServer firstPeer = new 
ProtonTestServer(loggerFactory))
+         using (ProtonTestServer secondPeer = new 
ProtonTestServer(loggerFactory))
+         {
+            firstPeer.ExpectSASLAnonymousConnect();
+            firstPeer.ExpectOpen().Respond();
+            firstPeer.ExpectBegin().Respond();
+            firstPeer.ExpectAttach().OfReceiver().Respond();
+            firstPeer.ExpectFlow().WithLinkCredit(10);
+            firstPeer.RemoteClose()
+                     
.WithErrorCondition(ConnectionError.CONNECTION_FORCED.ToString(), "Forced 
disconnect").Queue().AfterDelay(20);
+            firstPeer.ExpectClose();
+            firstPeer.Start();
+
+            secondPeer.ExpectSASLAnonymousConnect();
+            secondPeer.ExpectOpen().Respond();
+            secondPeer.ExpectBegin().Respond();
+            secondPeer.ExpectAttach().OfReceiver().Respond();
+            secondPeer.ExpectFlow().WithLinkCredit(10);
+            secondPeer.RemoteTransfer().WithHandle(0)
+                                      .WithDeliveryId(0)
+                                      .WithDeliveryTag(new byte[] { 1 })
+                                      .WithMore(false)
+                                      .WithSettled(true)
+                                      .WithMessageFormat(0)
+                                      
.WithPayload(payload).Queue().AfterDelay(5);
+            secondPeer.Start();
+
+            string primaryAddress = firstPeer.ServerAddress;
+            int primaryPort = firstPeer.ServerPort;
+            string backupAddress = secondPeer.ServerAddress;
+            int backupPort = secondPeer.ServerPort;
+
+            logger.LogInformation("Test started, first peer listening on: 
{0}:{1}", primaryAddress, primaryPort);
+            logger.LogInformation("Test started, backup peer listening on: 
{0}:{1}", backupAddress, backupPort);
+
+            ConnectionOptions options = new ConnectionOptions();
+            options.ReconnectOptions.ReconnectEnabled = true;
+            options.ReconnectOptions.AddReconnectLocation(backupAddress, 
backupPort);
+
+            IClient container = IClient.Create();
+            IConnection connection = container.Connect(primaryAddress, 
primaryPort, options);
+            StreamReceiverOptions rcvOpts = new StreamReceiverOptions()
+            {
+               AutoAccept = false
+            };
+            IStreamReceiver receiver = 
connection.OpenStreamReceiver("test-receiver", rcvOpts);
+
+            IStreamDelivery delivery = null;
+            try
+            {
+               delivery = receiver.Receive(System.TimeSpan.FromSeconds(10));
+            }
+            catch (Exception ex)
+            {
+               Assert.Fail("Should not have failed on blocking receive call." 
+ ex.Message);
+            }
+
+            Assert.IsNotNull(delivery);
+
+            firstPeer.WaitForScriptToComplete();
+            secondPeer.WaitForScriptToComplete();
+            secondPeer.ExpectDetach().Respond();
+            secondPeer.ExpectEnd().Respond();
+            secondPeer.ExpectClose().Respond();
+
+            delivery.Accept();
+
+            receiver.Close();
+            connection.Close();
+
+            Assert.IsNotNull(delivery);
+         }
+      }
    }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to