This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton-dotnet.git


The following commit(s) were added to refs/heads/main by this push:
     new ab43220  PROTON-2649 Update sender sendable state on check to reflect 
state
ab43220 is described below

commit ab43220acf92d14d3a41fb21e1f668edd7c4c43a
Author: Timothy Bish <tabish...@gmail.com>
AuthorDate: Mon Nov 7 15:35:39 2022 -0500

    PROTON-2649 Update sender sendable state on check to reflect state
    
    When a sender is asked if it is sendable update the state if the session
    capacity reports that it is not available while the sender link does
    still have credit.  This allows the engine to notifity all senders in
    the not sendable state that they have recovered.
---
 src/Proton/Engine/Implementation/ProtonSender.cs   |   2 +-
 .../Engine/Implementation/ProtonSessionTest.cs     | 184 +++++++++++++++++++++
 2 files changed, 185 insertions(+), 1 deletion(-)

diff --git a/src/Proton/Engine/Implementation/ProtonSender.cs 
b/src/Proton/Engine/Implementation/ProtonSender.cs
index 9e251b9..ecef356 100644
--- a/src/Proton/Engine/Implementation/ProtonSender.cs
+++ b/src/Proton/Engine/Implementation/ProtonSender.cs
@@ -54,7 +54,7 @@ namespace Apache.Qpid.Proton.Engine.Implementation
 
       public override uint Credit => CreditState.Credit;
 
-      public bool IsSendable => sendable && sessionWindow.IsSendable;
+      public bool IsSendable => sendable = sendable && 
sessionWindow.IsSendable;
 
       public override bool IsDraining => CreditState.IsDrain;
 
diff --git a/test/Proton.Tests/Engine/Implementation/ProtonSessionTest.cs 
b/test/Proton.Tests/Engine/Implementation/ProtonSessionTest.cs
index 805814d..edac887 100644
--- a/test/Proton.Tests/Engine/Implementation/ProtonSessionTest.cs
+++ b/test/Proton.Tests/Engine/Implementation/ProtonSessionTest.cs
@@ -2779,6 +2779,190 @@ namespace Apache.Qpid.Proton.Engine.Implementation
          Assert.IsNull(failure);
       }
 
