This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton-dotnet.git


The following commit(s) were added to refs/heads/main by this push:
     new 59e779f  PROTON-2830 Add an async version of BeginMessage to stream 
sender
59e779f is described below

commit 59e779fa66b143dbbe586720e9f94871311574be
Author: Timothy Bish <tabish...@gmail.com>
AuthorDate: Tue Jun 11 15:23:34 2024 -0400

    PROTON-2830 Add an async version of BeginMessage to stream sender
    
    Adds an async variant of the BeginMessage API to the IStreamSender to match
    the other async variations of send and try send.
---
 src/Proton.Client/Client/IStreamSender.cs          | 12 +++-
 .../Client/Implementation/ClientStreamSender.cs    | 68 +++++++++++++---------
 .../Implementation/ClientStreamSenderTest.cs       | 52 +++++++++++++++++
 3 files changed, 103 insertions(+), 29 deletions(-)

diff --git a/src/Proton.Client/Client/IStreamSender.cs 
b/src/Proton.Client/Client/IStreamSender.cs
index 47b4684..26661a1 100644
--- a/src/Proton.Client/Client/IStreamSender.cs
+++ b/src/Proton.Client/Client/IStreamSender.cs
@@ -82,8 +82,18 @@ namespace Apache.Qpid.Proton.Client
       /// to indicate that the previous instance has not yet been completed.
       /// </summary>
       /// <param name="deliveryAnnotations">The optional delivery annotations 
to transmit with the message</param>
-      /// <returns>This stream sender instance</returns>
+      /// <returns>A new IStreamSenderMessage if no send is currently in 
progress</returns>
       IStreamSenderMessage BeginMessage(IDictionary<string, object> 
deliveryAnnotations = null);
 
+      /// <summary>
+      /// Creates and returns a new stream capable message that can be used by 
the caller to perform
+      /// streaming sends of large message payload data.  Only one streamed 
message can be active
+      /// at a time so any successive calls to begin a new streaming message 
will throw an error
+      /// to indicate that the previous instance has not yet been completed.
+      /// </summary>
+      /// <param name="deliveryAnnotations">The optional delivery annotations 
to transmit with the message</param>
+      /// <returns>A Task that returns new IStreamSenderMessage if no send is 
currently in progress</returns>
+      Task<IStreamSenderMessage> BeginMessageAsync(IDictionary<string, object> 
deliveryAnnotations = null);
+
    }
 }
