This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton-dotnet.git


The following commit(s) were added to refs/heads/main by this push:
     new 1cba20f  PROTON-2585 Add some cleanups around request timeout handling
1cba20f is described below

commit 1cba20ff8d79725ce2bf85bcf8408b495f5795a1
Author: Timothy Bish <[email protected]>
AuthorDate: Wed Jul 27 16:05:14 2022 -0400

    PROTON-2585 Add some cleanups around request timeout handling
    
    Attempt to cancel out some work when scheduled timeouts have been
    completed or failed already to avoid adding work to the task pool.
---
 .../Client/Implementation/ClientConnection.cs       |  7 ++++++-
 .../Client/Implementation/ClientReceiver.cs         |  7 +++++--
 .../Client/Implementation/ClientSender.cs           | 10 +++++++++-
 .../Client/Implementation/ClientSession.cs          | 21 ++++++++++++++-------
 .../Client/Implementation/ClientStreamReceiver.cs   |  7 +++++--
 .../Client/Implementation/ClientStreamSender.cs     | 10 +++++++++-
 6 files changed, 48 insertions(+), 14 deletions(-)

diff --git a/src/Proton.Client/Client/Implementation/ClientConnection.cs 
b/src/Proton.Client/Client/Implementation/ClientConnection.cs
index 34df3ba..5c79af2 100644
--- a/src/Proton.Client/Client/Implementation/ClientConnection.cs
+++ b/src/Proton.Client/Client/Implementation/ClientConnection.cs
@@ -629,10 +629,15 @@ namespace Apache.Qpid.Proton.Client.Implementation
 
       internal void Schedule(Action action, TimeSpan delay)
       {
-         // TODO: Either add scheduling to event loop or handle timeouts here 
somehow
          Task.Delay(delay).ContinueWith((t) => Execute(action));
       }
 
+      internal Task Schedule(Action action, TimeSpan delay, CancellationToken 
token)
+      {
+         return Task.Delay(delay, token).ContinueWith(
+            (t) => Execute(action), token, 
TaskContinuationOptions.NotOnCanceled, TaskScheduler.Default);
+      }
+
       #endregion
 
       #region Proton Engine and Connection event handlers
diff --git a/src/Proton.Client/Client/Implementation/ClientReceiver.cs 
b/src/Proton.Client/Client/Implementation/ClientReceiver.cs
index af49833..b6a685d 100644
--- a/src/Proton.Client/Client/Implementation/ClientReceiver.cs
+++ b/src/Proton.Client/Client/Implementation/ClientReceiver.cs
@@ -91,8 +91,11 @@ namespace Apache.Qpid.Proton.Client.Implementation
                      {
                         session.Schedule(() =>
                         {
-                           receiveRequests.Remove(receive);
-                           receive.TrySetResult(null);
+                           if (!receive.Task.IsCompleted)
+                           {
+                              receiveRequests.Remove(receive);
+                              receive.TrySetResult(null);
+                           }
                         }, timeout);
                      }
 
diff --git a/src/Proton.Client/Client/Implementation/ClientSender.cs 
b/src/Proton.Client/Client/Implementation/ClientSender.cs
index e74c79f..3dc161c 100644
--- a/src/Proton.Client/Client/Implementation/ClientSender.cs
+++ b/src/Proton.Client/Client/Implementation/ClientSender.cs
@@ -185,7 +185,10 @@ namespace Apache.Qpid.Proton.Client.Implementation
          {
             session.Schedule(() =>
             {
-               send.Failed(send.CreateSendTimedOutException());
+               if (!send.Request.IsCompleted)
+               {
+                  send.Failed(send.CreateSendTimedOutException());
+               }
             },
             TimeSpan.FromMilliseconds(Options.SendTimeout));
          }
@@ -492,6 +495,11 @@ namespace Apache.Qpid.Proton.Client.Implementation
          /// </summary>
          public IOutgoingDelivery Delivery => delivery;
 
