This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton-dotnet.git


The following commit(s) were added to refs/heads/main by this push:
     new 76558f0  PROTON-2549 Add a next receiver API to connection and session
76558f0 is described below

commit 76558f01cde404ef4bc1b116375db1f9b68bb070
Author: Timothy Bish <[email protected]>
AuthorDate: Wed May 25 11:20:07 2022 -0400

    PROTON-2549 Add a next receiver API to connection and session
    
    Adds API to fetch the next receiver with a pending delivery or wait
    on any receiver to be provided with a delivery from the remote.
---
 .vscode/launch.json                                |   26 +
 .vscode/tasks.json                                 |   41 +
 Proton.sln                                         |   35 +-
 examples/Example.LargeMessageReceiver/Program.cs   |   10 +-
 examples/Example.LargeMessageSender/Program.cs     |   14 +-
 .../Example.NextReceiver.csproj                    |   38 +
 examples/Example.NextReceiver/Program.cs           |   73 ++
 .../Client/Concurrent/AtomicReference.cs           |    2 +-
 src/Proton.Client/Client/ConnectionOptions.cs      |    8 +
 src/Proton.Client/Client/IConnection.cs            |   79 ++
 src/Proton.Client/Client/ISession.cs               |   82 ++
 .../Client/Implementation/ClientConnection.cs      |   75 +-
 .../Implementation/ClientNextReceiverSelector.cs   |  235 ++++
 .../Implementation/ClientReceiverLinkType.cs       |    2 +
 .../Client/Implementation/ClientSender.cs          |    6 +-
 .../Client/Implementation/ClientSession.cs         |   74 +-
 .../Client/Implementation/ClientSessionBuilder.cs  |    3 +-
 .../Client/Implementation/ClientStreamSession.cs   |   49 +
 src/Proton.Client/Client/NextReceiverPolicy.cs     |   79 ++
 src/Proton.Client/Client/SessionOptions.cs         |    6 +
 src/Proton.Client/Client/Transport/ITransport.cs   |    2 +-
 src/Proton.Client/Client/Transport/TcpTransport.cs |    2 +-
 src/Proton/Engine/IAttachments.cs                  |   10 +
 src/Proton/Engine/ISession.cs                      |   14 +-
 .../Engine/Implementation/ProtonAttachments.cs     |    6 +
 src/Proton/Engine/Implementation/ProtonReceiver.cs |    7 +
 src/Proton/Engine/Implementation/ProtonSession.cs  |   18 +-
 .../Client/Implementation/ClientBaseTestFixture.cs |    2 +-
 .../Implementation/ClientReconnectSessionTest.cs   |  118 ++
 .../Client/Implementation/ClientSessionTest.cs     | 1164 +++++++++++++++++++-
 .../Engine/Implementation/ProtonSessionTest.cs     |   38 +
 31 files changed, 2275 insertions(+), 43 deletions(-)

diff --git a/.vscode/launch.json b/.vscode/launch.json
new file mode 100644
index 0000000..1820d3b
--- /dev/null
+++ b/.vscode/launch.json
@@ -0,0 +1,26 @@
+{
+   "version": "0.2.0",
+   "configurations": [
+      {
+         // Use IntelliSense to find out which attributes exist for C# 
debugging
+         // Use hover for the description of the existing attributes
+         // For further information visit 
https://github.com/OmniSharp/omnisharp-vscode/blob/master/debugger-launchjson.md
+         "name": ".NET Core Launch (console)",
+         "type": "coreclr",
+         "request": "launch",
+         "preLaunchTask": "build",
+         // If you have changed target frameworks, make sure to update the 
program path.
+         "program": 
"${workspaceFolder}/dist/1.0.0-SNAPSHOT/qpid-proton-dotnet-src-1.0.0-SNAPSHOT/examples/Example.LargeMessageSender/bin/Debug/net5.0/Example.LargeMessageSender.dll",
+         "args": [],
+         "cwd": 
"${workspaceFolder}/dist/1.0.0-SNAPSHOT/qpid-proton-dotnet-src-1.0.0-SNAPSHOT/examples/Example.LargeMessageSender",
+         // For more information about the 'console' field, see 
https://aka.ms/VSCode-CS-LaunchJson-Console
+         "console": "internalConsole",
+         "stopAtEntry": false
+      },
+      {
+         "name": ".NET Core Attach",
+         "type": "coreclr",
+         "request": "attach"
+      }
+   ]
+}
\ No newline at end of file
diff --git a/.vscode/tasks.json b/.vscode/tasks.json
new file mode 100644
index 0000000..8b2bfee
--- /dev/null
+++ b/.vscode/tasks.json
@@ -0,0 +1,41 @@
+{
+   "version": "2.0.0",
+   "tasks": [
+      {
+         "label": "build",
+         "command": "dotnet",
+         "type": "process",
+         "args": [
+            "build",
+            
"${workspaceFolder}/dist/1.0.0-SNAPSHOT/qpid-proton-dotnet-src-1.0.0-SNAPSHOT/examples/Example.LargeMessageSender/Example.LargeMessageSender.csproj",
+            "/property:GenerateFullPaths=true",
+            "/consoleloggerparameters:NoSummary"
+         ],
+         "problemMatcher": "$msCompile"
+      },
+      {
+         "label": "publish",
+         "command": "dotnet",
+         "type": "process",
+         "args": [
+            "publish",
+            
"${workspaceFolder}/dist/1.0.0-SNAPSHOT/qpid-proton-dotnet-src-1.0.0-SNAPSHOT/examples/Example.LargeMessageSender/Example.LargeMessageSender.csproj",
+            "/property:GenerateFullPaths=true",
+            "/consoleloggerparameters:NoSummary"
+         ],
+         "problemMatcher": "$msCompile"
+      },
+      {
+         "label": "watch",
+         "command": "dotnet",
+         "type": "process",
+         "args": [
+            "watch",
+            "run",
+            "--project",
+            
"${workspaceFolder}/dist/1.0.0-SNAPSHOT/qpid-proton-dotnet-src-1.0.0-SNAPSHOT/examples/Example.LargeMessageSender/Example.LargeMessageSender.csproj"
+         ],
+         "problemMatcher": "$msCompile"
+      }
+   ]
+}
\ No newline at end of file
diff --git a/Proton.sln b/Proton.sln
index 22a0075..b782fd6 100644
--- a/Proton.sln
+++ b/Proton.sln
@@ -1,21 +1,5 @@
 Microsoft Visual Studio Solution File, Format Version 12.00
-#
 #  Licensed to the Apache Software Foundation (ASF) under one or more
-#  contributor license agreements.  See the NOTICE file distributed with
-#  this work for additional information regarding copyright ownership.
-#  The ASF licenses this file to You under the Apache License, Version 2.0
-#  (the "License"); you may not use this file except in compliance with
-#  the License.  You may obtain a copy of the License at
-#
-#  http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
-#
-# Visual Studio Version 16
 VisualStudioVersion = 16.6.30114.105
 MinimumVisualStudioVersion = 10.0.40219.1
 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Proton", 
"src\Proton\Proton.csproj", "{101F0276-086A-4955-A762-75CA8527E8CA}"
@@ -67,10 +51,12 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = 
"Solution Items", "Solution
                .gitignore = .gitignore
                build.sh = build.sh
                README.md = README.md
-        common.props = common.props
-        versions.props = versions.props
+               common.props = common.props
+               versions.props = versions.props
        EndProjectSection
 EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Example.NextReceiver", 
"examples\Example.NextReceiver\Example.NextReceiver.csproj", 
"{0A62621D-FCC9-448B-8645-8FEA44BCD054}"
+EndProject
 Global
        GlobalSection(SolutionConfigurationPlatforms) = preSolution
                Debug|Any CPU = Debug|Any CPU
@@ -312,6 +298,18 @@ Global
                {2E5B03AB-2E32-43A5-9374-868DA143E4B3}.Release|x64.Build.0 = 
Release|Any CPU
                {2E5B03AB-2E32-43A5-9374-868DA143E4B3}.Release|x86.ActiveCfg = 
Release|Any CPU
                {2E5B03AB-2E32-43A5-9374-868DA143E4B3}.Release|x86.Build.0 = 
Release|Any CPU
+               {0A62621D-FCC9-448B-8645-8FEA44BCD054}.Debug|Any CPU.ActiveCfg 
= Debug|Any CPU
+               {0A62621D-FCC9-448B-8645-8FEA44BCD054}.Debug|Any CPU.Build.0 = 
Debug|Any CPU
+               {0A62621D-FCC9-448B-8645-8FEA44BCD054}.Debug|x64.ActiveCfg = 
Debug|Any CPU
+               {0A62621D-FCC9-448B-8645-8FEA44BCD054}.Debug|x64.Build.0 = 
Debug|Any CPU
+               {0A62621D-FCC9-448B-8645-8FEA44BCD054}.Debug|x86.ActiveCfg = 
Debug|Any CPU
+               {0A62621D-FCC9-448B-8645-8FEA44BCD054}.Debug|x86.Build.0 = 
Debug|Any CPU
+               {0A62621D-FCC9-448B-8645-8FEA44BCD054}.Release|Any 
CPU.ActiveCfg = Release|Any CPU
+               {0A62621D-FCC9-448B-8645-8FEA44BCD054}.Release|Any CPU.Build.0 
= Release|Any CPU
+               {0A62621D-FCC9-448B-8645-8FEA44BCD054}.Release|x64.ActiveCfg = 
Release|Any CPU
+               {0A62621D-FCC9-448B-8645-8FEA44BCD054}.Release|x64.Build.0 = 
Release|Any CPU
+               {0A62621D-FCC9-448B-8645-8FEA44BCD054}.Release|x86.ActiveCfg = 
Release|Any CPU
+               {0A62621D-FCC9-448B-8645-8FEA44BCD054}.Release|x86.Build.0 = 
Release|Any CPU
        EndGlobalSection
        GlobalSection(NestedProjects) = preSolution
                {A6F37028-A5C3-4A73-B21F-CF8D88F18144} = 
{86674F9E-B027-47C7-A5B7-C7B346665600}
@@ -327,5 +325,6 @@ Global
                {F837AEE2-057D-4328-8ED6-81CEFBE4008C} = 
{86674F9E-B027-47C7-A5B7-C7B346665600}
                {A756CFD0-C28C-4995-A08D-F446DC62C991} = 
{86674F9E-B027-47C7-A5B7-C7B346665600}
                {2E5B03AB-2E32-43A5-9374-868DA143E4B3} = 
{86674F9E-B027-47C7-A5B7-C7B346665600}
+               {0A62621D-FCC9-448B-8645-8FEA44BCD054} = 
{86674F9E-B027-47C7-A5B7-C7B346665600}
        EndGlobalSection
 EndGlobal
diff --git a/examples/Example.LargeMessageReceiver/Program.cs 
b/examples/Example.LargeMessageReceiver/Program.cs
index 84ef36d..31132c2 100644
--- a/examples/Example.LargeMessageReceiver/Program.cs
+++ b/examples/Example.LargeMessageReceiver/Program.cs
@@ -19,7 +19,7 @@ using System;
 using System.IO;
 using Apache.Qpid.Proton.Client;
 
