This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new 014aab2dc4 AMQP: added transport factory unit tests for mutex 
stripping and monitor wiring. (#1725)
014aab2dc4 is described below

commit 014aab2dc4c5e954093146c83fcf98c02249e03e
Author: Anmol Saxena <[email protected]>
AuthorDate: Wed Mar 18 21:26:43 2026 +0530

    AMQP: added transport factory unit tests for mutex stripping and monitor 
wiring. (#1725)
    
    * AMQP: added transport factory unit tests for mutex stripping and monitor 
wiring.
    
    * Added dynamic port and cleanup  to avoid tests conflicts.
    
    * Adding license to the file.
---
 .../AmqpTransportFactoryConfigurationTest.java     | 175 +++++++++++++++++++++
 1 file changed, 175 insertions(+)

diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransportFactoryConfigurationTest.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransportFactoryConfigurationTest.java
new file mode 100644
index 0000000000..9e0e3e9776
--- /dev/null
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransportFactoryConfigurationTest.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.net.Socket;
+import java.net.ServerSocket;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.net.SocketFactory;
+import javax.net.ssl.SSLSocket;
+import javax.net.ssl.SSLSocketFactory;
+
+import org.apache.activemq.transport.MutexTransport;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportFilter;
+import org.apache.activemq.transport.tcp.SslTransport;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(ParallelTest.class)
+public class AmqpTransportFactoryConfigurationTest {
+
+    private final List<TcpTransport> createdTransports = new ArrayList<>();
+
+    @After
+    public void cleanup() throws Exception {
+        for (TcpTransport transport : createdTransports) {
+            Socket socket = transport.narrow(Socket.class);
+            if (socket != null && !socket.isClosed()) {
+                socket.close();
+            }
+        }
+        createdTransports.clear();
+    }
+
+    @Test
+    public void testServerConfigureStripsMutexTransportForAllAmqpFactories() 
throws Exception {
+        assertServerConfigureStripsMutex(new AmqpTransportFactory());
+        assertServerConfigureStripsMutex(new AmqpSslTransportFactory());
+        assertServerConfigureStripsMutex(new AmqpNioTransportFactory());
+        assertServerConfigureStripsMutex(new AmqpNioSslTransportFactory());
+    }
+
+    @Test
+    public void 
testCompositeConfigureAppliesAmqpAndWireFormatPropertiesForAllAmqpFactories() 
throws Exception {
+        assertCompositeConfigureAppliesProperties(new AmqpTransportFactory());
+        assertCompositeConfigureAppliesProperties(new 
AmqpSslTransportFactory());
+        assertCompositeConfigureAppliesProperties(new 
AmqpNioTransportFactory());
+        assertCompositeConfigureAppliesProperties(new 
AmqpNioSslTransportFactory());
+    }
+
+    @Test
+    public void testAllAmqpFactoriesWireAmqpInactivityMonitor() throws 
Exception {
+        assertAmqpFactoriesWireAmqpInactivityMonitor(new 
AmqpTransportFactory());
+        assertAmqpFactoriesWireAmqpInactivityMonitor(new 
AmqpSslTransportFactory());
+        assertAmqpFactoriesWireAmqpInactivityMonitor(new 
AmqpNioTransportFactory());
+        assertAmqpFactoriesWireAmqpInactivityMonitor(new 
AmqpNioSslTransportFactory());
+    }
+
+    private void assertServerConfigureStripsMutex(TransportFactory factory) 
throws Exception {
+        Transport configured = factory.serverConfigure(
+            createTransportForFactory(factory, new AmqpWireFormat()),
+            new AmqpWireFormat(),
+            new HashMap<String, Object>());
+
+        assertFalse("AMQP serverConfigure should strip the broker-side 
MutexTransport",
+            configured instanceof MutexTransport);
+    }
+
+    private void assertCompositeConfigureAppliesProperties(TransportFactory 
factory) throws Exception {
+        Map<String, String> options = new HashMap<>();
+        options.put("transformer", "raw");
+        options.put("producerCredit", "17");
+        options.put("wireFormat.maxFrameSize", "4096");
+        options.put("wireFormat.connectAttemptTimeout", "1234");
+
+        Transport configured = factory.compositeConfigure(
+            createTransportForFactory(factory, new AmqpWireFormat()),
+            new AmqpWireFormat(),
+            options);
+
+        AmqpTransportFilter filter = findInChain(configured, 
AmqpTransportFilter.class);
+        assertNotNull("Expected AmqpTransportFilter in configured transport 
chain", filter);
+        assertEquals("raw", filter.getTransformer());
+        assertEquals(17, filter.getProducerCredit());
+        assertEquals(4096L, filter.getMaxFrameSize());
+        assertEquals(1234, filter.getConnectAttemptTimeout());
+    }
+
+    private void assertAmqpFactoriesWireAmqpInactivityMonitor(TransportFactory 
factory) throws Exception {
+        Transport configured = factory.compositeConfigure(
+            createTransportForFactory(factory, new AmqpWireFormat()),
+            new AmqpWireFormat(),
+            new HashMap<String, String>());
+
+        AmqpInactivityMonitor monitor = findInChain(configured, 
AmqpInactivityMonitor.class);
+        AmqpTransportFilter filter = findInChain(configured, 
AmqpTransportFilter.class);
+
+        assertNotNull("Expected AmqpInactivityMonitor in configured transport 
chain", monitor);
+        assertNotNull("Expected AmqpTransportFilter in configured transport 
chain", filter);
+        assertTrue("Filter should report inactivity monitor as enabled", 
filter.isUseInactivityMonitor());
+        assertSame("Factory should wire the same monitor instance into the 
AMQP filter",
+            monitor, filter.getInactivityMonitor());
+    }
+
+    private TcpTransport createTcpTransport(AmqpWireFormat wireFormat) throws 
Exception {
+        int dynamicPort = findAvailablePort();
+        return new TcpTransport(
+            wireFormat,
+            SocketFactory.getDefault(),
+            new URI("tcp://localhost:" + dynamicPort),
+            null);
+    }
+
+    private int findAvailablePort() throws Exception {
+        try (ServerSocket socket = new ServerSocket(0)) {
+            return socket.getLocalPort();
+        }
+    }
+
+    private TcpTransport createTransportForFactory(TransportFactory factory, 
AmqpWireFormat wireFormat) throws Exception {
+        TcpTransport transport;
+        if (factory instanceof AmqpSslTransportFactory || factory instanceof 
AmqpNioSslTransportFactory) {
+            SSLSocket socket = (SSLSocket) 
SSLSocketFactory.getDefault().createSocket();
+            transport = new SslTransport(wireFormat, socket);
+        } else {
+            transport = createTcpTransport(wireFormat);
+        }
+
+        createdTransports.add(transport);
+        return transport;
+    }
+
+    private <T> T findInChain(Transport transport, Class<T> type) {
+        Transport current = transport;
+        while (current != null) {
+            T found = current.narrow(type);
+            if (found != null) {
+                return found;
+            }
+            if (!(current instanceof TransportFilter)) {
+                return null;
+            }
+            current = ((TransportFilter) current).getNext();
+        }
+        return null;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to