This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 246520a27293686a889144fb4a22bc7df10a4eae
Author: Quan Tran <hqt...@linagora.com>
AuthorDate: Thu Apr 11 14:59:55 2024 +0700

    JAMES-4027 RabbitMQClusterTest: Add a test that makes sure RabbitMQ 
connection is recoverable upon cluster partial failure
---
 .../rabbitmq/DockerClusterRabbitMQExtension.java   |  4 +++
 .../backends/rabbitmq/RabbitMQClusterTest.java     | 39 ++++++++++++++++++++++
 2 files changed, 43 insertions(+)

diff --git 
a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/DockerClusterRabbitMQExtension.java
 
b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/DockerClusterRabbitMQExtension.java
index a9a172c597..6a7d29727b 100644
--- 
a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/DockerClusterRabbitMQExtension.java
+++ 
b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/DockerClusterRabbitMQExtension.java
@@ -136,6 +136,10 @@ public class DockerClusterRabbitMQExtension implements 
BeforeEachCallback, After
                 rabbitMQ1.address(), rabbitMQ2.address(), rabbitMQ3.address());
         }
 
+        public ImmutableList<DockerRabbitMQ> getNodes() {
+            return ImmutableList.of(rabbitMQ1, rabbitMQ2, rabbitMQ3);
+        }
+
         public void detach() {
             rabbitMQ3.performIfRunning(DockerRabbitMQ::reset);
             rabbitMQ1.performIfRunning(rabbitMQ -> 
rabbitMQ.forgetNode(rabbitMQ3.getNodeName()));
diff --git 
a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQClusterTest.java
 
b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQClusterTest.java
index 1f0da7300e..1096de4bcd 100644
--- 
a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQClusterTest.java
+++ 
b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQClusterTest.java
@@ -174,6 +174,7 @@ class RabbitMQClusterTest {
         @BeforeEach
         void setup(DockerRabbitMQCluster cluster) throws IOException, 
TimeoutException {
             node1ConnectionFactory = 
cluster.getRabbitMQ1().connectionFactory();
+            node1ConnectionFactory.setNetworkRecoveryInterval(100);
             resilientConnection = 
node1ConnectionFactory.newConnection(cluster.getAddresses());
             resilientChannel = resilientConnection.createChannel();
             ConnectionFactory node2ConnectionFactory = 
cluster.getRabbitMQ2().connectionFactory();
@@ -186,6 +187,44 @@ class RabbitMQClusterTest {
             closeQuietly(resilientConnection, resilientChannel);
         }
 
+        @Test
+        void 
connectionShouldBeRecoveredWhenConnectedNodeIsDown(DockerRabbitMQCluster 
cluster) throws Exception {
+            // find the connected node and shutdown the node
+            DockerRabbitMQ connectedNode = getConnectedNode(cluster, 
resilientChannel);
+            connectedNode.stop();
+
+            // give sometime to the connection recovered
+            Thread.sleep(100L);
+
+            // make sure the connection can be recovered (to other nodes)
+            resilientChannel.exchangeDeclare(EXCHANGE_NAME, DIRECT_EXCHANGE, 
DURABLE);
+            resilientChannel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, 
!AUTO_DELETE, ImmutableMap.of()).getQueue();
+            resilientChannel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+
+            int nbMessages = 10;
+            IntStream.range(0, nbMessages)
+                .mapToObj(i -> asBytes(String.valueOf(i)))
+                .forEach(Throwing.consumer(
+                    bytes -> resilientChannel.basicPublish(EXCHANGE_NAME, 
ROUTING_KEY, NO_PROPERTIES, bytes)));
+
+            InMemoryConsumer consumer = new InMemoryConsumer(resilientChannel);
+            resilientChannel.basicConsume(QUEUE, consumer);
+
+            awaitAtMostOneMinute.until(() -> 
consumer.getConsumedMessages().size() == nbMessages);
+
+            Integer[] expectedResult = IntStream.range(0, 
nbMessages).boxed().toArray(Integer[]::new);
+            
assertThat(consumer.getConsumedMessages()).containsOnly(expectedResult);
+        }
+
+        private DockerRabbitMQ getConnectedNode(DockerRabbitMQCluster cluster, 
Channel resilientChannel) {
+            return cluster.getNodes()
+                .stream()
+                .filter(node -> 
node.getNodeName().equals(resilientChannel.getConnection().getServerProperties().get("cluster_name").toString()))
+                .findFirst()
+                .get();
+        }
+
+
         @Disabled("JAMES-2334 For some reason, we are unable to recover 
topology when reconnecting" +
             "See https://github.com/rabbitmq/rabbitmq-server/issues/959";)
         @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to