This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch ARTEMIS-5678
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit c7937e2c94694947b7f8df779b16324102e86b03
Author: Timothy Bish <[email protected]>
AuthorDate: Mon Sep 22 18:40:10 2025 -0400

    ARTEMIS-5678 Add additional bridge tests
    
    Adds some tests between two server instances using bridge from and bridge to
    policies to drain off messages from a durable subscription to a matching sub
    on the other broker.
---
 .../amqp/connect/AMQPBridgeServerToServerTest.java | 223 +++++++++++++++++++++
 1 file changed, 223 insertions(+)

diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeServerToServerTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeServerToServerTest.java
index ca32d671ac..4d9ecc8e9a 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeServerToServerTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeServerToServerTest.java
@@ -27,6 +27,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
@@ -1383,4 +1384,226 @@ class AMQPBridgeServerToServerTest extends 
AmqpClientTestSupport {
          assertEquals("red", receiveAnother.getStringProperty("color"));
       }
    }
+
+   @Test
+   @Timeout(20)
+   public void testBrigeToFQQNUsedToDrainDurableConsumerSubscriptionQueue() 
throws Exception {
+      logger.info("Test started: {}", getTestName());
+
+      final long MESSAGE_COUNT = 10;
+
+      remoteServer.start();
+      server.start();
+
+      final ConnectionFactory factoryLocal = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + SERVER_PORT);
+
+      try (Connection connection = factoryLocal.createConnection()) {
+         connection.setClientID("test-brigdes");
+         connection.start();
+
+         final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+         final Topic topic = session.createTopic(getTestName());
+         final MessageConsumer consumer = session.createDurableConsumer(topic, 
"bridge-sub");
+         final MessageProducer producer = session.createProducer(topic);
+
+         consumer.close();
+
+         for (int i = 0; i < MESSAGE_COUNT; ++i) {
+            final TextMessage message = session.createTextMessage("Message:" + 
i);
+
+            message.setStringProperty("color", "red");
+
+            producer.send(message);
+         }
+      }
+
+      final String subscriptionQueueName = 
server.bindingQuery(SimpleString.of(getTestName())).getQueueNames().get(0).toString();
+
+      assertNotNull(subscriptionQueueName);
+
+      final org.apache.activemq.artemis.core.server.Queue subscriptionQueue = 
server.locateQueue(subscriptionQueueName);
+
+      assertNotNull(subscriptionQueue);
+      Wait.assertEquals(0L, () -> subscriptionQueue.getConsumerCount(), 5_000, 
100);
+      Wait.assertEquals(MESSAGE_COUNT, () -> 
subscriptionQueue.getMessageCount(), 5_000, 100);
+      assertTrue(subscriptionQueue.isDurable());
+
+      final ConnectionFactory factoryRemote = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + SERVER_PORT_REMOTE);
+
+      try (Connection connection = factoryRemote.createConnection()) {
+         connection.setClientID("test-brigdes");
+         connection.start();
+
+         final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+         final Topic topic = session.createTopic(getTestName());
+         final MessageConsumer consumer = session.createDurableConsumer(topic, 
"bridge-sub");
+
+         consumer.close();
+      }
+
+      final String remoteSubscriptionQueueName = 
remoteServer.bindingQuery(SimpleString.of(getTestName())).getQueueNames().get(0).toString();
+
+      assertNotNull(remoteSubscriptionQueueName);
+
+      final org.apache.activemq.artemis.core.server.Queue 
remoteSubscriptionQueue = remoteServer.locateQueue(subscriptionQueueName);
+
+      assertNotNull(remoteSubscriptionQueue);
+      Wait.assertEquals(0L, () -> remoteSubscriptionQueue.getConsumerCount(), 
5_000, 100);
+      Wait.assertEquals(0L, () -> remoteSubscriptionQueue.getMessageCount(), 
5_000, 100);
+      assertTrue(remoteSubscriptionQueue.isDurable());
+
+      assertEquals(subscriptionQueueName, remoteSubscriptionQueueName);
+
+      final AMQPBridgeQueuePolicyElement bridgePolicy = new 
AMQPBridgeQueuePolicyElement();
+      bridgePolicy.setName("test-policy");
+      bridgePolicy.addToIncludes(getTestName(), subscriptionQueueName);
+      bridgePolicy.setRemoteAddress(getTestName() + "::" + 
subscriptionQueueName); // Direct the policy on where to put the messages
+
+      final AMQPBridgeBrokerConnectionElement element = new 
AMQPBridgeBrokerConnectionElement();
+      element.setName(getTestName());
+      element.addBridgeToQueuePolicy(bridgePolicy);
+
+      final AMQPBrokerConnectConfiguration amqpConnection =
+         new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:" 
+ SERVER_PORT_REMOTE);
+      amqpConnection.setReconnectAttempts(10); // Limit reconnects
+      amqpConnection.setRetryInterval(50);
+      amqpConnection.addElement(element);
+
+      server.getConfiguration().getAMQPConnection().clear();
+      server.getConfiguration().addAMQPConnection(amqpConnection);
+
+      final ProtonProtocolManagerFactory protocolFactory = 
(ProtonProtocolManagerFactory)
+         server.getRemotingService().getProtocolFactoryMap().get("AMQP");
+      assertNotNull(protocolFactory);
+      protocolFactory.updateProtocolServices(server, new ArrayList<>());
+
+      Wait.assertEquals(MESSAGE_COUNT, () -> 
remoteSubscriptionQueue.getMessageCount(), 5_000, 100);
+      Wait.assertEquals(0L, () -> subscriptionQueue.getMessageCount(), 5_000, 
100);
+
+      try (Connection connection = factoryRemote.createConnection()) {
+         connection.setClientID("test-brigdes");
+         connection.start();
+
+         final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+         final Topic topic = session.createTopic(getTestName());
+         final MessageConsumer consumer = session.createDurableConsumer(topic, 
"bridge-sub");
+
+         for (int i = 0; i < MESSAGE_COUNT; ++i) {
+            final TextMessage message = (TextMessage) consumer.receive(100);
+
+            assertEquals("Message:" + i, message.getText());
+         }
+      }
+   }
+
+   @Test
+   @Timeout(20)
+   public void testBrigeFromQueueUsedToDrainDurableConsumerSubscriptionQueue() 
throws Exception {
+      logger.info("Test started: {}", getTestName());
+
+      final long MESSAGE_COUNT = 10;
+
+      remoteServer.start();
+      server.start();
+
+      final ConnectionFactory factoryRemote = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + SERVER_PORT_REMOTE);
+
+      try (Connection connection = factoryRemote.createConnection()) {
+         connection.setClientID("test-brigdes");
+         connection.start();
+
+         final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+         final Topic topic = session.createTopic(getTestName());
+         final MessageConsumer consumer = session.createDurableConsumer(topic, 
"bridge-sub");
+         final MessageProducer producer = session.createProducer(topic);
+
+         consumer.close();
+
+         for (int i = 0; i < MESSAGE_COUNT; ++i) {
+            final TextMessage message = session.createTextMessage("Message:" + 
i);
+
+            message.setStringProperty("color", "red");
+
+            producer.send(message);
+         }
+      }
+
+      final String remoteSubscriptionQueueName = 
remoteServer.bindingQuery(SimpleString.of(getTestName())).getQueueNames().get(0).toString();
+
+      assertNotNull(remoteSubscriptionQueueName);
+
+      final org.apache.activemq.artemis.core.server.Queue 
remoteSubscriptionQueue = remoteServer.locateQueue(remoteSubscriptionQueueName);
+
+      assertNotNull(remoteSubscriptionQueue);
+      Wait.assertEquals(0L, () -> remoteSubscriptionQueue.getConsumerCount(), 
5_000, 100);
+      Wait.assertEquals(MESSAGE_COUNT, () -> 
remoteSubscriptionQueue.getMessageCount(), 5_000, 100);
+      assertTrue(remoteSubscriptionQueue.isDurable());
+
+      final ConnectionFactory factoryLocal = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + SERVER_PORT);
+
+      try (Connection connection = factoryLocal.createConnection()) {
+         connection.setClientID("test-brigdes");
+         connection.start();
+
+         final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+         final Topic topic = session.createTopic(getTestName());
+         final MessageConsumer consumer = session.createDurableConsumer(topic, 
"bridge-sub");
+
+         consumer.close();
+      }
+
+      final String subscriptionQueueName = 
server.bindingQuery(SimpleString.of(getTestName())).getQueueNames().get(0).toString();
+
+      assertNotNull(subscriptionQueueName);
+
+      final org.apache.activemq.artemis.core.server.Queue subscriptionQueue = 
server.locateQueue(subscriptionQueueName);
+
+      assertNotNull(subscriptionQueue);
+      Wait.assertEquals(0L, () -> subscriptionQueue.getConsumerCount(), 5_000, 
100);
+      Wait.assertEquals(0L, () -> subscriptionQueue.getMessageCount(), 5_000, 
100);
+      assertTrue(subscriptionQueue.isDurable());
+
+      assertEquals(subscriptionQueueName, subscriptionQueueName);
+
+      final AMQPBridgeQueuePolicyElement bridgePolicy = new 
AMQPBridgeQueuePolicyElement();
+      bridgePolicy.setName("test-policy");
+      bridgePolicy.addToIncludes(getTestName(), subscriptionQueueName);
+      bridgePolicy.setRemoteAddress(getTestName() + "::" + 
subscriptionQueueName); // Direct the policy on where to get the messages
+
+      final AMQPBridgeBrokerConnectionElement element = new 
AMQPBridgeBrokerConnectionElement();
+      element.setName(getTestName());
+      element.addBridgeFromQueuePolicy(bridgePolicy);
+
+      final AMQPBrokerConnectConfiguration amqpConnection =
+         new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:" 
+ SERVER_PORT_REMOTE);
+      amqpConnection.setReconnectAttempts(10); // Limit reconnects
+      amqpConnection.setRetryInterval(50);
+      amqpConnection.addElement(element);
+
+      server.getConfiguration().getAMQPConnection().clear();
+      server.getConfiguration().addAMQPConnection(amqpConnection);
+
+      final ProtonProtocolManagerFactory protocolFactory = 
(ProtonProtocolManagerFactory)
+         server.getRemotingService().getProtocolFactoryMap().get("AMQP");
+      assertNotNull(protocolFactory);
+      protocolFactory.updateProtocolServices(server, new ArrayList<>());
+
+      try (Connection connection = factoryLocal.createConnection()) {
+         connection.setClientID("test-brigdes");
+         connection.start();
+
+         final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+         final Topic topic = session.createTopic(getTestName());
+         final MessageConsumer consumer = session.createDurableConsumer(topic, 
"bridge-sub");
+
+         Wait.assertEquals(MESSAGE_COUNT, () -> 
subscriptionQueue.getDeliveringCount(), 5_000, 100);
+         Wait.assertEquals(0L, () -> 
remoteSubscriptionQueue.getMessageCount(), 5_000, 100);
+
+         for (int i = 0; i < MESSAGE_COUNT; ++i) {
+            final TextMessage message = (TextMessage) consumer.receive(100);
+
+            assertEquals("Message:" + i, message.getText());
+         }
+      }
+   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to