This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton-dotnet.git


The following commit(s) were added to refs/heads/main by this push:
     new 3f089c3  PROTON-2829 Remove timed out sends from blocked queue
3f089c3 is described below

commit 3f089c35d9a15a4ab62bc7e63f5f6aca376b2712
Author: Timothy Bish <tabish...@gmail.com>
AuthorDate: Fri Jun 7 18:41:34 2024 -0400

    PROTON-2829 Remove timed out sends from blocked queue
    
    Ensure that sends that time out waiting for credit are removed from the
    blocked queue and also abort any partial send that was blocked waiting
    for credit to ensure no leak of deliveries from the sender link.
---
 .../Client/Implementation/ClientSender.cs          |  35 +++++-
 .../Client/Implementation/ClientStreamSender.cs    |  34 +++++-
 .../Client/Implementation/ClientSenderTest.cs      | 131 +++++++++++++++++++++
 .../Implementation/ClientStreamSenderTest.cs       |  64 ++++++++++
 4 files changed, 260 insertions(+), 4 deletions(-)

diff --git a/src/Proton.Client/Client/Implementation/ClientSender.cs 
b/src/Proton.Client/Client/Implementation/ClientSender.cs
index 2cf317f..01ab227 100644
--- a/src/Proton.Client/Client/Implementation/ClientSender.cs
+++ b/src/Proton.Client/Client/Implementation/ClientSender.cs
@@ -181,24 +181,38 @@ namespace Apache.Qpid.Proton.Client.Implementation
 
       private void AddToTailOfBlockedQueue(ClientOutgoingEnvelope send)
       {
+         blocked.EnqueueBack(send);
          if (Options.SendTimeout > 0)
          {
+            send.TimeoutApplied = true;
             session.Schedule(() =>
             {
                if (!send.Request.IsCompleted)
                {
+                  blocked.Remove(send);
                   send.Failed(send.CreateSendTimedOutException());
                }
             },
             TimeSpan.FromMilliseconds(Options.SendTimeout));
          }
-
-         blocked.EnqueueBack(send);
       }
 
       private void AddToHeadOfBlockedQueue(ClientOutgoingEnvelope send)
       {
          blocked.EnqueueFront(send);
+         if (Options.SendTimeout > 0 && !send.TimeoutApplied)
+         {
+            send.TimeoutApplied = true;
+            session.Schedule(() =>
+            {
+               if (!send.Request.IsCompleted)
+               {
+                  blocked.Remove(send);
+                  send.Failed(send.CreateSendTimedOutException());
+               }
+            },
+            TimeSpan.FromMilliseconds(Options.SendTimeout));
+         }
       }
 
       #endregion
@@ -501,6 +515,11 @@ namespace Apache.Qpid.Proton.Client.Implementation
          /// </summary>
          public Task Request => request.Task;
 
+         /// <summary>
+         /// Gets or sets if a timeout has been assigned to time out a blocked 
send operation.
+         /// </summary>
+         public bool TimeoutApplied { get; set; }
+
          public void Complete()
          {
             Send(delivery.State, delivery.IsSettled);
@@ -562,6 +581,18 @@ namespace Apache.Qpid.Proton.Client.Implementation
 
          public ClientOutgoingEnvelope Failed(ClientException exception)
          {
+            if (delivery != null)
+            {
+               try
+               {
+                  delivery.Abort();
+               }
+               catch (Exception)
+               {
+                  // Can fail is offline so ignore any exceptions from abort.
+               }
+            }
+
             request.TrySetException(exception);
             return this;
          }
diff --git a/src/Proton.Client/Client/Implementation/ClientStreamSender.cs 
b/src/Proton.Client/Client/Implementation/ClientStreamSender.cs
index 73223a9..75f10f8 100644
--- a/src/Proton.Client/Client/Implementation/ClientStreamSender.cs
+++ b/src/Proton.Client/Client/Implementation/ClientStreamSender.cs
@@ -286,24 +286,37 @@ namespace Apache.Qpid.Proton.Client.Implementation
 
       private void AddToTailOfBlockedQueue(ClientOutgoingEnvelope send)
       {
+         blocked.EnqueueBack(send);
          if (Options.SendTimeout > 0)
          {
             session.Schedule(() =>
             {
                if (!send.Request.IsCompleted)
                {
+                  blocked.Remove(send);
                   send.Failed(send.CreateSendTimedOutException());
                }
             },
             TimeSpan.FromMilliseconds(Options.SendTimeout));
          }
-
-         blocked.EnqueueBack(send);
       }
 
       private void AddToHeadOfBlockedQueue(ClientOutgoingEnvelope send)
       {
          blocked.EnqueueFront(send);
+         if (Options.SendTimeout > 0 && !send.TimeoutApplied)
+         {
+            send.TimeoutApplied = true;
+            session.Schedule(() =>
+            {
+               if (!send.Request.IsCompleted)
+               {
+                  blocked.Remove(send);
+                  send.Failed(send.CreateSendTimedOutException());
+               }
+            },
+            TimeSpan.FromMilliseconds(Options.SendTimeout));
+         }
       }
 
       private IStreamTracker CreateTracker(IOutgoingDelivery delivery)
@@ -681,6 +694,11 @@ namespace Apache.Qpid.Proton.Client.Implementation
          /// </summary>
          public IOutgoingDelivery Delivery => delivery;
 
+         /// <summary>
+         /// Gets or sets if a timeout has been assigned to time out a blocked 
send operation.
+         /// </summary>
+         public bool TimeoutApplied { get; set; }
+
          public void Abort()
          {
             delivery.Abort();
@@ -756,6 +774,18 @@ namespace Apache.Qpid.Proton.Client.Implementation
 
          public ClientOutgoingEnvelope Failed(ClientException exception)
          {
+            if (delivery != null)
+            {
+               try
+               {
+                  delivery.Abort();
+               }
+               catch (Exception)
+               {
+                  // Can fail is offline so ignore any exceptions from abort.
+               }
+            }
+
             request.TrySetException(exception);
             return this;
          }
diff --git a/test/Proton.Client.Tests/Client/Implementation/ClientSenderTest.cs 
b/test/Proton.Client.Tests/Client/Implementation/ClientSenderTest.cs
index 088b93a..b03d4cb 100644
--- a/test/Proton.Client.Tests/Client/Implementation/ClientSenderTest.cs
+++ b/test/Proton.Client.Tests/Client/Implementation/ClientSenderTest.cs
@@ -583,6 +583,69 @@ namespace Apache.Qpid.Proton.Client.Implementation
          }
       }
 
