http://git-wip-us.apache.org/repos/asf/james-project/blob/cc72f881/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQExtensionTest.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQExtensionTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQExtensionTest.java deleted file mode 100644 index f2a0cea..0000000 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQExtensionTest.java +++ /dev/null @@ -1,50 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ -package org.apache.james.queue.rabbitmq; - -import static org.assertj.core.api.Assertions.assertThat; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; - -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; - -@ExtendWith(DockerRabbitMQExtension.class) -public class DockerRabbitMQExtensionTest { - - private ConnectionFactory connectionFactory; - - @BeforeEach - public void setup(DockerRabbitMQ rabbitMQ) { - connectionFactory = new ConnectionFactory(); - connectionFactory.setHost(rabbitMQ.getHostIp()); - connectionFactory.setPort(rabbitMQ.getPort()); - connectionFactory.setUsername(rabbitMQ.getUsername()); - connectionFactory.setPassword(rabbitMQ.getPassword()); - } - - @Test - public void containerShouldBeUp() throws Exception { - try (Connection connection = connectionFactory.newConnection()) { - assertThat(connection.isOpen()).isTrue(); - } - } -}
http://git-wip-us.apache.org/repos/asf/james-project/blob/cc72f881/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java deleted file mode 100644 index 211a9ae..0000000 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java +++ /dev/null @@ -1,61 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ -package org.apache.james.queue.rabbitmq; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; - -import com.rabbitmq.client.AMQP; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.DefaultConsumer; -import com.rabbitmq.client.Envelope; - -public class InMemoryConsumer extends DefaultConsumer { - - @FunctionalInterface - interface Operation { - void perform(); - } - - private final ConcurrentLinkedQueue<Integer> messages; - private final Operation operation; - - public InMemoryConsumer(Channel channel) { - this(channel, () -> { }); - } - - public InMemoryConsumer(Channel channel, Operation operation) { - super(channel); - this.operation = operation; - this.messages = new ConcurrentLinkedQueue<>(); - } - - @Override - public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { - operation.perform(); - Integer payload = Integer.valueOf(new String(body, StandardCharsets.UTF_8)); - messages.add(payload); - } - - public Queue<Integer> getConsumedMessages() { - return messages; - } -} http://git-wip-us.apache.org/repos/asf/james-project/blob/cc72f881/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java deleted file mode 100644 index 2f4dcb6..0000000 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java +++ /dev/null @@ -1,294 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ -package org.apache.james.queue.rabbitmq; - -import static org.apache.james.queue.rabbitmq.RabbitMQFixture.AUTO_DELETE; -import static org.apache.james.queue.rabbitmq.RabbitMQFixture.DIRECT; -import static org.apache.james.queue.rabbitmq.RabbitMQFixture.DURABLE; -import static org.apache.james.queue.rabbitmq.RabbitMQFixture.EXCHANGE_NAME; -import static org.apache.james.queue.rabbitmq.RabbitMQFixture.EXCLUSIVE; -import static org.apache.james.queue.rabbitmq.RabbitMQFixture.NO_PROPERTIES; -import static org.apache.james.queue.rabbitmq.RabbitMQFixture.ROUTING_KEY; -import static org.apache.james.queue.rabbitmq.RabbitMQFixture.awaitAtMostOneMinute; -import static org.assertj.core.api.Assertions.assertThat; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.IntStream; - -import org.apache.james.queue.rabbitmq.DockerClusterRabbitMQExtension.DockerRabbitMQCluster; -import org.awaitility.Awaitility; -import org.awaitility.Duration; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Nested; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.github.fge.lambdas.Throwing; -import com.github.steveash.guavate.Guavate; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.rabbitmq.client.Address; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; - -@ExtendWith(DockerClusterRabbitMQExtension.class) -class RabbitMQClusterTest { - - private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQClusterTest.class); - - private static final String QUEUE = "queue"; - - @Nested - class ClusterSharing { - - private ConnectionFactory node1ConnectionFactory; - private ConnectionFactory node2ConnectionFactory; - private Connection node1Connection; - private Connection node2Connection; - private Channel node1Channel; - private Channel node2Channel; - - @BeforeEach - void setup(DockerRabbitMQCluster cluster) throws IOException, TimeoutException { - node1ConnectionFactory = cluster.getRabbitMQ1().connectionFactory(); - node2ConnectionFactory = cluster.getRabbitMQ2().connectionFactory(); - node1Connection = node1ConnectionFactory.newConnection(); - node2Connection = node2ConnectionFactory.newConnection(); - node1Channel = node1Connection.createChannel(); - node2Channel = node2Connection.createChannel(); - } - - @AfterEach - void tearDown() { - closeQuietly(node1Channel, node2Channel, node1Connection, node2Connection); - } - - @Test - void rabbitMQManagerShouldReturnThreeNodesWhenAskingForStatus(DockerRabbitMQCluster cluster) throws Exception { - String stdout = cluster.getRabbitMQ1().container() - .execInContainer("rabbitmqctl", "cluster_status") - .getStdout(); - - assertThat(stdout) - .contains( - DockerClusterRabbitMQExtension.RABBIT_1, - DockerClusterRabbitMQExtension.RABBIT_2, - DockerClusterRabbitMQExtension.RABBIT_3); - } - - @Test - void queuesShouldBeShared() throws Exception { - node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); - node1Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue(); - node1Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY); - - int nbMessages = 10; - IntStream.range(0, nbMessages) - .mapToObj(i -> asBytes(String.valueOf(i))) - .forEach(Throwing.consumer( - bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); - - - InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel); - node2Channel.basicConsume(QUEUE, consumer2); - - awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == nbMessages); - - List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList()); - assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult); - } - - @Test - void queuesShouldBeDeclarableOnAnotherNode() throws Exception { - node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); - node2Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue(); - node2Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY); - - int nbMessages = 10; - IntStream.range(0, nbMessages) - .mapToObj(i -> asBytes(String.valueOf(i))) - .forEach(Throwing.consumer( - bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); - - InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel); - node2Channel.basicConsume(QUEUE, consumer2); - - awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == nbMessages); - - List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList()); - assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult); - } - - } - - @Nested - class ClusterNodesFailure { - - private ConnectionFactory node1ConnectionFactory; - private Connection resilientConnection; - private Channel resilientChannel; - private Connection node2Connection; - private Channel node2Channel; - - @BeforeEach - void setup(DockerRabbitMQCluster cluster) throws IOException, TimeoutException { - node1ConnectionFactory = cluster.getRabbitMQ1().connectionFactory(); - resilientConnection = node1ConnectionFactory.newConnection(cluster.getAddresses()); - resilientChannel = resilientConnection.createChannel(); - ConnectionFactory node2ConnectionFactory = cluster.getRabbitMQ2().connectionFactory(); - node2Connection = node2ConnectionFactory.newConnection(); - node2Channel = node2Connection.createChannel(); - } - - @AfterEach - void tearDown() { - closeQuietly(resilientConnection, resilientChannel); - } - - @Disabled("JAMES-2334 For some reason, we are unable to recover topology when reconnecting" + - "See https://github.com/rabbitmq/rabbitmq-server/issues/959") - @Test - void nodeKillingWhenProducing(DockerRabbitMQCluster cluster) throws Exception { - resilientChannel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); - resilientChannel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue(); - resilientChannel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY); - - int nbMessages = 20; - int firstBatchSize = nbMessages / 2; - IntStream.range(0, firstBatchSize) - .mapToObj(i -> asBytes(String.valueOf(i))) - .forEach(Throwing.consumer( - bytes -> resilientChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); - - InMemoryConsumer consumer = new InMemoryConsumer(node2Channel); - node2Channel.basicConsume(QUEUE, consumer); - awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == firstBatchSize); - - cluster.getRabbitMQ1().stop(); - - IntStream.range(firstBatchSize, nbMessages) - .mapToObj(i -> asBytes(String.valueOf(i))) - .forEach(this::tryPublishWithRetry); - - awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == nbMessages); - - List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList()); - assertThat(consumer.getConsumedMessages()).containsOnlyElementsOf(expectedResult); - } - - private void tryPublishWithRetry(byte[] bytes) { - Awaitility.waitAtMost(Duration.ONE_MINUTE).pollInterval(Duration.ONE_SECOND).until(() -> tryPublish(bytes)); - } - - private boolean tryPublish(byte[] bytes) { - try { - resilientChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes); - return true; - } catch (Exception e) { - LOGGER.error("failed publish", e); - return false; - } - } - - @Test - void connectingToAClusterWithAFailedRabbit(DockerRabbitMQCluster cluster) throws Exception { - ConnectionFactory node3ConnectionFactory = cluster.getRabbitMQ3().connectionFactory(); - ImmutableList<Address> clusterAddresses = cluster.getAddresses(); - cluster.getRabbitMQ3().stop(); - - try (Connection connection = node3ConnectionFactory.newConnection(clusterAddresses); - Channel channel = connection.createChannel()) { - - channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); - channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue(); - channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY); - - int nbMessages = 10; - IntStream.range(0, nbMessages) - .mapToObj(i -> asBytes(String.valueOf(i))) - .forEach(Throwing.consumer( - bytes -> channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); - - InMemoryConsumer consumer = new InMemoryConsumer(channel); - channel.basicConsume(QUEUE, consumer); - - awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == nbMessages); - - List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList()); - assertThat(consumer.getConsumedMessages()).containsOnlyElementsOf(expectedResult); - } - } - - @Test - void nodeKillingWhenConsuming(DockerRabbitMQCluster cluster) throws Exception { - node2Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); - node2Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue(); - node2Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY); - - int nbMessages = 10; - IntStream.range(0, nbMessages) - .mapToObj(i -> asBytes(String.valueOf(i))) - .forEach(Throwing.consumer( - bytes -> resilientChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); - - AtomicInteger counter = new AtomicInteger(0); - InMemoryConsumer consumer = new InMemoryConsumer(resilientChannel, - () -> stopWhenHalfProcessed(cluster, nbMessages, counter)); - resilientChannel.basicConsume(QUEUE, consumer); - - awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == nbMessages); - - List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList()); - assertThat(consumer.getConsumedMessages()).containsOnlyElementsOf(expectedResult); - } - - private void stopWhenHalfProcessed(DockerRabbitMQCluster cluster, int nbMessages, AtomicInteger counter) { - if (counter.incrementAndGet() == nbMessages / 2) { - cluster.getRabbitMQ1().stop(); - } - } - - } - - private void closeQuietly(AutoCloseable... closeables) { - Arrays.stream(closeables).forEach(this::closeQuietly); - } - - private void closeQuietly(AutoCloseable closeable) { - try { - closeable.close(); - } catch (Exception e) { - //ignore error - } - } - - private byte[] asBytes(String message) { - return message.getBytes(StandardCharsets.UTF_8); - } -} http://git-wip-us.apache.org/repos/asf/james-project/blob/cc72f881/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQFixture.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQFixture.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQFixture.java deleted file mode 100644 index b83d1f7..0000000 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQFixture.java +++ /dev/null @@ -1,50 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.queue.rabbitmq; - -import static org.awaitility.Duration.ONE_HUNDRED_MILLISECONDS; -import static org.awaitility.Duration.ONE_MINUTE; - -import org.awaitility.Awaitility; -import org.awaitility.Duration; -import org.awaitility.core.ConditionFactory; - -import com.rabbitmq.client.AMQP; - -public class RabbitMQFixture { - public static final boolean DURABLE = true; - public static final boolean AUTO_ACK = true; - public static final AMQP.BasicProperties NO_PROPERTIES = null; - public static final String EXCHANGE_NAME = "exchangeName"; - public static final String ROUTING_KEY = "routingKey"; - public static final String DIRECT = "direct"; - public static final boolean EXCLUSIVE = true; - public static final boolean AUTO_DELETE = true; - public static final String WORK_QUEUE = "workQueue"; - - public static Duration slowPacedPollInterval = ONE_HUNDRED_MILLISECONDS; - public static ConditionFactory calmlyAwait = Awaitility.with() - .pollInterval(slowPacedPollInterval) - .and() - .with() - .pollDelay(slowPacedPollInterval) - .await(); - public static ConditionFactory awaitAtMostOneMinute = calmlyAwait.atMost(ONE_MINUTE); -} http://git-wip-us.apache.org/repos/asf/james-project/blob/cc72f881/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java index b9f56ff..be48674 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java @@ -27,6 +27,8 @@ import java.util.concurrent.TimeoutException; import javax.mail.internet.MimeMessage; import org.apache.http.client.utils.URIBuilder; +import org.apache.james.backend.mailqueue.DockerRabbitMQ; +import org.apache.james.backend.mailqueue.ReusableDockerRabbitMQExtension; import org.apache.james.backends.cassandra.CassandraCluster; import org.apache.james.backends.cassandra.DockerCassandraExtension; import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration; http://git-wip-us.apache.org/repos/asf/james-project/blob/cc72f881/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java deleted file mode 100644 index 61d3aa5..0000000 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java +++ /dev/null @@ -1,330 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ -package org.apache.james.queue.rabbitmq; - -import static org.apache.james.queue.rabbitmq.RabbitMQFixture.AUTO_ACK; -import static org.apache.james.queue.rabbitmq.RabbitMQFixture.AUTO_DELETE; -import static org.apache.james.queue.rabbitmq.RabbitMQFixture.DIRECT; -import static org.apache.james.queue.rabbitmq.RabbitMQFixture.DURABLE; -import static org.apache.james.queue.rabbitmq.RabbitMQFixture.EXCHANGE_NAME; -import static org.apache.james.queue.rabbitmq.RabbitMQFixture.EXCLUSIVE; -import static org.apache.james.queue.rabbitmq.RabbitMQFixture.NO_PROPERTIES; -import static org.apache.james.queue.rabbitmq.RabbitMQFixture.ROUTING_KEY; -import static org.apache.james.queue.rabbitmq.RabbitMQFixture.WORK_QUEUE; -import static org.apache.james.queue.rabbitmq.RabbitMQFixture.awaitAtMostOneMinute; -import static org.assertj.core.api.Assertions.assertThat; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.Arrays; -import java.util.Queue; -import java.util.concurrent.TimeoutException; -import java.util.stream.IntStream; - -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Nested; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; - -import com.github.fge.lambdas.Throwing; -import com.github.steveash.guavate.Guavate; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; - -@ExtendWith(DockerRabbitMQExtension.class) -class RabbitMQTest { - - @Nested - class SingleConsumerTest { - - private ConnectionFactory connectionFactory; - private Connection connection; - private Channel channel; - - @BeforeEach - void setup(DockerRabbitMQ rabbitMQ) throws IOException, TimeoutException { - connectionFactory = rabbitMQ.connectionFactory(); - connection = connectionFactory.newConnection(); - channel = connection.createChannel(); - } - - @AfterEach - void tearDown() { - closeQuietly(connection, channel); - } - - @Test - void publishedEventWithoutSubscriberShouldNotBeLost() throws Exception { - String queueName = createQueue(channel); - publishAMessage(channel); - awaitAtMostOneMinute.until(() -> messageReceived(channel, queueName)); - } - - @Test - void demonstrateDurability(DockerRabbitMQ rabbitMQ) throws Exception { - String queueName = createQueue(channel); - publishAMessage(channel); - - rabbitMQ.restart(); - - awaitAtMostOneMinute.until(() -> containerIsRestarted(rabbitMQ)); - assertThat(channel.basicGet(queueName, !AUTO_ACK)).isNotNull(); - } - - private Boolean containerIsRestarted(DockerRabbitMQ rabbitMQ) { - try { - rabbitMQ.connectionFactory().newConnection(); - return true; - } catch (Exception e) { - return false; - } - } - - private String createQueue(Channel channel) throws IOException { - channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); - String queueName = channel.queueDeclare().getQueue(); - channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY); - return queueName; - } - - private void publishAMessage(Channel channel) throws IOException { - channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, asBytes("Hello, world!")); - } - - private Boolean messageReceived(Channel channel, String queueName) { - try { - return channel.basicGet(queueName, !AUTO_ACK) != null; - } catch (Exception e) { - return false; - } - } - } - - @Nested - class FourConnections { - - private ConnectionFactory connectionFactory1; - private ConnectionFactory connectionFactory2; - private ConnectionFactory connectionFactory3; - private ConnectionFactory connectionFactory4; - private Connection connection1; - private Connection connection2; - private Connection connection3; - private Connection connection4; - private Channel channel1; - private Channel channel2; - private Channel channel3; - private Channel channel4; - - @BeforeEach - void setup(DockerRabbitMQ rabbitMQ) throws IOException, TimeoutException { - connectionFactory1 = rabbitMQ.connectionFactory(); - connectionFactory2 = rabbitMQ.connectionFactory(); - connectionFactory3 = rabbitMQ.connectionFactory(); - connectionFactory4 = rabbitMQ.connectionFactory(); - connection1 = connectionFactory1.newConnection(); - connection2 = connectionFactory2.newConnection(); - connection3 = connectionFactory3.newConnection(); - connection4 = connectionFactory4.newConnection(); - channel1 = connection1.createChannel(); - channel2 = connection2.createChannel(); - channel3 = connection3.createChannel(); - channel4 = connection4.createChannel(); - } - - @AfterEach - void tearDown() { - closeQuietly( - channel1, channel2, channel3, channel4, - connection1, connection2, connection3, connection4); - } - - @Nested - class BroadCast { - - // In the following case, each consumer will receive the messages produced by the - // producer - // To do so, each consumer will bind it's queue to the producer exchange. - @Test - void rabbitMQShouldSupportTheBroadcastCase() throws Exception { - // Declare a single exchange and three queues attached to it. - channel1.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); - - String queue2 = channel2.queueDeclare().getQueue(); - channel2.queueBind(queue2, EXCHANGE_NAME, ROUTING_KEY); - String queue3 = channel3.queueDeclare().getQueue(); - channel3.queueBind(queue3, EXCHANGE_NAME, ROUTING_KEY); - String queue4 = channel4.queueDeclare().getQueue(); - channel4.queueBind(queue4, EXCHANGE_NAME, ROUTING_KEY); - - InMemoryConsumer consumer2 = new InMemoryConsumer(channel2); - InMemoryConsumer consumer3 = new InMemoryConsumer(channel3); - InMemoryConsumer consumer4 = new InMemoryConsumer(channel4); - channel2.basicConsume(queue2, consumer2); - channel3.basicConsume(queue3, consumer3); - channel4.basicConsume(queue4, consumer4); - - // the publisher will produce 10 messages - IntStream.range(0, 10) - .mapToObj(String::valueOf) - .map(RabbitMQTest.this::asBytes) - .forEach(Throwing.consumer( - bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); - - awaitAtMostOneMinute.until( - () -> countReceivedMessages(consumer2, consumer3, consumer4) == 30); - - ImmutableList<Integer> expectedResult = IntStream.range(0, 10).boxed().collect(Guavate.toImmutableList()); - // Check every subscriber have receive all the messages. - assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult); - assertThat(consumer3.getConsumedMessages()).containsOnlyElementsOf(expectedResult); - assertThat(consumer4.getConsumedMessages()).containsOnlyElementsOf(expectedResult); - } - } - - @Nested - class WorkQueue { - - // In the following case, consumers will receive the messages produced by the - // producer but will share them. - // To do so, we will bind a single queue to the producer exchange. - @Test - void rabbitMQShouldSupportTheWorkQueueCase() throws Exception { - int nbMessages = 100; - - // Declare the exchange and a single queue attached to it. - channel1.exchangeDeclare(EXCHANGE_NAME, "direct", DURABLE); - channel1.queueDeclare(WORK_QUEUE, DURABLE, !EXCLUSIVE, AUTO_DELETE, ImmutableMap.of()); - channel1.queueBind(WORK_QUEUE, EXCHANGE_NAME, ROUTING_KEY); - - // Publisher will produce 100 messages - IntStream.range(0, nbMessages) - .mapToObj(String::valueOf) - .map(RabbitMQTest.this::asBytes) - .forEach(Throwing.consumer( - bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); - - InMemoryConsumer consumer2 = new InMemoryConsumer(channel2); - InMemoryConsumer consumer3 = new InMemoryConsumer(channel3); - InMemoryConsumer consumer4 = new InMemoryConsumer(channel4); - channel2.basicConsume(WORK_QUEUE, consumer2); - channel3.basicConsume(WORK_QUEUE, consumer3); - channel4.basicConsume(WORK_QUEUE, consumer4); - - awaitAtMostOneMinute.until( - () -> countReceivedMessages(consumer2, consumer3, consumer4) == nbMessages); - - ImmutableList<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList()); - - assertThat( - Iterables.concat( - consumer2.getConsumedMessages(), - consumer3.getConsumedMessages(), - consumer4.getConsumedMessages())) - .containsOnlyElementsOf(expectedResult); - } - - } - - @Nested - class Routing { - @Test - void rabbitMQShouldSupportRouting() throws Exception { - String conversation1 = "c1"; - String conversation2 = "c2"; - String conversation3 = "c3"; - String conversation4 = "c4"; - - // Declare the exchange and a single queue attached to it. - channel1.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); - - String queue1 = channel1.queueDeclare().getQueue(); - // 1 will follow conversation 1 and 2 - channel1.queueBind(queue1, EXCHANGE_NAME, conversation1); - channel1.queueBind(queue1, EXCHANGE_NAME, conversation2); - - String queue2 = channel2.queueDeclare().getQueue(); - // 2 will follow conversation 2 and 3 - channel2.queueBind(queue2, EXCHANGE_NAME, conversation2); - channel2.queueBind(queue2, EXCHANGE_NAME, conversation3); - - String queue3 = channel3.queueDeclare().getQueue(); - // 3 will follow conversation 3 and 4 - channel3.queueBind(queue3, EXCHANGE_NAME, conversation3); - channel3.queueBind(queue3, EXCHANGE_NAME, conversation4); - - String queue4 = channel4.queueDeclare().getQueue(); - // 4 will follow conversation 1 and 4 - channel4.queueBind(queue4, EXCHANGE_NAME, conversation1); - channel4.queueBind(queue4, EXCHANGE_NAME, conversation4); - - channel1.basicPublish(EXCHANGE_NAME, conversation1, NO_PROPERTIES, asBytes("1")); - channel2.basicPublish(EXCHANGE_NAME, conversation2, NO_PROPERTIES, asBytes("2")); - channel3.basicPublish(EXCHANGE_NAME, conversation3, NO_PROPERTIES, asBytes("3")); - channel4.basicPublish(EXCHANGE_NAME, conversation4, NO_PROPERTIES, asBytes("4")); - - InMemoryConsumer consumer1 = new InMemoryConsumer(channel1); - InMemoryConsumer consumer2 = new InMemoryConsumer(channel2); - InMemoryConsumer consumer3 = new InMemoryConsumer(channel3); - InMemoryConsumer consumer4 = new InMemoryConsumer(channel4); - channel1.basicConsume(queue1, consumer1); - channel2.basicConsume(queue2, consumer2); - channel3.basicConsume(queue3, consumer3); - channel4.basicConsume(queue4, consumer4); - - awaitAtMostOneMinute.until(() -> countReceivedMessages(consumer1, consumer2, consumer3, consumer4) == 8); - - assertThat(consumer1.getConsumedMessages()).containsOnly(1, 2); - assertThat(consumer2.getConsumedMessages()).containsOnly(2, 3); - assertThat(consumer3.getConsumedMessages()).containsOnly(3, 4); - assertThat(consumer4.getConsumedMessages()).containsOnly(1, 4); - } - } - - private long countReceivedMessages(InMemoryConsumer... consumers) { - return Arrays.stream(consumers) - .map(InMemoryConsumer::getConsumedMessages) - .mapToLong(Queue::size) - .sum(); - } - - } - - private void closeQuietly(AutoCloseable... closeables) { - Arrays.stream(closeables).forEach(this::closeQuietly); - } - - private void closeQuietly(AutoCloseable closeable) { - try { - closeable.close(); - } catch (Exception e) { - //ignore error - } - } - - private byte[] asBytes(String message) { - return message.getBytes(StandardCharsets.UTF_8); - } - -} http://git-wip-us.apache.org/repos/asf/james-project/blob/cc72f881/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQWaitStrategy.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQWaitStrategy.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQWaitStrategy.java deleted file mode 100644 index ba6ce3b..0000000 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQWaitStrategy.java +++ /dev/null @@ -1,67 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.queue.rabbitmq; - -import java.io.IOException; -import java.time.Duration; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import org.rnorth.ducttape.unreliables.Unreliables; -import org.testcontainers.containers.wait.strategy.WaitStrategy; -import org.testcontainers.containers.wait.strategy.WaitStrategyTarget; - -import com.google.common.primitives.Ints; -import com.rabbitmq.client.Connection; - -public class RabbitMQWaitStrategy implements WaitStrategy { - - private static final Duration DEFAULT_TIMEOUT = Duration.ofMinutes(1); - - public static RabbitMQWaitStrategy withDefaultTimeout(DockerRabbitMQ rabbitMQ) { - return new RabbitMQWaitStrategy(rabbitMQ, DEFAULT_TIMEOUT); - } - - private final DockerRabbitMQ rabbitMQ; - private final Duration timeout; - - public RabbitMQWaitStrategy(DockerRabbitMQ rabbitMQ, Duration timeout) { - this.rabbitMQ = rabbitMQ; - this.timeout = timeout; - } - - @Override - public void waitUntilReady(WaitStrategyTarget waitStrategyTarget) { - int seconds = Ints.checkedCast(this.timeout.getSeconds()); - - Unreliables.retryUntilTrue(seconds, TimeUnit.SECONDS, this::isConnected); - } - - private Boolean isConnected() throws IOException, TimeoutException { - try (Connection connection = rabbitMQ.connectionFactory().newConnection()) { - return connection.isOpen(); - } - } - - @Override - public WaitStrategy withStartupTimeout(Duration startupTimeout) { - return new RabbitMQWaitStrategy(rabbitMQ, startupTimeout); - } -} http://git-wip-us.apache.org/repos/asf/james-project/blob/cc72f881/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java index 1ff0510..c5ffc27 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java @@ -27,6 +27,8 @@ import java.util.concurrent.TimeoutException; import javax.mail.internet.MimeMessage; import org.apache.http.client.utils.URIBuilder; +import org.apache.james.backend.mailqueue.DockerRabbitMQ; +import org.apache.james.backend.mailqueue.ReusableDockerRabbitMQExtension; import org.apache.james.backends.cassandra.CassandraCluster; import org.apache.james.backends.cassandra.DockerCassandraExtension; import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration; http://git-wip-us.apache.org/repos/asf/james-project/blob/cc72f881/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/ReusableDockerRabbitMQExtension.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/ReusableDockerRabbitMQExtension.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/ReusableDockerRabbitMQExtension.java deleted file mode 100644 index 3072b9c..0000000 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/ReusableDockerRabbitMQExtension.java +++ /dev/null @@ -1,59 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ -package org.apache.james.queue.rabbitmq; - -import org.junit.jupiter.api.extension.AfterAllCallback; -import org.junit.jupiter.api.extension.AfterEachCallback; -import org.junit.jupiter.api.extension.BeforeAllCallback; -import org.junit.jupiter.api.extension.ExtensionContext; -import org.junit.jupiter.api.extension.ParameterContext; -import org.junit.jupiter.api.extension.ParameterResolutionException; -import org.junit.jupiter.api.extension.ParameterResolver; - -public class ReusableDockerRabbitMQExtension implements BeforeAllCallback, AfterAllCallback, AfterEachCallback, ParameterResolver { - - private DockerRabbitMQ rabbitMQ; - - @Override - public void beforeAll(ExtensionContext context) { - rabbitMQ = DockerRabbitMQ.withoutCookie(); - rabbitMQ.start(); - } - - @Override - public void afterEach(ExtensionContext context) throws Exception { - rabbitMQ.reset(); - } - - @Override - public void afterAll(ExtensionContext extensionContext) { - rabbitMQ.stop(); - } - - @Override - public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { - return (parameterContext.getParameter().getType() == DockerRabbitMQ.class); - } - - @Override - public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { - return rabbitMQ; - } - -} http://git-wip-us.apache.org/repos/asf/james-project/blob/cc72f881/server/queue/queue-rabbitmq/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/resources/logback-test.xml b/server/queue/queue-rabbitmq/src/test/resources/logback-test.xml index fc25fca..57f07d9 100644 --- a/server/queue/queue-rabbitmq/src/test/resources/logback-test.xml +++ b/server/queue/queue-rabbitmq/src/test/resources/logback-test.xml @@ -9,7 +9,7 @@ <logger name="org.testcontainers" level="ERROR"/> <logger name="org.apache.james" level="WARN"/> - <logger name="org.apache.james.queue.rabbitmq.DockerRabbitMQ" level="WARN"/> + <logger name="org.apache.james.backend.mailqueue.DockerRabbitMQ" level="WARN"/> <root level="ERROR"> <appender-ref ref="CONSOLE" /> --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org