This is an automated email from the ASF dual-hosted git repository. Havret pushed a commit to branch move-amqp-transport-settings-to-provider in repository https://gitbox.apache.org/repos/asf/activemq-nms-amqp.git
commit bb28bc9a245035a96e0ba90be68e65e940c9ab13 Author: Havret <[email protected]> AuthorDate: Wed Jun 3 00:47:57 2026 +0200 Move AMQP transport settings from NmsConnectionInfo to AmqpProvider ChannelMax, MaxFrameSize, and IdleTimeout are AMQP transport-level settings and belong on AmqpProvider rather than NmsConnectionInfo. Defaults are now initialized once via a static constructor on AmqpProvider by reading from AmqpNetLite's ConnectionFactory. --- src/NMS.AMQP/Meta/NmsConnectionInfo.cs | 14 ---- src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs | 8 +-- src/NMS.AMQP/Provider/Amqp/AmqpProvider.cs | 44 ++++++++----- .../Integration/ConnectionIntegrationTest.cs | 64 ++++++++++++++++++ .../Integration/IntegrationTestFixture.cs | 2 +- .../Provider/Amqp/AmqpConnectionTest.cs | 76 ---------------------- .../Provider/Amqp/AmqpProviderFactoryTest.cs | 13 +++- test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs | 10 ++- 8 files changed, 115 insertions(+), 116 deletions(-) diff --git a/src/NMS.AMQP/Meta/NmsConnectionInfo.cs b/src/NMS.AMQP/Meta/NmsConnectionInfo.cs index f9120c6..8d0d8e2 100644 --- a/src/NMS.AMQP/Meta/NmsConnectionInfo.cs +++ b/src/NMS.AMQP/Meta/NmsConnectionInfo.cs @@ -32,9 +32,6 @@ namespace Apache.NMS.AMQP.Meta public static readonly long DEFAULT_CLOSE_TIMEOUT = 60000; public static readonly long DEFAULT_SEND_TIMEOUT = INFINITE; public static readonly long DEFAULT_REQUEST_TIMEOUT = INFINITE; - public static readonly int DEFAULT_IDLE_TIMEOUT; - public static readonly ushort DEFAULT_CHANNEL_MAX; - public static readonly int DEFAULT_MAX_FRAME_SIZE; public static readonly PrefetchPolicyInfo DEFAULT_PREFETCH_POLICY = new PrefetchPolicyInfo() { QueuePrefetch = 1000, @@ -44,14 +41,6 @@ namespace Apache.NMS.AMQP.Meta }; public static double DEFAULT_MAX_NEW_CONNECTION_RATE_PER_SEC = -1; - static NmsConnectionInfo() - { - AmqpSettings defaultAmqpSettings = new Amqp.ConnectionFactory().AMQP; - DEFAULT_CHANNEL_MAX = defaultAmqpSettings.MaxSessionsPerConnection; - DEFAULT_MAX_FRAME_SIZE = defaultAmqpSettings.MaxFrameSize; - DEFAULT_IDLE_TIMEOUT = defaultAmqpSettings.IdleTimeout; - } - public NmsConnectionInfo(NmsConnectionId connectionId) { this.Id = connectionId ?? throw new ArgumentNullException(nameof(connectionId)); @@ -69,9 +58,6 @@ namespace Apache.NMS.AMQP.Meta public bool LocalMessageExpiry { get; set; } public string QueuePrefix { get; set; } public string TopicPrefix { get; set; } - public ushort ChannelMax { get; set; } = DEFAULT_CHANNEL_MAX; - public int MaxFrameSize { get; set; } = DEFAULT_MAX_FRAME_SIZE; - public int IdleTimeOut { get; set; } = DEFAULT_IDLE_TIMEOUT; public bool AnonymousRelaySupported { get; set; } diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs b/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs index fa05fbb..c876253 100644 --- a/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs +++ b/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs @@ -112,12 +112,12 @@ namespace Apache.NMS.AMQP.Provider.Amqp internal void OnLocalOpen(Open open) { open.ContainerId = Info.ClientId; - open.ChannelMax = Info.ChannelMax; - open.MaxFrameSize = (uint) Info.MaxFrameSize; + open.ChannelMax = Provider.ChannelMax; + open.MaxFrameSize = (uint) Provider.MaxFrameSize; open.HostName = String.IsNullOrWhiteSpace(this.Provider.VHost) ? remoteUri.Host : this.Provider.VHost; - if (Info.IdleTimeOut > 0) + if (Provider.IdleTimeout > 0) { - open.IdleTimeOut = (uint)Info.IdleTimeOut; + open.IdleTimeOut = (uint)Provider.IdleTimeout; } open.DesiredCapabilities = new[] { diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpProvider.cs b/src/NMS.AMQP/Provider/Amqp/AmqpProvider.cs index bff0b27..77f7220 100644 --- a/src/NMS.AMQP/Provider/Amqp/AmqpProvider.cs +++ b/src/NMS.AMQP/Provider/Amqp/AmqpProvider.cs @@ -30,25 +30,17 @@ namespace Apache.NMS.AMQP.Provider.Amqp public static readonly uint DEFAULT_MAX_HANDLE = 1024; private static readonly uint DEFAULT_SESSION_OUTGOING_WINDOW = 2048; // AmqpNetLite default - private readonly ITransportContext transport; - private NmsConnectionInfo connectionInfo; - private AmqpConnection connection; - - /// <summary> - /// Sets and gets the name of the virtual host to which we are connecting. - /// By default this value is derived from the URI. - /// Can be used to determine the correct service if connecting to an AMQP proxy. - /// </summary> - public string VHost { get; set; } - - public AmqpProvider(Uri remoteUri, ITransportContext transport) - { - RemoteUri = remoteUri; - this.transport = transport; - } + public static readonly ushort DEFAULT_CHANNEL_MAX; + public static readonly int DEFAULT_MAX_FRAME_SIZE; + public static readonly int DEFAULT_IDLE_TIMEOUT; static AmqpProvider() { + AmqpSettings defaultAmqpSettings = new global::Amqp.ConnectionFactory().AMQP; + DEFAULT_CHANNEL_MAX = defaultAmqpSettings.MaxSessionsPerConnection; + DEFAULT_MAX_FRAME_SIZE = defaultAmqpSettings.MaxFrameSize; + DEFAULT_IDLE_TIMEOUT = defaultAmqpSettings.IdleTimeout; + // Set up tracing in AMQP. We capture all AMQP traces in the TraceListener below // and map to NMS 'Tracer' logs as follows: // AMQP Tracer @@ -87,6 +79,23 @@ namespace Apache.NMS.AMQP.Provider.Amqp } }; } + + private readonly ITransportContext transport; + private NmsConnectionInfo connectionInfo; + private AmqpConnection connection; + + /// <summary> + /// Sets and gets the name of the virtual host to which we are connecting. + /// By default this value is derived from the URI. + /// Can be used to determine the correct service if connecting to an AMQP proxy. + /// </summary> + public string VHost { get; set; } + + public AmqpProvider(Uri remoteUri, ITransportContext transport) + { + RemoteUri = remoteUri; + this.transport = transport; + } /// <summary> /// Enables AmqpNetLite's Frame logging level. @@ -112,6 +121,9 @@ namespace Apache.NMS.AMQP.Provider.Amqp public long RequestTimeout => connectionInfo?.RequestTimeout ?? NmsConnectionInfo.DEFAULT_REQUEST_TIMEOUT; public uint SessionOutgoingWindow { get; set; } = DEFAULT_SESSION_OUTGOING_WINDOW; public uint MaxHandle { get; set; } = DEFAULT_MAX_HANDLE; + public ushort ChannelMax { get; set; } = DEFAULT_CHANNEL_MAX; + public int MaxFrameSize { get; set; } = DEFAULT_MAX_FRAME_SIZE; + public int IdleTimeout { get; set; } = DEFAULT_IDLE_TIMEOUT; public Uri RemoteUri { get; } public IProviderListener Listener { get; private set; } diff --git a/test/Apache-NMS-AMQP-Test/Integration/ConnectionIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/ConnectionIntegrationTest.cs index ec86699..26bdfee 100644 --- a/test/Apache-NMS-AMQP-Test/Integration/ConnectionIntegrationTest.cs +++ b/test/Apache-NMS-AMQP-Test/Integration/ConnectionIntegrationTest.cs @@ -18,6 +18,7 @@ using System; using System.Threading; using Apache.NMS; +using Apache.NMS.AMQP.Provider.Amqp; using Apache.NMS.AMQP; using NMS.AMQP.Test.TestAmqp; using NMS.AMQP.Test.TestAmqp.BasicTypes; @@ -249,5 +250,68 @@ namespace NMS.AMQP.Test.Integration testPeer.WaitForAllMatchersToComplete(2000); } } + + [Test, Timeout(20_000)] + public void TestIdleTimeoutIsSetByDefault() + { + using TestAmqpPeer testPeer = new TestAmqpPeer(); + testPeer.ExpectSaslPlain("guest", "guest"); + testPeer.ExpectOpen(openAssertion: open => + { + Assert.AreEqual((uint)AmqpProvider.DEFAULT_IDLE_TIMEOUT, open.IdleTimeOut); + }); + testPeer.ExpectBegin(); + + IConnection connection = new NmsConnectionFactory(BuildUri(testPeer)).CreateConnection("guest", "guest"); + connection.ClientId = "ClientName"; + + testPeer.ExpectClose(); + connection.Close(); + } + + [Test, Timeout(20_000)] + public void TestIdleTimeoutIsNotSetOnOpenFrameWhenConfiguredToZero() + { + using TestAmqpPeer testPeer = new TestAmqpPeer(); + testPeer.ExpectSaslPlain("guest", "guest"); + testPeer.ExpectOpen(openAssertion: open => Assert.AreEqual(int.MaxValue, open.IdleTimeOut)); + testPeer.ExpectBegin(); + + IConnection connection = new NmsConnectionFactory(BuildUri(testPeer, "amqp.idleTimeout=0")).CreateConnection("guest", "guest"); + connection.ClientId = "ClientName"; + + testPeer.ExpectClose(); + connection.Close(); + } + + [Test, Timeout(20_000)] + public void TestIdleTimeoutIsSetOnOpenFrameWhenConfiguredGreaterThanZero() + { + using TestAmqpPeer testPeer = new TestAmqpPeer(); + testPeer.ExpectSaslPlain("guest", "guest"); + testPeer.ExpectOpen(openAssertion: open => Assert.AreEqual(30000u, open.IdleTimeOut)); + testPeer.ExpectBegin(); + + IConnection connection = new NmsConnectionFactory(BuildUri(testPeer, "amqp.idleTimeout=30000")).CreateConnection("guest", "guest"); + connection.ClientId = "ClientName"; + + testPeer.ExpectClose(); + connection.Close(); + } + + [Test, Timeout(20_000)] + public void TestIdleTimeoutIsNotSetOnOpenFrameWhenConfiguredNegative() + { + using TestAmqpPeer testPeer = new TestAmqpPeer(); + testPeer.ExpectSaslPlain("guest", "guest"); + testPeer.ExpectOpen(openAssertion: open => Assert.AreEqual(int.MaxValue, open.IdleTimeOut)); + testPeer.ExpectBegin(); + + IConnection connection = new NmsConnectionFactory(BuildUri(testPeer, "amqp.idleTimeout=-1")).CreateConnection("guest", "guest"); + connection.ClientId = "ClientName"; + + testPeer.ExpectClose(); + connection.Close(); + } } } \ No newline at end of file diff --git a/test/Apache-NMS-AMQP-Test/Integration/IntegrationTestFixture.cs b/test/Apache-NMS-AMQP-Test/Integration/IntegrationTestFixture.cs index 52af9af..c3d780e 100644 --- a/test/Apache-NMS-AMQP-Test/Integration/IntegrationTestFixture.cs +++ b/test/Apache-NMS-AMQP-Test/Integration/IntegrationTestFixture.cs @@ -82,7 +82,7 @@ namespace NMS.AMQP.Test.Integration return context; } - private static string BuildUri(TestAmqpPeer testPeer, string optionsString) + protected static string BuildUri(TestAmqpPeer testPeer, string optionsString = null) { string baseUri = "amqp://127.0.0.1:" + testPeer.ServerPort.ToString(); diff --git a/test/Apache-NMS-AMQP-Test/Provider/Amqp/AmqpConnectionTest.cs b/test/Apache-NMS-AMQP-Test/Provider/Amqp/AmqpConnectionTest.cs deleted file mode 100644 index be62332..0000000 --- a/test/Apache-NMS-AMQP-Test/Provider/Amqp/AmqpConnectionTest.cs +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -using System; -using Amqp.Framing; -using Apache.NMS.AMQP.Meta; -using Apache.NMS.AMQP.Provider; -using Apache.NMS.AMQP.Provider.Amqp; -using NUnit.Framework; - -namespace NMS.AMQP.Test.Provider.Amqp -{ - [TestFixture] - public class AmqpConnectionTest - { - private AmqpProvider provider; - private AmqpConnection connection; - private AmqpHandler handler; - - [SetUp] - public void Setup() - { - provider = ProviderFactory.Create(GetDefaultUri()) as AmqpProvider; - } - - [TearDown] - public void TearDown() - { - if (provider != null) - { - provider.Close(); - provider = null; - } - } - - [TestCaseSource(nameof(_timeoutTestCases))] - public void TestIdleTimeoutIsPassedOnlyIfNotNegative(int configuredIdleTimeout, uint expectedIdleTimeOut) - { - var nmsConnectionInfo = new NmsConnectionInfo(new NmsConnectionId("mock")); - nmsConnectionInfo.IdleTimeOut = configuredIdleTimeout; - connection = new AmqpConnection(provider, null, nmsConnectionInfo); - var open = new Open(); - connection.OnLocalOpen(open); - Assert.AreEqual(open.IdleTimeOut, expectedIdleTimeOut); - } - - private static object[] _timeoutTestCases = - [ - new object[] { int.MaxValue, 2147483647u }, - new object[] { int.MinValue, 0u }, - new object[] { 123, 123u }, - new object[] { 0, 0u }, - new object[] { -1, 0u }, - new object[] { -23, 0u } - ]; - - private Uri GetDefaultUri() - { - return new Uri("amqp://localhost:5672"); - } - } -} \ No newline at end of file diff --git a/test/Apache-NMS-AMQP-Test/Provider/Amqp/AmqpProviderFactoryTest.cs b/test/Apache-NMS-AMQP-Test/Provider/Amqp/AmqpProviderFactoryTest.cs index 9b09240..b2b3644 100644 --- a/test/Apache-NMS-AMQP-Test/Provider/Amqp/AmqpProviderFactoryTest.cs +++ b/test/Apache-NMS-AMQP-Test/Provider/Amqp/AmqpProviderFactoryTest.cs @@ -27,7 +27,9 @@ namespace NMS.AMQP.Test.Provider.Amqp public class AmqpProviderFactoryTest { private const uint customMaxHandle = 2048; - + private const ushort customChannelMax = 32; + private const int customMaxFrameSize = 1048576; + private const int customIdleTimeout = 30000; private const string customVHost = "test-vhost"; [Test] @@ -45,6 +47,7 @@ namespace NMS.AMQP.Test.Provider.Amqp Assert.IsNotNull(provider); Assert.AreEqual(AmqpProvider.DEFAULT_MAX_HANDLE, provider.MaxHandle); Assert.IsFalse(provider.TraceFrames); + Assert.Greater(provider.IdleTimeout, 0); } [Test] @@ -53,7 +56,10 @@ namespace NMS.AMQP.Test.Provider.Amqp Uri uri = new Uri("amqp://localhost:5672" + "?amqp.maxHandle=" + customMaxHandle + "&amqp.traceFrames=true" + - "&amqp.vhost=" + customVHost); + "&amqp.vhost=" + customVHost + + "&amqp.channelMax=" + customChannelMax + + "&amqp.maxFrameSize=" + customMaxFrameSize + + "&amqp.idleTimeout=" + customIdleTimeout); AmqpProvider provider = ProviderFactory.Create(uri) as AmqpProvider; @@ -61,6 +67,9 @@ namespace NMS.AMQP.Test.Provider.Amqp Assert.AreEqual(customMaxHandle, provider.MaxHandle); Assert.IsTrue(provider.TraceFrames); Assert.AreEqual(customVHost, provider.VHost); + Assert.AreEqual(customChannelMax, provider.ChannelMax); + Assert.AreEqual(customMaxFrameSize, provider.MaxFrameSize); + Assert.AreEqual(customIdleTimeout, provider.IdleTimeout); } [TearDown] diff --git a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs index 2fad923..22d5861 100644 --- a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs +++ b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs @@ -158,9 +158,9 @@ namespace NMS.AMQP.Test.TestAmqp AddMatcher(saslInitMatcher); } - public void ExpectOpen(Fields serverProperties = null) + public void ExpectOpen(Fields serverProperties = null, Action<Open> openAssertion = null) { - ExpectOpen(desiredCapabilities: DEFAULT_DESIRED_CAPABILITIES, serverCapabilities: new[] { SymbolUtil.OPEN_CAPABILITY_SOLE_CONNECTION_FOR_CONTAINER }, serverProperties: serverProperties); + ExpectOpen(desiredCapabilities: DEFAULT_DESIRED_CAPABILITIES, serverCapabilities: new[] { SymbolUtil.OPEN_CAPABILITY_SOLE_CONNECTION_FOR_CONTAINER }, serverProperties: serverProperties, openAssertion: openAssertion); } public void ExpectOpen(Symbol[] serverCapabilities, Fields serverProperties) @@ -168,7 +168,8 @@ namespace NMS.AMQP.Test.TestAmqp ExpectOpen(desiredCapabilities: DEFAULT_DESIRED_CAPABILITIES, serverCapabilities: serverCapabilities, serverProperties: serverProperties); } - private void ExpectOpen(Symbol[] desiredCapabilities, Symbol[] serverCapabilities, Fields serverProperties) + private void ExpectOpen(Symbol[] desiredCapabilities, Symbol[] serverCapabilities, Fields serverProperties, + Action<Open> openAssertion = null) { var openMatcher = new FrameMatcher<Open>(); @@ -177,6 +178,9 @@ namespace NMS.AMQP.Test.TestAmqp else openMatcher.WithAssertion(open => Assert.IsNull(open.DesiredCapabilities)); + if (openAssertion != null) + openMatcher.WithAssertion(openAssertion); + openMatcher.WithOnComplete(context => { var open = new Open --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected] For further information, visit: https://activemq.apache.org/contact