+      [Test]
+      public void 
TestBothSendersNotifiedAfterSessionOutgoingWindowOpenedWhenBothRequestedSendableState()
+      {
+         IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
+         engine.ErrorHandler((error) => failure = error.FailureCause);
+         Queue<Action> asyncIOCallbacks = new Queue<Action>();
+         ProtonTestConnector peer = CreateTestPeer(engine, asyncIOCallbacks);
+
+         byte[] payload = new byte[] { 0, 1, 2, 3, 4 };
+         IDeliveryTagGenerator generator = 
ProtonDeliveryTagTypes.Pooled.NewTagGenerator();
+
+         peer.ExpectAMQPHeader().RespondWithAMQPHeader();
+         peer.ExpectOpen().WithMaxFrameSize(1024).Respond();
+         
peer.ExpectBegin().WithNextOutgoingId(0).Respond().WithNextOutgoingId(0);
+         peer.ExpectAttach().Respond();
+         
peer.RemoteFlow().WithLinkCredit(20).WithNextIncomingId(0).WithIncomingWindow(8192).Queue();
+         peer.ExpectAttach().Respond();
+         
peer.RemoteFlow().WithLinkCredit(20).WithNextIncomingId(0).WithIncomingWindow(8192).Queue();
+
+         IConnection connection = engine.Start();
+         connection.MaxFrameSize = 1024;
+         connection.Open();
+         ISession session = connection.Session();
+         session.OutgoingCapacity = 1024;
+         session.Open();
+         ISender sender1 = session.Sender("test1");
+         sender1.DeliveryTagGenerator = generator;
+         sender1.Open();
+         ISender sender2 = session.Sender("test2");
+         sender2.DeliveryTagGenerator = generator;
+         sender2.Open();
+
+         peer.WaitForScriptToComplete();
+         peer.ExpectTransfer().WithPayload(payload);
+
+         int sender1CreditStateUpdated = 0;
+         sender1.CreditStateUpdateHandler((self) =>
+         {
+            sender1CreditStateUpdated++;
+         });
+
+         int sender2CreditStateUpdated = 0;
+         sender2.CreditStateUpdateHandler((self) =>
+         {
+            sender2CreditStateUpdated++;
+         });
+
+         Assert.IsTrue(sender1.IsSendable);
+         Assert.AreEqual(1024, session.RemainingOutgoingCapacity);
+         Assert.IsTrue(sender2.IsSendable);
+         Assert.AreEqual(1024, session.RemainingOutgoingCapacity);
+
+         // Open, Begin, Attach, Attach
+         Assert.AreEqual(4, asyncIOCallbacks.Count);
+         foreach (Action action in asyncIOCallbacks)
+         {
+            action.Invoke();
+         }
+         asyncIOCallbacks.Clear();
+
+         IOutgoingDelivery delivery = sender1.Next();
+         delivery.WriteBytes(ProtonByteBufferAllocator.Instance.Wrap(payload));
+
+         peer.WaitForScriptToComplete();
+
+         Assert.AreEqual(1, asyncIOCallbacks.Count);
+
+         Assert.IsFalse(sender1.IsSendable);
+         Assert.AreEqual(0, session.RemainingOutgoingCapacity);
+         // Sender 2 shouldn't be able to send since sender 1 consumed the 
outgoing window
+         Assert.IsFalse(sender2.IsSendable);
+         Assert.AreEqual(0, session.RemainingOutgoingCapacity);
+
+         // Free a frame's worth of window which should trigger both senders 
sendable update event
+         asyncIOCallbacks.Dequeue().Invoke();
+         Assert.AreEqual(0, asyncIOCallbacks.Count);
+
+         Assert.IsTrue(sender1.IsSendable);
+         Assert.AreEqual(1024, session.RemainingOutgoingCapacity);
+         Assert.AreEqual(1, sender1CreditStateUpdated);
+         Assert.IsTrue(sender2.IsSendable);
+         Assert.AreEqual(1024, session.RemainingOutgoingCapacity);
+         Assert.AreEqual(1, sender2CreditStateUpdated);
+
+         peer.WaitForScriptToComplete();
+         Assert.IsNull(failure);
+      }
+
+      [Test]
+      public void 
TestSingleSenderUpdatedWhenOutgoingWindowOpenedForTwoIfFirstConsumesSessionOutgoingWindow()
+      {
+         IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
+         engine.ErrorHandler((error) => failure = error.FailureCause);
+         Queue<Action> asyncIOCallbacks = new Queue<Action>();
+         ProtonTestConnector peer = CreateTestPeer(engine, asyncIOCallbacks);
+
+         byte[] payload = new byte[] { 0, 1, 2, 3, 4 };
+         IDeliveryTagGenerator generator = 
ProtonDeliveryTagTypes.Pooled.NewTagGenerator();
+
+         peer.ExpectAMQPHeader().RespondWithAMQPHeader();
+         peer.ExpectOpen().WithMaxFrameSize(1024).Respond();
+         
peer.ExpectBegin().WithNextOutgoingId(0).Respond().WithNextOutgoingId(0);
+         peer.ExpectAttach().Respond();
+         
peer.RemoteFlow().WithLinkCredit(20).WithNextIncomingId(0).WithIncomingWindow(8192).Queue();
+         peer.ExpectAttach().Respond();
+         
peer.RemoteFlow().WithLinkCredit(20).WithNextIncomingId(0).WithIncomingWindow(8192).Queue();
+
+         IConnection connection = engine.Start();
+         connection.MaxFrameSize = 1024;
+         connection.Open();
+         ISession session = connection.Session();
+         session.OutgoingCapacity = 1024;
+         session.Open();
+         ISender sender1 = session.Sender("test1");
+         sender1.DeliveryTagGenerator = generator;
+         sender1.Open();
+         ISender sender2 = session.Sender("test2");
+         sender2.DeliveryTagGenerator = generator;
+         sender2.Open();
+
+         peer.WaitForScriptToComplete();
+         peer.ExpectTransfer().WithPayload(payload);
+
+         int sender1CreditStateUpdated = 0;
+         sender1.CreditStateUpdateHandler((self) =>
+         {
+            sender1CreditStateUpdated++;
+            if (self.IsSendable)
+            {
+               IOutgoingDelivery delivery = self.Next();
+               
delivery.WriteBytes(ProtonByteBufferAllocator.Instance.Wrap(payload));
+            }
+         });
+
+         int sender2CreditStateUpdated = 0;
+         sender2.CreditStateUpdateHandler((self) =>
+         {
+            sender2CreditStateUpdated++;
+         });
+
+         Assert.IsTrue(sender1.IsSendable);
+         Assert.AreEqual(1024, session.RemainingOutgoingCapacity);
+         Assert.IsTrue(sender2.IsSendable);
+         Assert.AreEqual(1024, session.RemainingOutgoingCapacity);
+
+         // Open, Begin, Attach, Attach
+         Assert.AreEqual(4, asyncIOCallbacks.Count);
+         foreach (Action action in asyncIOCallbacks)
+         {
+            action.Invoke();
+         }
+         asyncIOCallbacks.Clear();
+
+         IOutgoingDelivery delivery = sender1.Next();
+         delivery.WriteBytes(ProtonByteBufferAllocator.Instance.Wrap(payload));
+
+         peer.WaitForScriptToComplete();
+         peer.ExpectTransfer().WithPayload(payload);
+
+         Assert.AreEqual(1, asyncIOCallbacks.Count);
+
+         Assert.IsFalse(sender1.IsSendable);
+         Assert.AreEqual(0, session.RemainingOutgoingCapacity);
+         // Sender 2 shouldn't be able to send since sender 1 consumed the 
outgoing window
+         Assert.IsFalse(sender2.IsSendable);
+         Assert.AreEqual(0, session.RemainingOutgoingCapacity);
+
+         // Should trigger sender 1 to send which should exhaust the outgoing 
credit
+         asyncIOCallbacks.Dequeue().Invoke();
+         Assert.AreEqual(1, asyncIOCallbacks.Count);
+
+         Assert.IsFalse(sender1.IsSendable);
+         Assert.AreEqual(0, session.RemainingOutgoingCapacity);
+         Assert.AreEqual(1, sender1CreditStateUpdated);
+         Assert.IsFalse(sender2.IsSendable);
+         Assert.AreEqual(0, session.RemainingOutgoingCapacity);
+         // Should not have triggered an event for sender 2 being able to send 
since
+         // sender one consumed the outgoing window already.
+         Assert.AreEqual(0, sender2CreditStateUpdated);
+
+         peer.WaitForScriptToComplete();
+         Assert.IsNull(failure);
+      }
+
       [Test]
       public void 
TestHandleInUseErrorReturnedIfAttachWithAlreadyBoundHandleArrives()
       {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to