+         /// <summary>
+         /// Gets the Task that backs the send operation.
+         /// </summary>
+         public Task Request => request.Task;
+
          public void Complete()
          {
             Send(delivery.State, delivery.IsSettled);
diff --git a/src/Proton.Client/Client/Implementation/ClientSession.cs 
b/src/Proton.Client/Client/Implementation/ClientSession.cs
index f20a291..fe42e4e 100644
--- a/src/Proton.Client/Client/Implementation/ClientSession.cs
+++ b/src/Proton.Client/Client/Implementation/ClientSession.cs
@@ -22,6 +22,7 @@ using Apache.Qpid.Proton.Client.Exceptions;
 using Apache.Qpid.Proton.Client.Concurrent;
 using Apache.Qpid.Proton.Client.Utilities;
 using Apache.Qpid.Proton.Logging;
+using System.Threading;
 
 namespace Apache.Qpid.Proton.Client.Implementation
 {
@@ -428,8 +429,6 @@ namespace Apache.Qpid.Proton.Client.Implementation
 
       internal void Execute(Action action) => connection.Execute(action);
 
-      internal void Schedule(Action action, TimeSpan delay) => 
connection.Schedule(action, delay);
-
       internal ClientSession Open()
       {
          protonSession.LocalOpenHandler(HandleLocalOpen)
@@ -450,16 +449,24 @@ namespace Apache.Qpid.Proton.Client.Implementation
          return this;
       }
 
+      internal void Schedule(Action action, TimeSpan delay) => 
connection.Schedule(action, delay);
+
+      internal void Schedule(Action action, TimeSpan delay, CancellationToken 
token)
+      {
+         connection.Schedule(action, delay, token);
+      }
+
       internal void ScheduleRequestTimeout<T>(TaskCompletionSource<T> request, 
long timeout, Func<ClientException> errorSupplier)
       {
          if (timeout != INFINITE)
          {
             connection.Schedule(() =>
-               _ = request.TrySetException(errorSupplier.Invoke()), 
TimeSpan.FromMilliseconds(timeout));
-         }
-         else
-         {
-            // TODO return null;
+            {
+               if (!request.Task.IsCompleted)
+               {
+                  _ = request.TrySetException(errorSupplier.Invoke());
+               }
+            }, TimeSpan.FromMilliseconds(timeout));
          }
       }
 
diff --git a/src/Proton.Client/Client/Implementation/ClientStreamReceiver.cs 
b/src/Proton.Client/Client/Implementation/ClientStreamReceiver.cs
index 81bb138..24ab436 100644
--- a/src/Proton.Client/Client/Implementation/ClientStreamReceiver.cs
+++ b/src/Proton.Client/Client/Implementation/ClientStreamReceiver.cs
@@ -102,8 +102,11 @@ namespace Apache.Qpid.Proton.Client.Implementation
                      {
                         session.Schedule(() =>
                         {
-                           receiveRequests.Remove(receive);
-                           receive.TrySetResult(null);
+                           if (!receive.Task.IsCompleted)
+                           {
+                              receiveRequests.Remove(receive);
+                              receive.TrySetResult(null);
+                           }
                         }, timeout);
                      }
 
diff --git a/src/Proton.Client/Client/Implementation/ClientStreamSender.cs 
b/src/Proton.Client/Client/Implementation/ClientStreamSender.cs
index e755109..6a7a925 100644
--- a/src/Proton.Client/Client/Implementation/ClientStreamSender.cs
+++ b/src/Proton.Client/Client/Implementation/ClientStreamSender.cs
@@ -290,7 +290,10 @@ namespace Apache.Qpid.Proton.Client.Implementation
          {
             session.Schedule(() =>
             {
-               send.Failed(send.CreateSendTimedOutException());
+               if (!send.Request.IsCompleted)
+               {
+                  send.Failed(send.CreateSendTimedOutException());
+               }
             },
             TimeSpan.FromMilliseconds(Options.SendTimeout));
          }
@@ -665,6 +668,11 @@ namespace Apache.Qpid.Proton.Client.Implementation
          /// </summary>
          public IProtonBuffer Payload => payload;
 
+         /// <summary>
+         /// Gets the task that tracks the completion of this send operation.
+         /// </summary>
+         public Task Request => request.Task;
+
          /// <summary>
          /// Returns the proton outgoing delivery object that is contained in 
this envelope
          /// which can be null if the payload to be sent has not yet had a 
transmit attempt


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to