This is an automated email from the ASF dual-hosted git repository. tabish pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton-dotnet.git
The following commit(s) were added to refs/heads/main by this push: new 59e779f PROTON-2830 Add an async version of BeginMessage to stream sender 59e779f is described below commit 59e779fa66b143dbbe586720e9f94871311574be Author: Timothy Bish <tabish...@gmail.com> AuthorDate: Tue Jun 11 15:23:34 2024 -0400 PROTON-2830 Add an async version of BeginMessage to stream sender Adds an async variant of the BeginMessage API to the IStreamSender to match the other async variations of send and try send. --- src/Proton.Client/Client/IStreamSender.cs | 12 +++- .../Client/Implementation/ClientStreamSender.cs | 68 +++++++++++++--------- .../Implementation/ClientStreamSenderTest.cs | 52 +++++++++++++++++ 3 files changed, 103 insertions(+), 29 deletions(-) diff --git a/src/Proton.Client/Client/IStreamSender.cs b/src/Proton.Client/Client/IStreamSender.cs index 47b4684..26661a1 100644 --- a/src/Proton.Client/Client/IStreamSender.cs +++ b/src/Proton.Client/Client/IStreamSender.cs @@ -82,8 +82,18 @@ namespace Apache.Qpid.Proton.Client /// to indicate that the previous instance has not yet been completed. /// </summary> /// <param name="deliveryAnnotations">The optional delivery annotations to transmit with the message</param> - /// <returns>This stream sender instance</returns> + /// <returns>A new IStreamSenderMessage if no send is currently in progress</returns> IStreamSenderMessage BeginMessage(IDictionary<string, object> deliveryAnnotations = null); + /// <summary> + /// Creates and returns a new stream capable message that can be used by the caller to perform + /// streaming sends of large message payload data. Only one streamed message can be active + /// at a time so any successive calls to begin a new streaming message will throw an error + /// to indicate that the previous instance has not yet been completed. + /// </summary> + /// <param name="deliveryAnnotations">The optional delivery annotations to transmit with the message</param> + /// <returns>A Task that returns new IStreamSenderMessage if no send is currently in progress</returns> + Task<IStreamSenderMessage> BeginMessageAsync(IDictionary<string, object> deliveryAnnotations = null); + } } \ No newline at end of file diff --git a/src/Proton.Client/Client/Implementation/ClientStreamSender.cs b/src/Proton.Client/Client/Implementation/ClientStreamSender.cs index 75f10f8..76a721c 100644 --- a/src/Proton.Client/Client/Implementation/ClientStreamSender.cs +++ b/src/Proton.Client/Client/Implementation/ClientStreamSender.cs @@ -83,35 +83,13 @@ namespace Apache.Qpid.Proton.Client.Implementation public IStreamSenderMessage BeginMessage(IDictionary<string, object> deliveryAnnotations = null) { CheckClosedOrFailed(); - DeliveryAnnotations annotations = null; - TaskCompletionSource<IStreamSenderMessage> request = new(); - - if (deliveryAnnotations != null) - { - annotations = new DeliveryAnnotations(ClientConversionSupport.ToSymbolKeyedMap(deliveryAnnotations)); - } - - ClientSession.Execute(() => - { - if (ProtonSender.Current != null) - { - request.TrySetException(new ClientIllegalStateException( - "Cannot initiate a new streaming send until the previous one is complete")); - } - else - { - // Grab the next delivery and hold for stream writes, no other sends - // can occur while we hold the delivery. - IOutgoingDelivery streamDelivery = ProtonSender.Next(); - ClientStreamTracker streamTracker = new(this, streamDelivery); - - streamDelivery.LinkedResource = streamTracker; - - request.TrySetResult(new ClientStreamSenderMessage(this, streamTracker, annotations)); - } - }); + return DoBeginMessageAsync(deliveryAnnotations).ConfigureAwait(false).GetAwaiter().GetResult(); + } - return request.Task.ConfigureAwait(false).GetAwaiter().GetResult(); + public Task<IStreamSenderMessage> BeginMessageAsync(IDictionary<string, object> deliveryAnnotations = null) + { + CheckClosedOrFailed(); + return DoBeginMessageAsync(deliveryAnnotations); } internal string SenderId => senderId; @@ -329,6 +307,40 @@ namespace Apache.Qpid.Proton.Client.Implementation return new ClientNoOpStreamTracker(this); } + private Task<IStreamSenderMessage> DoBeginMessageAsync(IDictionary<string, object> deliveryAnnotations = null) + { + CheckClosedOrFailed(); + DeliveryAnnotations annotations = null; + TaskCompletionSource<IStreamSenderMessage> request = new(); + + if (deliveryAnnotations != null) + { + annotations = new DeliveryAnnotations(ClientConversionSupport.ToSymbolKeyedMap(deliveryAnnotations)); + } + + ClientSession.Execute(() => + { + if (ProtonSender.Current != null) + { + request.TrySetException(new ClientIllegalStateException( + "Cannot initiate a new streaming send until the previous one is complete")); + } + else + { + // Grab the next delivery and hold for stream writes, no other sends + // can occur while we hold the delivery. + IOutgoingDelivery streamDelivery = ProtonSender.Next(); + ClientStreamTracker streamTracker = new(this, streamDelivery); + + streamDelivery.LinkedResource = streamTracker; + + request.TrySetResult(new ClientStreamSenderMessage(this, streamTracker, annotations)); + } + }); + + return request.Task; + } + private Task<IStreamTracker> DoSendMessageAsync<T>(IAdvancedMessage<T> message, IDictionary<string, object> deliveryAnnotations, bool waitForCredit) { TaskCompletionSource<IStreamTracker> operation = new(); diff --git a/test/Proton.Client.Tests/Client/Implementation/ClientStreamSenderTest.cs b/test/Proton.Client.Tests/Client/Implementation/ClientStreamSenderTest.cs index 9e76808..08f1b50 100644 --- a/test/Proton.Client.Tests/Client/Implementation/ClientStreamSenderTest.cs +++ b/test/Proton.Client.Tests/Client/Implementation/ClientStreamSenderTest.cs @@ -542,6 +542,16 @@ namespace Apache.Qpid.Proton.Client.Implementation // Expected } + try + { + sender.BeginMessageAsync().GetAwaiter().GetResult(); + Assert.Fail("Should not be able create a new streaming sender message before last one is completed."); + } + catch (ClientIllegalStateException) + { + // Expected + } + message.Abort(); Assert.Throws<ClientIllegalStateException>(() => message.Complete()); @@ -723,6 +733,48 @@ namespace Apache.Qpid.Proton.Client.Implementation } } + [Test] + public void TestCreateStreamFromAsyncBegin() + { + using (ProtonTestServer peer = new ProtonTestServer(loggerFactory)) + { + peer.ExpectSASLAnonymousConnect(); + peer.ExpectOpen().Respond(); + peer.ExpectBegin().Respond(); + peer.ExpectAttach().OfSender().Respond(); + peer.RemoteFlow().WithLinkCredit(1).Queue(); + peer.ExpectTransfer().WithMore(false).WithNullPayload(); + peer.ExpectDetach().WithClosed(true).Respond(); + peer.ExpectEnd().Respond(); + peer.ExpectClose().Respond(); + peer.Start(); + + string remoteAddress = peer.ServerAddress; + int remotePort = peer.ServerPort; + + logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort); + + IClient container = IClient.Create(); + IConnection connection = container.Connect(remoteAddress, remotePort); + IStreamSender sender = connection.OpenStreamSender("test-qos"); + IStreamSenderMessage tracker = sender.BeginMessageAsync().GetAwaiter().GetResult(); + + OutputStreamOptions options = new OutputStreamOptions(); + Stream stream = tracker.GetBodyStream(options); + + Assert.IsNotNull(stream); + + sender.OpenTask.Wait(); + + stream.Close(); + + sender.Close(); + connection.Close(); + + peer.WaitForScriptToComplete(); + } + } + [Test] public void TestOutputStreamOptionsEnforcesValidBodySizeValues() { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org