\ No newline at end of file
diff --git a/src/Proton.Client/Client/Implementation/ClientStreamSender.cs 
b/src/Proton.Client/Client/Implementation/ClientStreamSender.cs
index 75f10f8..76a721c 100644
--- a/src/Proton.Client/Client/Implementation/ClientStreamSender.cs
+++ b/src/Proton.Client/Client/Implementation/ClientStreamSender.cs
@@ -83,35 +83,13 @@ namespace Apache.Qpid.Proton.Client.Implementation
       public IStreamSenderMessage BeginMessage(IDictionary<string, object> 
deliveryAnnotations = null)
       {
          CheckClosedOrFailed();
-         DeliveryAnnotations annotations = null;
-         TaskCompletionSource<IStreamSenderMessage> request = new();
-
-         if (deliveryAnnotations != null)
-         {
-            annotations = new 
DeliveryAnnotations(ClientConversionSupport.ToSymbolKeyedMap(deliveryAnnotations));
-         }
-
-         ClientSession.Execute(() =>
-         {
-            if (ProtonSender.Current != null)
-            {
-               request.TrySetException(new ClientIllegalStateException(
-                   "Cannot initiate a new streaming send until the previous 
one is complete"));
-            }
-            else
-            {
-               // Grab the next delivery and hold for stream writes, no other 
sends
-               // can occur while we hold the delivery.
-               IOutgoingDelivery streamDelivery = ProtonSender.Next();
-               ClientStreamTracker streamTracker = new(this, streamDelivery);
-
-               streamDelivery.LinkedResource = streamTracker;
-
-               request.TrySetResult(new ClientStreamSenderMessage(this, 
streamTracker, annotations));
-            }
-         });
+         return 
DoBeginMessageAsync(deliveryAnnotations).ConfigureAwait(false).GetAwaiter().GetResult();
+      }
 
-         return request.Task.ConfigureAwait(false).GetAwaiter().GetResult();
+      public Task<IStreamSenderMessage> BeginMessageAsync(IDictionary<string, 
object> deliveryAnnotations = null)
+      {
+         CheckClosedOrFailed();
+         return DoBeginMessageAsync(deliveryAnnotations);
       }
 
       internal string SenderId => senderId;
@@ -329,6 +307,40 @@ namespace Apache.Qpid.Proton.Client.Implementation
          return new ClientNoOpStreamTracker(this);
       }
 
+      private Task<IStreamSenderMessage> 
DoBeginMessageAsync(IDictionary<string, object> deliveryAnnotations = null)
+      {
+         CheckClosedOrFailed();
+         DeliveryAnnotations annotations = null;
+         TaskCompletionSource<IStreamSenderMessage> request = new();
+
+         if (deliveryAnnotations != null)
+         {
+            annotations = new 
DeliveryAnnotations(ClientConversionSupport.ToSymbolKeyedMap(deliveryAnnotations));
+         }
+
+         ClientSession.Execute(() =>
+         {
+            if (ProtonSender.Current != null)
+            {
+               request.TrySetException(new ClientIllegalStateException(
+                   "Cannot initiate a new streaming send until the previous 
one is complete"));
+            }
+            else
+            {
+               // Grab the next delivery and hold for stream writes, no other 
sends
+               // can occur while we hold the delivery.
+               IOutgoingDelivery streamDelivery = ProtonSender.Next();
+               ClientStreamTracker streamTracker = new(this, streamDelivery);
+
+               streamDelivery.LinkedResource = streamTracker;
+
+               request.TrySetResult(new ClientStreamSenderMessage(this, 
streamTracker, annotations));
+            }
+         });
+
+         return request.Task;
+      }
+
       private Task<IStreamTracker> DoSendMessageAsync<T>(IAdvancedMessage<T> 
message, IDictionary<string, object> deliveryAnnotations, bool waitForCredit)
       {
          TaskCompletionSource<IStreamTracker> operation = new();
diff --git 
a/test/Proton.Client.Tests/Client/Implementation/ClientStreamSenderTest.cs 
b/test/Proton.Client.Tests/Client/Implementation/ClientStreamSenderTest.cs
index 9e76808..08f1b50 100644
--- a/test/Proton.Client.Tests/Client/Implementation/ClientStreamSenderTest.cs
+++ b/test/Proton.Client.Tests/Client/Implementation/ClientStreamSenderTest.cs
@@ -542,6 +542,16 @@ namespace Apache.Qpid.Proton.Client.Implementation
                // Expected
             }
 
+            try
+            {
+               sender.BeginMessageAsync().GetAwaiter().GetResult();
+               Assert.Fail("Should not be able create a new streaming sender 
message before last one is completed.");
+            }
+            catch (ClientIllegalStateException)
+            {
+               // Expected
+            }
+
             message.Abort();
 
             Assert.Throws<ClientIllegalStateException>(() => 
message.Complete());
@@ -723,6 +733,48 @@ namespace Apache.Qpid.Proton.Client.Implementation
          }
       }
 
+      [Test]
+      public void TestCreateStreamFromAsyncBegin()
+      {
+         using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+         {
+            peer.ExpectSASLAnonymousConnect();
+            peer.ExpectOpen().Respond();
+            peer.ExpectBegin().Respond();
+            peer.ExpectAttach().OfSender().Respond();
+            peer.RemoteFlow().WithLinkCredit(1).Queue();
+            peer.ExpectTransfer().WithMore(false).WithNullPayload();
+            peer.ExpectDetach().WithClosed(true).Respond();
+            peer.ExpectEnd().Respond();
+            peer.ExpectClose().Respond();
+            peer.Start();
+
+            string remoteAddress = peer.ServerAddress;
+            int remotePort = peer.ServerPort;
+
+            logger.LogInformation("Test started, peer listening on: {0}:{1}", 
remoteAddress, remotePort);
+
+            IClient container = IClient.Create();
+            IConnection connection = container.Connect(remoteAddress, 
remotePort);
+            IStreamSender sender = connection.OpenStreamSender("test-qos");
+            IStreamSenderMessage tracker = 
sender.BeginMessageAsync().GetAwaiter().GetResult();
+
+            OutputStreamOptions options = new OutputStreamOptions();
+            Stream stream = tracker.GetBodyStream(options);
+
+            Assert.IsNotNull(stream);
+
+            sender.OpenTask.Wait();
+
+            stream.Close();
+
+            sender.Close();
+            connection.Close();
+
+            peer.WaitForScriptToComplete();
+         }
+      }
+
       [Test]
       public void TestOutputStreamOptionsEnforcesValidBodySizeValues()
       {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to