This is an automated email from the ASF dual-hosted git repository. tabish pushed a commit to branch ARTEMIS-5678 in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit c7937e2c94694947b7f8df779b16324102e86b03 Author: Timothy Bish <[email protected]> AuthorDate: Mon Sep 22 18:40:10 2025 -0400 ARTEMIS-5678 Add additional bridge tests Adds some tests between two server instances using bridge from and bridge to policies to drain off messages from a durable subscription to a matching sub on the other broker. --- .../amqp/connect/AMQPBridgeServerToServerTest.java | 223 +++++++++++++++++++++ 1 file changed, 223 insertions(+) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeServerToServerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeServerToServerTest.java index ca32d671ac..4d9ecc8e9a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeServerToServerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeServerToServerTest.java @@ -27,6 +27,7 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import java.lang.invoke.MethodHandles; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Map; @@ -1383,4 +1384,226 @@ class AMQPBridgeServerToServerTest extends AmqpClientTestSupport { assertEquals("red", receiveAnother.getStringProperty("color")); } } + + @Test + @Timeout(20) + public void testBrigeToFQQNUsedToDrainDurableConsumerSubscriptionQueue() throws Exception { + logger.info("Test started: {}", getTestName()); + + final long MESSAGE_COUNT = 10; + + remoteServer.start(); + server.start(); + + final ConnectionFactory factoryLocal = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + SERVER_PORT); + + try (Connection connection = factoryLocal.createConnection()) { + connection.setClientID("test-brigdes"); + connection.start(); + + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final Topic topic = session.createTopic(getTestName()); + final MessageConsumer consumer = session.createDurableConsumer(topic, "bridge-sub"); + final MessageProducer producer = session.createProducer(topic); + + consumer.close(); + + for (int i = 0; i < MESSAGE_COUNT; ++i) { + final TextMessage message = session.createTextMessage("Message:" + i); + + message.setStringProperty("color", "red"); + + producer.send(message); + } + } + + final String subscriptionQueueName = server.bindingQuery(SimpleString.of(getTestName())).getQueueNames().get(0).toString(); + + assertNotNull(subscriptionQueueName); + + final org.apache.activemq.artemis.core.server.Queue subscriptionQueue = server.locateQueue(subscriptionQueueName); + + assertNotNull(subscriptionQueue); + Wait.assertEquals(0L, () -> subscriptionQueue.getConsumerCount(), 5_000, 100); + Wait.assertEquals(MESSAGE_COUNT, () -> subscriptionQueue.getMessageCount(), 5_000, 100); + assertTrue(subscriptionQueue.isDurable()); + + final ConnectionFactory factoryRemote = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + SERVER_PORT_REMOTE); + + try (Connection connection = factoryRemote.createConnection()) { + connection.setClientID("test-brigdes"); + connection.start(); + + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final Topic topic = session.createTopic(getTestName()); + final MessageConsumer consumer = session.createDurableConsumer(topic, "bridge-sub"); + + consumer.close(); + } + + final String remoteSubscriptionQueueName = remoteServer.bindingQuery(SimpleString.of(getTestName())).getQueueNames().get(0).toString(); + + assertNotNull(remoteSubscriptionQueueName); + + final org.apache.activemq.artemis.core.server.Queue remoteSubscriptionQueue = remoteServer.locateQueue(subscriptionQueueName); + + assertNotNull(remoteSubscriptionQueue); + Wait.assertEquals(0L, () -> remoteSubscriptionQueue.getConsumerCount(), 5_000, 100); + Wait.assertEquals(0L, () -> remoteSubscriptionQueue.getMessageCount(), 5_000, 100); + assertTrue(remoteSubscriptionQueue.isDurable()); + + assertEquals(subscriptionQueueName, remoteSubscriptionQueueName); + + final AMQPBridgeQueuePolicyElement bridgePolicy = new AMQPBridgeQueuePolicyElement(); + bridgePolicy.setName("test-policy"); + bridgePolicy.addToIncludes(getTestName(), subscriptionQueueName); + bridgePolicy.setRemoteAddress(getTestName() + "::" + subscriptionQueueName); // Direct the policy on where to put the messages + + final AMQPBridgeBrokerConnectionElement element = new AMQPBridgeBrokerConnectionElement(); + element.setName(getTestName()); + element.addBridgeToQueuePolicy(bridgePolicy); + + final AMQPBrokerConnectConfiguration amqpConnection = + new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:" + SERVER_PORT_REMOTE); + amqpConnection.setReconnectAttempts(10); // Limit reconnects + amqpConnection.setRetryInterval(50); + amqpConnection.addElement(element); + + server.getConfiguration().getAMQPConnection().clear(); + server.getConfiguration().addAMQPConnection(amqpConnection); + + final ProtonProtocolManagerFactory protocolFactory = (ProtonProtocolManagerFactory) + server.getRemotingService().getProtocolFactoryMap().get("AMQP"); + assertNotNull(protocolFactory); + protocolFactory.updateProtocolServices(server, new ArrayList<>()); + + Wait.assertEquals(MESSAGE_COUNT, () -> remoteSubscriptionQueue.getMessageCount(), 5_000, 100); + Wait.assertEquals(0L, () -> subscriptionQueue.getMessageCount(), 5_000, 100); + + try (Connection connection = factoryRemote.createConnection()) { + connection.setClientID("test-brigdes"); + connection.start(); + + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final Topic topic = session.createTopic(getTestName()); + final MessageConsumer consumer = session.createDurableConsumer(topic, "bridge-sub"); + + for (int i = 0; i < MESSAGE_COUNT; ++i) { + final TextMessage message = (TextMessage) consumer.receive(100); + + assertEquals("Message:" + i, message.getText()); + } + } + } + + @Test + @Timeout(20) + public void testBrigeFromQueueUsedToDrainDurableConsumerSubscriptionQueue() throws Exception { + logger.info("Test started: {}", getTestName()); + + final long MESSAGE_COUNT = 10; + + remoteServer.start(); + server.start(); + + final ConnectionFactory factoryRemote = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + SERVER_PORT_REMOTE); + + try (Connection connection = factoryRemote.createConnection()) { + connection.setClientID("test-brigdes"); + connection.start(); + + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final Topic topic = session.createTopic(getTestName()); + final MessageConsumer consumer = session.createDurableConsumer(topic, "bridge-sub"); + final MessageProducer producer = session.createProducer(topic); + + consumer.close(); + + for (int i = 0; i < MESSAGE_COUNT; ++i) { + final TextMessage message = session.createTextMessage("Message:" + i); + + message.setStringProperty("color", "red"); + + producer.send(message); + } + } + + final String remoteSubscriptionQueueName = remoteServer.bindingQuery(SimpleString.of(getTestName())).getQueueNames().get(0).toString(); + + assertNotNull(remoteSubscriptionQueueName); + + final org.apache.activemq.artemis.core.server.Queue remoteSubscriptionQueue = remoteServer.locateQueue(remoteSubscriptionQueueName); + + assertNotNull(remoteSubscriptionQueue); + Wait.assertEquals(0L, () -> remoteSubscriptionQueue.getConsumerCount(), 5_000, 100); + Wait.assertEquals(MESSAGE_COUNT, () -> remoteSubscriptionQueue.getMessageCount(), 5_000, 100); + assertTrue(remoteSubscriptionQueue.isDurable()); + + final ConnectionFactory factoryLocal = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + SERVER_PORT); + + try (Connection connection = factoryLocal.createConnection()) { + connection.setClientID("test-brigdes"); + connection.start(); + + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final Topic topic = session.createTopic(getTestName()); + final MessageConsumer consumer = session.createDurableConsumer(topic, "bridge-sub"); + + consumer.close(); + } + + final String subscriptionQueueName = server.bindingQuery(SimpleString.of(getTestName())).getQueueNames().get(0).toString(); + + assertNotNull(subscriptionQueueName); + + final org.apache.activemq.artemis.core.server.Queue subscriptionQueue = server.locateQueue(subscriptionQueueName); + + assertNotNull(subscriptionQueue); + Wait.assertEquals(0L, () -> subscriptionQueue.getConsumerCount(), 5_000, 100); + Wait.assertEquals(0L, () -> subscriptionQueue.getMessageCount(), 5_000, 100); + assertTrue(subscriptionQueue.isDurable()); + + assertEquals(subscriptionQueueName, subscriptionQueueName); + + final AMQPBridgeQueuePolicyElement bridgePolicy = new AMQPBridgeQueuePolicyElement(); + bridgePolicy.setName("test-policy"); + bridgePolicy.addToIncludes(getTestName(), subscriptionQueueName); + bridgePolicy.setRemoteAddress(getTestName() + "::" + subscriptionQueueName); // Direct the policy on where to get the messages + + final AMQPBridgeBrokerConnectionElement element = new AMQPBridgeBrokerConnectionElement(); + element.setName(getTestName()); + element.addBridgeFromQueuePolicy(bridgePolicy); + + final AMQPBrokerConnectConfiguration amqpConnection = + new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:" + SERVER_PORT_REMOTE); + amqpConnection.setReconnectAttempts(10); // Limit reconnects + amqpConnection.setRetryInterval(50); + amqpConnection.addElement(element); + + server.getConfiguration().getAMQPConnection().clear(); + server.getConfiguration().addAMQPConnection(amqpConnection); + + final ProtonProtocolManagerFactory protocolFactory = (ProtonProtocolManagerFactory) + server.getRemotingService().getProtocolFactoryMap().get("AMQP"); + assertNotNull(protocolFactory); + protocolFactory.updateProtocolServices(server, new ArrayList<>()); + + try (Connection connection = factoryLocal.createConnection()) { + connection.setClientID("test-brigdes"); + connection.start(); + + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final Topic topic = session.createTopic(getTestName()); + final MessageConsumer consumer = session.createDurableConsumer(topic, "bridge-sub"); + + Wait.assertEquals(MESSAGE_COUNT, () -> subscriptionQueue.getDeliveringCount(), 5_000, 100); + Wait.assertEquals(0L, () -> remoteSubscriptionQueue.getMessageCount(), 5_000, 100); + + for (int i = 0; i < MESSAGE_COUNT; ++i) { + final TextMessage message = (TextMessage) consumer.receive(100); + + assertEquals("Message:" + i, message.getText()); + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected] For further information, visit: https://activemq.apache.org/contact
