This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 246520a27293686a889144fb4a22bc7df10a4eae Author: Quan Tran <hqt...@linagora.com> AuthorDate: Thu Apr 11 14:59:55 2024 +0700 JAMES-4027 RabbitMQClusterTest: Add a test that makes sure RabbitMQ connection is recoverable upon cluster partial failure --- .../rabbitmq/DockerClusterRabbitMQExtension.java | 4 +++ .../backends/rabbitmq/RabbitMQClusterTest.java | 39 ++++++++++++++++++++++ 2 files changed, 43 insertions(+) diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/DockerClusterRabbitMQExtension.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/DockerClusterRabbitMQExtension.java index a9a172c597..6a7d29727b 100644 --- a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/DockerClusterRabbitMQExtension.java +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/DockerClusterRabbitMQExtension.java @@ -136,6 +136,10 @@ public class DockerClusterRabbitMQExtension implements BeforeEachCallback, After rabbitMQ1.address(), rabbitMQ2.address(), rabbitMQ3.address()); } + public ImmutableList<DockerRabbitMQ> getNodes() { + return ImmutableList.of(rabbitMQ1, rabbitMQ2, rabbitMQ3); + } + public void detach() { rabbitMQ3.performIfRunning(DockerRabbitMQ::reset); rabbitMQ1.performIfRunning(rabbitMQ -> rabbitMQ.forgetNode(rabbitMQ3.getNodeName())); diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQClusterTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQClusterTest.java index 1f0da7300e..1096de4bcd 100644 --- a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQClusterTest.java +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQClusterTest.java @@ -174,6 +174,7 @@ class RabbitMQClusterTest { @BeforeEach void setup(DockerRabbitMQCluster cluster) throws IOException, TimeoutException { node1ConnectionFactory = cluster.getRabbitMQ1().connectionFactory(); + node1ConnectionFactory.setNetworkRecoveryInterval(100); resilientConnection = node1ConnectionFactory.newConnection(cluster.getAddresses()); resilientChannel = resilientConnection.createChannel(); ConnectionFactory node2ConnectionFactory = cluster.getRabbitMQ2().connectionFactory(); @@ -186,6 +187,44 @@ class RabbitMQClusterTest { closeQuietly(resilientConnection, resilientChannel); } + @Test + void connectionShouldBeRecoveredWhenConnectedNodeIsDown(DockerRabbitMQCluster cluster) throws Exception { + // find the connected node and shutdown the node + DockerRabbitMQ connectedNode = getConnectedNode(cluster, resilientChannel); + connectedNode.stop(); + + // give sometime to the connection recovered + Thread.sleep(100L); + + // make sure the connection can be recovered (to other nodes) + resilientChannel.exchangeDeclare(EXCHANGE_NAME, DIRECT_EXCHANGE, DURABLE); + resilientChannel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue(); + resilientChannel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY); + + int nbMessages = 10; + IntStream.range(0, nbMessages) + .mapToObj(i -> asBytes(String.valueOf(i))) + .forEach(Throwing.consumer( + bytes -> resilientChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); + + InMemoryConsumer consumer = new InMemoryConsumer(resilientChannel); + resilientChannel.basicConsume(QUEUE, consumer); + + awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == nbMessages); + + Integer[] expectedResult = IntStream.range(0, nbMessages).boxed().toArray(Integer[]::new); + assertThat(consumer.getConsumedMessages()).containsOnly(expectedResult); + } + + private DockerRabbitMQ getConnectedNode(DockerRabbitMQCluster cluster, Channel resilientChannel) { + return cluster.getNodes() + .stream() + .filter(node -> node.getNodeName().equals(resilientChannel.getConnection().getServerProperties().get("cluster_name").toString())) + .findFirst() + .get(); + } + + @Disabled("JAMES-2334 For some reason, we are unable to recover topology when reconnecting" + "See https://github.com/rabbitmq/rabbitmq-server/issues/959") @Test --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org