This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton-dotnet.git


The following commit(s) were added to refs/heads/main by this push:
     new 2cbcf3a  PROTON-2722 Race during receiver queue count can break next 
receiver
2cbcf3a is described below

commit 2cbcf3a72f46445d4d50059547802d2cd93fe67c
Author: Timothy Bish <[email protected]>
AuthorDate: Wed May 3 14:43:08 2023 -0400

    PROTON-2722 Race during receiver queue count can break next receiver
    
    A race on the check of the queue backlog for receivers can break the next
    receiver API and can also throw an error from the public queue count API
    dual to concurrent updates. This fixes a very intermittent CI test failure
    and makes the public queued deliveries API stable against concurrent access.
---
 .../Implementation/ClientNextReceiverSelector.cs   | 14 ++++++------
 .../Implementation/ClientReceiverLinkType.cs       | 25 +++++++++++++++++++++-
 2 files changed, 31 insertions(+), 8 deletions(-)

diff --git 
a/src/Proton.Client/Client/Implementation/ClientNextReceiverSelector.cs 
b/src/Proton.Client/Client/Implementation/ClientNextReceiverSelector.cs
index e9dfff7..751f547 100644
--- a/src/Proton.Client/Client/Implementation/ClientNextReceiverSelector.cs
+++ b/src/Proton.Client/Client/Implementation/ClientNextReceiverSelector.cs
@@ -142,7 +142,7 @@ namespace Apache.Qpid.Proton.Client.Implementation
       private ClientReceiver SelectRandomReceiver()
       {
          IEnumerable<Engine.IReceiver> receivers = 
session.ProtonSession.Receivers.
-            Where(r => r.LinkedResource is ClientReceiver receiver && 
receiver.QueuedDeliveries > 0);
+            Where(r => r.LinkedResource is ClientReceiver receiver && 
receiver.GetQueuedDeliveries() > 0);
 
          Engine.IReceiver receiver = 
receivers.ElementAtOrDefault(random.Next(0, receivers.Count()));
 
@@ -163,7 +163,7 @@ namespace Apache.Qpid.Proton.Client.Implementation
                {
                   if (foundLast)
                   {
-                     if (candidate.QueuedDeliveries > 0)
+                     if (candidate.GetQueuedDeliveries() > 0)
                      {
                         result = candidate;
                      }
@@ -187,7 +187,7 @@ namespace Apache.Qpid.Proton.Client.Implementation
       {
          Engine.IReceiver receiver =
             session.ProtonSession.Receivers.Where(
-               r => r.LinkedResource is ClientReceiver receiver && 
receiver.QueuedDeliveries > 0).FirstOrDefault();
+               r => r.LinkedResource is ClientReceiver receiver && 
receiver.GetQueuedDeliveries() > 0).FirstOrDefault();
 
          return (ClientReceiver)receiver?.LinkedResource;
       }
@@ -195,7 +195,7 @@ namespace Apache.Qpid.Proton.Client.Implementation
       private ClientReceiver SelectLargestBacklog()
       {
          IEnumerable<Engine.IReceiver> receivers = 
session.ProtonSession.Receivers.
-            Where(r => r.LinkedResource is ClientReceiver receiver && 
receiver.QueuedDeliveries > 0);
+            Where(r => r.LinkedResource is ClientReceiver receiver && 
receiver.GetQueuedDeliveries() > 0);
 
          ClientReceiver result = null;
 
@@ -203,7 +203,7 @@ namespace Apache.Qpid.Proton.Client.Implementation
          {
             ClientReceiver candidate = (ClientReceiver)receiver.LinkedResource;
 
-            if (result == null || result.QueuedDeliveries < 
candidate.QueuedDeliveries)
+            if (result == null || result.GetQueuedDeliveries() < 
candidate.GetQueuedDeliveries())
             {
                result = candidate;
             }
@@ -215,7 +215,7 @@ namespace Apache.Qpid.Proton.Client.Implementation
       private ClientReceiver SelectSmallestBacklog()
       {
          IEnumerable<Engine.IReceiver> receivers = 
session.ProtonSession.Receivers.
-            Where(r => r.LinkedResource is ClientReceiver receiver && 
receiver.QueuedDeliveries > 0);
+            Where(r => r.LinkedResource is ClientReceiver receiver && 
receiver.GetQueuedDeliveries() > 0);
 
          ClientReceiver result = null;
 
@@ -223,7 +223,7 @@ namespace Apache.Qpid.Proton.Client.Implementation
          {
             ClientReceiver candidate = (ClientReceiver)receiver.LinkedResource;
 
-            if (result == null || result.QueuedDeliveries > 
candidate.QueuedDeliveries)
+            if (result == null || result.GetQueuedDeliveries() > 
candidate.GetQueuedDeliveries())
             {
                result = candidate;
             }
diff --git a/src/Proton.Client/Client/Implementation/ClientReceiverLinkType.cs 
b/src/Proton.Client/Client/Implementation/ClientReceiverLinkType.cs
index 72f41d0..9db0b19 100644
--- a/src/Proton.Client/Client/Implementation/ClientReceiverLinkType.cs
+++ b/src/Proton.Client/Client/Implementation/ClientReceiverLinkType.cs
@@ -49,7 +49,30 @@ namespace Apache.Qpid.Proton.Client.Implementation
          }
       }
 
-      public virtual int QueuedDeliveries => 
protonLink.Unsettled.Count(delivery => delivery.LinkedResource == null);
+      internal int GetQueuedDeliveries()
+      {
+         // Internal implementation operates on the current thread which 
should generally
+         // be the connection execution thread.
+         return protonLink.Unsettled.Count(delivery => delivery.LinkedResource 
== null);
+      }
+
+      public virtual int QueuedDeliveries
+      {
+         get
+         {
+            CheckClosedOrFailed();
+            TaskCompletionSource<int> queuedCount = new();
+            session.Execute(() =>
+            {
+               if (NotClosedOrFailed(queuedCount))
+               {
+                  _ = queuedCount.TrySetResult(GetQueuedDeliveries());
+               }
+            });
+
+            return 
queuedCount.Task.ConfigureAwait(false).GetAwaiter().GetResult();
+         }
+      }
 
       public LinkType AddCredit(uint credit)
       {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to