This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton-dotnet.git
The following commit(s) were added to refs/heads/main by this push:
new 76558f0 PROTON-2549 Add a next receiver API to connection and session
76558f0 is described below
commit 76558f01cde404ef4bc1b116375db1f9b68bb070
Author: Timothy Bish <[email protected]>
AuthorDate: Wed May 25 11:20:07 2022 -0400
PROTON-2549 Add a next receiver API to connection and session
Adds API to fetch the next receiver with a pending delivery or wait
on any receiver to be provided with a delivery from the remote.
---
.vscode/launch.json | 26 +
.vscode/tasks.json | 41 +
Proton.sln | 35 +-
examples/Example.LargeMessageReceiver/Program.cs | 10 +-
examples/Example.LargeMessageSender/Program.cs | 14 +-
.../Example.NextReceiver.csproj | 38 +
examples/Example.NextReceiver/Program.cs | 73 ++
.../Client/Concurrent/AtomicReference.cs | 2 +-
src/Proton.Client/Client/ConnectionOptions.cs | 8 +
src/Proton.Client/Client/IConnection.cs | 79 ++
src/Proton.Client/Client/ISession.cs | 82 ++
.../Client/Implementation/ClientConnection.cs | 75 +-
.../Implementation/ClientNextReceiverSelector.cs | 235 ++++
.../Implementation/ClientReceiverLinkType.cs | 2 +
.../Client/Implementation/ClientSender.cs | 6 +-
.../Client/Implementation/ClientSession.cs | 74 +-
.../Client/Implementation/ClientSessionBuilder.cs | 3 +-
.../Client/Implementation/ClientStreamSession.cs | 49 +
src/Proton.Client/Client/NextReceiverPolicy.cs | 79 ++
src/Proton.Client/Client/SessionOptions.cs | 6 +
src/Proton.Client/Client/Transport/ITransport.cs | 2 +-
src/Proton.Client/Client/Transport/TcpTransport.cs | 2 +-
src/Proton/Engine/IAttachments.cs | 10 +
src/Proton/Engine/ISession.cs | 14 +-
.../Engine/Implementation/ProtonAttachments.cs | 6 +
src/Proton/Engine/Implementation/ProtonReceiver.cs | 7 +
src/Proton/Engine/Implementation/ProtonSession.cs | 18 +-
.../Client/Implementation/ClientBaseTestFixture.cs | 2 +-
.../Implementation/ClientReconnectSessionTest.cs | 118 ++
.../Client/Implementation/ClientSessionTest.cs | 1164 +++++++++++++++++++-
.../Engine/Implementation/ProtonSessionTest.cs | 38 +
31 files changed, 2275 insertions(+), 43 deletions(-)
diff --git a/.vscode/launch.json b/.vscode/launch.json
new file mode 100644
index 0000000..1820d3b
--- /dev/null
+++ b/.vscode/launch.json
@@ -0,0 +1,26 @@
+{
+ "version": "0.2.0",
+ "configurations": [
+ {
+ // Use IntelliSense to find out which attributes exist for C#
debugging
+ // Use hover for the description of the existing attributes
+ // For further information visit
https://github.com/OmniSharp/omnisharp-vscode/blob/master/debugger-launchjson.md
+ "name": ".NET Core Launch (console)",
+ "type": "coreclr",
+ "request": "launch",
+ "preLaunchTask": "build",
+ // If you have changed target frameworks, make sure to update the
program path.
+ "program":
"${workspaceFolder}/dist/1.0.0-SNAPSHOT/qpid-proton-dotnet-src-1.0.0-SNAPSHOT/examples/Example.LargeMessageSender/bin/Debug/net5.0/Example.LargeMessageSender.dll",
+ "args": [],
+ "cwd":
"${workspaceFolder}/dist/1.0.0-SNAPSHOT/qpid-proton-dotnet-src-1.0.0-SNAPSHOT/examples/Example.LargeMessageSender",
+ // For more information about the 'console' field, see
https://aka.ms/VSCode-CS-LaunchJson-Console
+ "console": "internalConsole",
+ "stopAtEntry": false
+ },
+ {
+ "name": ".NET Core Attach",
+ "type": "coreclr",
+ "request": "attach"
+ }
+ ]
+}
\ No newline at end of file
diff --git a/.vscode/tasks.json b/.vscode/tasks.json
new file mode 100644
index 0000000..8b2bfee
--- /dev/null
+++ b/.vscode/tasks.json
@@ -0,0 +1,41 @@
+{
+ "version": "2.0.0",
+ "tasks": [
+ {
+ "label": "build",
+ "command": "dotnet",
+ "type": "process",
+ "args": [
+ "build",
+
"${workspaceFolder}/dist/1.0.0-SNAPSHOT/qpid-proton-dotnet-src-1.0.0-SNAPSHOT/examples/Example.LargeMessageSender/Example.LargeMessageSender.csproj",
+ "/property:GenerateFullPaths=true",
+ "/consoleloggerparameters:NoSummary"
+ ],
+ "problemMatcher": "$msCompile"
+ },
+ {
+ "label": "publish",
+ "command": "dotnet",
+ "type": "process",
+ "args": [
+ "publish",
+
"${workspaceFolder}/dist/1.0.0-SNAPSHOT/qpid-proton-dotnet-src-1.0.0-SNAPSHOT/examples/Example.LargeMessageSender/Example.LargeMessageSender.csproj",
+ "/property:GenerateFullPaths=true",
+ "/consoleloggerparameters:NoSummary"
+ ],
+ "problemMatcher": "$msCompile"
+ },
+ {
+ "label": "watch",
+ "command": "dotnet",
+ "type": "process",
+ "args": [
+ "watch",
+ "run",
+ "--project",
+
"${workspaceFolder}/dist/1.0.0-SNAPSHOT/qpid-proton-dotnet-src-1.0.0-SNAPSHOT/examples/Example.LargeMessageSender/Example.LargeMessageSender.csproj"
+ ],
+ "problemMatcher": "$msCompile"
+ }
+ ]
+}
\ No newline at end of file
diff --git a/Proton.sln b/Proton.sln
index 22a0075..b782fd6 100644
--- a/Proton.sln
+++ b/Proton.sln
@@ -1,21 +1,5 @@
Microsoft Visual Studio Solution File, Format Version 12.00
-#
# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# Visual Studio Version 16
VisualStudioVersion = 16.6.30114.105
MinimumVisualStudioVersion = 10.0.40219.1
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Proton",
"src\Proton\Proton.csproj", "{101F0276-086A-4955-A762-75CA8527E8CA}"
@@ -67,10 +51,12 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") =
"Solution Items", "Solution
.gitignore = .gitignore
build.sh = build.sh
README.md = README.md
- common.props = common.props
- versions.props = versions.props
+ common.props = common.props
+ versions.props = versions.props
EndProjectSection
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Example.NextReceiver",
"examples\Example.NextReceiver\Example.NextReceiver.csproj",
"{0A62621D-FCC9-448B-8645-8FEA44BCD054}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -312,6 +298,18 @@ Global
{2E5B03AB-2E32-43A5-9374-868DA143E4B3}.Release|x64.Build.0 =
Release|Any CPU
{2E5B03AB-2E32-43A5-9374-868DA143E4B3}.Release|x86.ActiveCfg =
Release|Any CPU
{2E5B03AB-2E32-43A5-9374-868DA143E4B3}.Release|x86.Build.0 =
Release|Any CPU
+ {0A62621D-FCC9-448B-8645-8FEA44BCD054}.Debug|Any CPU.ActiveCfg
= Debug|Any CPU
+ {0A62621D-FCC9-448B-8645-8FEA44BCD054}.Debug|Any CPU.Build.0 =
Debug|Any CPU
+ {0A62621D-FCC9-448B-8645-8FEA44BCD054}.Debug|x64.ActiveCfg =
Debug|Any CPU
+ {0A62621D-FCC9-448B-8645-8FEA44BCD054}.Debug|x64.Build.0 =
Debug|Any CPU
+ {0A62621D-FCC9-448B-8645-8FEA44BCD054}.Debug|x86.ActiveCfg =
Debug|Any CPU
+ {0A62621D-FCC9-448B-8645-8FEA44BCD054}.Debug|x86.Build.0 =
Debug|Any CPU
+ {0A62621D-FCC9-448B-8645-8FEA44BCD054}.Release|Any
CPU.ActiveCfg = Release|Any CPU
+ {0A62621D-FCC9-448B-8645-8FEA44BCD054}.Release|Any CPU.Build.0
= Release|Any CPU
+ {0A62621D-FCC9-448B-8645-8FEA44BCD054}.Release|x64.ActiveCfg =
Release|Any CPU
+ {0A62621D-FCC9-448B-8645-8FEA44BCD054}.Release|x64.Build.0 =
Release|Any CPU
+ {0A62621D-FCC9-448B-8645-8FEA44BCD054}.Release|x86.ActiveCfg =
Release|Any CPU
+ {0A62621D-FCC9-448B-8645-8FEA44BCD054}.Release|x86.Build.0 =
Release|Any CPU
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{A6F37028-A5C3-4A73-B21F-CF8D88F18144} =
{86674F9E-B027-47C7-A5B7-C7B346665600}
@@ -327,5 +325,6 @@ Global
{F837AEE2-057D-4328-8ED6-81CEFBE4008C} =
{86674F9E-B027-47C7-A5B7-C7B346665600}
{A756CFD0-C28C-4995-A08D-F446DC62C991} =
{86674F9E-B027-47C7-A5B7-C7B346665600}
{2E5B03AB-2E32-43A5-9374-868DA143E4B3} =
{86674F9E-B027-47C7-A5B7-C7B346665600}
+ {0A62621D-FCC9-448B-8645-8FEA44BCD054} =
{86674F9E-B027-47C7-A5B7-C7B346665600}
EndGlobalSection
EndGlobal
diff --git a/examples/Example.LargeMessageReceiver/Program.cs
b/examples/Example.LargeMessageReceiver/Program.cs
index 84ef36d..31132c2 100644
--- a/examples/Example.LargeMessageReceiver/Program.cs
+++ b/examples/Example.LargeMessageReceiver/Program.cs
@@ -19,7 +19,7 @@ using System;
using System.IO;
using Apache.Qpid.Proton.Client;
-namespace Apache.Qpid.Proton.Examples.HelloWorld
+namespace Apache.Qpid.Proton.Examples.LargeMessageReceiver
{
class Program
{
@@ -31,9 +31,11 @@ namespace Apache.Qpid.Proton.Examples.HelloWorld
IClient client = IClient.Create();
- ConnectionOptions options = new ConnectionOptions();
- options.User = Environment.GetEnvironmentVariable("USER");
- options.Password = Environment.GetEnvironmentVariable("PASSWORD");
+ ConnectionOptions options = new()
+ {
+ User = Environment.GetEnvironmentVariable("USER"),
+ Password = Environment.GetEnvironmentVariable("PASSWORD")
+ };
using IConnection connection = client.Connect(serverHost, serverPort,
options);
using IStreamReceiver receiver =
connection.OpenStreamReceiver(address);
diff --git a/examples/Example.LargeMessageSender/Program.cs
b/examples/Example.LargeMessageSender/Program.cs
index c0fbf3a..80e7cfc 100644
--- a/examples/Example.LargeMessageSender/Program.cs
+++ b/examples/Example.LargeMessageSender/Program.cs
@@ -19,11 +19,11 @@ using System;
using System.IO;
using Apache.Qpid.Proton.Client;
-namespace Apache.Qpid.Proton.Examples.HelloWorld
+namespace Apache.Qpid.Proton.Examples.LargeMessageSender
{
- class Program
+ public class Program
{
- static void Main(string[] args)
+ public static void Main(string[] args)
{
string serverHost = Environment.GetEnvironmentVariable("HOST") ??
"localhost";
int serverPort =
Convert.ToInt32(Environment.GetEnvironmentVariable("PORT") ?? "5672");
@@ -31,9 +31,11 @@ namespace Apache.Qpid.Proton.Examples.HelloWorld
IClient client = IClient.Create();
- ConnectionOptions options = new ConnectionOptions();
- options.User = Environment.GetEnvironmentVariable("USER");
- options.Password = Environment.GetEnvironmentVariable("PASSWORD");
+ ConnectionOptions options = new()
+ {
+ User = Environment.GetEnvironmentVariable("USER"),
+ Password = Environment.GetEnvironmentVariable("PASSWORD")
+ };
using IConnection connection = client.Connect(serverHost, serverPort,
options);
using IStreamSender sender = connection.OpenStreamSender(address);
diff --git a/examples/Example.NextReceiver/Example.NextReceiver.csproj
b/examples/Example.NextReceiver/Example.NextReceiver.csproj
new file mode 100644
index 0000000..edd731c
--- /dev/null
+++ b/examples/Example.NextReceiver/Example.NextReceiver.csproj
@@ -0,0 +1,38 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<Project Sdk="Microsoft.NET.Sdk">
+
+ <Import Project="../../common.props" />
+
+ <PropertyGroup>
+ <TargetFramework>$(DefaultExeTargetFrameworks)</TargetFramework>
+ <OutputType>Exe</OutputType>
+ <IsPackable>false</IsPackable>
+ <GenerateAssemblyInfo>false</GenerateAssemblyInfo>
+ </PropertyGroup>
+
+ <PropertyGroup Condition="'$(Configuration)'=='Release'">
+ <TreatWarningsAsErrors>true</TreatWarningsAsErrors>
+ <WarningsAsErrors />
+ </PropertyGroup>
+
+ <ItemGroup>
+ <ProjectReference Include="..\..\src\Proton\Proton.csproj" />
+ <ProjectReference Include="..\..\src\Proton.Client\Proton.Client.csproj" />
+ </ItemGroup>
+
+</Project>
diff --git a/examples/Example.NextReceiver/Program.cs
b/examples/Example.NextReceiver/Program.cs
new file mode 100644
index 0000000..932f926
--- /dev/null
+++ b/examples/Example.NextReceiver/Program.cs
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Qpid.Proton.Client;
+
+namespace Apache.Qpid.Proton.Examples.NextReceiver
+{
+ public class Program
+ {
+ public static void Main(string[] args)
+ {
+ string serverHost = Environment.GetEnvironmentVariable("HOST") ??
"localhost";
+ int serverPort =
Convert.ToInt32(Environment.GetEnvironmentVariable("PORT") ?? "5672");
+ string address1 = Environment.GetEnvironmentVariable("ADDRESS1") ??
"next-receiver-1-address";
+ string address2 = Environment.GetEnvironmentVariable("ADDRESS2") ??
"next-receiver-1-address";
+
+ IClient client = IClient.Create();
+
+ ConnectionOptions options = new()
+ {
+ User = Environment.GetEnvironmentVariable("USER"),
+ Password = Environment.GetEnvironmentVariable("PASSWORD")
+ };
+
+ using IConnection connection = client.Connect(serverHost, serverPort,
options);
+
+ _ = connection.OpenReceiver(address1);
+ _ = connection.OpenReceiver(address2);
+
+ Task.Run(() =>
+ {
+ try
+ {
+ Thread.Sleep(2000);
+ IMessage<string> message1 = IMessage<string>.Create("Hello
World 1");
+ message1.To = address1;
+ connection.Send(message1);
+ Thread.Sleep(2000);
+ IMessage<string> message2 = IMessage<string>.Create("Hello
World 2");
+ message2.To = address2;
+ connection.Send(message2);
+ }
+ catch (Exception e)
+ {
+ Console.WriteLine("Exception in message send task: " +
e.Message);
+ }
+ });
+
+ IDelivery delivery1 = connection.NextReceiver().Receive();
+ IDelivery delivery2 = connection.NextReceiver().Receive();
+
+ Console.WriteLine("Received first message with body: " +
delivery1.Message().Body);
+ Console.WriteLine("Received second message with body: " +
delivery2.Message().Body);
+ }
+ }
+}
diff --git a/src/Proton.Client/Client/Concurrent/AtomicReference.cs
b/src/Proton.Client/Client/Concurrent/AtomicReference.cs
index cbfee83..d52a9d0 100644
--- a/src/Proton.Client/Client/Concurrent/AtomicReference.cs
+++ b/src/Proton.Client/Client/Concurrent/AtomicReference.cs
@@ -106,7 +106,7 @@ namespace Apache.Qpid.Proton.Client.Concurrent
}
/// <summary>
- /// Implicit conversion of an atomic reference type to the contined value
+ /// Implicit conversion of an atomic reference type to the contained
value
/// using a volatile read operation.
/// </summary>
/// <param name="reference">The atomic reference to read from</param>
diff --git a/src/Proton.Client/Client/ConnectionOptions.cs
b/src/Proton.Client/Client/ConnectionOptions.cs
index efd2a68..0c037a0 100644
--- a/src/Proton.Client/Client/ConnectionOptions.cs
+++ b/src/Proton.Client/Client/ConnectionOptions.cs
@@ -33,6 +33,7 @@ namespace Apache.Qpid.Proton.Client
public static readonly long DEFAULT_DRAIN_TIMEOUT = 60000;
public static readonly ushort DEFAULT_CHANNEL_MAX = 65535;
public static readonly uint DEFAULT_MAX_FRAME_SIZE = 65536;
+ public static readonly NextReceiverPolicy DEFAULT_NEXT_RECEIVER_POLICY =
NextReceiverPolicy.RoundRobin;
/// <summary>
/// Creates a default Connection options instance.
@@ -74,6 +75,7 @@ namespace Apache.Qpid.Proton.Client
other.ChannelMax = ChannelMax;
other.MaxFrameSize = MaxFrameSize;
other.TraceFrames = TraceFrames;
+ other.DefaultNextReceiverPolicy = DefaultNextReceiverPolicy;
if (OfferedCapabilities != null && OfferedCapabilities.Length > 0)
{
string[] copyOf = new string[OfferedCapabilities.Length];
@@ -185,6 +187,12 @@ namespace Apache.Qpid.Proton.Client
/// </summary>
public uint MaxFrameSize { get; set; } = DEFAULT_MAX_FRAME_SIZE;
+ /// <summary>
+ /// Configures the default next receiver policy for this connection and
any session
+ /// that is created without specifying user defined session default
options.
+ /// </summary>
+ public NextReceiverPolicy DefaultNextReceiverPolicy { get; set; } =
DEFAULT_NEXT_RECEIVER_POLICY;
+
/// <summary>
/// Configures the set of capabilities that a new connection will
advertise to the remote.
/// </summary>
diff --git a/src/Proton.Client/Client/IConnection.cs
b/src/Proton.Client/Client/IConnection.cs
index 7fab12d..b42eca2 100644
--- a/src/Proton.Client/Client/IConnection.cs
+++ b/src/Proton.Client/Client/IConnection.cs
@@ -334,6 +334,85 @@ namespace Apache.Qpid.Proton.Client
/// <returns>A Task that results in a tracker instance that can be used
to track the send outcome</returns>
Task<ITracker> SendAsync<T>(IMessage<T> message);
+ /// <summary>
+ /// Waits indefinitely for a receiver created from this session to have
a delivery ready for
+ /// receipt. The selection of the next receiver when more than one
exists which has pending
+ /// deliveries is based upon the configured value of the default next
receiver policy that
+ /// was configured in the session options used to create this session,
or the connection
+ /// level policy if none was assigned to the session options.
+ /// </summary>
+ /// <returns>The next receiver that has a pending delivery available
based on policy.</returns>
+ IReceiver NextReceiver();
+
+ /// <summary>
+ /// Returns a task that will complete only after a receiver created from
this connection has
+ /// a delivery ready for receipt. The selection of the next receiver
when more than one
+ /// exists which has pending deliveries is based upon the configured
value of the default next
+ /// receiver policy that was configured in the connection options used
to create this connection.
+ /// </summary>
+ /// <returns>A Task that results in the next receiver that has a pending
delivery available based on policy.</returns>
+ Task<IReceiver> NextReceiverAsync();
+
+ /// <summary>
+ /// Waits indefinitely for a receiver created from this connection to
have a delivery ready for
+ /// receipt. The selection of the next receiver when more than one
exists which has pending
+ /// deliveries is based upon the value of the next receiver policy that
is provided by the caller.
+ /// </summary>
+ /// <param name="policy">The next receiver policy to apply when
selecting a result</param>
+ /// <returns></returns>
+ IReceiver NextReceiver(NextReceiverPolicy policy);
+
+ /// <summary>
+ /// Returns a task that will complete only after a receiver created from
this connection has
+ /// a delivery ready for receipt. The selection of the next receiver
when more than one
+ /// exists which has pending deliveries is based upon the value of the
next receiver policy
+ /// that is provided by the caller.
+ /// </summary>
+ /// <param name="policy">The next receiver policy to apply when
selecting a result</param>
+ /// <returns>A Task that results in the next receiver that has a pending
delivery available based on policy.</returns>
+ Task<IReceiver> NextReceiverAsync(NextReceiverPolicy policy);
+
+ /// <summary>
+ /// Waits up to the given timeout for a receiver created from this
connection to have a delivery
+ /// ready for receipt. The selection of the next receiver when more than
one exists which has
+ /// pending deliveries is based upon the configured value of the default
next receiver policy that
+ /// was configured in the connection options used to create this
connection.
+ /// </summary>
+ /// <param name="timeout">The time to wait for a receiver to have a
pending delivery</param>
+ /// <returns></returns>
+ IReceiver NextReceiver(TimeSpan timeout);
+
+ /// <summary>
+ /// Returns a task that will complete once a receiver created from this
connection has a delivery
+ /// ready for receipt or the given timeout expires. The selection of the
next receiver when more
+ /// than one exists which has pending deliveries is based upon the
configured value of the default
+ /// next receiver policy that was configured in the connection options
used to create this connection.
+ /// </summary>
+ /// <param name="timeout">The time to wait for a receiver to have a
pending delivery</param>
+ /// <returns></returns>
+ Task<IReceiver> NextReceiverAsync(TimeSpan timeout);
+
+ /// <summary>
+ /// Waits up to the given timeout for a receiver created from this
connection to have a delivery ready
+ /// for receipt. The selection of the next receiver when more than one
exists which has pending
+ /// deliveries is based upon the value of the next receiver policy that
is provided by the caller.
+ /// </summary>
+ /// <param name="policy">The next receiver policy to apply when
selecting a result</param>
+ /// <param name="timeout">The time to wait for a receiver to have a
pending delivery</param>
+ /// <returns>A Task that results in the next receiver that has a pending
delivery available based on policy.</returns>
+ IReceiver NextReceiver(NextReceiverPolicy policy, TimeSpan timeout);
+
+ /// <summary>
+ /// Returns a task that will complete once a receiver created from this
connection has a delivery
+ /// ready for receipt or the given timeout expires. The selection of the
next receiver when more
+ /// than one exists which has pending deliveries is based upon the value
of the next receiver policy
+ /// that is provided by the caller.
+ /// </summary>
+ /// <param name="policy">The next receiver policy to apply when
selecting a result</param>
+ /// <param name="timeout">The time to wait for a receiver to have a
pending delivery</param>
+ /// <returns>A Task that results in the next receiver that has a pending
delivery available based on policy.</returns>
+ Task<IReceiver> NextReceiverAsync(NextReceiverPolicy policy, TimeSpan
timeout);
+
/// <summary>
/// Returns the properties that the remote provided upon successfully
opening the connection.
/// If the open has not completed yet this method will block to await
the open response which
diff --git a/src/Proton.Client/Client/ISession.cs
b/src/Proton.Client/Client/ISession.cs
index a3caf42..ffc6fe5 100644
--- a/src/Proton.Client/Client/ISession.cs
+++ b/src/Proton.Client/Client/ISession.cs
@@ -336,5 +336,87 @@ namespace Apache.Qpid.Proton.Client
/// <returns>A Task whose result is the Session no longer under a
transaction</returns>
Task<ISession> RollbackTransactionAsync();
+ /// <summary>
+ /// Waits indefinitely for a receiver created from this session to have
a delivery ready for
+ /// receipt. The selection of the next receiver when more than one
exists which has pending
+ /// deliveries is based upon the configured value of the default next
receiver policy that
+ /// was configured in the session options used to create this session,
or the connection
+ /// level policy if none was assigned to the session options.
+ /// </summary>
+ /// <returns>The next receiver that has a pending delivery available
based on policy.</returns>
+ IReceiver NextReceiver();
+
+ /// <summary>
+ /// Returns a task that will complete only after a receiver created from
this session has
+ /// a delivery ready for receipt. The selection of the next receiver
when more than one
+ /// exists which has pending deliveries is based upon the configured
value of the default next
+ /// receiver policy that was configured in the session options used to
create this session,
+ /// or the connection level policy if none was assigned to the session
options.
+ /// </summary>
+ /// <returns>A Task that results in the next receiver that has a pending
delivery available based on policy.</returns>
+ Task<IReceiver> NextReceiverAsync();
+
+ /// <summary>
+ /// Waits indefinitely for a receiver created from this session to have
a delivery ready for
+ /// receipt. The selection of the next receiver when more than one
exists which has pending
+ /// deliveries is based upon the value of the next receiver policy that
is provided by the caller.
+ /// </summary>
+ /// <param name="policy">The next receiver policy to apply when
selecting a result</param>
+ /// <returns></returns>
+ IReceiver NextReceiver(NextReceiverPolicy policy);
+
+ /// <summary>
+ /// Returns a task that will complete only after a receiver created from
this session has
+ /// a delivery ready for receipt. The selection of the next receiver
when more than one
+ /// exists which has pending deliveries is based upon the value of the
next receiver policy
+ /// that is provided by the caller.
+ /// </summary>
+ /// <param name="policy">The next receiver policy to apply when
selecting a result</param>
+ /// <returns>A Task that results in the next receiver that has a pending
delivery available based on policy.</returns>
+ Task<IReceiver> NextReceiverAsync(NextReceiverPolicy policy);
+
+ /// <summary>
+ /// Waits up to the given timeout for a receiver created from this
session to have a delivery ready
+ /// for receipt. The selection of the next receiver when more than one
exists which has pending
+ /// deliveries is based upon the configured value of the default next
receiver policy that was
+ /// configured in the session options used to create this session, or
the connection level policy
+ /// if none was assigned to the session options.
+ /// </summary>
+ /// <param name="timeout">The time to wait for a receiver to have a
pending delivery</param>
+ /// <returns></returns>
+ IReceiver NextReceiver(TimeSpan timeout);
+
+ /// <summary>
+ /// Returns a task that will complete once a receiver created from this
session has a delivery
+ /// ready for receipt or the given timeout expires. The selection of the
next receiver when more
+ /// than one exists which has pending deliveries is based upon the
configured value of the default
+ /// next receiver policy that was configured in the session options used
to create this session,
+ /// or the connection level policy if none was assigned to the session
options.
+ /// </summary>
+ /// <param name="timeout">The time to wait for a receiver to have a
pending delivery</param>
+ /// <returns></returns>
+ Task<IReceiver> NextReceiverAsync(TimeSpan timeout);
+
+ /// <summary>
+ /// Waits up to the given timeout for a receiver created from this
session to have a delivery ready
+ /// for receipt. The selection of the next receiver when more than one
exists which has pending
+ /// deliveries is based upon the value of the next receiver policy that
is provided by the caller.
+ /// </summary>
+ /// <param name="policy">The next receiver policy to apply when
selecting a result</param>
+ /// <param name="timeout">The time to wait for a receiver to have a
pending delivery</param>
+ /// <returns>A Task that results in the next receiver that has a pending
delivery available based on policy.</returns>
+ IReceiver NextReceiver(NextReceiverPolicy policy, TimeSpan timeout);
+
+ /// <summary>
+ /// Returns a task that will complete once a receiver created from this
session has a delivery
+ /// ready for receipt or the given timeout expires. The selection of the
next receiver when more
+ /// than one exists which has pending deliveries is based upon the value
of the next receiver policy
+ /// that is provided by the caller.
+ /// </summary>
+ /// <param name="policy">The next receiver policy to apply when
selecting a result</param>
+ /// <param name="timeout">The time to wait for a receiver to have a
pending delivery</param>
+ /// <returns>A Task that results in the next receiver that has a pending
delivery available based on policy.</returns>
+ Task<IReceiver> NextReceiverAsync(NextReceiverPolicy policy, TimeSpan
timeout);
+
}
}
\ No newline at end of file
diff --git a/src/Proton.Client/Client/Implementation/ClientConnection.cs
b/src/Proton.Client/Client/Implementation/ClientConnection.cs
index 4b6af84..34df3ba 100644
--- a/src/Proton.Client/Client/Implementation/ClientConnection.cs
+++ b/src/Proton.Client/Client/Implementation/ClientConnection.cs
@@ -482,6 +482,79 @@ namespace Apache.Qpid.Proton.Client.Implementation
return result.Task;
}
+ public IReceiver NextReceiver()
+ {
+ return NextReceiverAsync(options.DefaultNextReceiverPolicy,
TimeSpan.MaxValue).ConfigureAwait(false).GetAwaiter().GetResult();
+ }
+
+ public Task<IReceiver> NextReceiverAsync()
+ {
+ return NextReceiverAsync(options.DefaultNextReceiverPolicy,
TimeSpan.MaxValue);
+ }
+
+ public IReceiver NextReceiver(NextReceiverPolicy policy)
+ {
+ return NextReceiverAsync(policy,
TimeSpan.MaxValue).ConfigureAwait(false).GetAwaiter().GetResult();
+ }
+
+ public Task<IReceiver> NextReceiverAsync(NextReceiverPolicy policy)
+ {
+ return NextReceiverAsync(policy, TimeSpan.MaxValue);
+ }
+
+ public IReceiver NextReceiver(TimeSpan timeout)
+ {
+ return NextReceiverAsync(options.DefaultNextReceiverPolicy,
timeout).ConfigureAwait(false).GetAwaiter().GetResult();
+ }
+
+ public Task<IReceiver> NextReceiverAsync(TimeSpan timeout)
+ {
+ return NextReceiverAsync(options.DefaultNextReceiverPolicy, timeout);
+ }
+
+ public IReceiver NextReceiver(NextReceiverPolicy policy, TimeSpan
timeout)
+ {
+ return NextReceiverAsync(policy,
timeout).ConfigureAwait(false).GetAwaiter().GetResult();
+ }
+
+ public Task<IReceiver> NextReceiverAsync(NextReceiverPolicy policy,
TimeSpan timeout)
+ {
+ CheckClosedOrFailed();
+ TaskCompletionSource<IReceiver> result = new();
+
+ DefaultSessionAsync().ContinueWith(session =>
+ {
+ if (session.IsCompletedSuccessfully)
+ {
+ session.Result.NextReceiverAsync(policy,
timeout).ContinueWith(tracker =>
+ {
+ if (tracker.IsCompletedSuccessfully)
+ {
+ result.TrySetResult(tracker.Result);
+ }
+ else if (tracker.IsCanceled)
+ {
+ result.TrySetCanceled();
+ }
+ else
+ {
+ result.TrySetException(tracker.Exception.InnerException);
+ }
+ });
+ }
+ else if (session.IsCanceled)
+ {
+ result.TrySetCanceled();
+ }
+ else
+ {
+ result.TrySetException(session.Exception.InnerException);
+ }
+ });
+
+ return result.Task;
+ }
+
public override string ToString()
{
return "ClientConnection:[" + ConnectionId + "]";
@@ -988,7 +1061,7 @@ namespace Apache.Qpid.Proton.Client.Implementation
private void DoEngineTickAndReschedule()
{
- // Prevent a tick that was queued on the event loop from occuring if
the
+ // Prevent a tick that was queued on the event loop from occurring if
the
// connection dropped and a new engine was created for a reconnect
attempt.
if (engine.Connection.IsRemotelyOpen &&
engine.Connection.IsLocallyOpen)
{
diff --git
a/src/Proton.Client/Client/Implementation/ClientNextReceiverSelector.cs
b/src/Proton.Client/Client/Implementation/ClientNextReceiverSelector.cs
new file mode 100644
index 0000000..e9dfff7
--- /dev/null
+++ b/src/Proton.Client/Client/Implementation/ClientNextReceiverSelector.cs
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading.Tasks;
+using Apache.Qpid.Proton.Client.Exceptions;
+using Apache.Qpid.Proton.Engine;
+using Apache.Qpid.Proton.Utilities;
+
+namespace Apache.Qpid.Proton.Client.Implementation
+{
+ public sealed class ClientNextReceiverSelector
+ {
+ private static readonly string LAST_RETURNED_STATE_KEY =
"Last_Returned_State";
+
+ private readonly ArrayDeque<TaskCompletionSource<IReceiver>> pending =
new();
+ private readonly Random random = new();
+
+ private readonly ClientSession session;
+
+ internal ClientNextReceiverSelector(ClientSession session)
+ {
+ this.session = session;
+
+ HandleReconnect(); // Same processing works for initialization
+ }
+
+ public void NextReceiver(TaskCompletionSource<IReceiver> request,
NextReceiverPolicy policy, TimeSpan timeout)
+ {
+ Statics.RequireNonNull(policy, "The next receiver selection policy
cannot be null");
+
+ ClientReceiver result = null;
+
+ switch (policy)
+ {
+ case NextReceiverPolicy.Random:
+ result = SelectRandomReceiver();
+ break;
+ case NextReceiverPolicy.RoundRobin:
+ result = SelectNextAvailable();
+ break;
+ case NextReceiverPolicy.FirstAvailable:
+ result = SelectFirstAvailable();
+ break;
+ case NextReceiverPolicy.LargestBacklog:
+ result = SelectLargestBacklog();
+ break;
+ case NextReceiverPolicy.SmallestBacklog:
+ result = SelectSmallestBacklog();
+ break;
+ default:
+ request.TrySetException(new ClientException("Next receiver
called with invalid or unknown policy:" + policy));
+ break;
+ }
+
+ if (result == null)
+ {
+ pending.Enqueue(request);
+ if (timeout > TimeSpan.Zero && timeout < TimeSpan.MaxValue)
+ {
+ session.Schedule(() =>
+ {
+ pending.Remove(request);
+ request.TrySetResult(null);
+ }, timeout);
+ }
+ }
+ else
+ {
+ // Track last returned to update state for Round Robin next
receiver dispatch
+ // this effectively ties all policies together in updating the
next result from
+ // a call that requests the round robin fairness policy.
+ session.ProtonSession.Attachments[LAST_RETURNED_STATE_KEY] =
result;
+
+ request.TrySetResult(result);
+ }
+ }
+
+ public void HandleReconnect()
+ {
+ session.ProtonSession.DeliveryReadHandler(DeliveryReadHandler);
+ }
+
+ public void HandleShutdown()
+ {
+ ClientException cause;
+
+ if (session.IsClosed)
+ {
+ cause = new ClientIllegalStateException("The Session was
explicitly closed", session.FailureCause);
+ }
+ else if (session.FailureCause != null)
+ {
+ cause = session.FailureCause;
+ }
+ else
+ {
+ cause = new ClientIllegalStateException("The session was closed
without a specific error being provided");
+ }
+
+ foreach (TaskCompletionSource<IReceiver> request in pending)
+ {
+ request.TrySetException(cause);
+ }
+
+ pending.Clear();
+ }
+
+ private void DeliveryReadHandler(IIncomingDelivery delivery)
+ {
+ // When a new delivery arrives that is completed
+ if (!pending.IsEmpty && !delivery.IsPartial && !delivery.IsAborted)
+ {
+ // We only handle next receiver events for normal client receivers
and
+ // not for stream receiver types etc.
+ if (delivery.Receiver.LinkedResource is ClientReceiver receiver)
+ {
+ // Track last returned to update state for Round Robin next
receiver dispatch
+ delivery.Receiver.Session.Attachments[LAST_RETURNED_STATE_KEY]
= receiver;
+
+ pending.Dequeue().TrySetResult(receiver);
+ }
+ }
+ }
+
+ private ClientReceiver SelectRandomReceiver()
+ {
+ IEnumerable<Engine.IReceiver> receivers =
session.ProtonSession.Receivers.
+ Where(r => r.LinkedResource is ClientReceiver receiver &&
receiver.QueuedDeliveries > 0);
+
+ Engine.IReceiver receiver =
receivers.ElementAtOrDefault(random.Next(0, receivers.Count()));
+
+ return receiver?.LinkedResource as ClientReceiver;
+ }
+
+ private ClientReceiver SelectNextAvailable()
+ {
+ ClientReceiver lastReceiver =
session.ProtonSession.Attachments.Get<ClientReceiver>(LAST_RETURNED_STATE_KEY,
null);
+ ClientReceiver result = null;
+
+ if (lastReceiver != null &&
!lastReceiver.ProtonReceiver.IsLocallyClosedOrDetached)
+ {
+ bool foundLast = false;
+ foreach (Engine.IReceiver protonReceiver in
session.ProtonSession.Receivers)
+ {
+ if (protonReceiver.LinkedResource is ClientReceiver candidate)
+ {
+ if (foundLast)
+ {
+ if (candidate.QueuedDeliveries > 0)
+ {
+ result = candidate;
+ }
+ }
+ else
+ {
+ foundLast = candidate == lastReceiver;
+ }
+ }
+ }
+ }
+ else
+ {
+ session.ProtonSession.Attachments[LAST_RETURNED_STATE_KEY] = null;
+ }
+
+ return result ?? SelectFirstAvailable();
+ }
+
+ private ClientReceiver SelectFirstAvailable()
+ {
+ Engine.IReceiver receiver =
+ session.ProtonSession.Receivers.Where(
+ r => r.LinkedResource is ClientReceiver receiver &&
receiver.QueuedDeliveries > 0).FirstOrDefault();
+
+ return (ClientReceiver)receiver?.LinkedResource;
+ }
+
+ private ClientReceiver SelectLargestBacklog()
+ {
+ IEnumerable<Engine.IReceiver> receivers =
session.ProtonSession.Receivers.
+ Where(r => r.LinkedResource is ClientReceiver receiver &&
receiver.QueuedDeliveries > 0);
+
+ ClientReceiver result = null;
+
+ foreach (Engine.IReceiver receiver in receivers)
+ {
+ ClientReceiver candidate = (ClientReceiver)receiver.LinkedResource;
+
+ if (result == null || result.QueuedDeliveries <
candidate.QueuedDeliveries)
+ {
+ result = candidate;
+ }
+ }
+
+ return result;
+ }
+
+ private ClientReceiver SelectSmallestBacklog()
+ {
+ IEnumerable<Engine.IReceiver> receivers =
session.ProtonSession.Receivers.
+ Where(r => r.LinkedResource is ClientReceiver receiver &&
receiver.QueuedDeliveries > 0);
+
+ ClientReceiver result = null;
+
+ foreach (Engine.IReceiver receiver in receivers)
+ {
+ ClientReceiver candidate = (ClientReceiver)receiver.LinkedResource;
+
+ if (result == null || result.QueuedDeliveries >
candidate.QueuedDeliveries)
+ {
+ result = candidate;
+ }
+ }
+
+ return result;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Proton.Client/Client/Implementation/ClientReceiverLinkType.cs
b/src/Proton.Client/Client/Implementation/ClientReceiverLinkType.cs
index 837ed14..f619258 100644
--- a/src/Proton.Client/Client/Implementation/ClientReceiverLinkType.cs
+++ b/src/Proton.Client/Client/Implementation/ClientReceiverLinkType.cs
@@ -138,6 +138,8 @@ namespace Apache.Qpid.Proton.Client.Implementation
internal string ReceiverId => receiverId;
+ internal Engine.IReceiver ProtonReceiver => protonLink;
+
protected void AsyncApplyDisposition(IIncomingDelivery delivery,
Types.Transport.IDeliveryState state, bool settle)
{
session.Execute(() =>
diff --git a/src/Proton.Client/Client/Implementation/ClientSender.cs
b/src/Proton.Client/Client/Implementation/ClientSender.cs
index a86d109..e74c79f 100644
--- a/src/Proton.Client/Client/Implementation/ClientSender.cs
+++ b/src/Proton.Client/Client/Implementation/ClientSender.cs
@@ -134,7 +134,7 @@ namespace Apache.Qpid.Proton.Client.Implementation
private Task<ITracker> DoSendMessageAsync<T>(IAdvancedMessage<T>
message, IDictionary<string, object> deliveryAnnotations, bool waitForCredit)
{
- TaskCompletionSource<ITracker> operation = new
TaskCompletionSource<ITracker>();
+ TaskCompletionSource<ITracker> operation = new();
IProtonBuffer buffer = message.Encode(deliveryAnnotations);
@@ -144,7 +144,7 @@ namespace Apache.Qpid.Proton.Client.Implementation
{
try
{
- ClientOutgoingEnvelope envelope = new
ClientOutgoingEnvelope(this, message.MessageFormat, buffer, operation);
+ ClientOutgoingEnvelope envelope = new(this,
message.MessageFormat, buffer, operation);
if (ProtonSender.IsSendable && ProtonSender.Current == null)
{
@@ -274,7 +274,7 @@ namespace Apache.Qpid.Proton.Client.Implementation
#endregion
- #region Proton Sender lifecycle envent handlers
+ #region Proton Sender lifecycle event handlers
private void HandleLocalOpen(Engine.ISender sender)
{
diff --git a/src/Proton.Client/Client/Implementation/ClientSession.cs
b/src/Proton.Client/Client/Implementation/ClientSession.cs
index ce46884..f20a291 100644
--- a/src/Proton.Client/Client/Implementation/ClientSession.cs
+++ b/src/Proton.Client/Client/Implementation/ClientSession.cs
@@ -40,15 +40,16 @@ namespace Apache.Qpid.Proton.Client.Implementation
private readonly SessionOptions options;
private readonly ClientConnection connection;
private readonly string sessionId;
- private readonly AtomicBoolean closed = new AtomicBoolean();
+ private readonly AtomicBoolean closed = new();
private readonly ClientSenderBuilder senderBuilder;
private readonly ClientReceiverBuilder receiverBuilder;
- private readonly TaskCompletionSource<ISession> openFuture = new
TaskCompletionSource<ISession>();
- private readonly TaskCompletionSource<ISession> closeFuture = new
TaskCompletionSource<ISession>();
+ private readonly TaskCompletionSource<ISession> openFuture = new();
+ private readonly TaskCompletionSource<ISession> closeFuture = new();
private IClientTransactionContext txnContext = NoOpTransactionContext;
private Engine.ISession protonSession;
private ClientException failureCause;
+ private ClientNextReceiverSelector nextReceiverSelector;
public ClientSession(ClientConnection connection, SessionOptions
options, string sessionId, Engine.ISession session)
{
@@ -66,6 +67,8 @@ namespace Apache.Qpid.Proton.Client.Implementation
public Task<ISession> OpenTask => openFuture.Task;
+ internal ClientException FailureCause => failureCause;
+
public IReadOnlyDictionary<string, object> Properties
{
get
@@ -338,6 +341,67 @@ namespace Apache.Qpid.Proton.Client.Implementation
return rollbackFuture.Task;
}
+ public virtual IReceiver NextReceiver()
+ {
+ return NextReceiverAsync(options.DefaultNextReceiverPolicy,
TimeSpan.MaxValue).ConfigureAwait(false).GetAwaiter().GetResult();
+ }
+
+ public virtual Task<IReceiver> NextReceiverAsync()
+ {
+ return NextReceiverAsync(options.DefaultNextReceiverPolicy,
TimeSpan.MaxValue);
+ }
+
+ public virtual IReceiver NextReceiver(NextReceiverPolicy policy)
+ {
+ return NextReceiverAsync(policy,
TimeSpan.MaxValue).ConfigureAwait(false).GetAwaiter().GetResult();
+ }
+
+ public virtual Task<IReceiver> NextReceiverAsync(NextReceiverPolicy
policy)
+ {
+ return NextReceiverAsync(policy, TimeSpan.MaxValue);
+ }
+
+ public virtual IReceiver NextReceiver(TimeSpan timeout)
+ {
+ return NextReceiverAsync(options.DefaultNextReceiverPolicy,
timeout).ConfigureAwait(false).GetAwaiter().GetResult();
+ }
+
+ public virtual Task<IReceiver> NextReceiverAsync(TimeSpan timeout)
+ {
+ return NextReceiverAsync(options.DefaultNextReceiverPolicy, timeout);
+ }
+
+ public virtual IReceiver NextReceiver(NextReceiverPolicy policy,
TimeSpan timeout)
+ {
+ return NextReceiverAsync(policy,
timeout).ConfigureAwait(false).GetAwaiter().GetResult();
+ }
+
+ public virtual Task<IReceiver> NextReceiverAsync(NextReceiverPolicy
policy, TimeSpan timeout)
+ {
+ CheckClosedOrFailed();
+ TaskCompletionSource<IReceiver> nextReceiver = new();
+
+ connection.Execute(() =>
+ {
+ try
+ {
+ CheckClosedOrFailed();
+ if (nextReceiverSelector == null)
+ {
+ nextReceiverSelector = new ClientNextReceiverSelector(this);
+ }
+
+ nextReceiverSelector.NextReceiver(nextReceiver, policy,
timeout);
+ }
+ catch (Exception error)
+ {
+
nextReceiver.TrySetException(ClientExceptionSupport.CreateNonFatalOrPassthrough(error));
+ }
+ });
+
+ return nextReceiver.Task;
+ }
+
#region Internal client session API
internal void CheckClosedOrFailed()
@@ -516,6 +580,8 @@ namespace Apache.Qpid.Proton.Client.Implementation
{
}
+ nextReceiverSelector?.HandleShutdown();
+
if (failureCause != null)
{
_ = openFuture.TrySetException(failureCause);
@@ -628,6 +694,8 @@ namespace Apache.Qpid.Proton.Client.Implementation
protonSession.Close();
protonSession =
ConfigureSession(ClientSessionBuilder.RecreateSession(connection, options));
+ nextReceiverSelector?.HandleReconnect();
+
Open();
}
else
diff --git a/src/Proton.Client/Client/Implementation/ClientSessionBuilder.cs
b/src/Proton.Client/Client/Implementation/ClientSessionBuilder.cs
index a47c872..28dbbe2 100644
--- a/src/Proton.Client/Client/Implementation/ClientSessionBuilder.cs
+++ b/src/Proton.Client/Client/Implementation/ClientSessionBuilder.cs
@@ -98,7 +98,8 @@ namespace Apache.Qpid.Proton.Client.Implementation
CloseTimeout = connectionOptions.CloseTimeout,
RequestTimeout = connectionOptions.RequestTimeout,
SendTimeout = connectionOptions.SendTimeout,
- DrainTimeout = connectionOptions.DrainTimeout
+ DrainTimeout = connectionOptions.DrainTimeout,
+ DefaultNextReceiverPolicy =
connectionOptions.DefaultNextReceiverPolicy
};
}
diff --git a/src/Proton.Client/Client/Implementation/ClientStreamSession.cs
b/src/Proton.Client/Client/Implementation/ClientStreamSession.cs
index 0aee53e..f3f6c06 100644
--- a/src/Proton.Client/Client/Implementation/ClientStreamSession.cs
+++ b/src/Proton.Client/Client/Implementation/ClientStreamSession.cs
@@ -15,6 +15,7 @@
* limitations under the License.
*/
+using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Apache.Qpid.Proton.Client.Exceptions;
@@ -87,5 +88,53 @@ namespace Apache.Qpid.Proton.Client.Implementation
CheckClosedOrFailed();
throw new ClientUnsupportedOperationException("Cannot create a sender
from a streaming resource session");
}
+
+ public override IReceiver NextReceiver()
+ {
+ CheckClosedOrFailed();
+ throw new ClientUnsupportedOperationException("Cannot request next
receiver from a streaming resource session");
+ }
+
+ public override Task<IReceiver> NextReceiverAsync()
+ {
+ CheckClosedOrFailed();
+ throw new ClientUnsupportedOperationException("Cannot request next
receiver from a streaming resource session");
+ }
+
+ public override IReceiver NextReceiver(NextReceiverPolicy policy)
+ {
+ CheckClosedOrFailed();
+ throw new ClientUnsupportedOperationException("Cannot request next
receiver from a streaming resource session");
+ }
+
+ public override Task<IReceiver> NextReceiverAsync(NextReceiverPolicy
policy)
+ {
+ CheckClosedOrFailed();
+ throw new ClientUnsupportedOperationException("Cannot request next
receiver from a streaming resource session");
+ }
+
+ public override IReceiver NextReceiver(TimeSpan timeout)
+ {
+ CheckClosedOrFailed();
+ throw new ClientUnsupportedOperationException("Cannot request next
receiver from a streaming resource session");
+ }
+
+ public override Task<IReceiver> NextReceiverAsync(TimeSpan timeout)
+ {
+ CheckClosedOrFailed();
+ throw new ClientUnsupportedOperationException("Cannot request next
receiver from a streaming resource session");
+ }
+
+ public override IReceiver NextReceiver(NextReceiverPolicy policy,
TimeSpan timeout)
+ {
+ CheckClosedOrFailed();
+ throw new ClientUnsupportedOperationException("Cannot request next
receiver from a streaming resource session");
+ }
+
+ public override Task<IReceiver> NextReceiverAsync(NextReceiverPolicy
policy, TimeSpan timeout)
+ {
+ CheckClosedOrFailed();
+ throw new ClientUnsupportedOperationException("Cannot request next
receiver from a streaming resource session");
+ }
}
}
\ No newline at end of file
diff --git a/src/Proton.Client/Client/NextReceiverPolicy.cs
b/src/Proton.Client/Client/NextReceiverPolicy.cs
new file mode 100644
index 0000000..fe84f4b
--- /dev/null
+++ b/src/Proton.Client/Client/NextReceiverPolicy.cs
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Qpid.Proton.Client
+{
+ /// <summary>
+ /// Determines the behavior of a Session when the next receiver method is
called
+ /// on that session. Each policy provides a contract on the ordering of
returned
+ /// receivers from the next receiver API when there are receivers with
locally
+ /// queued deliveries. When there are no Receiver instances that have
locally
+ /// queued deliveries the next receive API will return the next receiver to
+ /// receive a complete incoming delivery unless a timeout was given and that
+ /// time period expires in which case it will return null.
+ ///
+ /// Should the user perform receive calls on a Receiver directly in multiple
+ /// threads the behavior of the next receiver API is undefined and it
becomes possible
+ /// that the resulting receiver returned from that API will have no actual
pending
+ /// deliveries due to a race. In most cases the caller can mitigate some
risk by using
+ /// the Receiver tryReceive API and accounting for a null result.
+ /// </summary>
+ public enum NextReceiverPolicy
+ {
+ /// <summary>
+ /// Examines the list of currently open receivers in the session and
returns
+ /// the next receiver that has a pending delivery that follows the
previously
+ /// returned receiver (if any) otherwise the first receiver in the
session with
+ /// a pending delivery is returned. The order of receivers returned will
likely
+ /// be creation order however the implementation is not required to
follow this
+ /// pattern so the caller should not be coded to rely on that ordering.
+ /// </summary>
+ RoundRobin,
+
+ /// <summary>
+ /// Examines the list of currently open receivers in the session and
returns a
+ /// random selection from the set of receivers that have a pending
delivery
+ /// immediately available. This provides a means of selecting receivers
which
+ /// is not prone to sticking to a highly active receiver which can
starve out
+ /// other receivers which receive only limited traffic.
+ /// </summary>
+ Random,
+
+ /// <summary>
+ /// Examines the list of currently open receivers in the session and
returns the
+ /// first receiver found with an available delivery. This can result in
starvation
+ /// if that receiver has a continuous feed of new deliveries from the
remote as it
+ /// will be repeatedly selected by the next receiver API.
+ /// </summary>
+ FirstAvailable,
+
+ /// <summary>
+ /// Examines the list of currently open receivers in the session and
returns the
+ /// receiver with the largest backlog of available deliveries. This can
result in
+ /// starvation if that receiver has a continuous feed of new deliveries
from the
+ /// remote as it will likely be repeatedly selected by the next receiver
API.
+ /// </summary>
+ LargestBacklog,
+
+ /// <summary>
+ /// Examines the list of currently open receivers in the session and
returns the
+ /// receiver with the smallest backlog of available deliveries.
+ /// </summary>
+ SmallestBacklog
+
+ }
+}
\ No newline at end of file
diff --git a/src/Proton.Client/Client/SessionOptions.cs
b/src/Proton.Client/Client/SessionOptions.cs
index 6660b26..713b533 100644
--- a/src/Proton.Client/Client/SessionOptions.cs
+++ b/src/Proton.Client/Client/SessionOptions.cs
@@ -65,6 +65,7 @@ namespace Apache.Qpid.Proton.Client
other.OpenTimeout = OpenTimeout;
other.CloseTimeout = CloseTimeout;
other.DrainTimeout = DrainTimeout;
+ other.DefaultNextReceiverPolicy = DefaultNextReceiverPolicy;
if (OfferedCapabilities != null && OfferedCapabilities.Length > 0)
{
string[] copyOf = new string[OfferedCapabilities.Length];
@@ -137,6 +138,11 @@ namespace Apache.Qpid.Proton.Client
/// </summary>
public string[] DesiredCapabilities { get; set; }
+ /// <summary>
+ /// Configures the default next receiver policy for this session.
+ /// </summary>
+ public NextReceiverPolicy DefaultNextReceiverPolicy { get; set; } =
ConnectionOptions.DEFAULT_NEXT_RECEIVER_POLICY;
+
/// <summary>
/// Configures a collection of property values that are sent to the
remote upon opening
/// a new session.
diff --git a/src/Proton.Client/Client/Transport/ITransport.cs
b/src/Proton.Client/Client/Transport/ITransport.cs
index cd0d9c8..26bf87b 100644
--- a/src/Proton.Client/Client/Transport/ITransport.cs
+++ b/src/Proton.Client/Client/Transport/ITransport.cs
@@ -30,7 +30,7 @@ namespace Apache.Qpid.Proton.Client.Transport
public interface ITransport
{
/// <summary>
- /// Returns the event loop that this transport is registed against,
+ /// Returns the event loop that this transport is registered against,
/// the event loop should never have its lifetime linked to a transport
/// as the client connection will use a single event loop for the
/// duration of its lifetime.
diff --git a/src/Proton.Client/Client/Transport/TcpTransport.cs
b/src/Proton.Client/Client/Transport/TcpTransport.cs
index d6cb339..b9d73e6 100644
--- a/src/Proton.Client/Client/Transport/TcpTransport.cs
+++ b/src/Proton.Client/Client/Transport/TcpTransport.cs
@@ -110,7 +110,7 @@ namespace Apache.Qpid.Proton.Client.Transport
// since we are shutting down anyway.
try
{
- ChannelTermination termination = new ChannelTermination();
+ ChannelTermination termination = new();
if ((!channelOutputSource?.TryWrite(termination) ?? true) ||
!connected)
{
termination.Execute();
diff --git a/src/Proton/Engine/IAttachments.cs
b/src/Proton/Engine/IAttachments.cs
index c057b09..9ac40c3 100644
--- a/src/Proton/Engine/IAttachments.cs
+++ b/src/Proton/Engine/IAttachments.cs
@@ -25,6 +25,16 @@ namespace Apache.Qpid.Proton.Engine
/// </summary>
public interface IAttachments
{
+ /// <summary>
+ /// Gets or sets the attachment with the given key. If the element being
+ /// retrieved is not in the collection an exception is thrown.
+ /// </summary>
+ /// <param name="key"></param>
+ /// <returns></returns>
+ /// <exception cref="KeyNotFoundException">If the given key is not in
the attachments</exception>
+ /// <exception cref="ArgumentNullException">If the given key is
null</exception>
+ object this[string key] { get; set; }
+
/// <summary>
/// Gets the user attached value that is associated with the given key,
or null
/// if no data is mapped to the key.
diff --git a/src/Proton/Engine/ISession.cs b/src/Proton/Engine/ISession.cs
index 0db86d4..8cea13f 100644
--- a/src/Proton/Engine/ISession.cs
+++ b/src/Proton/Engine/ISession.cs
@@ -148,7 +148,7 @@ namespace Apache.Qpid.Proton.Engine
/// Sets a delegate for when an AMQP Attach frame is received from the
remote peer
/// for a transaction manager link attach.
/// <para/>
- /// Used to process remotely initiated transcation mangaers. Locally
initiated links have
+ /// Used to process remotely initiated transaction managers. Locally
initiated links have
/// their own handlers invoked instead. This method is Typically used by
servers to listen
/// for remote resource creation. If an event handler for remote sender
open is registered on
/// this Session for a link scoped to it then this handler will be
invoked instead of the
@@ -158,5 +158,17 @@ namespace Apache.Qpid.Proton.Engine
/// <returns>This session instance</returns>
ISession TransactionManagerOpenedHandler(Action<ITransactionManager>
handler);
+ /// <summary>
+ /// Allows monitoring of incoming deliveries to receivers attached to
this {@link Session}.
+ /// The Receiver that is the target of the incoming delivery will be
notified first of the
+ /// incoming delivery and any processing should be done using the
Receiver DeliveryReadHandler
+ /// API. This event point will be trigger only after the Receiver level
handler and should be
+ /// used to monitor deliveries passing through a session for logging or
other state related
+ /// actions performed by the service managing this session.
+ /// </summary>
+ /// <param name="handler">Handler that is signalled that a receiver read
a new delivery</param>
+ /// <returns>This session instance</returns>
+ ISession DeliveryReadHandler(Action<IIncomingDelivery> handler);
+
}
}
\ No newline at end of file
diff --git a/src/Proton/Engine/Implementation/ProtonAttachments.cs
b/src/Proton/Engine/Implementation/ProtonAttachments.cs
index 63b1eba..a3f6a02 100644
--- a/src/Proton/Engine/Implementation/ProtonAttachments.cs
+++ b/src/Proton/Engine/Implementation/ProtonAttachments.cs
@@ -23,6 +23,12 @@ namespace Apache.Qpid.Proton.Engine.Implementation
{
private readonly IDictionary<string, object> entries = new
Dictionary<string, object>();
+ public object this[string key]
+ {
+ get => entries[key];
+ set => entries[key] = value;
+ }
+
public IAttachments Clear()
{
entries.Clear();
diff --git a/src/Proton/Engine/Implementation/ProtonReceiver.cs
b/src/Proton/Engine/Implementation/ProtonReceiver.cs
index b582184..e641172 100644
--- a/src/Proton/Engine/Implementation/ProtonReceiver.cs
+++ b/src/Proton/Engine/Implementation/ProtonReceiver.cs
@@ -392,6 +392,13 @@ namespace Apache.Qpid.Proton.Engine.Implementation
{
deliveryReadEventHandler?.Invoke(delivery);
}
+
+ // Allow session owner to monitor deliveries passing through the
session
+ // but only after the receiver handlers have had a chance to handle
it.
+ if (session.HasDeliveryReadHandler)
+ {
+ session.FireDeliveryRead(delivery);
+ }
}
internal void FireDeliveryUpdated(ProtonIncomingDelivery delivery)
diff --git a/src/Proton/Engine/Implementation/ProtonSession.cs
b/src/Proton/Engine/Implementation/ProtonSession.cs
index e622892..fcc8792 100644
--- a/src/Proton/Engine/Implementation/ProtonSession.cs
+++ b/src/Proton/Engine/Implementation/ProtonSession.cs
@@ -32,7 +32,7 @@ namespace Apache.Qpid.Proton.Engine.Implementation
/// </summary>
public sealed class ProtonSession : ProtonEndpoint<ISession>, ISession
{
- private readonly Begin localBegin = new Begin();
+ private readonly Begin localBegin = new();
private Begin remoteBegin;
private readonly ushort localChannel;
@@ -61,6 +61,9 @@ namespace Apache.Qpid.Proton.Engine.Implementation
private Action<IReceiver> remoteReceiverOpenEventHandler;
private Action<ITransactionManager> remoteTxnManagerOpenEventHandler;
+ // Spy API for session resources
+ private Action<IIncomingDelivery> deliveryReadHandler;
+
public ProtonSession(ProtonConnection connection, ushort channel) :
base(connection.ProtonEngine)
{
this.connection = connection;
@@ -307,6 +310,12 @@ namespace Apache.Qpid.Proton.Engine.Implementation
return this;
}
+ public ISession DeliveryReadHandler(Action<IIncomingDelivery> handler)
+ {
+ deliveryReadHandler = handler;
+ return this;
+ }
+
#endregion
#region Handlers for remote AMQP Performatives
@@ -461,6 +470,8 @@ namespace Apache.Qpid.Proton.Engine.Implementation
internal bool HasTransactionManagerOpenHandler =>
remoteTxnManagerOpenEventHandler != null;
+ internal bool HasDeliveryReadHandler => deliveryReadHandler != null;
+
internal void FireRemoteReceiverOpened(IReceiver receiver)
{
remoteReceiverOpenEventHandler?.Invoke(receiver);
@@ -471,6 +482,11 @@ namespace Apache.Qpid.Proton.Engine.Implementation
remoteSenderOpenEventHandler?.Invoke(sender);
}
+ internal void FireDeliveryRead(IIncomingDelivery delivery)
+ {
+ deliveryReadHandler?.Invoke(delivery);
+ }
+
internal void FireRemoteTransactionManagerOpened(ITransactionManager
manager)
{
remoteTxnManagerOpenEventHandler?.Invoke(manager);
diff --git
a/test/Proton.Client.Tests/Client/Implementation/ClientBaseTestFixture.cs
b/test/Proton.Client.Tests/Client/Implementation/ClientBaseTestFixture.cs
index 14d2bca..92eb8c0 100644
--- a/test/Proton.Client.Tests/Client/Implementation/ClientBaseTestFixture.cs
+++ b/test/Proton.Client.Tests/Client/Implementation/ClientBaseTestFixture.cs
@@ -46,7 +46,7 @@ namespace Apache.Qpid.Proton.Client.Implementation
NLog.Targets.Target logconsole = new
NLog.Targets.ConsoleTarget("logconsole");
// Rules for mapping loggers to targets
- // config.AddRule(NLog.LogLevel.Trace, NLog.LogLevel.Fatal,
logconsole);
+ //config.AddRule(NLog.LogLevel.Trace, NLog.LogLevel.Fatal,
logconsole);
config.AddRule(NLog.LogLevel.Trace, NLog.LogLevel.Fatal, logfile);
loggerFactory = LoggerFactory.Create(builder =>
diff --git
a/test/Proton.Client.Tests/Client/Implementation/ClientReconnectSessionTest.cs
b/test/Proton.Client.Tests/Client/Implementation/ClientReconnectSessionTest.cs
index 3642c77..7504fef 100644
---
a/test/Proton.Client.Tests/Client/Implementation/ClientReconnectSessionTest.cs
+++
b/test/Proton.Client.Tests/Client/Implementation/ClientReconnectSessionTest.cs
@@ -17,7 +17,9 @@
using System;
using System.Threading;
+using System.Threading.Tasks;
using Apache.Qpid.Proton.Test.Driver;
+using Apache.Qpid.Proton.Types.Messaging;
using Microsoft.Extensions.Logging;
using NUnit.Framework;
@@ -275,5 +277,121 @@ namespace Apache.Qpid.Proton.Client.Implementation
}
}
+ [Test]
+ public void TestNextReceiverCompletesAfterDeliveryArrivesRoundRobin()
+ {
+
DoTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy.RoundRobin);
+ }
+
+ [Test]
+ public void TestNextReceiverCompletesAfterDeliveryArrivesRandom()
+ {
+
DoTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy.Random);
+ }
+
+ [Test]
+ public void TestNextReceiverCompletesAfterDeliveryArrivesLargestBacklog()
+ {
+
DoTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy.LargestBacklog);
+ }
+
+ [Test]
+ public void
TestNextReceiverCompletesAfterDeliveryArrivesSmallestBacklog()
+ {
+
DoTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy.SmallestBacklog);
+ }
+
+ [Test]
+ public void TestNextReceiverCompletesAfterDeliveryArrivesFirstAvailable()
+ {
+
DoTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy.FirstAvailable);
+ }
+
+ public void
DoTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy policy)
+ {
+ byte[] payload = CreateEncodedMessage(new AmqpValue("Hello World"));
+
+ using (ProtonTestServer firstPeer = new
ProtonTestServer(loggerFactory))
+ using (ProtonTestServer finalPeer = new
ProtonTestServer(loggerFactory))
+ {
+ firstPeer.ExpectSASLAnonymousConnect();
+ firstPeer.ExpectOpen().Respond();
+ firstPeer.ExpectBegin().Respond();
+ firstPeer.ExpectAttach().OfReceiver().Respond();
+ firstPeer.ExpectFlow().WithLinkCredit(10);
+ firstPeer.DropAfterLastHandler();
+ firstPeer.Start();
+
+ finalPeer.ExpectSASLAnonymousConnect();
+ finalPeer.ExpectOpen().Respond();
+ finalPeer.ExpectBegin().Respond();
+ finalPeer.ExpectAttach().OfReceiver().Respond();
+ finalPeer.ExpectFlow().WithLinkCredit(10);
+ finalPeer.Start();
+
+ string primaryAddress = firstPeer.ServerAddress;
+ int primaryPort = firstPeer.ServerPort;
+ string finalAddress = finalPeer.ServerAddress;
+ int finalPort = finalPeer.ServerPort;
+
+ logger.LogInformation("Test started, first peer listening on:
{0}:{1}", primaryAddress, primaryPort);
+ logger.LogInformation("Test started, final peer listening on:
{0}:{1}", finalAddress, finalPort);
+
+ CountdownEvent done = new CountdownEvent(1);
+
+ ConnectionOptions options = new ConnectionOptions()
+ {
+ DefaultNextReceiverPolicy = policy,
+ };
+ options.ReconnectOptions.ReconnectEnabled = true;
+ options.ReconnectOptions.AddReconnectLocation(finalAddress,
finalPort);
+
+ IClient container = IClient.Create();
+ IConnection connection = container.Connect(primaryAddress,
primaryPort, options);
+
+ Task.Run(() =>
+ {
+ try
+ {
+ IReceiver receiver = connection.NextReceiver();
+ IDelivery delivery = receiver.Receive();
+ logger.LogInformation("Next receiver returned delivery with
body: {0}", delivery.Message().Body);
+ done.Signal();
+ }
+ catch (Exception e)
+ {
+ logger.LogDebug("Exception in next receiver task", e);
+ }
+ });
+
+ ReceiverOptions receiverOptions = new ReceiverOptions()
+ {
+ CreditWindow = 10,
+ AutoAccept = false
+ };
+
+ _ = connection.OpenReceiver("test-receiver1",
receiverOptions).OpenTask.Result;
+
+ firstPeer.WaitForScriptToComplete();
+ finalPeer.WaitForScriptToComplete();
+
+ finalPeer.RemoteTransfer().WithHandle(0)
+ .WithDeliveryId(0)
+ .WithMore(false)
+ .WithMessageFormat(0)
+ .WithPayload(payload).Later(15);
+
+ finalPeer.WaitForScriptToComplete();
+
+ Assert.IsTrue(done.Wait(TimeSpan.FromSeconds(10)));
+
+ finalPeer.WaitForScriptToComplete();
+ finalPeer.ExpectClose().Respond();
+
+ connection.Close();
+
+ finalPeer.WaitForScriptToComplete();
+ }
+ }
}
}
\ No newline at end of file
diff --git
a/test/Proton.Client.Tests/Client/Implementation/ClientSessionTest.cs
b/test/Proton.Client.Tests/Client/Implementation/ClientSessionTest.cs
index 04dadcb..078f1d2 100644
--- a/test/Proton.Client.Tests/Client/Implementation/ClientSessionTest.cs
+++ b/test/Proton.Client.Tests/Client/Implementation/ClientSessionTest.cs
@@ -24,6 +24,9 @@ using Apache.Qpid.Proton.Client.Exceptions;
using Apache.Qpid.Proton.Types.Transport;
using System.Collections.Generic;
using System.Linq;
+using Apache.Qpid.Proton.Types.Messaging;
+using Apache.Qpid.Proton.Client.TestSupport;
+using System.Threading.Tasks;
namespace Apache.Qpid.Proton.Client.Implementation
{
@@ -266,7 +269,7 @@ namespace Apache.Qpid.Proton.Client.Implementation
}
[Test]
- public void
TestConnectionCloseGetsResponseWithErrorDoesNotThrowUntimedGet()
+ public void
TestConnectionCloseGetsResponseWithErrorDoesNotThrowInfiniteGet()
{
DoTestSessionCloseGetsResponseWithErrorThrows(false);
}
@@ -686,5 +689,1164 @@ namespace Apache.Qpid.Proton.Client.Implementation
peer.WaitForScriptToComplete();
}
}
+
+ [Test]
+ public void
TestNextReceiverFromDefaultSessionReturnsSameReceiverForQueuedDeliveries()
+ {
+ byte[] payload = CreateEncodedMessage(new AmqpValue("Hello World"));
+
+ using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+ {
+ peer.ExpectSASLAnonymousConnect();
+ peer.ExpectOpen().Respond();
+ peer.ExpectBegin().Respond();
+ peer.ExpectAttach().OfReceiver().Respond();
+ peer.ExpectFlow().WithLinkCredit(10);
+ for (uint i = 0; i < 10; ++i)
+ {
+ peer.RemoteTransfer().WithDeliveryId(i)
+ .WithMore(false)
+ .WithMessageFormat(0)
+ .WithPayload(payload).Queue();
+ }
+ peer.Start();
+
+ string remoteAddress = peer.ServerAddress;
+ int remotePort = peer.ServerPort;
+
+ logger.LogInformation("Test started, peer listening on: {0}:{1}",
remoteAddress, remotePort);
+
+ IClient container = IClient.Create();
+ ConnectionOptions connOptions = new ConnectionOptions()
+ {
+ DefaultNextReceiverPolicy = NextReceiverPolicy.FirstAvailable
+ };
+ IConnection connection = container.Connect(remoteAddress,
remotePort, connOptions);
+
+ ReceiverOptions options = new ReceiverOptions()
+ {
+ CreditWindow = 0,
+ AutoAccept = false
+ };
+ IReceiver receiver = connection.OpenReceiver("test-receiver",
options);
+ receiver.AddCredit(10);
+
+ Wait.WaitFor(() => receiver.QueuedDeliveries == 10);
+
+ peer.WaitForScriptToComplete();
+
+ for (int i = 0; i < 10; ++i)
+ {
+ IReceiver nextReceiver = connection.NextReceiver();
+ Assert.AreSame(receiver, nextReceiver);
+ IDelivery delivery = nextReceiver.Receive();
+ Assert.IsNotNull(delivery);
+ }
+
+ peer.WaitForScriptToComplete();
+ peer.ExpectClose().Respond();
+
+ connection.Close();
+
+ peer.WaitForScriptToComplete();
+ }
+ }
+
+ [Test]
+ public void TestNextReceiverTimesOut()
+ {
+ using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+ {
+ peer.ExpectSASLAnonymousConnect();
+ peer.ExpectOpen().Respond();
+ peer.ExpectBegin().Respond();
+ peer.ExpectAttach().OfReceiver().Respond();
+ peer.ExpectFlow().WithLinkCredit(10);
+ peer.Start();
+
+ string remoteAddress = peer.ServerAddress;
+ int remotePort = peer.ServerPort;
+
+ logger.LogInformation("Test started, peer listening on: {0}:{1}",
remoteAddress, remotePort);
+
+ IClient container = IClient.Create();
+ IConnection connection = container.Connect(remoteAddress,
remotePort);
+
+ _ = connection.OpenReceiver("test-receiver").OpenTask.Result;
+
+ peer.WaitForScriptToComplete();
+
+
Assert.IsNull(connection.NextReceiver(TimeSpan.FromMilliseconds(10)));
+
+ peer.WaitForScriptToComplete();
+ peer.ExpectClose().Respond();
+
+ connection.Close();
+
+ peer.WaitForScriptToComplete();
+ }
+ }
+
+ [Test]
+ public void TestNextReceiverReturnsAllReceiversEventually()
+ {
+ byte[] payload = CreateEncodedMessage(new AmqpValue("Hello World"));
+
+ using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+ {
+ peer.ExpectSASLAnonymousConnect();
+ peer.ExpectOpen().Respond();
+ peer.ExpectBegin().Respond();
+ peer.ExpectAttach().OfReceiver().Respond();
+ peer.ExpectFlow().WithLinkCredit(10);
+ peer.ExpectAttach().OfReceiver().Respond();
+ peer.ExpectFlow().WithLinkCredit(10);
+ peer.ExpectAttach().OfReceiver().Respond();
+ peer.ExpectFlow().WithLinkCredit(10);
+ peer.RemoteTransfer().WithHandle(0)
+ .WithDeliveryId(0)
+ .WithMore(false)
+ .WithMessageFormat(0)
+ .WithPayload(payload).Queue();
+ peer.RemoteTransfer().WithHandle(1)
+ .WithDeliveryId(1)
+ .WithMore(false)
+ .WithMessageFormat(0)
+ .WithPayload(payload).Queue();
+ peer.RemoteTransfer().WithHandle(2)
+ .WithDeliveryId(2)
+ .WithMore(false)
+ .WithMessageFormat(0)
+ .WithPayload(payload).Queue();
+ peer.Start();
+
+ string remoteAddress = peer.ServerAddress;
+ int remotePort = peer.ServerPort;
+
+ logger.LogInformation("Test started, peer listening on: {0}:{1}",
remoteAddress, remotePort);
+
+ IClient container = IClient.Create();
+ IConnection connection = container.Connect(remoteAddress,
remotePort);
+
+ ReceiverOptions options = new ReceiverOptions()
+ {
+ CreditWindow = 10,
+ AutoAccept = false
+ };
+
+ _ = connection.OpenReceiver("test-receiver1",
options).OpenTask.Result;
+ _ = connection.OpenReceiver("test-receiver2",
options).OpenTask.Result;
+ _ = connection.OpenReceiver("test-receiver3",
options).OpenTask.Result;
+
+ peer.WaitForScriptToComplete();
+
+ IReceiver receiver1 =
connection.NextReceiver(NextReceiverPolicy.FirstAvailable);
+ Assert.IsNotNull(receiver1.Receive());
+ IReceiver receiver2 =
connection.NextReceiver(NextReceiverPolicy.FirstAvailable);
+ Assert.IsNotNull(receiver2.Receive());
+ IReceiver receiver3 =
connection.NextReceiver(NextReceiverPolicy.FirstAvailable);
+ Assert.IsNotNull(receiver3.Receive());
+
+ Assert.AreNotSame(receiver1, receiver2);
+ Assert.AreNotSame(receiver1, receiver3);
+ Assert.AreNotSame(receiver2, receiver3);
+
+ peer.WaitForScriptToComplete();
+
+ peer.WaitForScriptToComplete();
+ peer.ExpectClose().Respond();
+
+ connection.Close();
+
+ peer.WaitForScriptToComplete();
+ }
+ }
+
+ [Test]
+ public void
TestConnectionOptionsConfiguresLargestBacklogNextReceiverPolicy()
+ {
+ byte[] payload = CreateEncodedMessage(new AmqpValue("Hello World"));
+
+ using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+ {
+ peer.ExpectSASLAnonymousConnect();
+ peer.ExpectOpen().Respond();
+ peer.ExpectBegin().Respond();
+ peer.ExpectAttach().OfReceiver().Respond();
+ peer.ExpectFlow().WithLinkCredit(10);
+ peer.ExpectAttach().OfReceiver().Respond();
+ peer.ExpectFlow().WithLinkCredit(10);
+ peer.ExpectAttach().OfReceiver().Respond();
+ peer.ExpectFlow().WithLinkCredit(10);
+ peer.RemoteTransfer().WithHandle(0)
+ .WithDeliveryId(0)
+ .WithMore(false)
+ .WithMessageFormat(0)
+ .WithPayload(payload).Queue();
+ peer.RemoteTransfer().WithHandle(1)
+ .WithDeliveryId(1)
+ .WithMore(false)
+ .WithMessageFormat(0)
+ .WithPayload(payload).Queue();
+ peer.RemoteTransfer().WithHandle(1)
+ .WithDeliveryId(2)
+ .WithMore(false)
+ .WithMessageFormat(0)
+ .WithPayload(payload).Queue();
+ peer.RemoteTransfer().WithHandle(2)
+ .WithDeliveryId(3)
+ .WithMore(false)
+ .WithMessageFormat(0)
+ .WithPayload(payload).Queue();
+ peer.Start();
+
+ string remoteAddress = peer.ServerAddress;
+ int remotePort = peer.ServerPort;
+
+ logger.LogInformation("Test started, peer listening on: {0}:{1}",
remoteAddress, remotePort);
+
+ IClient container = IClient.Create();
+ ConnectionOptions options = new ConnectionOptions()
+ {
+ DefaultNextReceiverPolicy = NextReceiverPolicy.LargestBacklog
+ };
+ IConnection connection = container.Connect(remoteAddress,
remotePort, options);
+
+ ReceiverOptions receiverOptions = new ReceiverOptions()
+ {
+ CreditWindow = 10,
+ AutoAccept = false
+ };
+ IReceiver receiver1 = connection.OpenReceiver("test-receiver1",
receiverOptions).OpenTask.Result;
+ IReceiver receiver2 = connection.OpenReceiver("test-receiver2",
receiverOptions).OpenTask.Result;
+ IReceiver receiver3 = connection.OpenReceiver("test-receiver3",
receiverOptions).OpenTask.Result;
+
+ peer.WaitForScriptToComplete();
+
+ Wait.WaitFor(() => receiver1.QueuedDeliveries == 1);
+ Wait.WaitFor(() => receiver2.QueuedDeliveries == 2);
+ Wait.WaitFor(() => receiver3.QueuedDeliveries == 1);
+
+ IReceiver next = connection.NextReceiver();
+ Assert.AreSame(next, receiver2);
+
+ peer.WaitForScriptToComplete();
+
+ peer.WaitForScriptToComplete();
+ peer.ExpectClose().Respond();
+
+ connection.Close();
+
+ peer.WaitForScriptToComplete();
+ }
+ }
+
+ [Test]
+ public void
TestSessionOptionsConfiguresLargestBacklogNextReceiverPolicy()
+ {
+ byte[] payload = CreateEncodedMessage(new AmqpValue("Hello World"));
+
+ using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+ {
+ peer.ExpectSASLAnonymousConnect();
+ peer.ExpectOpen().Respond();
+ peer.ExpectBegin().Respond();
+ peer.ExpectAttach().OfReceiver().Respond();
+ peer.ExpectFlow().WithLinkCredit(10);
+ peer.ExpectAttach().OfReceiver().Respond();
+ peer.ExpectFlow().WithLinkCredit(10);
+ peer.ExpectAttach().OfReceiver().Respond();
+ peer.ExpectFlow().WithLinkCredit(10);
+ peer.RemoteTransfer().WithHandle(0)
+ .WithDeliveryId(0)
+ .WithMore(false)
+ .WithMessageFormat(0)
+ .WithPayload(payload).Queue();
+ peer.RemoteTransfer().WithHandle(1)
+ .WithDeliveryId(1)
+ .WithMore(false)
+ .WithMessageFormat(0)
+ .WithPayload(payload).Queue();
+ peer.RemoteTransfer().WithHandle(1)
+ .WithDeliveryId(2)
+ .WithMore(false)
+ .WithMessageFormat(0)
+ .WithPayload(payload).Queue();
+ peer.RemoteTransfer().WithHandle(2)
+ .WithDeliveryId(3)
+ .WithMore(false)
+ .WithMessageFormat(0)
+ .WithPayload(payload).Queue();
+ peer.Start();
+
+ string remoteAddress = peer.ServerAddress;
+ int remotePort = peer.ServerPort;
+
+ logger.LogInformation("Test started, peer listening on: {0}:{1}",
remoteAddress, remotePort);
+
+ IClient container = IClient.Create();
+ ConnectionOptions options = new ConnectionOptions()
+ {
+ DefaultNextReceiverPolicy = NextReceiverPolicy.Random
+ };
+ IConnection connection = container.Connect(remoteAddress,
remotePort, options);
+ SessionOptions sessionOptions = new SessionOptions()
+ {
+ DefaultNextReceiverPolicy = NextReceiverPolicy.LargestBacklog
+ };
+ ISession session = connection.OpenSession(sessionOptions);
+
+ ReceiverOptions receiverOptions = new ReceiverOptions()
+ {
+ CreditWindow = 10,
+ AutoAccept = false
+ };
+ IReceiver receiver1 = session.OpenReceiver("test-receiver1",
receiverOptions).OpenTask.Result;
+ IReceiver receiver2 = session.OpenReceiver("test-receiver2",
receiverOptions).OpenTask.Result;
+ IReceiver receiver3 = session.OpenReceiver("test-receiver3",
receiverOptions).OpenTask.Result;
+
+ peer.WaitForScriptToComplete();
+
+ Wait.WaitFor(() => receiver1.QueuedDeliveries == 1);
+ Wait.WaitFor(() => receiver2.QueuedDeliveries == 2);
+ Wait.WaitFor(() => receiver3.QueuedDeliveries == 1);
+
+ IReceiver next = session.NextReceiver();
+ Assert.AreSame(next, receiver2);
+
+ peer.WaitForScriptToComplete();
+
+ peer.WaitForScriptToComplete();
+ peer.ExpectClose().Respond();
+
+ connection.Close();
+
+ peer.WaitForScriptToComplete();
+ }
+ }
+
+ [Test]
+ public void TestUserSpecifiedNextReceiverPolicyOverridesConfiguration()
+ {
+ byte[] payload = CreateEncodedMessage(new AmqpValue("Hello World"));
+
+ using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+ {
+ peer.ExpectSASLAnonymousConnect();
+ peer.ExpectOpen().Respond();
+ peer.ExpectBegin().Respond();
+ peer.ExpectAttach().OfReceiver().Respond();
+ peer.ExpectFlow().WithLinkCredit(10);
+ peer.ExpectAttach().OfReceiver().Respond();
+ peer.ExpectFlow().WithLinkCredit(10);
+ peer.ExpectAttach().OfReceiver().Respond();
+ peer.ExpectFlow().WithLinkCredit(10);
+ peer.RemoteTransfer().WithHandle(0)
+ .WithDeliveryId(0)
+ .WithMore(false)
+ .WithMessageFormat(0)
+ .WithPayload(payload).Queue();
+ peer.RemoteTransfer().WithHandle(1)
+ .WithDeliveryId(1)
+ .WithMore(false)
+ .WithMessageFormat(0)
+ .WithPayload(payload).Queue();
+ peer.RemoteTransfer().WithHandle(1)
+ .WithDeliveryId(2)
+ .WithMore(false)
+ .WithMessageFormat(0)
+ .WithPayload(payload).Queue();
+ peer.RemoteTransfer().WithHandle(2)
+ .WithDeliveryId(3)
+ .WithMore(false)
+ .WithMessageFormat(0)
+ .WithPayload(payload).Queue();
+ peer.Start();
+
+ string remoteAddress = peer.ServerAddress;
+ int remotePort = peer.ServerPort;
+
+ logger.LogInformation("Test started, peer listening on: {0}:{1}",
remoteAddress, remotePort);
+
+ IClient container = IClient.Create();
+ ConnectionOptions options = new ConnectionOptions()
+ {
+ DefaultNextReceiverPolicy = NextReceiverPolicy.Random
+ };
+ IConnection connection = container.Connect(remoteAddress,
remotePort, options);
+ SessionOptions sessionOptions = new SessionOptions()
+ {
+ DefaultNextReceiverPolicy = NextReceiverPolicy.SmallestBacklog
+ };
+ ISession session = connection.OpenSession(sessionOptions);
+
+ ReceiverOptions receiverOptions = new ReceiverOptions()
+ {
+ CreditWindow = 10,
+ AutoAccept = false
+ };
+ IReceiver receiver1 = session.OpenReceiver("test-receiver1",
receiverOptions).OpenTask.Result;
+ IReceiver receiver2 = session.OpenReceiver("test-receiver2",
receiverOptions).OpenTask.Result;
+ IReceiver receiver3 = session.OpenReceiver("test-receiver3",
receiverOptions).OpenTask.Result;
+
+ peer.WaitForScriptToComplete();
+
+ Wait.WaitFor(() => receiver1.QueuedDeliveries == 1);
+ Wait.WaitFor(() => receiver2.QueuedDeliveries == 2);
+ Wait.WaitFor(() => receiver3.QueuedDeliveries == 1);
+
+ IReceiver next =
session.NextReceiver(NextReceiverPolicy.LargestBacklog);
+ Assert.AreSame(next, receiver2);
+
+ peer.WaitForScriptToComplete();
+
+ peer.WaitForScriptToComplete();
+ peer.ExpectClose().Respond();
+
+ connection.Close();
+
+ peer.WaitForScriptToComplete();
+ }
+ }
+
+ [Test]
+ public void
TestSessionOptionsConfiguresSmallestBacklogNextReceiverPolicy()
+ {
+ byte[] payload = CreateEncodedMessage(new AmqpValue("Hello World"));
+
+ using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+ {
+ peer.ExpectSASLAnonymousConnect();
+ peer.ExpectOpen().Respond();
+ peer.ExpectBegin().Respond();
+ peer.ExpectAttach().OfReceiver().Respond();
+ peer.ExpectFlow().WithLinkCredit(10);
+ peer.ExpectAttach().OfReceiver().Respond();
+ peer.ExpectFlow().WithLinkCredit(10);
+ peer.ExpectAttach().OfReceiver().Respond();
+ peer.ExpectFlow().WithLinkCredit(10);
+ peer.RemoteTransfer().WithHandle(0)
+ .WithDeliveryId(0)
+ .WithMore(false)
+ .WithMessageFormat(0)
+ .WithPayload(payload).Queue();
+ peer.RemoteTransfer().WithHandle(0)
+ .WithDeliveryId(1)
+ .WithMore(false)
+ .WithMessageFormat(0)
+ .WithPayload(payload).Queue();
+ peer.RemoteTransfer().WithHandle(0)
+ .WithDeliveryId(2)
+ .WithMore(false)
+ .WithMessageFormat(0)
+ .WithPayload(payload).Queue();
+ peer.RemoteTransfer().WithHandle(1)
+ .WithDeliveryId(3)
+ .WithMore(false)
+ .WithMessageFormat(0)
+ .WithPayload(payload).Queue();
+ peer.RemoteTransfer().WithHandle(1)
+ .WithDeliveryId(4)
+ .WithMore(false)
+ .WithMessageFormat(0)
+ .WithPayload(payload).Queue();
+ peer.RemoteTransfer().WithHandle(2)
+ .WithDeliveryId(5)
+ .WithMore(false)
+ .WithMessageFormat(0)
+ .WithPayload(payload).Queue();
+ peer.Start();
+
+ string remoteAddress = peer.ServerAddress;
+ int remotePort = peer.ServerPort;
+
+ logger.LogInformation("Test started, peer listening on: {0}:{1}",
remoteAddress, remotePort);
+
+ IClient container = IClient.Create();
+ ConnectionOptions options = new ConnectionOptions()
+ {
+ DefaultNextReceiverPolicy = NextReceiverPolicy.Random
+ };
+ IConnection connection = container.Connect(remoteAddress,
remotePort, options);
+ SessionOptions sessionOptions = new SessionOptions()
+ {
+ DefaultNextReceiverPolicy = NextReceiverPolicy.SmallestBacklog
+ };
+ ISession session = connection.OpenSession(sessionOptions);
+
+ ReceiverOptions receiverOptions = new ReceiverOptions()
+ {
+ CreditWindow = 10,
+ AutoAccept = false
+ };
+ IReceiver receiver1 = session.OpenReceiver("test-receiver1",
receiverOptions).OpenTask.Result;
+ IReceiver receiver2 = session.OpenReceiver("test-receiver2",
receiverOptions).OpenTask.Result;
+ IReceiver receiver3 = session.OpenReceiver("test-receiver3",
receiverOptions).OpenTask.Result;
+
+ peer.WaitForScriptToComplete();
+
+ Wait.WaitFor(() => receiver1.QueuedDeliveries == 3);
+ Wait.WaitFor(() => receiver2.QueuedDeliveries == 2);
+ Wait.WaitFor(() => receiver3.QueuedDeliveries == 1);
+
+ IReceiver next = session.NextReceiver();
+ Assert.AreSame(next, receiver3);
+
+ peer.WaitForScriptToComplete();
+
+ peer.WaitForScriptToComplete();
+ peer.ExpectClose().Respond();
+
+ connection.Close();
+
+ peer.WaitForScriptToComplete();
+ }
+ }
+
+ [Test]
+ public void TestNextReceiverCompletesAfterDeliveryArrivesRoundRobin()
+ {
+
DoTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy.RoundRobin);
+ }
+
+ [Test]
+ public void TestNextReceiverCompletesAfterDeliveryArrivesRandom()
+ {
+
DoTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy.Random);
+ }
+
+ [Test]
+ public void TestNextReceiverCompletesAfterDeliveryArrivesLargestBacklog()
+ {
+
DoTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy.LargestBacklog);
+ }
+
+ [Test]
+ public void
TestNextReceiverCompletesAfterDeliveryArrivesSmallestBacklog()
+ {
+
DoTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy.SmallestBacklog);
+ }
+
+ [Test]
+ public void TestNextReceiverCompletesAfterDeliveryArrivesFirstAvailable()
+ {
+
DoTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy.FirstAvailable);
+ }
+
+ public void
DoTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy policy)
+ {
+ byte[] payload = CreateEncodedMessage(new AmqpValue("Hello World"));
+
+ using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+ {
+ peer.ExpectSASLAnonymousConnect();
+ peer.ExpectOpen().Respond();
+ peer.ExpectBegin().Respond();
+ peer.ExpectAttach().OfReceiver().Respond();
+ peer.ExpectFlow().WithLinkCredit(10);
+ peer.Start();
+
+ CountdownEvent done = new CountdownEvent(1);
+
+ string remoteAddress = peer.ServerAddress;
+ int remotePort = peer.ServerPort;
+
+ logger.LogInformation("Test started, peer listening on: {0}:{1}",
remoteAddress, remotePort);
+
+ IClient container = IClient.Create();
+ ConnectionOptions options = new ConnectionOptions()
+ {
+ DefaultNextReceiverPolicy = policy
+ };
+ IConnection connection = container.Connect(remoteAddress,
remotePort, options);
+
+ ReceiverOptions receiverOptions = new ReceiverOptions()
+ {
+ CreditWindow = 10,
+ AutoAccept = false
+ };
+ _ = connection.OpenReceiver("test-receiver1",
receiverOptions).OpenTask.Result;
+
+ peer.WaitForScriptToComplete();
+
+ Task.Run(() =>
+ {
+ try
+ {
+ IReceiver receiver = connection.NextReceiver();
+ IDelivery delivery = receiver.Receive();
+ logger.LogInformation("Next receiver returned delivery with
body: {0}", delivery.Message().Body);
+ done.Signal();
+ }
+ catch (Exception e)
+ {
+ logger.LogDebug("Failed in next receiver task: {0}", e);
+ }
+ });
+
+ peer.RemoteTransfer().WithHandle(0)
+ .WithDeliveryId(0)
+ .WithMore(false)
+ .WithMessageFormat(0)
+ .WithPayload(payload).Later(15);
+
+ peer.WaitForScriptToComplete();
+
+ Assert.IsTrue(done.Wait(TimeSpan.FromSeconds(10)));
+
+ peer.WaitForScriptToComplete();
+ peer.ExpectClose().Respond();
+
+ connection.Close();
+
+ peer.WaitForScriptToComplete();
+ }
+ }
+
+ [Test]
+ public void TestNextReceiverThrowsAfterSessionClosedRoundRobin()
+ {
+
DoTestNextReceiverThrowsAfterSessionClosed(NextReceiverPolicy.RoundRobin);
+ }
+
+ [Test]
+ public void TestNextReceiverThrowsAfterSessionClosedRandom()
+ {
+ DoTestNextReceiverThrowsAfterSessionClosed(NextReceiverPolicy.Random);
+ }
+
+ [Test]
+ public void TestNextReceiverThrowsAfterSessionClosedLargestBacklog()
+ {
+
DoTestNextReceiverThrowsAfterSessionClosed(NextReceiverPolicy.LargestBacklog);
+ }
+
+ [Test]
+ public void TestNextReceiverThrowsAfterSessionClosedSmallestBacklog()
+ {
+
DoTestNextReceiverThrowsAfterSessionClosed(NextReceiverPolicy.SmallestBacklog);
+ }
+
+ [Test]
+ public void TestNextReceiverThrowsAfterSessionClosedFirstAvailable()
+ {
+
DoTestNextReceiverThrowsAfterSessionClosed(NextReceiverPolicy.FirstAvailable);
+ }
+
+ public void
DoTestNextReceiverThrowsAfterSessionClosed(NextReceiverPolicy policy)
+ {
+ using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+ {
+ peer.ExpectSASLAnonymousConnect();
+ peer.ExpectOpen().Respond();
+ peer.ExpectBegin().Respond();
+ peer.Start();
+
+ CountdownEvent started = new CountdownEvent(1);
+ CountdownEvent done = new CountdownEvent(1);
+ Exception error = null;
+
+ string remoteAddress = peer.ServerAddress;
+ int remotePort = peer.ServerPort;
+
+ logger.LogInformation("Test started, peer listening on: {0}:{1}",
remoteAddress, remotePort);
+
+ IClient container = IClient.Create();
+
+ ConnectionOptions options = new ConnectionOptions()
+ {
+ DefaultNextReceiverPolicy = policy
+ };
+ IConnection connection = container.Connect(remoteAddress,
remotePort, options);
+ ISession session = connection.OpenSession().OpenTask.Result;
+
+ Task.Run(() =>
+ {
+ try
+ {
+ started.Signal();
+ session.NextReceiver();
+ }
+ catch (ClientException e)
+ {
+ error = e;
+ }
+ finally
+ {
+ done.Signal();
+ }
+ });
+
+ peer.WaitForScriptToComplete();
+
+ Assert.IsTrue(started.Wait(TimeSpan.FromSeconds(10)));
+
+ peer.ExpectEnd().Respond();
+
+ session.CloseAsync();
+
+ Assert.IsTrue(done.Wait(TimeSpan.FromSeconds(10)));
+ Assert.IsTrue(error is ClientIllegalStateException);
+
+ peer.WaitForScriptToComplete();
+ peer.ExpectClose().Respond();
+
+ connection.Close();
+
+ peer.WaitForScriptToComplete();
+ }
+ }
+
+ [Test]
+ public void
TestNextReceiverCompletesWhenCalledBeforeReceiverCreateRoundRobin()
+ {
+
DoTestNextReceiverCompletesWhenCalledBeforeReceiverCreate(NextReceiverPolicy.RoundRobin);
+ }
+
+ [Test]
+ public void
TestNextReceiverCompletesWhenCalledBeforeReceiverCreateRandom()
+ {
+
DoTestNextReceiverCompletesWhenCalledBeforeReceiverCreate(NextReceiverPolicy.Random);
+ }
+
+ [Test]
+ public void
TestNextReceiverCompletesWhenCalledBeforeReceiverCreateLargestBacklog()
+ {
+
DoTestNextReceiverCompletesWhenCalledBeforeReceiverCreate(NextReceiverPolicy.LargestBacklog);
+ }
+
+ [Test]
+ public void
TestNextReceiverCompletesWhenCalledBeforeReceiverCreateSmallestBacklog()
+ {
+
DoTestNextReceiverCompletesWhenCalledBeforeReceiverCreate(NextReceiverPolicy.SmallestBacklog);
+ }
+
+ [Test]
+ public void
TestNextReceiverCompletesWhenCalledBeforeReceiverCreateFirstAvailable()
+ {
+
DoTestNextReceiverCompletesWhenCalledBeforeReceiverCreate(NextReceiverPolicy.FirstAvailable);
+ }
+
+ public void
DoTestNextReceiverCompletesWhenCalledBeforeReceiverCreate(NextReceiverPolicy
policy)
+ {
+ byte[] payload = CreateEncodedMessage(new AmqpValue("Hello World"));
+
+ using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+ {
+ peer.ExpectSASLAnonymousConnect();
+ peer.ExpectOpen().Respond();
+ peer.ExpectBegin().Respond();
+ peer.Start();
+
+ CountdownEvent started = new CountdownEvent(1);
+ CountdownEvent done = new CountdownEvent(1);
+
+ string remoteAddress = peer.ServerAddress;
+ int remotePort = peer.ServerPort;
+
+ logger.LogInformation("Test started, peer listening on: {0}:{1}",
remoteAddress, remotePort);
+
+ IClient container = IClient.Create();
+ ConnectionOptions options = new ConnectionOptions()
+ {
+ DefaultNextReceiverPolicy = policy
+ };
+ IConnection connection = container.Connect(remoteAddress,
remotePort, options);
+
+ Task.Run(() =>
+ {
+ try
+ {
+ started.Signal();
+ IReceiver receiver = connection.NextReceiver();
+ IDelivery delivery = receiver.Receive();
+ logger.LogInformation("Next receiver returned delivery with
body: {0}", delivery.Message().Body);
+ done.Signal();
+ }
+ catch (Exception e)
+ {
+ logger.LogDebug("Failed in next receiver task: {0}", e);
+ }
+ });
+
+ Assert.IsTrue(started.Wait(TimeSpan.FromSeconds(10)));
+
+ _ = connection.OpenTask.Result;
+
+ peer.WaitForScriptToComplete();
+ peer.ExpectAttach().OfReceiver().Respond();
+ peer.ExpectFlow().WithLinkCredit(10);
+ peer.RemoteTransfer().WithHandle(0)
+ .WithDeliveryId(0)
+ .WithMore(false)
+ .WithMessageFormat(0)
+ .WithPayload(payload).Queue().AfterDelay(10);
+
+ ReceiverOptions receiverOptions = new ReceiverOptions()
+ {
+ CreditWindow = 10,
+ AutoAccept = false
+ };
+ _ = connection.OpenReceiver("test-receiver1",
receiverOptions).OpenTask.Result;
+
+ peer.WaitForScriptToComplete();
+
+ Assert.IsTrue(done.Wait(TimeSpan.FromSeconds(10)));
+
+ peer.WaitForScriptToComplete();
+ peer.ExpectClose().Respond();
+
+ connection.Close();
+
+ peer.WaitForScriptToComplete();
+ }
+ }
+
+ [Test]
+ public void TestNextReceiverRoundRobinReturnsNextReceiverAfterLast()
+ {
+ byte[] payload = CreateEncodedMessage(new AmqpValue("Hello World"));
+
+ using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+ {
+ peer.ExpectSASLAnonymousConnect();
+ peer.ExpectOpen().Respond();
+ peer.ExpectBegin().Respond();
+ peer.ExpectAttach().OfReceiver().Respond();
+ peer.ExpectFlow().WithLinkCredit(10);
+ peer.ExpectAttach().OfReceiver().Respond();
+ peer.ExpectFlow().WithLinkCredit(10);
+ peer.ExpectAttach().OfReceiver().Respond();
+ peer.ExpectFlow().WithLinkCredit(10);
+ peer.RemoteTransfer().WithHandle(1)
+ .WithDeliveryId(0)
+ .WithMore(false)
+ .WithMessageFormat(0)
+ .WithPayload(payload).Queue();
+ peer.RemoteTransfer().WithHandle(1)
+ .WithDeliveryId(1)
+ .WithMore(false)
+ .WithMessageFormat(0)
+ .WithPayload(payload).Queue();
+ peer.RemoteTransfer().WithHandle(2)
+ .WithDeliveryId(2)
+ .WithMore(false)
+ .WithMessageFormat(0)
+ .WithPayload(payload).Queue();
+ peer.Start();
+
+ string remoteAddress = peer.ServerAddress;
+ int remotePort = peer.ServerPort;
+
+ logger.LogInformation("Test started, peer listening on: {0}:{1}",
remoteAddress, remotePort);
+
+ IClient container = IClient.Create();
+ ConnectionOptions options = new ConnectionOptions()
+ {
+ DefaultNextReceiverPolicy = NextReceiverPolicy.Random
+ };
+ IConnection connection = container.Connect(remoteAddress,
remotePort, options);
+ SessionOptions sessionOptions = new SessionOptions()
+ {
+ DefaultNextReceiverPolicy = NextReceiverPolicy.RoundRobin
+ };
+ ISession session = connection.OpenSession(sessionOptions);
+
+ ReceiverOptions receiverOptions = new ReceiverOptions()
+ {
+ CreditWindow = 10,
+ AutoAccept = false
+ };
+ IReceiver receiver1 = session.OpenReceiver("test-receiver1",
receiverOptions).OpenTask.Result;
+ IReceiver receiver2 = session.OpenReceiver("test-receiver2",
receiverOptions).OpenTask.Result;
+ IReceiver receiver3 = session.OpenReceiver("test-receiver3",
receiverOptions).OpenTask.Result;
+
+ peer.WaitForScriptToComplete();
+
+ Wait.WaitFor(() => receiver2.QueuedDeliveries == 2);
+ Wait.WaitFor(() => receiver3.QueuedDeliveries == 1);
+
+ Assert.AreEqual(0, receiver1.QueuedDeliveries);
+
+ IReceiver next = session.NextReceiver();
+ Assert.AreSame(next, receiver2);
+ next = session.NextReceiver();
+ Assert.AreSame(next, receiver3);
+
+ peer.WaitForScriptToComplete();
+
+ peer.WaitForScriptToComplete();
+ peer.ExpectClose().Respond();
+
+ connection.Close();
+
+ peer.WaitForScriptToComplete();
+ }
+ }
+
+ [Test]
+ public void TestNextReceiverRoundRobinPolicyWrapsAround()
+ {
+ byte[] payload = CreateEncodedMessage(new AmqpValue("Hello World"));
+
+ using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+ {
+ peer.ExpectSASLAnonymousConnect();
+ peer.ExpectOpen().Respond();
+ peer.ExpectBegin().Respond();
+ peer.ExpectAttach().OfReceiver().Respond();
+ peer.ExpectFlow().WithLinkCredit(10);
+ peer.ExpectAttach().OfReceiver().Respond();
+ peer.ExpectFlow().WithLinkCredit(10);
+ peer.ExpectAttach().OfReceiver().Respond();
+ peer.ExpectFlow().WithLinkCredit(10);
+ peer.RemoteTransfer().WithHandle(1)
+ .WithDeliveryId(0)
+ .WithMore(false)
+ .WithMessageFormat(0)
+ .WithPayload(payload).Queue();
+ peer.RemoteTransfer().WithHandle(1)
+ .WithDeliveryId(1)
+ .WithMore(false)
+ .WithMessageFormat(0)
+ .WithPayload(payload).Queue();
+ peer.RemoteTransfer().WithHandle(2)
+ .WithDeliveryId(2)
+ .WithMore(false)
+ .WithMessageFormat(0)
+ .WithPayload(payload).Queue();
+ peer.Start();
+
+ string remoteAddress = peer.ServerAddress;
+ int remotePort = peer.ServerPort;
+
+ logger.LogInformation("Test started, peer listening on: {0}:{1}",
remoteAddress, remotePort);
+
+ IClient container = IClient.Create();
+ ConnectionOptions options = new ConnectionOptions()
+ {
+ DefaultNextReceiverPolicy = NextReceiverPolicy.Random
+ };
+ IConnection connection = container.Connect(remoteAddress,
remotePort, options);
+ SessionOptions sessionOptions = new SessionOptions()
+ {
+ DefaultNextReceiverPolicy = NextReceiverPolicy.RoundRobin
+ };
+ ISession session = connection.OpenSession(sessionOptions);
+
+ ReceiverOptions receiverOptions = new ReceiverOptions()
+ {
+ CreditWindow = 10,
+ AutoAccept = false
+ };
+ IReceiver receiver1 = session.OpenReceiver("test-receiver1",
receiverOptions).OpenTask.Result;
+ IReceiver receiver2 = session.OpenReceiver("test-receiver2",
receiverOptions).OpenTask.Result;
+ IReceiver receiver3 = session.OpenReceiver("test-receiver3",
receiverOptions).OpenTask.Result;
+
+ peer.WaitForScriptToComplete();
+
+ Wait.WaitFor(() => receiver2.QueuedDeliveries == 2);
+ Wait.WaitFor(() => receiver3.QueuedDeliveries == 1);
+
+ Assert.AreEqual(0, receiver1.QueuedDeliveries);
+
+ IReceiver next = session.NextReceiver();
+ Assert.AreSame(next, receiver2);
+ next = session.NextReceiver();
+ Assert.AreSame(next, receiver3);
+
+ peer.WaitForScriptToComplete();
+
+ peer.RemoteTransfer().WithHandle(0)
+ .WithDeliveryId(3)
+ .WithMore(false)
+ .WithMessageFormat(0)
+ .WithPayload(payload).Now();
+
+ Wait.WaitFor(() => receiver1.QueuedDeliveries == 1);
+
+ next = session.NextReceiver();
+ Assert.AreSame(next, receiver1);
+
+ peer.WaitForScriptToComplete();
+ peer.ExpectClose().Respond();
+
+ connection.Close();
+
+ peer.WaitForScriptToComplete();
+ }
+ }
+
+ [Test]
+ public void
TestNextReceiverRoundRobinPolicyRestartsWhenLastReceiverClosed()
+ {
+ byte[] payload = CreateEncodedMessage(new AmqpValue("Hello World"));
+
+ using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+ {
+ peer.ExpectSASLAnonymousConnect();
+ peer.ExpectOpen().Respond();
+ peer.ExpectBegin().Respond();
+ peer.ExpectAttach().OfReceiver().Respond();
+ peer.ExpectFlow().WithLinkCredit(10);
+ peer.ExpectAttach().OfReceiver().Respond();
+ peer.ExpectFlow().WithLinkCredit(10);
+ peer.ExpectAttach().OfReceiver().Respond();
+ peer.ExpectFlow().WithLinkCredit(10);
+ peer.RemoteTransfer().WithHandle(1)
+ .WithDeliveryId(0)
+ .WithMore(false)
+ .WithMessageFormat(0)
+ .WithPayload(payload).Queue();
+ peer.RemoteTransfer().WithHandle(1)
+ .WithDeliveryId(1)
+ .WithMore(false)
+ .WithMessageFormat(0)
+ .WithPayload(payload).Queue();
+ peer.RemoteTransfer().WithHandle(2)
+ .WithDeliveryId(2)
+ .WithMore(false)
+ .WithMessageFormat(0)
+ .WithPayload(payload).Queue();
+ peer.Start();
+
+ string remoteAddress = peer.ServerAddress;
+ int remotePort = peer.ServerPort;
+
+ logger.LogInformation("Test started, peer listening on: {0}:{1}",
remoteAddress, remotePort);
+
+ IClient container = IClient.Create();
+ ConnectionOptions options = new ConnectionOptions()
+ {
+ DefaultNextReceiverPolicy = NextReceiverPolicy.Random
+ };
+ IConnection connection = container.Connect(remoteAddress,
remotePort, options);
+ SessionOptions sessionOptions = new SessionOptions()
+ {
+ DefaultNextReceiverPolicy = NextReceiverPolicy.RoundRobin
+ };
+ ISession session = connection.OpenSession(sessionOptions);
+
+ ReceiverOptions receiverOptions = new ReceiverOptions()
+ {
+ CreditWindow = 10,
+ AutoAccept = false
+ };
+ IReceiver receiver1 = session.OpenReceiver("test-receiver1",
receiverOptions).OpenTask.Result;
+ IReceiver receiver2 = session.OpenReceiver("test-receiver2",
receiverOptions).OpenTask.Result;
+ IReceiver receiver3 = session.OpenReceiver("test-receiver3",
receiverOptions).OpenTask.Result;
+
+ peer.WaitForScriptToComplete();
+ peer.ExpectDetach().Respond();
+
+ Wait.WaitFor(() => receiver2.QueuedDeliveries == 2);
+ Wait.WaitFor(() => receiver3.QueuedDeliveries == 1);
+
+ Assert.AreEqual(0, receiver1.QueuedDeliveries);
+
+ IReceiver next = session.NextReceiver();
+ Assert.AreSame(next, receiver2);
+ next.Close();
+
+ peer.WaitForScriptToComplete();
+
+ peer.RemoteTransfer().WithHandle(0)
+ .WithDeliveryId(3)
+ .WithMore(false)
+ .WithMessageFormat(0)
+ .WithPayload(payload).Now();
+
+ Wait.WaitFor(() => receiver1.QueuedDeliveries == 1);
+
+ next = session.NextReceiver();
+ Assert.AreSame(next, receiver1);
+
+ peer.WaitForScriptToComplete();
+ peer.ExpectClose().Respond();
+
+ connection.Close();
+
+ peer.WaitForScriptToComplete();
+ }
+ }
+
+ [Test]
+ public void TestNextReceiverRoundRobinPolicySkipsEmptyReceivers() {
+ byte[] payload = CreateEncodedMessage(new AmqpValue("Hello World"));
+
+ using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+ {
+ peer.ExpectSASLAnonymousConnect();
+ peer.ExpectOpen().Respond();
+ peer.ExpectBegin().Respond();
+ peer.ExpectAttach().OfReceiver().Respond();
+ peer.ExpectFlow().WithLinkCredit(10);
+ peer.ExpectAttach().OfReceiver().Respond();
+ peer.ExpectFlow().WithLinkCredit(10);
+ peer.ExpectAttach().OfReceiver().Respond();
+ peer.ExpectFlow().WithLinkCredit(10);
+ peer.ExpectAttach().OfReceiver().Respond();
+ peer.ExpectFlow().WithLinkCredit(10);
+ peer.RemoteTransfer().WithHandle(0)
+ .WithDeliveryId(0)
+ .WithMore(false)
+ .WithMessageFormat(0)
+ .WithPayload(payload).Queue();
+ peer.RemoteTransfer().WithHandle(3)
+ .WithDeliveryId(1)
+ .WithMore(false)
+ .WithMessageFormat(0)
+ .WithPayload(payload).Queue();
+ peer.Start();
+
+ string remoteAddress = peer.ServerAddress;
+ int remotePort = peer.ServerPort;
+
+ logger.LogInformation("Test started, peer listening on: {0}:{1}",
remoteAddress, remotePort);
+
+ IClient container = IClient.Create();
+ ConnectionOptions options = new ConnectionOptions()
+ {
+ DefaultNextReceiverPolicy = NextReceiverPolicy.Random
+ };
+ IConnection connection = container.Connect(remoteAddress,
remotePort, options);
+ SessionOptions sessionOptions = new SessionOptions()
+ {
+ DefaultNextReceiverPolicy = NextReceiverPolicy.RoundRobin
+ };
+ ISession session = connection.OpenSession(sessionOptions);
+
+ ReceiverOptions receiverOptions = new ReceiverOptions()
+ {
+ CreditWindow = 10,
+ AutoAccept = false
+ };
+ IReceiver receiver1 = session.OpenReceiver("test-receiver1",
receiverOptions).OpenTask.Result;
+ IReceiver receiver2 = session.OpenReceiver("test-receiver2",
receiverOptions).OpenTask.Result;
+ IReceiver receiver3 = session.OpenReceiver("test-receiver3",
receiverOptions).OpenTask.Result;
+ IReceiver receiver4 = session.OpenReceiver("test-receiver4",
receiverOptions).OpenTask.Result;
+
+ peer.WaitForScriptToComplete();
+
+ Wait.WaitFor(() => receiver1.QueuedDeliveries == 1);
+ Wait.WaitFor(() => receiver4.QueuedDeliveries == 1);
+
+ Assert.AreEqual(0, receiver2.QueuedDeliveries);
+ Assert.AreEqual(0, receiver3.QueuedDeliveries);
+
+ IReceiver next = session.NextReceiver();
+ Assert.AreSame(next, receiver1);
+ next = session.NextReceiver();
+ Assert.AreSame(next, receiver4);
+
+ peer.WaitForScriptToComplete();
+ peer.ExpectClose().Respond();
+
+ connection.Close();
+
+ peer.WaitForScriptToComplete();
+ }
+ }
+
}
}
\ No newline at end of file
diff --git a/test/Proton.Tests/Engine/Implementation/ProtonSessionTest.cs
b/test/Proton.Tests/Engine/Implementation/ProtonSessionTest.cs
index ae95839..805814d 100644
--- a/test/Proton.Tests/Engine/Implementation/ProtonSessionTest.cs
+++ b/test/Proton.Tests/Engine/Implementation/ProtonSessionTest.cs
@@ -2856,5 +2856,43 @@ namespace Apache.Qpid.Proton.Engine.Implementation
Assert.IsNotNull(failure);
Assert.IsTrue(failure is ProtocolViolationException);
}
+
+ [Test]
+ public void TestSessionWideDeliveryMonitoringHandler()
+ {
+ IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
+ engine.ErrorHandler((error) => failure = error.FailureCause);
+ ProtonTestConnector peer = CreateTestPeer(engine);
+
+ bool deliveryReadByReceiver = false;
+ bool deliveryReadBySession = false;
+
+ peer.ExpectAMQPHeader().RespondWithAMQPHeader();
+ peer.ExpectOpen().Respond().WithContainerId("driver");
+ peer.ExpectBegin().Respond();
+ peer.ExpectAttach().OfReceiver().Respond();
+ peer.ExpectFlow().WithLinkCredit(1);
+ peer.RemoteTransfer().WithHandle(0)
+ .WithDeliveryId(0)
+ .WithDeliveryTag(new byte[] { 1 })
+ .OnChannel(0)
+ .Queue();
+
+ IConnection connection = engine.Start().Open();
+ ISession session = connection.Session().Open();
+
+ session.DeliveryReadHandler((delivery) => deliveryReadBySession =
true);
+
+ IReceiver receiver = session.Receiver("test");
+ receiver.DeliveryReadHandler((delivery) => deliveryReadByReceiver =
true);
+ receiver.Open().AddCredit(1);
+
+ peer.WaitForScriptToComplete();
+
+ Assert.IsTrue(deliveryReadByReceiver);
+ Assert.IsTrue(deliveryReadBySession);
+
+ Assert.IsNull(failure);
+ }
}
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]