This is an automated email from the ASF dual-hosted git repository. tabish pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton-dotnet.git
The following commit(s) were added to refs/heads/main by this push: new ab43220 PROTON-2649 Update sender sendable state on check to reflect state ab43220 is described below commit ab43220acf92d14d3a41fb21e1f668edd7c4c43a Author: Timothy Bish <tabish...@gmail.com> AuthorDate: Mon Nov 7 15:35:39 2022 -0500 PROTON-2649 Update sender sendable state on check to reflect state When a sender is asked if it is sendable update the state if the session capacity reports that it is not available while the sender link does still have credit. This allows the engine to notifity all senders in the not sendable state that they have recovered. --- src/Proton/Engine/Implementation/ProtonSender.cs | 2 +- .../Engine/Implementation/ProtonSessionTest.cs | 184 +++++++++++++++++++++ 2 files changed, 185 insertions(+), 1 deletion(-) diff --git a/src/Proton/Engine/Implementation/ProtonSender.cs b/src/Proton/Engine/Implementation/ProtonSender.cs index 9e251b9..ecef356 100644 --- a/src/Proton/Engine/Implementation/ProtonSender.cs +++ b/src/Proton/Engine/Implementation/ProtonSender.cs @@ -54,7 +54,7 @@ namespace Apache.Qpid.Proton.Engine.Implementation public override uint Credit => CreditState.Credit; - public bool IsSendable => sendable && sessionWindow.IsSendable; + public bool IsSendable => sendable = sendable && sessionWindow.IsSendable; public override bool IsDraining => CreditState.IsDrain; diff --git a/test/Proton.Tests/Engine/Implementation/ProtonSessionTest.cs b/test/Proton.Tests/Engine/Implementation/ProtonSessionTest.cs index 805814d..edac887 100644 --- a/test/Proton.Tests/Engine/Implementation/ProtonSessionTest.cs +++ b/test/Proton.Tests/Engine/Implementation/ProtonSessionTest.cs @@ -2779,6 +2779,190 @@ namespace Apache.Qpid.Proton.Engine.Implementation Assert.IsNull(failure); } + [Test] + public void TestBothSendersNotifiedAfterSessionOutgoingWindowOpenedWhenBothRequestedSendableState() + { + IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine(); + engine.ErrorHandler((error) => failure = error.FailureCause); + Queue<Action> asyncIOCallbacks = new Queue<Action>(); + ProtonTestConnector peer = CreateTestPeer(engine, asyncIOCallbacks); + + byte[] payload = new byte[] { 0, 1, 2, 3, 4 }; + IDeliveryTagGenerator generator = ProtonDeliveryTagTypes.Pooled.NewTagGenerator(); + + peer.ExpectAMQPHeader().RespondWithAMQPHeader(); + peer.ExpectOpen().WithMaxFrameSize(1024).Respond(); + peer.ExpectBegin().WithNextOutgoingId(0).Respond().WithNextOutgoingId(0); + peer.ExpectAttach().Respond(); + peer.RemoteFlow().WithLinkCredit(20).WithNextIncomingId(0).WithIncomingWindow(8192).Queue(); + peer.ExpectAttach().Respond(); + peer.RemoteFlow().WithLinkCredit(20).WithNextIncomingId(0).WithIncomingWindow(8192).Queue(); + + IConnection connection = engine.Start(); + connection.MaxFrameSize = 1024; + connection.Open(); + ISession session = connection.Session(); + session.OutgoingCapacity = 1024; + session.Open(); + ISender sender1 = session.Sender("test1"); + sender1.DeliveryTagGenerator = generator; + sender1.Open(); + ISender sender2 = session.Sender("test2"); + sender2.DeliveryTagGenerator = generator; + sender2.Open(); + + peer.WaitForScriptToComplete(); + peer.ExpectTransfer().WithPayload(payload); + + int sender1CreditStateUpdated = 0; + sender1.CreditStateUpdateHandler((self) => + { + sender1CreditStateUpdated++; + }); + + int sender2CreditStateUpdated = 0; + sender2.CreditStateUpdateHandler((self) => + { + sender2CreditStateUpdated++; + }); + + Assert.IsTrue(sender1.IsSendable); + Assert.AreEqual(1024, session.RemainingOutgoingCapacity); + Assert.IsTrue(sender2.IsSendable); + Assert.AreEqual(1024, session.RemainingOutgoingCapacity); + + // Open, Begin, Attach, Attach + Assert.AreEqual(4, asyncIOCallbacks.Count); + foreach (Action action in asyncIOCallbacks) + { + action.Invoke(); + } + asyncIOCallbacks.Clear(); + + IOutgoingDelivery delivery = sender1.Next(); + delivery.WriteBytes(ProtonByteBufferAllocator.Instance.Wrap(payload)); + + peer.WaitForScriptToComplete(); + + Assert.AreEqual(1, asyncIOCallbacks.Count); + + Assert.IsFalse(sender1.IsSendable); + Assert.AreEqual(0, session.RemainingOutgoingCapacity); + // Sender 2 shouldn't be able to send since sender 1 consumed the outgoing window + Assert.IsFalse(sender2.IsSendable); + Assert.AreEqual(0, session.RemainingOutgoingCapacity); + + // Free a frame's worth of window which should trigger both senders sendable update event + asyncIOCallbacks.Dequeue().Invoke(); + Assert.AreEqual(0, asyncIOCallbacks.Count); + + Assert.IsTrue(sender1.IsSendable); + Assert.AreEqual(1024, session.RemainingOutgoingCapacity); + Assert.AreEqual(1, sender1CreditStateUpdated); + Assert.IsTrue(sender2.IsSendable); + Assert.AreEqual(1024, session.RemainingOutgoingCapacity); + Assert.AreEqual(1, sender2CreditStateUpdated); + + peer.WaitForScriptToComplete(); + Assert.IsNull(failure); + } + + [Test] + public void TestSingleSenderUpdatedWhenOutgoingWindowOpenedForTwoIfFirstConsumesSessionOutgoingWindow() + { + IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine(); + engine.ErrorHandler((error) => failure = error.FailureCause); + Queue<Action> asyncIOCallbacks = new Queue<Action>(); + ProtonTestConnector peer = CreateTestPeer(engine, asyncIOCallbacks); + + byte[] payload = new byte[] { 0, 1, 2, 3, 4 }; + IDeliveryTagGenerator generator = ProtonDeliveryTagTypes.Pooled.NewTagGenerator(); + + peer.ExpectAMQPHeader().RespondWithAMQPHeader(); + peer.ExpectOpen().WithMaxFrameSize(1024).Respond(); + peer.ExpectBegin().WithNextOutgoingId(0).Respond().WithNextOutgoingId(0); + peer.ExpectAttach().Respond(); + peer.RemoteFlow().WithLinkCredit(20).WithNextIncomingId(0).WithIncomingWindow(8192).Queue(); + peer.ExpectAttach().Respond(); + peer.RemoteFlow().WithLinkCredit(20).WithNextIncomingId(0).WithIncomingWindow(8192).Queue(); + + IConnection connection = engine.Start(); + connection.MaxFrameSize = 1024; + connection.Open(); + ISession session = connection.Session(); + session.OutgoingCapacity = 1024; + session.Open(); + ISender sender1 = session.Sender("test1"); + sender1.DeliveryTagGenerator = generator; + sender1.Open(); + ISender sender2 = session.Sender("test2"); + sender2.DeliveryTagGenerator = generator; + sender2.Open(); + + peer.WaitForScriptToComplete(); + peer.ExpectTransfer().WithPayload(payload); + + int sender1CreditStateUpdated = 0; + sender1.CreditStateUpdateHandler((self) => + { + sender1CreditStateUpdated++; + if (self.IsSendable) + { + IOutgoingDelivery delivery = self.Next(); + delivery.WriteBytes(ProtonByteBufferAllocator.Instance.Wrap(payload)); + } + }); + + int sender2CreditStateUpdated = 0; + sender2.CreditStateUpdateHandler((self) => + { + sender2CreditStateUpdated++; + }); + + Assert.IsTrue(sender1.IsSendable); + Assert.AreEqual(1024, session.RemainingOutgoingCapacity); + Assert.IsTrue(sender2.IsSendable); + Assert.AreEqual(1024, session.RemainingOutgoingCapacity); + + // Open, Begin, Attach, Attach + Assert.AreEqual(4, asyncIOCallbacks.Count); + foreach (Action action in asyncIOCallbacks) + { + action.Invoke(); + } + asyncIOCallbacks.Clear(); + + IOutgoingDelivery delivery = sender1.Next(); + delivery.WriteBytes(ProtonByteBufferAllocator.Instance.Wrap(payload)); + + peer.WaitForScriptToComplete(); + peer.ExpectTransfer().WithPayload(payload); + + Assert.AreEqual(1, asyncIOCallbacks.Count); + + Assert.IsFalse(sender1.IsSendable); + Assert.AreEqual(0, session.RemainingOutgoingCapacity); + // Sender 2 shouldn't be able to send since sender 1 consumed the outgoing window + Assert.IsFalse(sender2.IsSendable); + Assert.AreEqual(0, session.RemainingOutgoingCapacity); + + // Should trigger sender 1 to send which should exhaust the outgoing credit + asyncIOCallbacks.Dequeue().Invoke(); + Assert.AreEqual(1, asyncIOCallbacks.Count); + + Assert.IsFalse(sender1.IsSendable); + Assert.AreEqual(0, session.RemainingOutgoingCapacity); + Assert.AreEqual(1, sender1CreditStateUpdated); + Assert.IsFalse(sender2.IsSendable); + Assert.AreEqual(0, session.RemainingOutgoingCapacity); + // Should not have triggered an event for sender 2 being able to send since + // sender one consumed the outgoing window already. + Assert.AreEqual(0, sender2CreditStateUpdated); + + peer.WaitForScriptToComplete(); + Assert.IsNull(failure); + } + [Test] public void TestHandleInUseErrorReturnedIfAttachWithAlreadyBoundHandleArrives() { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org