+      [Test]
+      public void TestSendTimesOutWhenNoCreditIssuedAndThenIssueCredit()
+      {
+         using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+         {
+            peer.ExpectSASLAnonymousConnect();
+            peer.ExpectOpen().Respond();
+            peer.ExpectBegin().Respond();
+            peer.ExpectAttach().OfSender().Respond();
+            peer.Start();
+
+            string remoteAddress = peer.ServerAddress;
+            int remotePort = peer.ServerPort;
+
+            logger.LogInformation("Test started, peer listening on: {0}:{1}", 
remoteAddress, remotePort);
+
+            IClient container = IClient.Create();
+            ConnectionOptions options = new ConnectionOptions()
+            {
+               SendTimeout = 10
+            };
+            IConnection connection = container.Connect(remoteAddress, 
remotePort, options);
+            ISession session = connection.OpenSession();
+            ISender sender = session.OpenSender("test-queue").OpenTask.Result;
+
+            IMessage<string> message = IMessage<string>.Create("Hello World");
+            try
+            {
+               sender.Send(message);
+               Assert.Fail("Should throw a send timed out exception");
+            }
+            catch (ClientSendTimedOutException)
+            {
+               // Expected error, ignore
+            }
+
+            peer.WaitForScriptToComplete();
+            peer.RemoteFlow().WithLinkCredit(1).Now();
+            peer.ExpectAttach().OfSender().Respond();
+            peer.ExpectTransfer().WithNonNullPayload();
+            peer.ExpectDetach().Respond();
+            peer.ExpectClose().Respond();
+
+            // Ensure the send happens after the remote has sent a flow with 
credit
+            _ = session.OpenSender("test-queue-2").OpenTask.Result;
+
+            try
+            {
+               sender.Send(IMessage<string>.Create("Hello World 2"));
+            }
+            catch (ClientException ex)
+            {
+               logger.LogTrace("Error on second send: {0}", ex);
+               Assert.Fail("Should not throw an exception");
+            }
+
+            sender.CloseAsync().Wait(TimeSpan.FromSeconds(10));
+            connection.CloseAsync().Wait(TimeSpan.FromSeconds(10));
+
+            peer.WaitForScriptToComplete();
+         }
+      }
+
       [Test]
       public void TestSendCompletesWhenCreditEventuallyOffered()
       {
@@ -3260,5 +3323,73 @@ namespace Apache.Qpid.Proton.Client.Implementation
             peer.WaitForScriptToComplete();
          }
       }