-namespace Apache.Qpid.Proton.Examples.HelloWorld
+namespace Apache.Qpid.Proton.Examples.LargeMessageReceiver
 {
    class Program
    {
@@ -31,9 +31,11 @@ namespace Apache.Qpid.Proton.Examples.HelloWorld
 
          IClient client = IClient.Create();
 
-         ConnectionOptions options = new ConnectionOptions();
-         options.User = Environment.GetEnvironmentVariable("USER");
-         options.Password = Environment.GetEnvironmentVariable("PASSWORD");
+         ConnectionOptions options = new()
+         {
+            User = Environment.GetEnvironmentVariable("USER"),
+            Password = Environment.GetEnvironmentVariable("PASSWORD")
+         };
 
          using IConnection connection = client.Connect(serverHost, serverPort, 
options);
          using IStreamReceiver receiver = 
connection.OpenStreamReceiver(address);
diff --git a/examples/Example.LargeMessageSender/Program.cs 
b/examples/Example.LargeMessageSender/Program.cs
index c0fbf3a..80e7cfc 100644
--- a/examples/Example.LargeMessageSender/Program.cs
+++ b/examples/Example.LargeMessageSender/Program.cs
@@ -19,11 +19,11 @@ using System;
 using System.IO;
 using Apache.Qpid.Proton.Client;
 
-namespace Apache.Qpid.Proton.Examples.HelloWorld
+namespace Apache.Qpid.Proton.Examples.LargeMessageSender
 {
-   class Program
+   public class Program
    {
-      static void Main(string[] args)
+      public static void Main(string[] args)
       {
          string serverHost = Environment.GetEnvironmentVariable("HOST") ?? 
"localhost";
          int serverPort = 
Convert.ToInt32(Environment.GetEnvironmentVariable("PORT") ?? "5672");
@@ -31,9 +31,11 @@ namespace Apache.Qpid.Proton.Examples.HelloWorld
 
          IClient client = IClient.Create();
 
-         ConnectionOptions options = new ConnectionOptions();
-         options.User = Environment.GetEnvironmentVariable("USER");
-         options.Password = Environment.GetEnvironmentVariable("PASSWORD");
+         ConnectionOptions options = new()
+         {
+            User = Environment.GetEnvironmentVariable("USER"),
+            Password = Environment.GetEnvironmentVariable("PASSWORD")
+         };
 
          using IConnection connection = client.Connect(serverHost, serverPort, 
options);
          using IStreamSender sender = connection.OpenStreamSender(address);
diff --git a/examples/Example.NextReceiver/Example.NextReceiver.csproj 
b/examples/Example.NextReceiver/Example.NextReceiver.csproj
new file mode 100644
index 0000000..edd731c
--- /dev/null
+++ b/examples/Example.NextReceiver/Example.NextReceiver.csproj
@@ -0,0 +1,38 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<Project Sdk="Microsoft.NET.Sdk">
+
+  <Import Project="../../common.props" />
+
+  <PropertyGroup>
+    <TargetFramework>$(DefaultExeTargetFrameworks)</TargetFramework>
+    <OutputType>Exe</OutputType>
+    <IsPackable>false</IsPackable>
+    <GenerateAssemblyInfo>false</GenerateAssemblyInfo>
+  </PropertyGroup>
+
+  <PropertyGroup Condition="'$(Configuration)'=='Release'">
+    <TreatWarningsAsErrors>true</TreatWarningsAsErrors>
+    <WarningsAsErrors />
+  </PropertyGroup>
+
+  <ItemGroup>
+    <ProjectReference Include="..\..\src\Proton\Proton.csproj" />
+    <ProjectReference Include="..\..\src\Proton.Client\Proton.Client.csproj" />
+  </ItemGroup>
+
+</Project>
diff --git a/examples/Example.NextReceiver/Program.cs 
b/examples/Example.NextReceiver/Program.cs
new file mode 100644
index 0000000..932f926
--- /dev/null
+++ b/examples/Example.NextReceiver/Program.cs
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Qpid.Proton.Client;
+
+namespace Apache.Qpid.Proton.Examples.NextReceiver
+{
+   public class Program
+   {
+      public static void Main(string[] args)
+      {
+         string serverHost = Environment.GetEnvironmentVariable("HOST") ?? 
"localhost";
+         int serverPort = 
Convert.ToInt32(Environment.GetEnvironmentVariable("PORT") ?? "5672");
+         string address1 = Environment.GetEnvironmentVariable("ADDRESS1") ?? 
"next-receiver-1-address";
+         string address2 = Environment.GetEnvironmentVariable("ADDRESS2") ?? 
"next-receiver-1-address";
+
+         IClient client = IClient.Create();
+
+         ConnectionOptions options = new()
+         {
+            User = Environment.GetEnvironmentVariable("USER"),
+            Password = Environment.GetEnvironmentVariable("PASSWORD")
+         };
+
+         using IConnection connection = client.Connect(serverHost, serverPort, 
options);
+
+         _ = connection.OpenReceiver(address1);
+         _ = connection.OpenReceiver(address2);
+
+         Task.Run(() =>
+         {
+            try
+            {
+               Thread.Sleep(2000);
+               IMessage<string> message1 = IMessage<string>.Create("Hello 
World 1");
+               message1.To = address1;
+               connection.Send(message1);
+               Thread.Sleep(2000);
+               IMessage<string> message2 = IMessage<string>.Create("Hello 
World 2");
+               message2.To = address2;
+               connection.Send(message2);
+            }
+            catch (Exception e)
+            {
+               Console.WriteLine("Exception in message send task: " + 
e.Message);
+            }
+         });
+
+         IDelivery delivery1 = connection.NextReceiver().Receive();
+         IDelivery delivery2 = connection.NextReceiver().Receive();
+
+         Console.WriteLine("Received first message with body: " + 
delivery1.Message().Body);
+         Console.WriteLine("Received second message with body: " + 
delivery2.Message().Body);
+      }
+   }
+}
diff --git a/src/Proton.Client/Client/Concurrent/AtomicReference.cs 
b/src/Proton.Client/Client/Concurrent/AtomicReference.cs
index cbfee83..d52a9d0 100644
--- a/src/Proton.Client/Client/Concurrent/AtomicReference.cs
+++ b/src/Proton.Client/Client/Concurrent/AtomicReference.cs
@@ -106,7 +106,7 @@ namespace Apache.Qpid.Proton.Client.Concurrent
       }
 
       /// <summary>
-      /// Implicit conversion of an atomic reference type to the contined value
+      /// Implicit conversion of an atomic reference type to the contained 
value
       /// using a volatile read operation.
       /// </summary>
       /// <param name="reference">The atomic reference to read from</param>
diff --git a/src/Proton.Client/Client/ConnectionOptions.cs 
b/src/Proton.Client/Client/ConnectionOptions.cs
index efd2a68..0c037a0 100644
--- a/src/Proton.Client/Client/ConnectionOptions.cs
+++ b/src/Proton.Client/Client/ConnectionOptions.cs
@@ -33,6 +33,7 @@ namespace Apache.Qpid.Proton.Client
       public static readonly long DEFAULT_DRAIN_TIMEOUT = 60000;
       public static readonly ushort DEFAULT_CHANNEL_MAX = 65535;
       public static readonly uint DEFAULT_MAX_FRAME_SIZE = 65536;
+      public static readonly NextReceiverPolicy DEFAULT_NEXT_RECEIVER_POLICY = 
NextReceiverPolicy.RoundRobin;
 
       /// <summary>
       /// Creates a default Connection options instance.
@@ -74,6 +75,7 @@ namespace Apache.Qpid.Proton.Client
          other.ChannelMax = ChannelMax;
          other.MaxFrameSize = MaxFrameSize;
          other.TraceFrames = TraceFrames;
+         other.DefaultNextReceiverPolicy = DefaultNextReceiverPolicy;
          if (OfferedCapabilities != null && OfferedCapabilities.Length > 0)
          {
             string[] copyOf = new string[OfferedCapabilities.Length];
@@ -185,6 +187,12 @@ namespace Apache.Qpid.Proton.Client
       /// </summary>
       public uint MaxFrameSize { get; set; } = DEFAULT_MAX_FRAME_SIZE;
 
+      /// <summary>
+      /// Configures the default next receiver policy for this connection and 
any session
+      /// that is created without specifying user defined session default 
options.
+      /// </summary>
+      public NextReceiverPolicy DefaultNextReceiverPolicy { get; set; } = 
DEFAULT_NEXT_RECEIVER_POLICY;
+
       /// <summary>
       /// Configures the set of capabilities that a new connection will 
advertise to the remote.
       /// </summary>
diff --git a/src/Proton.Client/Client/IConnection.cs 
b/src/Proton.Client/Client/IConnection.cs
index 7fab12d..b42eca2 100644
--- a/src/Proton.Client/Client/IConnection.cs
+++ b/src/Proton.Client/Client/IConnection.cs
@@ -334,6 +334,85 @@ namespace Apache.Qpid.Proton.Client
       /// <returns>A Task that results in a tracker instance that can be used 
to track the send outcome</returns>
       Task<ITracker> SendAsync<T>(IMessage<T> message);
 
+      /// <summary>
+      /// Waits indefinitely for a receiver created from this session to have 
a delivery ready for
+      /// receipt. The selection of the next receiver when more than one 
exists which has pending
+      /// deliveries is based upon the configured value of the default next 
receiver policy that
+      /// was configured in the session options used to create this session, 
or the connection
+      /// level policy if none was assigned to the session options.
+      /// </summary>
+      /// <returns>The next receiver that has a pending delivery available 
based on policy.</returns>
+      IReceiver NextReceiver();
+
+      /// <summary>
+      /// Returns a task that will complete only after a receiver created from 
this connection has
+      /// a delivery ready for receipt. The selection of the next receiver 
when more than one
+      /// exists which has pending deliveries is based upon the configured 
value of the default next
+      /// receiver policy that was configured in the connection options used 
to create this connection.
+      /// </summary>
+      /// <returns>A Task that results in the next receiver that has a pending 
delivery available based on policy.</returns>
+      Task<IReceiver> NextReceiverAsync();
+
+      /// <summary>
+      /// Waits indefinitely for a receiver created from this connection to 
have a delivery ready for
+      /// receipt. The selection of the next receiver when more than one 
exists which has pending
+      /// deliveries is based upon the value of the next receiver policy that 
is provided by the caller.
+      /// </summary>
+      /// <param name="policy">The next receiver policy to apply when 
selecting a result</param>
+      /// <returns></returns>
+      IReceiver NextReceiver(NextReceiverPolicy policy);
+
+      /// <summary>
+      /// Returns a task that will complete only after a receiver created from 
this connection has
+      /// a delivery ready for receipt. The selection of the next receiver 
when more than one
+      /// exists which has pending deliveries is based upon the value of the 
next receiver policy
+      /// that is provided by the caller.
+      /// </summary>
+      /// <param name="policy">The next receiver policy to apply when 
selecting a result</param>
+      /// <returns>A Task that results in the next receiver that has a pending 
delivery available based on policy.</returns>
+      Task<IReceiver> NextReceiverAsync(NextReceiverPolicy policy);
+
+      /// <summary>
+      /// Waits up to the given timeout for a receiver created from this 
connection to have a delivery
+      /// ready for receipt. The selection of the next receiver when more than 
one exists which has
+      /// pending deliveries is based upon the configured value of the default 
next receiver policy that
+      /// was configured in the connection options used to create this 
connection.
+      /// </summary>
+      /// <param name="timeout">The time to wait for a receiver to have a 
pending delivery</param>
+      /// <returns></returns>
+      IReceiver NextReceiver(TimeSpan timeout);
+
+      /// <summary>
+      /// Returns a task that will complete once a receiver created from this 
connection has a delivery
+      /// ready for receipt or the given timeout expires. The selection of the 
next receiver when more
+      /// than one exists which has pending deliveries is based upon the 
configured value of the default
+      /// next receiver policy that was configured in the connection options 
used to create this connection.
+      /// </summary>
+      /// <param name="timeout">The time to wait for a receiver to have a 
pending delivery</param>
+      /// <returns></returns>
+      Task<IReceiver> NextReceiverAsync(TimeSpan timeout);
+
+      /// <summary>
+      /// Waits up to the given timeout for a receiver created from this 
connection to have a delivery ready
+      /// for receipt. The selection of the next receiver when more than one 
exists which has pending
+      /// deliveries is based upon the value of the next receiver policy that 
is provided by the caller.
+      /// </summary>
+      /// <param name="policy">The next receiver policy to apply when 
selecting a result</param>
+      /// <param name="timeout">The time to wait for a receiver to have a 
pending delivery</param>
+      /// <returns>A Task that results in the next receiver that has a pending 
delivery available based on policy.</returns>
+      IReceiver NextReceiver(NextReceiverPolicy policy, TimeSpan timeout);
+
+      /// <summary>
+      /// Returns a task that will complete once a receiver created from this 
connection has a delivery
+      /// ready for receipt or the given timeout expires. The selection of the 
next receiver when more
+      /// than one exists which has pending deliveries is based upon the value 
of the next receiver policy
+      /// that is provided by the caller.
+      /// </summary>
+      /// <param name="policy">The next receiver policy to apply when 
selecting a result</param>
+      /// <param name="timeout">The time to wait for a receiver to have a 
pending delivery</param>
+      /// <returns>A Task that results in the next receiver that has a pending 
delivery available based on policy.</returns>
+      Task<IReceiver> NextReceiverAsync(NextReceiverPolicy policy, TimeSpan 
timeout);
+
       /// <summary>
       /// Returns the properties that the remote provided upon successfully 
opening the connection.
       /// If the open has not completed yet this method will block to await 
the open response which
diff --git a/src/Proton.Client/Client/ISession.cs 
b/src/Proton.Client/Client/ISession.cs
index a3caf42..ffc6fe5 100644
--- a/src/Proton.Client/Client/ISession.cs
+++ b/src/Proton.Client/Client/ISession.cs
@@ -336,5 +336,87 @@ namespace Apache.Qpid.Proton.Client
       /// <returns>A Task whose result is the Session no longer under a 
transaction</returns>
       Task<ISession> RollbackTransactionAsync();
 
+      /// <summary>
+      /// Waits indefinitely for a receiver created from this session to have 
a delivery ready for
+      /// receipt. The selection of the next receiver when more than one 
exists which has pending
+      /// deliveries is based upon the configured value of the default next 
receiver policy that
+      /// was configured in the session options used to create this session, 
or the connection
+      /// level policy if none was assigned to the session options.
+      /// </summary>
+      /// <returns>The next receiver that has a pending delivery available 
based on policy.</returns>
+      IReceiver NextReceiver();
+
+      /// <summary>
+      /// Returns a task that will complete only after a receiver created from 
this session has
+      /// a delivery ready for receipt. The selection of the next receiver 
when more than one
+      /// exists which has pending deliveries is based upon the configured 
value of the default next
+      /// receiver policy that was configured in the session options used to 
create this session,
+      /// or the connection level policy if none was assigned to the session 
options.
+      /// </summary>
+      /// <returns>A Task that results in the next receiver that has a pending 
delivery available based on policy.</returns>
+      Task<IReceiver> NextReceiverAsync();
+
+      /// <summary>
+      /// Waits indefinitely for a receiver created from this session to have 
a delivery ready for
+      /// receipt. The selection of the next receiver when more than one 
exists which has pending
+      /// deliveries is based upon the value of the next receiver policy that 
is provided by the caller.
+      /// </summary>
+      /// <param name="policy">The next receiver policy to apply when 
selecting a result</param>
+      /// <returns></returns>
+      IReceiver NextReceiver(NextReceiverPolicy policy);
+
+      /// <summary>
+      /// Returns a task that will complete only after a receiver created from 
this session has
+      /// a delivery ready for receipt. The selection of the next receiver 
when more than one
+      /// exists which has pending deliveries is based upon the value of the 
next receiver policy
+      /// that is provided by the caller.
+      /// </summary>
+      /// <param name="policy">The next receiver policy to apply when 
selecting a result</param>
+      /// <returns>A Task that results in the next receiver that has a pending 
delivery available based on policy.</returns>
+      Task<IReceiver> NextReceiverAsync(NextReceiverPolicy policy);
+
+      /// <summary>
+      /// Waits up to the given timeout for a receiver created from this 
session to have a delivery ready
+      /// for receipt. The selection of the next receiver when more than one 
exists which has pending
+      /// deliveries is based upon the configured value of the default next 
receiver policy that was
+      /// configured in the session options used to create this session, or 
the connection level policy
+      /// if none was assigned to the session options.
+      /// </summary>
+      /// <param name="timeout">The time to wait for a receiver to have a 
pending delivery</param>
+      /// <returns></returns>
+      IReceiver NextReceiver(TimeSpan timeout);
+
+      /// <summary>
+      /// Returns a task that will complete once a receiver created from this 
session has a delivery
+      /// ready for receipt or the given timeout expires. The selection of the 
next receiver when more
+      /// than one exists which has pending deliveries is based upon the 
configured value of the default
+      /// next receiver policy that was configured in the session options used 
to create this session,
+      /// or the connection level policy if none was assigned to the session 
options.
+      /// </summary>
+      /// <param name="timeout">The time to wait for a receiver to have a 
pending delivery</param>
+      /// <returns></returns>
+      Task<IReceiver> NextReceiverAsync(TimeSpan timeout);
+
+      /// <summary>
+      /// Waits up to the given timeout for a receiver created from this 
session to have a delivery ready
+      /// for receipt. The selection of the next receiver when more than one 
exists which has pending
+      /// deliveries is based upon the value of the next receiver policy that 
is provided by the caller.
+      /// </summary>
+      /// <param name="policy">The next receiver policy to apply when 
selecting a result</param>
+      /// <param name="timeout">The time to wait for a receiver to have a 
pending delivery</param>
+      /// <returns>A Task that results in the next receiver that has a pending 
delivery available based on policy.</returns>
+      IReceiver NextReceiver(NextReceiverPolicy policy, TimeSpan timeout);
+
+      /// <summary>
+      /// Returns a task that will complete once a receiver created from this 
session has a delivery
+      /// ready for receipt or the given timeout expires. The selection of the 
next receiver when more
+      /// than one exists which has pending deliveries is based upon the value 
of the next receiver policy
+      /// that is provided by the caller.
+      /// </summary>
+      /// <param name="policy">The next receiver policy to apply when 
selecting a result</param>
+      /// <param name="timeout">The time to wait for a receiver to have a 
pending delivery</param>
+      /// <returns>A Task that results in the next receiver that has a pending 
delivery available based on policy.</returns>
+      Task<IReceiver> NextReceiverAsync(NextReceiverPolicy policy, TimeSpan 
timeout);
+
    }
 }
\ No newline at end of file
diff --git a/src/Proton.Client/Client/Implementation/ClientConnection.cs 
b/src/Proton.Client/Client/Implementation/ClientConnection.cs
index 4b6af84..34df3ba 100644
--- a/src/Proton.Client/Client/Implementation/ClientConnection.cs
+++ b/src/Proton.Client/Client/Implementation/ClientConnection.cs
@@ -482,6 +482,79 @@ namespace Apache.Qpid.Proton.Client.Implementation
          return result.Task;
       }
 
+      public IReceiver NextReceiver()
+      {
+         return NextReceiverAsync(options.DefaultNextReceiverPolicy, 
TimeSpan.MaxValue).ConfigureAwait(false).GetAwaiter().GetResult();
+      }
+
+      public Task<IReceiver> NextReceiverAsync()
+      {
+         return NextReceiverAsync(options.DefaultNextReceiverPolicy, 
TimeSpan.MaxValue);
+      }
+
+      public IReceiver NextReceiver(NextReceiverPolicy policy)
+      {
+         return NextReceiverAsync(policy, 
TimeSpan.MaxValue).ConfigureAwait(false).GetAwaiter().GetResult();
+      }
+
+      public Task<IReceiver> NextReceiverAsync(NextReceiverPolicy policy)
+      {
+         return NextReceiverAsync(policy, TimeSpan.MaxValue);
+      }
+
+      public IReceiver NextReceiver(TimeSpan timeout)
+      {
+         return NextReceiverAsync(options.DefaultNextReceiverPolicy, 
timeout).ConfigureAwait(false).GetAwaiter().GetResult();
+      }
+
+      public Task<IReceiver> NextReceiverAsync(TimeSpan timeout)
+      {
+         return NextReceiverAsync(options.DefaultNextReceiverPolicy, timeout);
+      }
+
+      public IReceiver NextReceiver(NextReceiverPolicy policy, TimeSpan 
timeout)
+      {
+         return NextReceiverAsync(policy, 
timeout).ConfigureAwait(false).GetAwaiter().GetResult();
+      }
+
+      public Task<IReceiver> NextReceiverAsync(NextReceiverPolicy policy, 
TimeSpan timeout)
+      {
+         CheckClosedOrFailed();
+         TaskCompletionSource<IReceiver> result = new();
+
+         DefaultSessionAsync().ContinueWith(session =>
+         {
+            if (session.IsCompletedSuccessfully)
+            {
+               session.Result.NextReceiverAsync(policy, 
timeout).ContinueWith(tracker =>
+               {
+                  if (tracker.IsCompletedSuccessfully)
+                  {
+                     result.TrySetResult(tracker.Result);
+                  }
+                  else if (tracker.IsCanceled)
+                  {
+                     result.TrySetCanceled();
+                  }
+                  else
+                  {
+                     result.TrySetException(tracker.Exception.InnerException);
+                  }
+               });
+            }
+            else if (session.IsCanceled)
+            {
+               result.TrySetCanceled();
+            }
+            else
+            {
+               result.TrySetException(session.Exception.InnerException);
+            }
+         });
+
+         return result.Task;
+      }
+
       public override string ToString()
       {
          return "ClientConnection:[" + ConnectionId + "]";
@@ -988,7 +1061,7 @@ namespace Apache.Qpid.Proton.Client.Implementation
 
       private void DoEngineTickAndReschedule()
       {
-         // Prevent a tick that was queued on the event loop from occuring if 
the
+         // Prevent a tick that was queued on the event loop from occurring if 
the
          // connection dropped and a new engine was created for a reconnect 
attempt.
          if (engine.Connection.IsRemotelyOpen && 
engine.Connection.IsLocallyOpen)
          {
diff --git 
a/src/Proton.Client/Client/Implementation/ClientNextReceiverSelector.cs 
b/src/Proton.Client/Client/Implementation/ClientNextReceiverSelector.cs
new file mode 100644
index 0000000..e9dfff7
--- /dev/null
+++ b/src/Proton.Client/Client/Implementation/ClientNextReceiverSelector.cs
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading.Tasks;
+using Apache.Qpid.Proton.Client.Exceptions;
+using Apache.Qpid.Proton.Engine;
+using Apache.Qpid.Proton.Utilities;
+
+namespace Apache.Qpid.Proton.Client.Implementation
+{
+   public sealed class ClientNextReceiverSelector
+   {
+      private static readonly string LAST_RETURNED_STATE_KEY = 
"Last_Returned_State";
+
+      private readonly ArrayDeque<TaskCompletionSource<IReceiver>> pending = 
new();
+      private readonly Random random = new();
+
+      private readonly ClientSession session;
+
+      internal ClientNextReceiverSelector(ClientSession session)
+      {
+         this.session = session;
+
+         HandleReconnect(); // Same processing works for initialization
+      }
+
+      public void NextReceiver(TaskCompletionSource<IReceiver> request, 
NextReceiverPolicy policy, TimeSpan timeout)
+      {
+         Statics.RequireNonNull(policy, "The next receiver selection policy 
cannot be null");
+
+         ClientReceiver result = null;
+
+         switch (policy)
+         {
+            case NextReceiverPolicy.Random:
+               result = SelectRandomReceiver();
+               break;
+            case NextReceiverPolicy.RoundRobin:
+               result = SelectNextAvailable();
+               break;
+            case NextReceiverPolicy.FirstAvailable:
+               result = SelectFirstAvailable();
+               break;
+            case NextReceiverPolicy.LargestBacklog:
+               result = SelectLargestBacklog();
+               break;
+            case NextReceiverPolicy.SmallestBacklog:
+               result = SelectSmallestBacklog();
+               break;
+            default:
+               request.TrySetException(new ClientException("Next receiver 
called with invalid or unknown policy:" + policy));
+               break;
+         }
+
+         if (result == null)
+         {
+            pending.Enqueue(request);
+            if (timeout > TimeSpan.Zero && timeout < TimeSpan.MaxValue)
+            {
+               session.Schedule(() =>
+               {
+                  pending.Remove(request);
+                  request.TrySetResult(null);
+               }, timeout);
+            }
+         }
+         else
+         {
+            // Track last returned to update state for Round Robin next 
receiver dispatch
+            // this effectively ties all policies together in updating the 
next result from
+            // a call that requests the round robin fairness policy.
+            session.ProtonSession.Attachments[LAST_RETURNED_STATE_KEY] = 
result;
+
+            request.TrySetResult(result);
+         }
+      }
+
+      public void HandleReconnect()
+      {
+         session.ProtonSession.DeliveryReadHandler(DeliveryReadHandler);
+      }
+
+      public void HandleShutdown()
+      {
+         ClientException cause;
+
+         if (session.IsClosed)
+         {
+            cause = new ClientIllegalStateException("The Session was 
explicitly closed", session.FailureCause);
+         }
+         else if (session.FailureCause != null)
+         {
+            cause = session.FailureCause;
+         }
+         else
+         {
+            cause = new ClientIllegalStateException("The session was closed 
without a specific error being provided");
+         }
+
+         foreach (TaskCompletionSource<IReceiver> request in pending)
+         {
+            request.TrySetException(cause);
+         }
+
+         pending.Clear();
+      }
+
+      private void DeliveryReadHandler(IIncomingDelivery delivery)
+      {
+         // When a new delivery arrives that is completed
+         if (!pending.IsEmpty && !delivery.IsPartial && !delivery.IsAborted)
+         {
+            // We only handle next receiver events for normal client receivers 
and
+            // not for stream receiver types etc.
+            if (delivery.Receiver.LinkedResource is ClientReceiver receiver)
+            {
+               // Track last returned to update state for Round Robin next 
receiver dispatch
+               delivery.Receiver.Session.Attachments[LAST_RETURNED_STATE_KEY] 
= receiver;
+
+               pending.Dequeue().TrySetResult(receiver);
+            }
+         }
+      }
+
+      private ClientReceiver SelectRandomReceiver()
+      {
+         IEnumerable<Engine.IReceiver> receivers = 
session.ProtonSession.Receivers.
+            Where(r => r.LinkedResource is ClientReceiver receiver && 
receiver.QueuedDeliveries > 0);
+
+         Engine.IReceiver receiver = 
receivers.ElementAtOrDefault(random.Next(0, receivers.Count()));
+
+         return receiver?.LinkedResource as ClientReceiver;
+      }
+
+      private ClientReceiver SelectNextAvailable()
+      {
+         ClientReceiver lastReceiver = 
session.ProtonSession.Attachments.Get<ClientReceiver>(LAST_RETURNED_STATE_KEY, 
null);
+         ClientReceiver result = null;
+
+         if (lastReceiver != null && 
!lastReceiver.ProtonReceiver.IsLocallyClosedOrDetached)
+         {
+            bool foundLast = false;
+            foreach (Engine.IReceiver protonReceiver in 
session.ProtonSession.Receivers)
+            {
+               if (protonReceiver.LinkedResource is ClientReceiver candidate)
+               {
+                  if (foundLast)
+                  {
+                     if (candidate.QueuedDeliveries > 0)
+                     {
+                        result = candidate;
+                     }
+                  }
+                  else
+                  {
+                     foundLast = candidate == lastReceiver;
+                  }
+               }
+            }
+         }
+         else
+         {
+            session.ProtonSession.Attachments[LAST_RETURNED_STATE_KEY] = null;
+         }
+
+         return result ?? SelectFirstAvailable();
+      }
+
+      private ClientReceiver SelectFirstAvailable()
+      {
+         Engine.IReceiver receiver =
+            session.ProtonSession.Receivers.Where(
+               r => r.LinkedResource is ClientReceiver receiver && 
receiver.QueuedDeliveries > 0).FirstOrDefault();
+
+         return (ClientReceiver)receiver?.LinkedResource;
+      }
+
+      private ClientReceiver SelectLargestBacklog()
+      {
+         IEnumerable<Engine.IReceiver> receivers = 
session.ProtonSession.Receivers.
+            Where(r => r.LinkedResource is ClientReceiver receiver && 
receiver.QueuedDeliveries > 0);
+
+         ClientReceiver result = null;
+
+         foreach (Engine.IReceiver receiver in receivers)
+         {
+            ClientReceiver candidate = (ClientReceiver)receiver.LinkedResource;
+
+            if (result == null || result.QueuedDeliveries < 
candidate.QueuedDeliveries)
+            {
+               result = candidate;
+            }
+         }
+
+         return result;
+      }
+
+      private ClientReceiver SelectSmallestBacklog()
+      {
+         IEnumerable<Engine.IReceiver> receivers = 
session.ProtonSession.Receivers.
+            Where(r => r.LinkedResource is ClientReceiver receiver && 
receiver.QueuedDeliveries > 0);
+
+         ClientReceiver result = null;
+
+         foreach (Engine.IReceiver receiver in receivers)
+         {
+            ClientReceiver candidate = (ClientReceiver)receiver.LinkedResource;
+
+            if (result == null || result.QueuedDeliveries > 
candidate.QueuedDeliveries)
+            {
+               result = candidate;
+            }
+         }
+
+         return result;
+      }
+   }
+}
\ No newline at end of file
diff --git a/src/Proton.Client/Client/Implementation/ClientReceiverLinkType.cs 
b/src/Proton.Client/Client/Implementation/ClientReceiverLinkType.cs
index 837ed14..f619258 100644
--- a/src/Proton.Client/Client/Implementation/ClientReceiverLinkType.cs
+++ b/src/Proton.Client/Client/Implementation/ClientReceiverLinkType.cs
@@ -138,6 +138,8 @@ namespace Apache.Qpid.Proton.Client.Implementation
 
       internal string ReceiverId => receiverId;
 
+      internal Engine.IReceiver ProtonReceiver => protonLink;
+
       protected void AsyncApplyDisposition(IIncomingDelivery delivery, 
Types.Transport.IDeliveryState state, bool settle)
       {
          session.Execute(() =>
diff --git a/src/Proton.Client/Client/Implementation/ClientSender.cs 
b/src/Proton.Client/Client/Implementation/ClientSender.cs
index a86d109..e74c79f 100644
--- a/src/Proton.Client/Client/Implementation/ClientSender.cs
+++ b/src/Proton.Client/Client/Implementation/ClientSender.cs
@@ -134,7 +134,7 @@ namespace Apache.Qpid.Proton.Client.Implementation
 
       private Task<ITracker> DoSendMessageAsync<T>(IAdvancedMessage<T> 
message, IDictionary<string, object> deliveryAnnotations, bool waitForCredit)
       {
-         TaskCompletionSource<ITracker> operation = new 
TaskCompletionSource<ITracker>();
+         TaskCompletionSource<ITracker> operation = new();
 
          IProtonBuffer buffer = message.Encode(deliveryAnnotations);
 
@@ -144,7 +144,7 @@ namespace Apache.Qpid.Proton.Client.Implementation
             {
                try
                {
-                  ClientOutgoingEnvelope envelope = new 
ClientOutgoingEnvelope(this, message.MessageFormat, buffer, operation);
+                  ClientOutgoingEnvelope envelope = new(this, 
message.MessageFormat, buffer, operation);
 
                   if (ProtonSender.IsSendable && ProtonSender.Current == null)
                   {
@@ -274,7 +274,7 @@ namespace Apache.Qpid.Proton.Client.Implementation
 
       #endregion
 
-      #region Proton Sender lifecycle envent handlers
+      #region Proton Sender lifecycle event handlers
 
       private void HandleLocalOpen(Engine.ISender sender)
       {
diff --git a/src/Proton.Client/Client/Implementation/ClientSession.cs 
b/src/Proton.Client/Client/Implementation/ClientSession.cs
index ce46884..f20a291 100644
--- a/src/Proton.Client/Client/Implementation/ClientSession.cs
+++ b/src/Proton.Client/Client/Implementation/ClientSession.cs
@@ -40,15 +40,16 @@ namespace Apache.Qpid.Proton.Client.Implementation
       private readonly SessionOptions options;
       private readonly ClientConnection connection;
       private readonly string sessionId;
-      private readonly AtomicBoolean closed = new AtomicBoolean();
+      private readonly AtomicBoolean closed = new();
       private readonly ClientSenderBuilder senderBuilder;
       private readonly ClientReceiverBuilder receiverBuilder;
-      private readonly TaskCompletionSource<ISession> openFuture = new 
TaskCompletionSource<ISession>();
-      private readonly TaskCompletionSource<ISession> closeFuture = new 
TaskCompletionSource<ISession>();
+      private readonly TaskCompletionSource<ISession> openFuture = new();
+      private readonly TaskCompletionSource<ISession> closeFuture = new();
 
       private IClientTransactionContext txnContext = NoOpTransactionContext;
       private Engine.ISession protonSession;
       private ClientException failureCause;
+      private ClientNextReceiverSelector nextReceiverSelector;
 
       public ClientSession(ClientConnection connection, SessionOptions 
options, string sessionId, Engine.ISession session)
       {
@@ -66,6 +67,8 @@ namespace Apache.Qpid.Proton.Client.Implementation
 
       public Task<ISession> OpenTask => openFuture.Task;
 
+      internal ClientException FailureCause => failureCause;
+
       public IReadOnlyDictionary<string, object> Properties
       {
          get
@@ -338,6 +341,67 @@ namespace Apache.Qpid.Proton.Client.Implementation
          return rollbackFuture.Task;
       }
 
+      public virtual IReceiver NextReceiver()
+      {
+         return NextReceiverAsync(options.DefaultNextReceiverPolicy, 
TimeSpan.MaxValue).ConfigureAwait(false).GetAwaiter().GetResult();
+      }
+
+      public virtual Task<IReceiver> NextReceiverAsync()
+      {
+         return NextReceiverAsync(options.DefaultNextReceiverPolicy, 
TimeSpan.MaxValue);
+      }
+
+      public virtual IReceiver NextReceiver(NextReceiverPolicy policy)
+      {
+         return NextReceiverAsync(policy, 
TimeSpan.MaxValue).ConfigureAwait(false).GetAwaiter().GetResult();
+      }
+
+      public virtual Task<IReceiver> NextReceiverAsync(NextReceiverPolicy 
policy)
+      {
+         return NextReceiverAsync(policy, TimeSpan.MaxValue);
+      }
+
+      public virtual IReceiver NextReceiver(TimeSpan timeout)
+      {
+         return NextReceiverAsync(options.DefaultNextReceiverPolicy, 
timeout).ConfigureAwait(false).GetAwaiter().GetResult();
+      }
+
+      public virtual Task<IReceiver> NextReceiverAsync(TimeSpan timeout)
+      {
+         return NextReceiverAsync(options.DefaultNextReceiverPolicy, timeout);
+      }
+
+      public virtual IReceiver NextReceiver(NextReceiverPolicy policy, 
TimeSpan timeout)
+      {
+         return NextReceiverAsync(policy, 
timeout).ConfigureAwait(false).GetAwaiter().GetResult();
+      }
+
+      public virtual Task<IReceiver> NextReceiverAsync(NextReceiverPolicy 
policy, TimeSpan timeout)
+      {
+         CheckClosedOrFailed();
+         TaskCompletionSource<IReceiver> nextReceiver = new();
+
+         connection.Execute(() =>
+         {
+            try
+            {
+               CheckClosedOrFailed();
+               if (nextReceiverSelector == null)
+               {
+                  nextReceiverSelector = new ClientNextReceiverSelector(this);
+               }
+
+               nextReceiverSelector.NextReceiver(nextReceiver, policy, 
timeout);
+            }
+            catch (Exception error)
+            {
+               
nextReceiver.TrySetException(ClientExceptionSupport.CreateNonFatalOrPassthrough(error));
+            }
+         });
+
+         return nextReceiver.Task;
+      }
+
       #region Internal client session API
 
       internal void CheckClosedOrFailed()
@@ -516,6 +580,8 @@ namespace Apache.Qpid.Proton.Client.Implementation
          {
          }
 
+         nextReceiverSelector?.HandleShutdown();
+
          if (failureCause != null)
          {
             _ = openFuture.TrySetException(failureCause);
@@ -628,6 +694,8 @@ namespace Apache.Qpid.Proton.Client.Implementation
             protonSession.Close();
             protonSession = 
ConfigureSession(ClientSessionBuilder.RecreateSession(connection, options));
 
+            nextReceiverSelector?.HandleReconnect();
+
             Open();
          }
          else
diff --git a/src/Proton.Client/Client/Implementation/ClientSessionBuilder.cs 
b/src/Proton.Client/Client/Implementation/ClientSessionBuilder.cs
index a47c872..28dbbe2 100644
--- a/src/Proton.Client/Client/Implementation/ClientSessionBuilder.cs
+++ b/src/Proton.Client/Client/Implementation/ClientSessionBuilder.cs
@@ -98,7 +98,8 @@ namespace Apache.Qpid.Proton.Client.Implementation
                      CloseTimeout = connectionOptions.CloseTimeout,
                      RequestTimeout = connectionOptions.RequestTimeout,
                      SendTimeout = connectionOptions.SendTimeout,
-                     DrainTimeout = connectionOptions.DrainTimeout
+                     DrainTimeout = connectionOptions.DrainTimeout,
+                     DefaultNextReceiverPolicy = 
connectionOptions.DefaultNextReceiverPolicy
                   };
                }
 
diff --git a/src/Proton.Client/Client/Implementation/ClientStreamSession.cs 
b/src/Proton.Client/Client/Implementation/ClientStreamSession.cs
index 0aee53e..f3f6c06 100644
--- a/src/Proton.Client/Client/Implementation/ClientStreamSession.cs
+++ b/src/Proton.Client/Client/Implementation/ClientStreamSession.cs
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 
+using System;
 using System.Collections.Generic;
 using System.Threading.Tasks;
 using Apache.Qpid.Proton.Client.Exceptions;
@@ -87,5 +88,53 @@ namespace Apache.Qpid.Proton.Client.Implementation
          CheckClosedOrFailed();
          throw new ClientUnsupportedOperationException("Cannot create a sender 
from a streaming resource session");
       }
+
+      public override IReceiver NextReceiver()
+      {
+         CheckClosedOrFailed();
+         throw new ClientUnsupportedOperationException("Cannot request next 
receiver from a streaming resource session");
+      }
+
+      public override Task<IReceiver> NextReceiverAsync()
+      {
+         CheckClosedOrFailed();
+         throw new ClientUnsupportedOperationException("Cannot request next 
receiver from a streaming resource session");
+      }
+
+      public override IReceiver NextReceiver(NextReceiverPolicy policy)
+      {
+         CheckClosedOrFailed();
+         throw new ClientUnsupportedOperationException("Cannot request next 
receiver from a streaming resource session");
+      }
+
+      public override Task<IReceiver> NextReceiverAsync(NextReceiverPolicy 
policy)
+      {
+         CheckClosedOrFailed();
+         throw new ClientUnsupportedOperationException("Cannot request next 
receiver from a streaming resource session");
+      }
+
+      public override IReceiver NextReceiver(TimeSpan timeout)
+      {
+         CheckClosedOrFailed();
+         throw new ClientUnsupportedOperationException("Cannot request next 
receiver from a streaming resource session");
+      }
+
+      public override Task<IReceiver> NextReceiverAsync(TimeSpan timeout)
+      {
+         CheckClosedOrFailed();
+         throw new ClientUnsupportedOperationException("Cannot request next 
receiver from a streaming resource session");
+      }
+
+      public override IReceiver NextReceiver(NextReceiverPolicy policy, 
TimeSpan timeout)
+      {
+         CheckClosedOrFailed();
+         throw new ClientUnsupportedOperationException("Cannot request next 
receiver from a streaming resource session");
+      }
+
+      public override Task<IReceiver> NextReceiverAsync(NextReceiverPolicy 
policy, TimeSpan timeout)
+      {
+         CheckClosedOrFailed();
+         throw new ClientUnsupportedOperationException("Cannot request next 
receiver from a streaming resource session");
+      }
    }
 }
\ No newline at end of file
diff --git a/src/Proton.Client/Client/NextReceiverPolicy.cs 
b/src/Proton.Client/Client/NextReceiverPolicy.cs
new file mode 100644
index 0000000..fe84f4b
--- /dev/null
+++ b/src/Proton.Client/Client/NextReceiverPolicy.cs
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Qpid.Proton.Client
+{
+   /// <summary>
+   /// Determines the behavior of a Session when the next receiver method is 
called
+   /// on that session. Each policy provides a contract on the ordering of 
returned
+   /// receivers from the next receiver API when there are receivers with 
locally
+   /// queued deliveries. When there are no Receiver instances that have 
locally
+   /// queued deliveries the next receive API will return the next receiver to
+   /// receive a complete incoming delivery unless a timeout was given and that
+   /// time period expires in which case it will return null.
+   ///
+   /// Should the user perform receive calls on a Receiver directly in multiple
+   /// threads the behavior of the next receiver API is undefined and it 
becomes possible
+   /// that the resulting receiver returned from that API will have no actual 
pending
+   /// deliveries due to a race. In most cases the caller can mitigate some 
risk by using
+   /// the Receiver tryReceive API and accounting for a null result.
+   /// </summary>
+   public enum NextReceiverPolicy
+   {
+      /// <summary>
+      /// Examines the list of currently open receivers in the session and 
returns
+      /// the next receiver that has a pending delivery that follows the 
previously
+      /// returned receiver (if any) otherwise the first receiver in the 
session with
+      /// a pending delivery is returned. The order of receivers returned will 
likely
+      /// be creation order however the implementation is not required to 
follow this
+      /// pattern so the caller should not be coded to rely on that ordering.
+      /// </summary>
+      RoundRobin,
+
+      /// <summary>
+      /// Examines the list of currently open receivers in the session and 
returns a
+      /// random selection from the set of receivers that have a pending 
delivery
+      /// immediately available. This provides a means of selecting receivers 
which
+      /// is not prone to sticking to a highly active receiver which can 
starve out
+      /// other receivers which receive only limited traffic.
+      /// </summary>
+      Random,
+
+      /// <summary>
+      /// Examines the list of currently open receivers in the session and 
returns the
+      /// first receiver found with an available delivery. This can result in 
starvation
+      /// if that receiver has a continuous feed of new deliveries from the 
remote as it
+      /// will be repeatedly selected by the next receiver API.
+      /// </summary>
+      FirstAvailable,
+
+      /// <summary>
+      /// Examines the list of currently open receivers in the session and 
returns the
+      /// receiver with the largest backlog of available deliveries. This can 
result in
+      /// starvation if that receiver has a continuous feed of new deliveries 
from the
+      /// remote as it will likely be repeatedly selected by the next receiver 
API.
+      /// </summary>
+      LargestBacklog,
+
+      /// <summary>
+      /// Examines the list of currently open receivers in the session and 
returns the
+      /// receiver with the smallest backlog of available deliveries.
+      /// </summary>
+      SmallestBacklog
+
+   }
+}
\ No newline at end of file
diff --git a/src/Proton.Client/Client/SessionOptions.cs 
b/src/Proton.Client/Client/SessionOptions.cs
index 6660b26..713b533 100644
--- a/src/Proton.Client/Client/SessionOptions.cs
+++ b/src/Proton.Client/Client/SessionOptions.cs
@@ -65,6 +65,7 @@ namespace Apache.Qpid.Proton.Client
          other.OpenTimeout = OpenTimeout;
          other.CloseTimeout = CloseTimeout;
          other.DrainTimeout = DrainTimeout;
+         other.DefaultNextReceiverPolicy = DefaultNextReceiverPolicy;
          if (OfferedCapabilities != null && OfferedCapabilities.Length > 0)
          {
             string[] copyOf = new string[OfferedCapabilities.Length];
@@ -137,6 +138,11 @@ namespace Apache.Qpid.Proton.Client
       /// </summary>
       public string[] DesiredCapabilities { get; set; }
 
+      /// <summary>
+      /// Configures the default next receiver policy for this session.
+      /// </summary>
+      public NextReceiverPolicy DefaultNextReceiverPolicy { get; set; } = 
ConnectionOptions.DEFAULT_NEXT_RECEIVER_POLICY;
+
       /// <summary>
       /// Configures a collection of property values that are sent to the 
remote upon opening
       /// a new session.
diff --git a/src/Proton.Client/Client/Transport/ITransport.cs 
b/src/Proton.Client/Client/Transport/ITransport.cs
index cd0d9c8..26bf87b 100644
--- a/src/Proton.Client/Client/Transport/ITransport.cs
+++ b/src/Proton.Client/Client/Transport/ITransport.cs
@@ -30,7 +30,7 @@ namespace Apache.Qpid.Proton.Client.Transport
    public interface ITransport
    {
       /// <summary>
-      /// Returns the event loop that this transport is registed against,
+      /// Returns the event loop that this transport is registered against,
       /// the event loop should never have its lifetime linked to a transport
       /// as the client connection will use a single event loop for the
       /// duration of its lifetime.
diff --git a/src/Proton.Client/Client/Transport/TcpTransport.cs 
b/src/Proton.Client/Client/Transport/TcpTransport.cs
index d6cb339..b9d73e6 100644
--- a/src/Proton.Client/Client/Transport/TcpTransport.cs
+++ b/src/Proton.Client/Client/Transport/TcpTransport.cs
@@ -110,7 +110,7 @@ namespace Apache.Qpid.Proton.Client.Transport
             // since we are shutting down anyway.
             try
             {
-               ChannelTermination termination = new ChannelTermination();
+               ChannelTermination termination = new();
                if ((!channelOutputSource?.TryWrite(termination) ?? true) || 
!connected)
                {
                   termination.Execute();
diff --git a/src/Proton/Engine/IAttachments.cs 
b/src/Proton/Engine/IAttachments.cs
index c057b09..9ac40c3 100644
--- a/src/Proton/Engine/IAttachments.cs
+++ b/src/Proton/Engine/IAttachments.cs
@@ -25,6 +25,16 @@ namespace Apache.Qpid.Proton.Engine
    /// </summary>
    public interface IAttachments
    {
+      /// <summary>
+      /// Gets or sets the attachment with the given key. If the element being
+      /// retrieved is not in the collection an exception is thrown.
+      /// </summary>
+      /// <param name="key"></param>
+      /// <returns></returns>
+      /// <exception cref="KeyNotFoundException">If the given key is not in 
the attachments</exception>
+      /// <exception cref="ArgumentNullException">If the given key is 
null</exception>
+      object this[string key] { get; set; }
+
       /// <summary>
       /// Gets the user attached value that is associated with the given key, 
or null
       /// if no data is mapped to the key.
diff --git a/src/Proton/Engine/ISession.cs b/src/Proton/Engine/ISession.cs
index 0db86d4..8cea13f 100644
--- a/src/Proton/Engine/ISession.cs
+++ b/src/Proton/Engine/ISession.cs
@@ -148,7 +148,7 @@ namespace Apache.Qpid.Proton.Engine
       /// Sets a delegate for when an AMQP Attach frame is received from the 
remote peer
       /// for a transaction manager link attach.
       /// <para/>
-      /// Used to process remotely initiated transcation mangaers. Locally 
initiated links have
+      /// Used to process remotely initiated transaction managers. Locally 
initiated links have
       /// their own handlers invoked instead. This method is Typically used by 
servers to listen
       /// for remote resource creation. If an event handler for remote sender 
open is registered on
       /// this Session for a link scoped to it then this handler will be 
invoked instead of the
@@ -158,5 +158,17 @@ namespace Apache.Qpid.Proton.Engine
       /// <returns>This session instance</returns>
       ISession TransactionManagerOpenedHandler(Action<ITransactionManager> 
handler);
 
+      /// <summary>
+      /// Allows monitoring of incoming deliveries to receivers attached to 
this {@link Session}.
+      /// The Receiver that is the target of the incoming delivery will be 
notified first of the
+      /// incoming delivery and any processing should be done using the 
Receiver DeliveryReadHandler
+      /// API. This event point will be trigger only after the Receiver level 
handler and should be
+      /// used to monitor deliveries passing through a session for logging or 
other state related
+      /// actions performed by the service managing this session.
+      /// </summary>
+      /// <param name="handler">Handler that is signalled that a receiver read 
a new delivery</param>
+      /// <returns>This session instance</returns>
+      ISession DeliveryReadHandler(Action<IIncomingDelivery> handler);
+
    }
 }
\ No newline at end of file
diff --git a/src/Proton/Engine/Implementation/ProtonAttachments.cs 
b/src/Proton/Engine/Implementation/ProtonAttachments.cs
index 63b1eba..a3f6a02 100644
--- a/src/Proton/Engine/Implementation/ProtonAttachments.cs
+++ b/src/Proton/Engine/Implementation/ProtonAttachments.cs
@@ -23,6 +23,12 @@ namespace Apache.Qpid.Proton.Engine.Implementation
    {
       private readonly IDictionary<string, object> entries = new 
Dictionary<string, object>();
 
+      public object this[string key]
+      {
+         get => entries[key];
+         set => entries[key] = value;
+      }
+
       public IAttachments Clear()
       {
          entries.Clear();
diff --git a/src/Proton/Engine/Implementation/ProtonReceiver.cs 
b/src/Proton/Engine/Implementation/ProtonReceiver.cs
index b582184..e641172 100644
--- a/src/Proton/Engine/Implementation/ProtonReceiver.cs
+++ b/src/Proton/Engine/Implementation/ProtonReceiver.cs
@@ -392,6 +392,13 @@ namespace Apache.Qpid.Proton.Engine.Implementation
          {
             deliveryReadEventHandler?.Invoke(delivery);
          }
+
+         // Allow session owner to monitor deliveries passing through the 
session
+         // but only after the receiver handlers have had a chance to handle 
it.
+         if (session.HasDeliveryReadHandler)
+         {
+            session.FireDeliveryRead(delivery);
+         }
       }
 
       internal void FireDeliveryUpdated(ProtonIncomingDelivery delivery)
diff --git a/src/Proton/Engine/Implementation/ProtonSession.cs 
b/src/Proton/Engine/Implementation/ProtonSession.cs
index e622892..fcc8792 100644
--- a/src/Proton/Engine/Implementation/ProtonSession.cs
+++ b/src/Proton/Engine/Implementation/ProtonSession.cs
@@ -32,7 +32,7 @@ namespace Apache.Qpid.Proton.Engine.Implementation
    /// </summary>
    public sealed class ProtonSession : ProtonEndpoint<ISession>, ISession
    {
-      private readonly Begin localBegin = new Begin();
+      private readonly Begin localBegin = new();
       private Begin remoteBegin;
 
       private readonly ushort localChannel;
@@ -61,6 +61,9 @@ namespace Apache.Qpid.Proton.Engine.Implementation
       private Action<IReceiver> remoteReceiverOpenEventHandler;
       private Action<ITransactionManager> remoteTxnManagerOpenEventHandler;
 
+      // Spy API for session resources
+      private Action<IIncomingDelivery> deliveryReadHandler;
+
       public ProtonSession(ProtonConnection connection, ushort channel) : 
base(connection.ProtonEngine)
       {
          this.connection = connection;
@@ -307,6 +310,12 @@ namespace Apache.Qpid.Proton.Engine.Implementation
          return this;
       }
 
+      public ISession DeliveryReadHandler(Action<IIncomingDelivery> handler)
+      {
+         deliveryReadHandler = handler;
+         return this;
+      }
+
       #endregion
 
       #region Handlers for remote AMQP Performatives
@@ -461,6 +470,8 @@ namespace Apache.Qpid.Proton.Engine.Implementation
 
       internal bool HasTransactionManagerOpenHandler => 
remoteTxnManagerOpenEventHandler != null;
 
+      internal bool HasDeliveryReadHandler => deliveryReadHandler != null;
+
       internal void FireRemoteReceiverOpened(IReceiver receiver)
       {
          remoteReceiverOpenEventHandler?.Invoke(receiver);
@@ -471,6 +482,11 @@ namespace Apache.Qpid.Proton.Engine.Implementation
          remoteSenderOpenEventHandler?.Invoke(sender);
       }
 
+      internal void FireDeliveryRead(IIncomingDelivery delivery)
+      {
+         deliveryReadHandler?.Invoke(delivery);
+      }
+
       internal void FireRemoteTransactionManagerOpened(ITransactionManager 
manager)
       {
          remoteTxnManagerOpenEventHandler?.Invoke(manager);
diff --git 
a/test/Proton.Client.Tests/Client/Implementation/ClientBaseTestFixture.cs 
b/test/Proton.Client.Tests/Client/Implementation/ClientBaseTestFixture.cs
index 14d2bca..92eb8c0 100644
--- a/test/Proton.Client.Tests/Client/Implementation/ClientBaseTestFixture.cs
+++ b/test/Proton.Client.Tests/Client/Implementation/ClientBaseTestFixture.cs
@@ -46,7 +46,7 @@ namespace Apache.Qpid.Proton.Client.Implementation
          NLog.Targets.Target logconsole = new 
NLog.Targets.ConsoleTarget("logconsole");
 
          // Rules for mapping loggers to targets
-         // config.AddRule(NLog.LogLevel.Trace, NLog.LogLevel.Fatal, 
logconsole);
+         //config.AddRule(NLog.LogLevel.Trace, NLog.LogLevel.Fatal, 
logconsole);
          config.AddRule(NLog.LogLevel.Trace, NLog.LogLevel.Fatal, logfile);
 
          loggerFactory = LoggerFactory.Create(builder =>
diff --git 
a/test/Proton.Client.Tests/Client/Implementation/ClientReconnectSessionTest.cs 
b/test/Proton.Client.Tests/Client/Implementation/ClientReconnectSessionTest.cs
index 3642c77..7504fef 100644
--- 
a/test/Proton.Client.Tests/Client/Implementation/ClientReconnectSessionTest.cs
+++ 
b/test/Proton.Client.Tests/Client/Implementation/ClientReconnectSessionTest.cs
@@ -17,7 +17,9 @@
 
 using System;
 using System.Threading;
+using System.Threading.Tasks;
 using Apache.Qpid.Proton.Test.Driver;
+using Apache.Qpid.Proton.Types.Messaging;
 using Microsoft.Extensions.Logging;
 using NUnit.Framework;
 
@@ -275,5 +277,121 @@ namespace Apache.Qpid.Proton.Client.Implementation
          }
       }
 
+      [Test]
+      public void TestNextReceiverCompletesAfterDeliveryArrivesRoundRobin()
+      {
+         
DoTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy.RoundRobin);
+      }
+
+      [Test]
+      public void TestNextReceiverCompletesAfterDeliveryArrivesRandom()
+      {
+         
DoTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy.Random);
+      }
+
+      [Test]
+      public void TestNextReceiverCompletesAfterDeliveryArrivesLargestBacklog()
+      {
+         
DoTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy.LargestBacklog);
+      }
+
+      [Test]
+      public void 
TestNextReceiverCompletesAfterDeliveryArrivesSmallestBacklog()
+      {
+         
DoTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy.SmallestBacklog);
+      }
+
+      [Test]
+      public void TestNextReceiverCompletesAfterDeliveryArrivesFirstAvailable()
+      {
+         
DoTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy.FirstAvailable);
+      }
+
+      public void 
DoTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy policy)
+      {
+         byte[] payload = CreateEncodedMessage(new AmqpValue("Hello World"));
+
+         using (ProtonTestServer firstPeer = new 
ProtonTestServer(loggerFactory))
+         using (ProtonTestServer finalPeer = new 
ProtonTestServer(loggerFactory))
+         {
+            firstPeer.ExpectSASLAnonymousConnect();
+            firstPeer.ExpectOpen().Respond();
+            firstPeer.ExpectBegin().Respond();
+            firstPeer.ExpectAttach().OfReceiver().Respond();
+            firstPeer.ExpectFlow().WithLinkCredit(10);
+            firstPeer.DropAfterLastHandler();
+            firstPeer.Start();
+
+            finalPeer.ExpectSASLAnonymousConnect();
+            finalPeer.ExpectOpen().Respond();
+            finalPeer.ExpectBegin().Respond();
+            finalPeer.ExpectAttach().OfReceiver().Respond();
+            finalPeer.ExpectFlow().WithLinkCredit(10);
+            finalPeer.Start();
+
+            string primaryAddress = firstPeer.ServerAddress;
+            int primaryPort = firstPeer.ServerPort;
+            string finalAddress = finalPeer.ServerAddress;
+            int finalPort = finalPeer.ServerPort;
+
+            logger.LogInformation("Test started, first peer listening on: 
{0}:{1}", primaryAddress, primaryPort);
+            logger.LogInformation("Test started, final peer listening on: 
{0}:{1}", finalAddress, finalPort);
+
+            CountdownEvent done = new CountdownEvent(1);
+
+            ConnectionOptions options = new ConnectionOptions()
+            {
+               DefaultNextReceiverPolicy = policy,
+            };
+            options.ReconnectOptions.ReconnectEnabled = true;
+            options.ReconnectOptions.AddReconnectLocation(finalAddress, 
finalPort);
+
+            IClient container = IClient.Create();
+            IConnection connection = container.Connect(primaryAddress, 
primaryPort, options);
+
+            Task.Run(() =>
+            {
+               try
+               {
+                  IReceiver receiver = connection.NextReceiver();
+                  IDelivery delivery = receiver.Receive();
+                  logger.LogInformation("Next receiver returned delivery with 
body: {0}", delivery.Message().Body);
+                  done.Signal();
+               }
+               catch (Exception e)
+               {
+                  logger.LogDebug("Exception in next receiver task", e);
+               }
+            });
+
+            ReceiverOptions receiverOptions = new ReceiverOptions()
+            {
+               CreditWindow = 10,
+               AutoAccept = false
+            };
+
+            _ = connection.OpenReceiver("test-receiver1", 
receiverOptions).OpenTask.Result;
+
+            firstPeer.WaitForScriptToComplete();
+            finalPeer.WaitForScriptToComplete();
+
+            finalPeer.RemoteTransfer().WithHandle(0)
+                                      .WithDeliveryId(0)
+                                      .WithMore(false)
+                                      .WithMessageFormat(0)
+                                      .WithPayload(payload).Later(15);
+
+            finalPeer.WaitForScriptToComplete();
+
+            Assert.IsTrue(done.Wait(TimeSpan.FromSeconds(10)));
+
+            finalPeer.WaitForScriptToComplete();
+            finalPeer.ExpectClose().Respond();
+
+            connection.Close();
+
+            finalPeer.WaitForScriptToComplete();
+         }
+      }
    }
 }
\ No newline at end of file
diff --git 
a/test/Proton.Client.Tests/Client/Implementation/ClientSessionTest.cs 
b/test/Proton.Client.Tests/Client/Implementation/ClientSessionTest.cs
index 04dadcb..078f1d2 100644
--- a/test/Proton.Client.Tests/Client/Implementation/ClientSessionTest.cs
+++ b/test/Proton.Client.Tests/Client/Implementation/ClientSessionTest.cs
@@ -24,6 +24,9 @@ using Apache.Qpid.Proton.Client.Exceptions;
 using Apache.Qpid.Proton.Types.Transport;
 using System.Collections.Generic;
 using System.Linq;
+using Apache.Qpid.Proton.Types.Messaging;
+using Apache.Qpid.Proton.Client.TestSupport;
+using System.Threading.Tasks;
 
 namespace Apache.Qpid.Proton.Client.Implementation
 {
@@ -266,7 +269,7 @@ namespace Apache.Qpid.Proton.Client.Implementation
       }
 
       [Test]
-      public void 
TestConnectionCloseGetsResponseWithErrorDoesNotThrowUntimedGet()
+      public void 
TestConnectionCloseGetsResponseWithErrorDoesNotThrowInfiniteGet()
       {
          DoTestSessionCloseGetsResponseWithErrorThrows(false);
       }
@@ -686,5 +689,1164 @@ namespace Apache.Qpid.Proton.Client.Implementation
             peer.WaitForScriptToComplete();
          }
       }
+
+      [Test]
+      public void 
TestNextReceiverFromDefaultSessionReturnsSameReceiverForQueuedDeliveries()
+      {
+         byte[] payload = CreateEncodedMessage(new AmqpValue("Hello World"));
+
+         using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+         {
+            peer.ExpectSASLAnonymousConnect();
+            peer.ExpectOpen().Respond();
+            peer.ExpectBegin().Respond();
+            peer.ExpectAttach().OfReceiver().Respond();
+            peer.ExpectFlow().WithLinkCredit(10);
+            for (uint i = 0; i < 10; ++i)
+            {
+               peer.RemoteTransfer().WithDeliveryId(i)
+                                    .WithMore(false)
+                                    .WithMessageFormat(0)
+                                    .WithPayload(payload).Queue();
+            }
+            peer.Start();
+
+            string remoteAddress = peer.ServerAddress;
+            int remotePort = peer.ServerPort;
+
+            logger.LogInformation("Test started, peer listening on: {0}:{1}", 
remoteAddress, remotePort);
+
+            IClient container = IClient.Create();
+            ConnectionOptions connOptions = new ConnectionOptions()
+            {
+               DefaultNextReceiverPolicy = NextReceiverPolicy.FirstAvailable
+            };
+            IConnection connection = container.Connect(remoteAddress, 
remotePort, connOptions);
+
+            ReceiverOptions options = new ReceiverOptions()
+            {
+               CreditWindow = 0,
+               AutoAccept = false
+            };
+            IReceiver receiver = connection.OpenReceiver("test-receiver", 
options);
+            receiver.AddCredit(10);
+
+            Wait.WaitFor(() => receiver.QueuedDeliveries == 10);
+
+            peer.WaitForScriptToComplete();
+
+            for (int i = 0; i < 10; ++i)
+            {
+               IReceiver nextReceiver = connection.NextReceiver();
+               Assert.AreSame(receiver, nextReceiver);
+               IDelivery delivery = nextReceiver.Receive();
+               Assert.IsNotNull(delivery);
+            }
+
+            peer.WaitForScriptToComplete();
+            peer.ExpectClose().Respond();
+
+            connection.Close();
+
+            peer.WaitForScriptToComplete();
+         }
+      }
+
+      [Test]
+      public void TestNextReceiverTimesOut()
+      {
+         using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+         {
+            peer.ExpectSASLAnonymousConnect();
+            peer.ExpectOpen().Respond();
+            peer.ExpectBegin().Respond();
+            peer.ExpectAttach().OfReceiver().Respond();
+            peer.ExpectFlow().WithLinkCredit(10);
+            peer.Start();
+
+            string remoteAddress = peer.ServerAddress;
+            int remotePort = peer.ServerPort;
+
+            logger.LogInformation("Test started, peer listening on: {0}:{1}", 
remoteAddress, remotePort);
+
+            IClient container = IClient.Create();
+            IConnection connection = container.Connect(remoteAddress, 
remotePort);
+
+            _ = connection.OpenReceiver("test-receiver").OpenTask.Result;
+
+            peer.WaitForScriptToComplete();
+
+            
Assert.IsNull(connection.NextReceiver(TimeSpan.FromMilliseconds(10)));
+
+            peer.WaitForScriptToComplete();
+            peer.ExpectClose().Respond();
+
+            connection.Close();
+
+            peer.WaitForScriptToComplete();
+         }
+      }
+
+      [Test]
+      public void TestNextReceiverReturnsAllReceiversEventually()
+      {
+         byte[] payload = CreateEncodedMessage(new AmqpValue("Hello World"));
+
+         using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+         {
+            peer.ExpectSASLAnonymousConnect();
+            peer.ExpectOpen().Respond();
+            peer.ExpectBegin().Respond();
+            peer.ExpectAttach().OfReceiver().Respond();
+            peer.ExpectFlow().WithLinkCredit(10);
+            peer.ExpectAttach().OfReceiver().Respond();
+            peer.ExpectFlow().WithLinkCredit(10);
+            peer.ExpectAttach().OfReceiver().Respond();
+            peer.ExpectFlow().WithLinkCredit(10);
+            peer.RemoteTransfer().WithHandle(0)
+                                 .WithDeliveryId(0)
+                                 .WithMore(false)
+                                 .WithMessageFormat(0)
+                                 .WithPayload(payload).Queue();
+            peer.RemoteTransfer().WithHandle(1)
+                                 .WithDeliveryId(1)
+                                 .WithMore(false)
+                                 .WithMessageFormat(0)
+                                 .WithPayload(payload).Queue();
+            peer.RemoteTransfer().WithHandle(2)
+                                 .WithDeliveryId(2)
+                                 .WithMore(false)
+                                 .WithMessageFormat(0)
+                                 .WithPayload(payload).Queue();
+            peer.Start();
+
+            string remoteAddress = peer.ServerAddress;
+            int remotePort = peer.ServerPort;
+
+            logger.LogInformation("Test started, peer listening on: {0}:{1}", 
remoteAddress, remotePort);
+
+            IClient container = IClient.Create();
+            IConnection connection = container.Connect(remoteAddress, 
remotePort);
+
+            ReceiverOptions options = new ReceiverOptions()
+            {
+               CreditWindow = 10,
+               AutoAccept = false
+            };
+
+            _ = connection.OpenReceiver("test-receiver1", 
options).OpenTask.Result;
+            _ = connection.OpenReceiver("test-receiver2", 
options).OpenTask.Result;
+            _ = connection.OpenReceiver("test-receiver3", 
options).OpenTask.Result;
+
+            peer.WaitForScriptToComplete();
+
+            IReceiver receiver1 = 
connection.NextReceiver(NextReceiverPolicy.FirstAvailable);
+            Assert.IsNotNull(receiver1.Receive());
+            IReceiver receiver2 = 
connection.NextReceiver(NextReceiverPolicy.FirstAvailable);
+            Assert.IsNotNull(receiver2.Receive());
+            IReceiver receiver3 = 
connection.NextReceiver(NextReceiverPolicy.FirstAvailable);
+            Assert.IsNotNull(receiver3.Receive());
+
+            Assert.AreNotSame(receiver1, receiver2);
+            Assert.AreNotSame(receiver1, receiver3);
+            Assert.AreNotSame(receiver2, receiver3);
+
+            peer.WaitForScriptToComplete();
+
+            peer.WaitForScriptToComplete();
+            peer.ExpectClose().Respond();
+
+            connection.Close();
+
+            peer.WaitForScriptToComplete();
+         }
+      }
+
+      [Test]
+      public void 
TestConnectionOptionsConfiguresLargestBacklogNextReceiverPolicy()
+      {
+         byte[] payload = CreateEncodedMessage(new AmqpValue("Hello World"));
+
+         using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+         {
+            peer.ExpectSASLAnonymousConnect();
+            peer.ExpectOpen().Respond();
+            peer.ExpectBegin().Respond();
+            peer.ExpectAttach().OfReceiver().Respond();
+            peer.ExpectFlow().WithLinkCredit(10);
+            peer.ExpectAttach().OfReceiver().Respond();
+            peer.ExpectFlow().WithLinkCredit(10);
+            peer.ExpectAttach().OfReceiver().Respond();
+            peer.ExpectFlow().WithLinkCredit(10);
+            peer.RemoteTransfer().WithHandle(0)
+                                 .WithDeliveryId(0)
+                                 .WithMore(false)
+                                 .WithMessageFormat(0)
+                                 .WithPayload(payload).Queue();
+            peer.RemoteTransfer().WithHandle(1)
+                                 .WithDeliveryId(1)
+                                 .WithMore(false)
+                                 .WithMessageFormat(0)
+                                 .WithPayload(payload).Queue();
+            peer.RemoteTransfer().WithHandle(1)
+                                 .WithDeliveryId(2)
+                                 .WithMore(false)
+                                 .WithMessageFormat(0)
+                                 .WithPayload(payload).Queue();
+            peer.RemoteTransfer().WithHandle(2)
+                                 .WithDeliveryId(3)
+                                 .WithMore(false)
+                                 .WithMessageFormat(0)
+                                 .WithPayload(payload).Queue();
+            peer.Start();
+
+            string remoteAddress = peer.ServerAddress;
+            int remotePort = peer.ServerPort;
+
+            logger.LogInformation("Test started, peer listening on: {0}:{1}", 
remoteAddress, remotePort);
+
+            IClient container = IClient.Create();
+            ConnectionOptions options = new ConnectionOptions()
+            {
+               DefaultNextReceiverPolicy = NextReceiverPolicy.LargestBacklog
+            };
+            IConnection connection = container.Connect(remoteAddress, 
remotePort, options);
+
+            ReceiverOptions receiverOptions = new ReceiverOptions()
+            {
+               CreditWindow = 10,
+               AutoAccept = false
+            };
+            IReceiver receiver1 = connection.OpenReceiver("test-receiver1", 
receiverOptions).OpenTask.Result;
+            IReceiver receiver2 = connection.OpenReceiver("test-receiver2", 
receiverOptions).OpenTask.Result;
+            IReceiver receiver3 = connection.OpenReceiver("test-receiver3", 
receiverOptions).OpenTask.Result;
+
+            peer.WaitForScriptToComplete();
+
+            Wait.WaitFor(() => receiver1.QueuedDeliveries == 1);
+            Wait.WaitFor(() => receiver2.QueuedDeliveries == 2);
+            Wait.WaitFor(() => receiver3.QueuedDeliveries == 1);
+
+            IReceiver next = connection.NextReceiver();
+            Assert.AreSame(next, receiver2);
+
+            peer.WaitForScriptToComplete();
+
+            peer.WaitForScriptToComplete();
+            peer.ExpectClose().Respond();
+
+            connection.Close();
+
+            peer.WaitForScriptToComplete();
+         }
+      }
+
+      [Test]
+      public void 
TestSessionOptionsConfiguresLargestBacklogNextReceiverPolicy()
+      {
+         byte[] payload = CreateEncodedMessage(new AmqpValue("Hello World"));
+
+         using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+         {
+            peer.ExpectSASLAnonymousConnect();
+            peer.ExpectOpen().Respond();
+            peer.ExpectBegin().Respond();
+            peer.ExpectAttach().OfReceiver().Respond();
+            peer.ExpectFlow().WithLinkCredit(10);
+            peer.ExpectAttach().OfReceiver().Respond();
+            peer.ExpectFlow().WithLinkCredit(10);
+            peer.ExpectAttach().OfReceiver().Respond();
+            peer.ExpectFlow().WithLinkCredit(10);
+            peer.RemoteTransfer().WithHandle(0)
+                                 .WithDeliveryId(0)
+                                 .WithMore(false)
+                                 .WithMessageFormat(0)
+                                 .WithPayload(payload).Queue();
+            peer.RemoteTransfer().WithHandle(1)
+                                 .WithDeliveryId(1)
+                                 .WithMore(false)
+                                 .WithMessageFormat(0)
+                                 .WithPayload(payload).Queue();
+            peer.RemoteTransfer().WithHandle(1)
+                                 .WithDeliveryId(2)
+                                 .WithMore(false)
+                                 .WithMessageFormat(0)
+                                 .WithPayload(payload).Queue();
+            peer.RemoteTransfer().WithHandle(2)
+                                 .WithDeliveryId(3)
+                                 .WithMore(false)
+                                 .WithMessageFormat(0)
+                                 .WithPayload(payload).Queue();
+            peer.Start();
+
+            string remoteAddress = peer.ServerAddress;
+            int remotePort = peer.ServerPort;
+
+            logger.LogInformation("Test started, peer listening on: {0}:{1}", 
remoteAddress, remotePort);
+
+            IClient container = IClient.Create();
+            ConnectionOptions options = new ConnectionOptions()
+            {
+               DefaultNextReceiverPolicy = NextReceiverPolicy.Random
+            };
+            IConnection connection = container.Connect(remoteAddress, 
remotePort, options);
+            SessionOptions sessionOptions = new SessionOptions()
+            {
+               DefaultNextReceiverPolicy = NextReceiverPolicy.LargestBacklog
+            };
+            ISession session = connection.OpenSession(sessionOptions);
+
+            ReceiverOptions receiverOptions = new ReceiverOptions()
+            {
+               CreditWindow = 10,
+               AutoAccept = false
+            };
+            IReceiver receiver1 = session.OpenReceiver("test-receiver1", 
receiverOptions).OpenTask.Result;
+            IReceiver receiver2 = session.OpenReceiver("test-receiver2", 
receiverOptions).OpenTask.Result;
+            IReceiver receiver3 = session.OpenReceiver("test-receiver3", 
receiverOptions).OpenTask.Result;
+
+            peer.WaitForScriptToComplete();
+
+            Wait.WaitFor(() => receiver1.QueuedDeliveries == 1);
+            Wait.WaitFor(() => receiver2.QueuedDeliveries == 2);
+            Wait.WaitFor(() => receiver3.QueuedDeliveries == 1);
+
+            IReceiver next = session.NextReceiver();
+            Assert.AreSame(next, receiver2);
+
+            peer.WaitForScriptToComplete();
+
+            peer.WaitForScriptToComplete();
+            peer.ExpectClose().Respond();
+
+            connection.Close();
+
+            peer.WaitForScriptToComplete();
+         }
+      }
+
+      [Test]
+      public void TestUserSpecifiedNextReceiverPolicyOverridesConfiguration()
+      {
+         byte[] payload = CreateEncodedMessage(new AmqpValue("Hello World"));
+
+         using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+         {
+            peer.ExpectSASLAnonymousConnect();
+            peer.ExpectOpen().Respond();
+            peer.ExpectBegin().Respond();
+            peer.ExpectAttach().OfReceiver().Respond();
+            peer.ExpectFlow().WithLinkCredit(10);
+            peer.ExpectAttach().OfReceiver().Respond();
+            peer.ExpectFlow().WithLinkCredit(10);
+            peer.ExpectAttach().OfReceiver().Respond();
+            peer.ExpectFlow().WithLinkCredit(10);
+            peer.RemoteTransfer().WithHandle(0)
+                                 .WithDeliveryId(0)
+                                 .WithMore(false)
+                                 .WithMessageFormat(0)
+                                 .WithPayload(payload).Queue();
+            peer.RemoteTransfer().WithHandle(1)
+                                 .WithDeliveryId(1)
+                                 .WithMore(false)
+                                 .WithMessageFormat(0)
+                                 .WithPayload(payload).Queue();
+            peer.RemoteTransfer().WithHandle(1)
+                                 .WithDeliveryId(2)
+                                 .WithMore(false)
+                                 .WithMessageFormat(0)
+                                 .WithPayload(payload).Queue();
+            peer.RemoteTransfer().WithHandle(2)
+                                 .WithDeliveryId(3)
+                                 .WithMore(false)
+                                 .WithMessageFormat(0)
+                                 .WithPayload(payload).Queue();
+            peer.Start();
+
+            string remoteAddress = peer.ServerAddress;
+            int remotePort = peer.ServerPort;
+
+            logger.LogInformation("Test started, peer listening on: {0}:{1}", 
remoteAddress, remotePort);
+
+            IClient container = IClient.Create();
+            ConnectionOptions options = new ConnectionOptions()
+            {
+               DefaultNextReceiverPolicy = NextReceiverPolicy.Random
+            };
+            IConnection connection = container.Connect(remoteAddress, 
remotePort, options);
+            SessionOptions sessionOptions = new SessionOptions()
+            {
+               DefaultNextReceiverPolicy = NextReceiverPolicy.SmallestBacklog
+            };
+            ISession session = connection.OpenSession(sessionOptions);
+
+            ReceiverOptions receiverOptions = new ReceiverOptions()
+            {
+               CreditWindow = 10,
+               AutoAccept = false
+            };
+            IReceiver receiver1 = session.OpenReceiver("test-receiver1", 
receiverOptions).OpenTask.Result;
+            IReceiver receiver2 = session.OpenReceiver("test-receiver2", 
receiverOptions).OpenTask.Result;
+            IReceiver receiver3 = session.OpenReceiver("test-receiver3", 
receiverOptions).OpenTask.Result;
+
+            peer.WaitForScriptToComplete();
+
+            Wait.WaitFor(() => receiver1.QueuedDeliveries == 1);
+            Wait.WaitFor(() => receiver2.QueuedDeliveries == 2);
+            Wait.WaitFor(() => receiver3.QueuedDeliveries == 1);
+
+            IReceiver next = 
session.NextReceiver(NextReceiverPolicy.LargestBacklog);
+            Assert.AreSame(next, receiver2);
+
+            peer.WaitForScriptToComplete();
+
+            peer.WaitForScriptToComplete();
+            peer.ExpectClose().Respond();
+
+            connection.Close();
+
+            peer.WaitForScriptToComplete();
+         }
+      }
+
+      [Test]
+      public void 
TestSessionOptionsConfiguresSmallestBacklogNextReceiverPolicy()
+      {
+         byte[] payload = CreateEncodedMessage(new AmqpValue("Hello World"));
+
+         using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+         {
+            peer.ExpectSASLAnonymousConnect();
+            peer.ExpectOpen().Respond();
+            peer.ExpectBegin().Respond();
+            peer.ExpectAttach().OfReceiver().Respond();
+            peer.ExpectFlow().WithLinkCredit(10);
+            peer.ExpectAttach().OfReceiver().Respond();
+            peer.ExpectFlow().WithLinkCredit(10);
+            peer.ExpectAttach().OfReceiver().Respond();
+            peer.ExpectFlow().WithLinkCredit(10);
+            peer.RemoteTransfer().WithHandle(0)
+                                 .WithDeliveryId(0)
+                                 .WithMore(false)
+                                 .WithMessageFormat(0)
+                                 .WithPayload(payload).Queue();
+            peer.RemoteTransfer().WithHandle(0)
+                                 .WithDeliveryId(1)
+                                 .WithMore(false)
+                                 .WithMessageFormat(0)
+                                 .WithPayload(payload).Queue();
+            peer.RemoteTransfer().WithHandle(0)
+                                 .WithDeliveryId(2)
+                                 .WithMore(false)
+                                 .WithMessageFormat(0)
+                                 .WithPayload(payload).Queue();
+            peer.RemoteTransfer().WithHandle(1)
+                                 .WithDeliveryId(3)
+                                 .WithMore(false)
+                                 .WithMessageFormat(0)
+                                 .WithPayload(payload).Queue();
+            peer.RemoteTransfer().WithHandle(1)
+                                 .WithDeliveryId(4)
+                                 .WithMore(false)
+                                 .WithMessageFormat(0)
+                                 .WithPayload(payload).Queue();
+            peer.RemoteTransfer().WithHandle(2)
+                                 .WithDeliveryId(5)
+                                 .WithMore(false)
+                                 .WithMessageFormat(0)
+                                 .WithPayload(payload).Queue();
+            peer.Start();
+
+            string remoteAddress = peer.ServerAddress;
+            int remotePort = peer.ServerPort;
+
+            logger.LogInformation("Test started, peer listening on: {0}:{1}", 
remoteAddress, remotePort);
+
+            IClient container = IClient.Create();
+            ConnectionOptions options = new ConnectionOptions()
+            {
+               DefaultNextReceiverPolicy = NextReceiverPolicy.Random
+            };
+            IConnection connection = container.Connect(remoteAddress, 
remotePort, options);
+            SessionOptions sessionOptions = new SessionOptions()
+            {
+               DefaultNextReceiverPolicy = NextReceiverPolicy.SmallestBacklog
+            };
+            ISession session = connection.OpenSession(sessionOptions);
+
+            ReceiverOptions receiverOptions = new ReceiverOptions()
+            {
+               CreditWindow = 10,
+               AutoAccept = false
+            };
+            IReceiver receiver1 = session.OpenReceiver("test-receiver1", 
receiverOptions).OpenTask.Result;
+            IReceiver receiver2 = session.OpenReceiver("test-receiver2", 
receiverOptions).OpenTask.Result;
+            IReceiver receiver3 = session.OpenReceiver("test-receiver3", 
receiverOptions).OpenTask.Result;
+
+            peer.WaitForScriptToComplete();
+
+            Wait.WaitFor(() => receiver1.QueuedDeliveries == 3);
+            Wait.WaitFor(() => receiver2.QueuedDeliveries == 2);
+            Wait.WaitFor(() => receiver3.QueuedDeliveries == 1);
+
+            IReceiver next = session.NextReceiver();
+            Assert.AreSame(next, receiver3);
+
+            peer.WaitForScriptToComplete();
+
+            peer.WaitForScriptToComplete();
+            peer.ExpectClose().Respond();
+
+            connection.Close();
+
+            peer.WaitForScriptToComplete();
+         }
+      }
+
+      [Test]
+      public void TestNextReceiverCompletesAfterDeliveryArrivesRoundRobin()
+      {
+         
DoTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy.RoundRobin);
+      }
+
+      [Test]
+      public void TestNextReceiverCompletesAfterDeliveryArrivesRandom()
+      {
+         
DoTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy.Random);
+      }
+
+      [Test]
+      public void TestNextReceiverCompletesAfterDeliveryArrivesLargestBacklog()
+      {
+         
DoTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy.LargestBacklog);
+      }
+
+      [Test]
+      public void 
TestNextReceiverCompletesAfterDeliveryArrivesSmallestBacklog()
+      {
+         
DoTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy.SmallestBacklog);
+      }
+
+      [Test]
+      public void TestNextReceiverCompletesAfterDeliveryArrivesFirstAvailable()
+      {
+         
DoTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy.FirstAvailable);
+      }
+
+      public void 
DoTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy policy)
+      {
+         byte[] payload = CreateEncodedMessage(new AmqpValue("Hello World"));
+
+         using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+         {
+            peer.ExpectSASLAnonymousConnect();
+            peer.ExpectOpen().Respond();
+            peer.ExpectBegin().Respond();
+            peer.ExpectAttach().OfReceiver().Respond();
+            peer.ExpectFlow().WithLinkCredit(10);
+            peer.Start();
+
+            CountdownEvent done = new CountdownEvent(1);
+
+            string remoteAddress = peer.ServerAddress;
+            int remotePort = peer.ServerPort;
+
+            logger.LogInformation("Test started, peer listening on: {0}:{1}", 
remoteAddress, remotePort);
+
+            IClient container = IClient.Create();
+            ConnectionOptions options = new ConnectionOptions()
+            {
+               DefaultNextReceiverPolicy = policy
+            };
+            IConnection connection = container.Connect(remoteAddress, 
remotePort, options);
+
+            ReceiverOptions receiverOptions = new ReceiverOptions()
+            {
+               CreditWindow = 10,
+               AutoAccept = false
+            };
+            _ = connection.OpenReceiver("test-receiver1", 
receiverOptions).OpenTask.Result;
+
+            peer.WaitForScriptToComplete();
+
+            Task.Run(() =>
+            {
+               try
+               {
+                  IReceiver receiver = connection.NextReceiver();
+                  IDelivery delivery = receiver.Receive();
+                  logger.LogInformation("Next receiver returned delivery with 
body: {0}", delivery.Message().Body);
+                  done.Signal();
+               }
+               catch (Exception e)
+               {
+                  logger.LogDebug("Failed in next receiver task: {0}", e);
+               }
+            });
+
+            peer.RemoteTransfer().WithHandle(0)
+                                 .WithDeliveryId(0)
+                                 .WithMore(false)
+                                 .WithMessageFormat(0)
+                                 .WithPayload(payload).Later(15);
+
+            peer.WaitForScriptToComplete();
+
+            Assert.IsTrue(done.Wait(TimeSpan.FromSeconds(10)));
+
+            peer.WaitForScriptToComplete();
+            peer.ExpectClose().Respond();
+
+            connection.Close();
+
+            peer.WaitForScriptToComplete();
+         }
+      }
+
+      [Test]
+      public void TestNextReceiverThrowsAfterSessionClosedRoundRobin()
+      {
+         
DoTestNextReceiverThrowsAfterSessionClosed(NextReceiverPolicy.RoundRobin);
+      }
+
+      [Test]
+      public void TestNextReceiverThrowsAfterSessionClosedRandom()
+      {
+         DoTestNextReceiverThrowsAfterSessionClosed(NextReceiverPolicy.Random);
+      }
+
+      [Test]
+      public void TestNextReceiverThrowsAfterSessionClosedLargestBacklog()
+      {
+         
DoTestNextReceiverThrowsAfterSessionClosed(NextReceiverPolicy.LargestBacklog);
+      }
+
+      [Test]
+      public void TestNextReceiverThrowsAfterSessionClosedSmallestBacklog()
+      {
+         
DoTestNextReceiverThrowsAfterSessionClosed(NextReceiverPolicy.SmallestBacklog);
+      }
+
+      [Test]
+      public void TestNextReceiverThrowsAfterSessionClosedFirstAvailable()
+      {
+         
DoTestNextReceiverThrowsAfterSessionClosed(NextReceiverPolicy.FirstAvailable);
+      }
+
+      public void 
DoTestNextReceiverThrowsAfterSessionClosed(NextReceiverPolicy policy)
+      {
+         using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+         {
+            peer.ExpectSASLAnonymousConnect();
+            peer.ExpectOpen().Respond();
+            peer.ExpectBegin().Respond();
+            peer.Start();
+
+            CountdownEvent started = new CountdownEvent(1);
+            CountdownEvent done = new CountdownEvent(1);
+            Exception error = null;
+
+            string remoteAddress = peer.ServerAddress;
+            int remotePort = peer.ServerPort;
+
+            logger.LogInformation("Test started, peer listening on: {0}:{1}", 
remoteAddress, remotePort);
+
+            IClient container = IClient.Create();
+
+            ConnectionOptions options = new ConnectionOptions()
+            {
+               DefaultNextReceiverPolicy = policy
+            };
+            IConnection connection = container.Connect(remoteAddress, 
remotePort, options);
+            ISession session = connection.OpenSession().OpenTask.Result;
+
+            Task.Run(() =>
+            {
+               try
+               {
+                  started.Signal();
+                  session.NextReceiver();
+               }
+               catch (ClientException e)
+               {
+                  error = e;
+               }
+               finally
+               {
+                  done.Signal();
+               }
+            });
+
+            peer.WaitForScriptToComplete();
+
+            Assert.IsTrue(started.Wait(TimeSpan.FromSeconds(10)));
+
+            peer.ExpectEnd().Respond();
+
+            session.CloseAsync();
+
+            Assert.IsTrue(done.Wait(TimeSpan.FromSeconds(10)));
+            Assert.IsTrue(error is ClientIllegalStateException);
+
+            peer.WaitForScriptToComplete();
+            peer.ExpectClose().Respond();
+
+            connection.Close();
+
+            peer.WaitForScriptToComplete();
+         }
+      }
+
+      [Test]
+      public void 
TestNextReceiverCompletesWhenCalledBeforeReceiverCreateRoundRobin()
+      {
+         
DoTestNextReceiverCompletesWhenCalledBeforeReceiverCreate(NextReceiverPolicy.RoundRobin);
+      }
+
+      [Test]
+      public void 
TestNextReceiverCompletesWhenCalledBeforeReceiverCreateRandom()
+      {
+         
DoTestNextReceiverCompletesWhenCalledBeforeReceiverCreate(NextReceiverPolicy.Random);
+      }
+
+      [Test]
+      public void 
TestNextReceiverCompletesWhenCalledBeforeReceiverCreateLargestBacklog()
+      {
+         
DoTestNextReceiverCompletesWhenCalledBeforeReceiverCreate(NextReceiverPolicy.LargestBacklog);
+      }
+
+      [Test]
+      public void 
TestNextReceiverCompletesWhenCalledBeforeReceiverCreateSmallestBacklog()
+      {
+         
DoTestNextReceiverCompletesWhenCalledBeforeReceiverCreate(NextReceiverPolicy.SmallestBacklog);
+      }
+
+      [Test]
+      public void 
TestNextReceiverCompletesWhenCalledBeforeReceiverCreateFirstAvailable()
+      {
+         
DoTestNextReceiverCompletesWhenCalledBeforeReceiverCreate(NextReceiverPolicy.FirstAvailable);
+      }
+
+      public void 
DoTestNextReceiverCompletesWhenCalledBeforeReceiverCreate(NextReceiverPolicy 
policy)
+      {
+         byte[] payload = CreateEncodedMessage(new AmqpValue("Hello World"));
+
+         using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+         {
+            peer.ExpectSASLAnonymousConnect();
+            peer.ExpectOpen().Respond();
+            peer.ExpectBegin().Respond();
+            peer.Start();
+
+            CountdownEvent started = new CountdownEvent(1);
+            CountdownEvent done = new CountdownEvent(1);
+
+            string remoteAddress = peer.ServerAddress;
+            int remotePort = peer.ServerPort;
+
+            logger.LogInformation("Test started, peer listening on: {0}:{1}", 
remoteAddress, remotePort);
+
+            IClient container = IClient.Create();
+            ConnectionOptions options = new ConnectionOptions()
+            {
+               DefaultNextReceiverPolicy = policy
+            };
+            IConnection connection = container.Connect(remoteAddress, 
remotePort, options);
+
+            Task.Run(() =>
+            {
+               try
+               {
+                  started.Signal();
+                  IReceiver receiver = connection.NextReceiver();
+                  IDelivery delivery = receiver.Receive();
+                  logger.LogInformation("Next receiver returned delivery with 
body: {0}", delivery.Message().Body);
+                  done.Signal();
+               }
+               catch (Exception e)
+               {
+                  logger.LogDebug("Failed in next receiver task: {0}", e);
+               }
+            });
+
+            Assert.IsTrue(started.Wait(TimeSpan.FromSeconds(10)));
+
+            _ = connection.OpenTask.Result;
+
+            peer.WaitForScriptToComplete();
+            peer.ExpectAttach().OfReceiver().Respond();
+            peer.ExpectFlow().WithLinkCredit(10);
+            peer.RemoteTransfer().WithHandle(0)
+                                 .WithDeliveryId(0)
+                                 .WithMore(false)
+                                 .WithMessageFormat(0)
+                                 .WithPayload(payload).Queue().AfterDelay(10);
+
+            ReceiverOptions receiverOptions = new ReceiverOptions()
+            {
+               CreditWindow = 10,
+               AutoAccept = false
+            };
+            _ = connection.OpenReceiver("test-receiver1", 
receiverOptions).OpenTask.Result;
+
+            peer.WaitForScriptToComplete();
+
+            Assert.IsTrue(done.Wait(TimeSpan.FromSeconds(10)));
+
+            peer.WaitForScriptToComplete();
+            peer.ExpectClose().Respond();
+
+            connection.Close();
+
+            peer.WaitForScriptToComplete();
+         }
+      }
+
+      [Test]
+      public void TestNextReceiverRoundRobinReturnsNextReceiverAfterLast()
+      {
+         byte[] payload = CreateEncodedMessage(new AmqpValue("Hello World"));
+
+         using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+         {
+            peer.ExpectSASLAnonymousConnect();
+            peer.ExpectOpen().Respond();
+            peer.ExpectBegin().Respond();
+            peer.ExpectAttach().OfReceiver().Respond();
+            peer.ExpectFlow().WithLinkCredit(10);
+            peer.ExpectAttach().OfReceiver().Respond();
+            peer.ExpectFlow().WithLinkCredit(10);
+            peer.ExpectAttach().OfReceiver().Respond();
+            peer.ExpectFlow().WithLinkCredit(10);
+            peer.RemoteTransfer().WithHandle(1)
+                                 .WithDeliveryId(0)
+                                 .WithMore(false)
+                                 .WithMessageFormat(0)
+                                 .WithPayload(payload).Queue();
+            peer.RemoteTransfer().WithHandle(1)
+                                 .WithDeliveryId(1)
+                                 .WithMore(false)
+                                 .WithMessageFormat(0)
+                                 .WithPayload(payload).Queue();
+            peer.RemoteTransfer().WithHandle(2)
+                                 .WithDeliveryId(2)
+                                 .WithMore(false)
+                                 .WithMessageFormat(0)
+                                 .WithPayload(payload).Queue();
+            peer.Start();
+
+            string remoteAddress = peer.ServerAddress;
+            int remotePort = peer.ServerPort;
+
+            logger.LogInformation("Test started, peer listening on: {0}:{1}", 
remoteAddress, remotePort);
+
+            IClient container = IClient.Create();
+            ConnectionOptions options = new ConnectionOptions()
+            {
+               DefaultNextReceiverPolicy = NextReceiverPolicy.Random
+            };
+            IConnection connection = container.Connect(remoteAddress, 
remotePort, options);
+            SessionOptions sessionOptions = new SessionOptions()
+            {
+               DefaultNextReceiverPolicy = NextReceiverPolicy.RoundRobin
+            };
+            ISession session = connection.OpenSession(sessionOptions);
+
+            ReceiverOptions receiverOptions = new ReceiverOptions()
+            {
+               CreditWindow = 10,
+               AutoAccept = false
+            };
+            IReceiver receiver1 = session.OpenReceiver("test-receiver1", 
receiverOptions).OpenTask.Result;
+            IReceiver receiver2 = session.OpenReceiver("test-receiver2", 
receiverOptions).OpenTask.Result;
+            IReceiver receiver3 = session.OpenReceiver("test-receiver3", 
receiverOptions).OpenTask.Result;
+
+            peer.WaitForScriptToComplete();
+
+            Wait.WaitFor(() => receiver2.QueuedDeliveries == 2);
+            Wait.WaitFor(() => receiver3.QueuedDeliveries == 1);
+
+            Assert.AreEqual(0, receiver1.QueuedDeliveries);
+
+            IReceiver next = session.NextReceiver();
+            Assert.AreSame(next, receiver2);
+            next = session.NextReceiver();
+            Assert.AreSame(next, receiver3);
+
+            peer.WaitForScriptToComplete();
+
+            peer.WaitForScriptToComplete();
+            peer.ExpectClose().Respond();
+
+            connection.Close();
+
+            peer.WaitForScriptToComplete();
+         }
+      }
+
+      [Test]
+      public void TestNextReceiverRoundRobinPolicyWrapsAround()
+      {
+         byte[] payload = CreateEncodedMessage(new AmqpValue("Hello World"));
+
+         using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+         {
+            peer.ExpectSASLAnonymousConnect();
+            peer.ExpectOpen().Respond();
+            peer.ExpectBegin().Respond();
+            peer.ExpectAttach().OfReceiver().Respond();
+            peer.ExpectFlow().WithLinkCredit(10);
+            peer.ExpectAttach().OfReceiver().Respond();
+            peer.ExpectFlow().WithLinkCredit(10);
+            peer.ExpectAttach().OfReceiver().Respond();
+            peer.ExpectFlow().WithLinkCredit(10);
+            peer.RemoteTransfer().WithHandle(1)
+                                 .WithDeliveryId(0)
+                                 .WithMore(false)
+                                 .WithMessageFormat(0)
+                                 .WithPayload(payload).Queue();
+            peer.RemoteTransfer().WithHandle(1)
+                                 .WithDeliveryId(1)
+                                 .WithMore(false)
+                                 .WithMessageFormat(0)
+                                 .WithPayload(payload).Queue();
+            peer.RemoteTransfer().WithHandle(2)
+                                 .WithDeliveryId(2)
+                                 .WithMore(false)
+                                 .WithMessageFormat(0)
+                                 .WithPayload(payload).Queue();
+            peer.Start();
+
+            string remoteAddress = peer.ServerAddress;
+            int remotePort = peer.ServerPort;
+
+            logger.LogInformation("Test started, peer listening on: {0}:{1}", 
remoteAddress, remotePort);
+
+            IClient container = IClient.Create();
+            ConnectionOptions options = new ConnectionOptions()
+            {
+               DefaultNextReceiverPolicy = NextReceiverPolicy.Random
+            };
+            IConnection connection = container.Connect(remoteAddress, 
remotePort, options);
+            SessionOptions sessionOptions = new SessionOptions()
+            {
+               DefaultNextReceiverPolicy = NextReceiverPolicy.RoundRobin
+            };
+            ISession session = connection.OpenSession(sessionOptions);
+
+            ReceiverOptions receiverOptions = new ReceiverOptions()
+            {
+               CreditWindow = 10,
+               AutoAccept = false
+            };
+            IReceiver receiver1 = session.OpenReceiver("test-receiver1", 
receiverOptions).OpenTask.Result;
+            IReceiver receiver2 = session.OpenReceiver("test-receiver2", 
receiverOptions).OpenTask.Result;
+            IReceiver receiver3 = session.OpenReceiver("test-receiver3", 
receiverOptions).OpenTask.Result;
+
+            peer.WaitForScriptToComplete();
+
+            Wait.WaitFor(() => receiver2.QueuedDeliveries == 2);
+            Wait.WaitFor(() => receiver3.QueuedDeliveries == 1);
+
+            Assert.AreEqual(0, receiver1.QueuedDeliveries);
+
+            IReceiver next = session.NextReceiver();
+            Assert.AreSame(next, receiver2);
+            next = session.NextReceiver();
+            Assert.AreSame(next, receiver3);
+
+            peer.WaitForScriptToComplete();
+
+            peer.RemoteTransfer().WithHandle(0)
+                                 .WithDeliveryId(3)
+                                 .WithMore(false)
+                                 .WithMessageFormat(0)
+                                 .WithPayload(payload).Now();
+
+            Wait.WaitFor(() => receiver1.QueuedDeliveries == 1);
+
+            next = session.NextReceiver();
+            Assert.AreSame(next, receiver1);
+
+            peer.WaitForScriptToComplete();
+            peer.ExpectClose().Respond();
+
+            connection.Close();
+
+            peer.WaitForScriptToComplete();
+         }
+      }
+
+      [Test]
+      public void 
TestNextReceiverRoundRobinPolicyRestartsWhenLastReceiverClosed()
+      {
+         byte[] payload = CreateEncodedMessage(new AmqpValue("Hello World"));
+
+         using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+         {
+            peer.ExpectSASLAnonymousConnect();
+            peer.ExpectOpen().Respond();
+            peer.ExpectBegin().Respond();
+            peer.ExpectAttach().OfReceiver().Respond();
+            peer.ExpectFlow().WithLinkCredit(10);
+            peer.ExpectAttach().OfReceiver().Respond();
+            peer.ExpectFlow().WithLinkCredit(10);
+            peer.ExpectAttach().OfReceiver().Respond();
+            peer.ExpectFlow().WithLinkCredit(10);
+            peer.RemoteTransfer().WithHandle(1)
+                                 .WithDeliveryId(0)
+                                 .WithMore(false)
+                                 .WithMessageFormat(0)
+                                 .WithPayload(payload).Queue();
+            peer.RemoteTransfer().WithHandle(1)
+                                 .WithDeliveryId(1)
+                                 .WithMore(false)
+                                 .WithMessageFormat(0)
+                                 .WithPayload(payload).Queue();
+            peer.RemoteTransfer().WithHandle(2)
+                                 .WithDeliveryId(2)
+                                 .WithMore(false)
+                                 .WithMessageFormat(0)
+                                 .WithPayload(payload).Queue();
+            peer.Start();
+
+            string remoteAddress = peer.ServerAddress;
+            int remotePort = peer.ServerPort;
+
+            logger.LogInformation("Test started, peer listening on: {0}:{1}", 
remoteAddress, remotePort);
+
+            IClient container = IClient.Create();
+            ConnectionOptions options = new ConnectionOptions()
+            {
+               DefaultNextReceiverPolicy = NextReceiverPolicy.Random
+            };
+            IConnection connection = container.Connect(remoteAddress, 
remotePort, options);
+            SessionOptions sessionOptions = new SessionOptions()
+            {
+               DefaultNextReceiverPolicy = NextReceiverPolicy.RoundRobin
+            };
+            ISession session = connection.OpenSession(sessionOptions);
+
+            ReceiverOptions receiverOptions = new ReceiverOptions()
+            {
+               CreditWindow = 10,
+               AutoAccept = false
+            };
+            IReceiver receiver1 = session.OpenReceiver("test-receiver1", 
receiverOptions).OpenTask.Result;
+            IReceiver receiver2 = session.OpenReceiver("test-receiver2", 
receiverOptions).OpenTask.Result;
+            IReceiver receiver3 = session.OpenReceiver("test-receiver3", 
receiverOptions).OpenTask.Result;
+
+            peer.WaitForScriptToComplete();
+            peer.ExpectDetach().Respond();
+
+            Wait.WaitFor(() => receiver2.QueuedDeliveries == 2);
+            Wait.WaitFor(() => receiver3.QueuedDeliveries == 1);
+
+            Assert.AreEqual(0, receiver1.QueuedDeliveries);
+
+            IReceiver next = session.NextReceiver();
+            Assert.AreSame(next, receiver2);
+            next.Close();
+
+            peer.WaitForScriptToComplete();
+
+            peer.RemoteTransfer().WithHandle(0)
+                                 .WithDeliveryId(3)
+                                 .WithMore(false)
+                                 .WithMessageFormat(0)
+                                 .WithPayload(payload).Now();
+
+            Wait.WaitFor(() => receiver1.QueuedDeliveries == 1);
+
+            next = session.NextReceiver();
+            Assert.AreSame(next, receiver1);
+
+            peer.WaitForScriptToComplete();
+            peer.ExpectClose().Respond();
+
+            connection.Close();
+
+            peer.WaitForScriptToComplete();
+         }
+      }
+
+    [Test]
+    public void TestNextReceiverRoundRobinPolicySkipsEmptyReceivers()  {
+        byte[] payload = CreateEncodedMessage(new AmqpValue("Hello World"));
+
+         using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+         {
+            peer.ExpectSASLAnonymousConnect();
+            peer.ExpectOpen().Respond();
+            peer.ExpectBegin().Respond();
+            peer.ExpectAttach().OfReceiver().Respond();
+            peer.ExpectFlow().WithLinkCredit(10);
+            peer.ExpectAttach().OfReceiver().Respond();
+            peer.ExpectFlow().WithLinkCredit(10);
+            peer.ExpectAttach().OfReceiver().Respond();
+            peer.ExpectFlow().WithLinkCredit(10);
+            peer.ExpectAttach().OfReceiver().Respond();
+            peer.ExpectFlow().WithLinkCredit(10);
+            peer.RemoteTransfer().WithHandle(0)
+                                 .WithDeliveryId(0)
+                                 .WithMore(false)
+                                 .WithMessageFormat(0)
+                                 .WithPayload(payload).Queue();
+            peer.RemoteTransfer().WithHandle(3)
+                                 .WithDeliveryId(1)
+                                 .WithMore(false)
+                                 .WithMessageFormat(0)
+                                 .WithPayload(payload).Queue();
+            peer.Start();
+
+            string remoteAddress = peer.ServerAddress;
+            int remotePort = peer.ServerPort;
+
+            logger.LogInformation("Test started, peer listening on: {0}:{1}", 
remoteAddress, remotePort);
+
+            IClient container = IClient.Create();
+            ConnectionOptions options = new ConnectionOptions()
+            {
+               DefaultNextReceiverPolicy = NextReceiverPolicy.Random
+            };
+            IConnection connection = container.Connect(remoteAddress, 
remotePort, options);
+            SessionOptions sessionOptions = new SessionOptions()
+            {
+               DefaultNextReceiverPolicy = NextReceiverPolicy.RoundRobin
+            };
+            ISession session = connection.OpenSession(sessionOptions);
+
+            ReceiverOptions receiverOptions = new ReceiverOptions()
+            {
+               CreditWindow = 10,
+               AutoAccept = false
+            };
+            IReceiver receiver1 = session.OpenReceiver("test-receiver1", 
receiverOptions).OpenTask.Result;
+            IReceiver receiver2 = session.OpenReceiver("test-receiver2", 
receiverOptions).OpenTask.Result;
+            IReceiver receiver3 = session.OpenReceiver("test-receiver3", 
receiverOptions).OpenTask.Result;
+            IReceiver receiver4 = session.OpenReceiver("test-receiver4", 
receiverOptions).OpenTask.Result;
+
+            peer.WaitForScriptToComplete();
+
+            Wait.WaitFor(() => receiver1.QueuedDeliveries == 1);
+            Wait.WaitFor(() => receiver4.QueuedDeliveries == 1);
+
+            Assert.AreEqual(0, receiver2.QueuedDeliveries);
+            Assert.AreEqual(0, receiver3.QueuedDeliveries);
+
+            IReceiver next = session.NextReceiver();
+            Assert.AreSame(next, receiver1);
+            next = session.NextReceiver();
+            Assert.AreSame(next, receiver4);
+
+            peer.WaitForScriptToComplete();
+            peer.ExpectClose().Respond();
+
+            connection.Close();
+
+            peer.WaitForScriptToComplete();
+        }
+    }
+
    }
 }
\ No newline at end of file
diff --git a/test/Proton.Tests/Engine/Implementation/ProtonSessionTest.cs 
b/test/Proton.Tests/Engine/Implementation/ProtonSessionTest.cs
index ae95839..805814d 100644
--- a/test/Proton.Tests/Engine/Implementation/ProtonSessionTest.cs
+++ b/test/Proton.Tests/Engine/Implementation/ProtonSessionTest.cs
@@ -2856,5 +2856,43 @@ namespace Apache.Qpid.Proton.Engine.Implementation
          Assert.IsNotNull(failure);
          Assert.IsTrue(failure is ProtocolViolationException);
       }
+
+      [Test]
+      public void TestSessionWideDeliveryMonitoringHandler()
+      {
+         IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
+         engine.ErrorHandler((error) => failure = error.FailureCause);
+         ProtonTestConnector peer = CreateTestPeer(engine);
+
+         bool deliveryReadByReceiver = false;
+         bool deliveryReadBySession = false;
+
+         peer.ExpectAMQPHeader().RespondWithAMQPHeader();
+         peer.ExpectOpen().Respond().WithContainerId("driver");
+         peer.ExpectBegin().Respond();
+         peer.ExpectAttach().OfReceiver().Respond();
+         peer.ExpectFlow().WithLinkCredit(1);
+         peer.RemoteTransfer().WithHandle(0)
+                              .WithDeliveryId(0)
+                              .WithDeliveryTag(new byte[] { 1 })
+                              .OnChannel(0)
+                              .Queue();
+
+         IConnection connection = engine.Start().Open();
+         ISession session = connection.Session().Open();
+
+         session.DeliveryReadHandler((delivery) => deliveryReadBySession = 
true);
+
+         IReceiver receiver = session.Receiver("test");
+         receiver.DeliveryReadHandler((delivery) => deliveryReadByReceiver = 
true);
+         receiver.Open().AddCredit(1);
+
+         peer.WaitForScriptToComplete();
+
+         Assert.IsTrue(deliveryReadByReceiver);
+         Assert.IsTrue(deliveryReadBySession);
+
+         Assert.IsNull(failure);
+      }
    }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to