This is an automated email from the ASF dual-hosted git repository. tabish pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton-dotnet.git
The following commit(s) were added to refs/heads/main by this push: new 3f089c3 PROTON-2829 Remove timed out sends from blocked queue 3f089c3 is described below commit 3f089c35d9a15a4ab62bc7e63f5f6aca376b2712 Author: Timothy Bish <tabish...@gmail.com> AuthorDate: Fri Jun 7 18:41:34 2024 -0400 PROTON-2829 Remove timed out sends from blocked queue Ensure that sends that time out waiting for credit are removed from the blocked queue and also abort any partial send that was blocked waiting for credit to ensure no leak of deliveries from the sender link. --- .../Client/Implementation/ClientSender.cs | 35 +++++- .../Client/Implementation/ClientStreamSender.cs | 34 +++++- .../Client/Implementation/ClientSenderTest.cs | 131 +++++++++++++++++++++ .../Implementation/ClientStreamSenderTest.cs | 64 ++++++++++ 4 files changed, 260 insertions(+), 4 deletions(-) diff --git a/src/Proton.Client/Client/Implementation/ClientSender.cs b/src/Proton.Client/Client/Implementation/ClientSender.cs index 2cf317f..01ab227 100644 --- a/src/Proton.Client/Client/Implementation/ClientSender.cs +++ b/src/Proton.Client/Client/Implementation/ClientSender.cs @@ -181,24 +181,38 @@ namespace Apache.Qpid.Proton.Client.Implementation private void AddToTailOfBlockedQueue(ClientOutgoingEnvelope send) { + blocked.EnqueueBack(send); if (Options.SendTimeout > 0) { + send.TimeoutApplied = true; session.Schedule(() => { if (!send.Request.IsCompleted) { + blocked.Remove(send); send.Failed(send.CreateSendTimedOutException()); } }, TimeSpan.FromMilliseconds(Options.SendTimeout)); } - - blocked.EnqueueBack(send); } private void AddToHeadOfBlockedQueue(ClientOutgoingEnvelope send) { blocked.EnqueueFront(send); + if (Options.SendTimeout > 0 && !send.TimeoutApplied) + { + send.TimeoutApplied = true; + session.Schedule(() => + { + if (!send.Request.IsCompleted) + { + blocked.Remove(send); + send.Failed(send.CreateSendTimedOutException()); + } + }, + TimeSpan.FromMilliseconds(Options.SendTimeout)); + } } #endregion @@ -501,6 +515,11 @@ namespace Apache.Qpid.Proton.Client.Implementation /// </summary> public Task Request => request.Task; + /// <summary> + /// Gets or sets if a timeout has been assigned to time out a blocked send operation. + /// </summary> + public bool TimeoutApplied { get; set; } + public void Complete() { Send(delivery.State, delivery.IsSettled); @@ -562,6 +581,18 @@ namespace Apache.Qpid.Proton.Client.Implementation public ClientOutgoingEnvelope Failed(ClientException exception) { + if (delivery != null) + { + try + { + delivery.Abort(); + } + catch (Exception) + { + // Can fail is offline so ignore any exceptions from abort. + } + } + request.TrySetException(exception); return this; } diff --git a/src/Proton.Client/Client/Implementation/ClientStreamSender.cs b/src/Proton.Client/Client/Implementation/ClientStreamSender.cs index 73223a9..75f10f8 100644 --- a/src/Proton.Client/Client/Implementation/ClientStreamSender.cs +++ b/src/Proton.Client/Client/Implementation/ClientStreamSender.cs @@ -286,24 +286,37 @@ namespace Apache.Qpid.Proton.Client.Implementation private void AddToTailOfBlockedQueue(ClientOutgoingEnvelope send) { + blocked.EnqueueBack(send); if (Options.SendTimeout > 0) { session.Schedule(() => { if (!send.Request.IsCompleted) { + blocked.Remove(send); send.Failed(send.CreateSendTimedOutException()); } }, TimeSpan.FromMilliseconds(Options.SendTimeout)); } - - blocked.EnqueueBack(send); } private void AddToHeadOfBlockedQueue(ClientOutgoingEnvelope send) { blocked.EnqueueFront(send); + if (Options.SendTimeout > 0 && !send.TimeoutApplied) + { + send.TimeoutApplied = true; + session.Schedule(() => + { + if (!send.Request.IsCompleted) + { + blocked.Remove(send); + send.Failed(send.CreateSendTimedOutException()); + } + }, + TimeSpan.FromMilliseconds(Options.SendTimeout)); + } } private IStreamTracker CreateTracker(IOutgoingDelivery delivery) @@ -681,6 +694,11 @@ namespace Apache.Qpid.Proton.Client.Implementation /// </summary> public IOutgoingDelivery Delivery => delivery; + /// <summary> + /// Gets or sets if a timeout has been assigned to time out a blocked send operation. + /// </summary> + public bool TimeoutApplied { get; set; } + public void Abort() { delivery.Abort(); @@ -756,6 +774,18 @@ namespace Apache.Qpid.Proton.Client.Implementation public ClientOutgoingEnvelope Failed(ClientException exception) { + if (delivery != null) + { + try + { + delivery.Abort(); + } + catch (Exception) + { + // Can fail is offline so ignore any exceptions from abort. + } + } + request.TrySetException(exception); return this; } diff --git a/test/Proton.Client.Tests/Client/Implementation/ClientSenderTest.cs b/test/Proton.Client.Tests/Client/Implementation/ClientSenderTest.cs index 088b93a..b03d4cb 100644 --- a/test/Proton.Client.Tests/Client/Implementation/ClientSenderTest.cs +++ b/test/Proton.Client.Tests/Client/Implementation/ClientSenderTest.cs @@ -583,6 +583,69 @@ namespace Apache.Qpid.Proton.Client.Implementation } } + [Test] + public void TestSendTimesOutWhenNoCreditIssuedAndThenIssueCredit() + { + using (ProtonTestServer peer = new ProtonTestServer(loggerFactory)) + { + peer.ExpectSASLAnonymousConnect(); + peer.ExpectOpen().Respond(); + peer.ExpectBegin().Respond(); + peer.ExpectAttach().OfSender().Respond(); + peer.Start(); + + string remoteAddress = peer.ServerAddress; + int remotePort = peer.ServerPort; + + logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort); + + IClient container = IClient.Create(); + ConnectionOptions options = new ConnectionOptions() + { + SendTimeout = 10 + }; + IConnection connection = container.Connect(remoteAddress, remotePort, options); + ISession session = connection.OpenSession(); + ISender sender = session.OpenSender("test-queue").OpenTask.Result; + + IMessage<string> message = IMessage<string>.Create("Hello World"); + try + { + sender.Send(message); + Assert.Fail("Should throw a send timed out exception"); + } + catch (ClientSendTimedOutException) + { + // Expected error, ignore + } + + peer.WaitForScriptToComplete(); + peer.RemoteFlow().WithLinkCredit(1).Now(); + peer.ExpectAttach().OfSender().Respond(); + peer.ExpectTransfer().WithNonNullPayload(); + peer.ExpectDetach().Respond(); + peer.ExpectClose().Respond(); + + // Ensure the send happens after the remote has sent a flow with credit + _ = session.OpenSender("test-queue-2").OpenTask.Result; + + try + { + sender.Send(IMessage<string>.Create("Hello World 2")); + } + catch (ClientException ex) + { + logger.LogTrace("Error on second send: {0}", ex); + Assert.Fail("Should not throw an exception"); + } + + sender.CloseAsync().Wait(TimeSpan.FromSeconds(10)); + connection.CloseAsync().Wait(TimeSpan.FromSeconds(10)); + + peer.WaitForScriptToComplete(); + } + } + [Test] public void TestSendCompletesWhenCreditEventuallyOffered() { @@ -3260,5 +3323,73 @@ namespace Apache.Qpid.Proton.Client.Implementation peer.WaitForScriptToComplete(); } } + + [Test] + public void TestSendTimesOutIfNotAllMessageFramesCanBeSent() + { + using (ProtonTestServer peer = new ProtonTestServer(loggerFactory)) + { + peer.ExpectSASLAnonymousConnect(); + peer.ExpectOpen().Respond(); + peer.ExpectBegin().WithNextOutgoingId(0).Respond(); + peer.ExpectAttach().OfSender().Respond(); + peer.RemoteFlow().WithIncomingWindow(2).WithNextIncomingId(0).WithLinkCredit(1).Queue(); + peer.ExpectTransfer().WithDeliveryId(0).WithNonNullPayload().WithMore(true); + peer.ExpectTransfer().WithNonNullPayload().WithMore(true); + peer.ExpectTransfer().WithNullPayload().WithAborted(true); + peer.Start(); + + string remoteAddress = peer.ServerAddress; + int remotePort = peer.ServerPort; + + logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort); + + IClient container = IClient.Create(); + ConnectionOptions options = new ConnectionOptions() + { + MaxFrameSize = 1024, + SendTimeout = 25 + }; + IConnection connection = container.Connect(remoteAddress, remotePort, options); + ISender sender = connection.OpenSender("test-queue").OpenTask.Result; + + byte[] payload = new byte[4800]; + Array.Fill(payload, (byte)1); + + try + { + sender.Send(IMessage<string>.Create(payload)); + } + catch (ClientSendTimedOutException e) + { + logger.LogTrace("send failed with expected error: {0}", e); + } + + peer.WaitForScriptToComplete(); + peer.RemoteFlow().WithIncomingWindow(1).WithNextIncomingId(4).WithLinkCredit(1).Now(); + peer.ExpectAttach().OfSender().Respond(); + peer.ExpectTransfer().WithDeliveryId(1).WithNonNullPayload(); + peer.ExpectDetach().Respond(); + peer.ExpectClose().Respond(); + + // Ensure the send happens after the remote has sent a flow with credit + _ = connection.OpenSender("test-queue-2").OpenTask.Result; + + try + { + sender.Send(IMessage<string>.Create("Hello World 2")); + } + catch (ClientException ex) + { + logger.LogTrace("Error on second send: {0}", ex); + Assert.Fail("Should not throw an exception"); + } + + sender.CloseAsync(); + connection.CloseAsync(); + + peer.WaitForScriptToComplete(); + } + } } } \ No newline at end of file diff --git a/test/Proton.Client.Tests/Client/Implementation/ClientStreamSenderTest.cs b/test/Proton.Client.Tests/Client/Implementation/ClientStreamSenderTest.cs index 54656ba..9e76808 100644 --- a/test/Proton.Client.Tests/Client/Implementation/ClientStreamSenderTest.cs +++ b/test/Proton.Client.Tests/Client/Implementation/ClientStreamSenderTest.cs @@ -3251,5 +3251,69 @@ namespace Apache.Qpid.Proton.Client.Implementation peer.WaitForScriptToComplete(); } } + + [Test] + public void TestSendTimesOutWhenNoCreditIssuedAndThenIssueCredit() + { + using (ProtonTestServer peer = new ProtonTestServer(loggerFactory)) + { + peer.ExpectSASLAnonymousConnect(); + peer.ExpectOpen().Respond(); + peer.ExpectBegin().Respond(); + peer.ExpectAttach().OfSender().Respond(); + peer.Start(); + + string remoteAddress = peer.ServerAddress; + int remotePort = peer.ServerPort; + + logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort); + + IClient container = IClient.Create(); + ConnectionOptions options = new ConnectionOptions() + { + SendTimeout = 10 + }; + IConnection connection = container.Connect(remoteAddress, remotePort, options); + IStreamSender sender = connection.OpenStreamSender("test-queue").OpenTask.Result; + + IMessage<string> message = IMessage<string>.Create("Hello World"); + try + { + sender.Send(message); + Assert.Fail("Should throw a send timed out exception"); + } + catch (ClientSendTimedOutException) + { + // Expected error, ignore + } + + peer.WaitForScriptToComplete(); + peer.RemoteFlow().WithLinkCredit(1).Now(); + peer.ExpectBegin().Respond(); + peer.ExpectAttach().OfSender().Respond(); + peer.ExpectTransfer().WithNonNullPayload(); + peer.ExpectDetach().Respond(); + peer.ExpectEnd().Respond(); + peer.ExpectClose().Respond(); + + // Ensure the send happens after the remote has sent a flow with credit + _ = connection.OpenSender("test-queue-2").OpenTask.Result; + + try + { + sender.Send(IMessage<string>.Create("Hello World 2")); + } + catch (ClientException ex) + { + logger.LogTrace("Error on second send: {0}", ex); + Assert.Fail("Should not throw an exception"); + } + + sender.CloseAsync().Wait(TimeSpan.FromSeconds(10)); + connection.CloseAsync().Wait(TimeSpan.FromSeconds(10)); + + peer.WaitForScriptToComplete(); + } + } } } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org