+
+      [Test]
+      public void TestSendTimesOutIfNotAllMessageFramesCanBeSent()
+      {
+         using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+         {
+            peer.ExpectSASLAnonymousConnect();
+            peer.ExpectOpen().Respond();
+            peer.ExpectBegin().WithNextOutgoingId(0).Respond();
+            peer.ExpectAttach().OfSender().Respond();
+            
peer.RemoteFlow().WithIncomingWindow(2).WithNextIncomingId(0).WithLinkCredit(1).Queue();
+            
peer.ExpectTransfer().WithDeliveryId(0).WithNonNullPayload().WithMore(true);
+            peer.ExpectTransfer().WithNonNullPayload().WithMore(true);
+            peer.ExpectTransfer().WithNullPayload().WithAborted(true);
+            peer.Start();
+
+            string remoteAddress = peer.ServerAddress;
+            int remotePort = peer.ServerPort;
+
+            logger.LogInformation("Test started, peer listening on: {0}:{1}", 
remoteAddress, remotePort);
+
+            IClient container = IClient.Create();
+            ConnectionOptions options = new ConnectionOptions()
+            {
+               MaxFrameSize = 1024,
+               SendTimeout = 25
+            };
+            IConnection connection = container.Connect(remoteAddress, 
remotePort, options);
+            ISender sender = 
connection.OpenSender("test-queue").OpenTask.Result;
+
+            byte[] payload = new byte[4800];
+            Array.Fill(payload, (byte)1);
+
+            try
+            {
+               sender.Send(IMessage<string>.Create(payload));
+            }
+            catch (ClientSendTimedOutException e)
+            {
+               logger.LogTrace("send failed with expected error: {0}", e);
+            }
+
+            peer.WaitForScriptToComplete();
+            
peer.RemoteFlow().WithIncomingWindow(1).WithNextIncomingId(4).WithLinkCredit(1).Now();
+            peer.ExpectAttach().OfSender().Respond();
+            peer.ExpectTransfer().WithDeliveryId(1).WithNonNullPayload();
+            peer.ExpectDetach().Respond();
+            peer.ExpectClose().Respond();
+
+            // Ensure the send happens after the remote has sent a flow with 
credit
+            _ = connection.OpenSender("test-queue-2").OpenTask.Result;
+
+            try
+            {
+               sender.Send(IMessage<string>.Create("Hello World 2"));
+            }
+            catch (ClientException ex)
+            {
+               logger.LogTrace("Error on second send: {0}", ex);
+               Assert.Fail("Should not throw an exception");
+            }
+
+            sender.CloseAsync();
+            connection.CloseAsync();
+
+            peer.WaitForScriptToComplete();
+         }
+      }
    }
 }
\ No newline at end of file
diff --git 
a/test/Proton.Client.Tests/Client/Implementation/ClientStreamSenderTest.cs 
b/test/Proton.Client.Tests/Client/Implementation/ClientStreamSenderTest.cs
index 54656ba..9e76808 100644
--- a/test/Proton.Client.Tests/Client/Implementation/ClientStreamSenderTest.cs
+++ b/test/Proton.Client.Tests/Client/Implementation/ClientStreamSenderTest.cs
@@ -3251,5 +3251,69 @@ namespace Apache.Qpid.Proton.Client.Implementation
             peer.WaitForScriptToComplete();
          }
       }
+
+      [Test]
+      public void TestSendTimesOutWhenNoCreditIssuedAndThenIssueCredit()
+      {
+         using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+         {
+            peer.ExpectSASLAnonymousConnect();
+            peer.ExpectOpen().Respond();
+            peer.ExpectBegin().Respond();
+            peer.ExpectAttach().OfSender().Respond();
+            peer.Start();
+
+            string remoteAddress = peer.ServerAddress;
+            int remotePort = peer.ServerPort;
+
+            logger.LogInformation("Test started, peer listening on: {0}:{1}", 
remoteAddress, remotePort);
+
+            IClient container = IClient.Create();
+            ConnectionOptions options = new ConnectionOptions()
+            {
+               SendTimeout = 10
+            };
+            IConnection connection = container.Connect(remoteAddress, 
remotePort, options);
+            IStreamSender sender = 
connection.OpenStreamSender("test-queue").OpenTask.Result;
+
+            IMessage<string> message = IMessage<string>.Create("Hello World");
+            try
+            {
+               sender.Send(message);
+               Assert.Fail("Should throw a send timed out exception");
+            }
+            catch (ClientSendTimedOutException)
+            {
+               // Expected error, ignore
+            }
+
+            peer.WaitForScriptToComplete();
+            peer.RemoteFlow().WithLinkCredit(1).Now();
+            peer.ExpectBegin().Respond();
+            peer.ExpectAttach().OfSender().Respond();
+            peer.ExpectTransfer().WithNonNullPayload();
+            peer.ExpectDetach().Respond();
+            peer.ExpectEnd().Respond();
+            peer.ExpectClose().Respond();
+
+            // Ensure the send happens after the remote has sent a flow with 
credit
+            _ = connection.OpenSender("test-queue-2").OpenTask.Result;
+
+            try
+            {
+               sender.Send(IMessage<string>.Create("Hello World 2"));
+            }
+            catch (ClientException ex)
+            {
+               logger.LogTrace("Error on second send: {0}", ex);
+               Assert.Fail("Should not throw an exception");
+            }
+
+            sender.CloseAsync().Wait(TimeSpan.FromSeconds(10));
+            connection.CloseAsync().Wait(TimeSpan.FromSeconds(10));
+
+            peer.WaitForScriptToComplete();
+         }
+      }
    }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to