JAMES-2545 Rename RabbitMQ backend package
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/74114e93 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/74114e93 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/74114e93 Branch: refs/heads/master Commit: 74114e9365a6b6593a714150f710a086b6647b46 Parents: 0b7748a Author: Benoit Tellier <btell...@linagora.com> Authored: Tue Sep 11 15:54:09 2018 +0700 Committer: Antoine Duprat <adup...@linagora.com> Committed: Wed Sep 12 10:01:13 2018 +0200 ---------------------------------------------------------------------- .../mailqueue/RabbitMQConfiguration.java | 117 ------- .../backend/mailqueue/RabbitMQHealthCheck.java | 67 ---- .../backend/rabbitmq/RabbitMQConfiguration.java | 117 +++++++ .../backend/rabbitmq/RabbitMQHealthCheck.java | 67 ++++ .../DockerClusterRabbitMQExtension.java | 130 -------- .../james/backend/mailqueue/DockerRabbitMQ.java | 167 ---------- .../mailqueue/DockerRabbitMQExtension.java | 53 --- .../mailqueue/DockerRabbitMQExtensionTest.java | 50 --- .../backend/mailqueue/InMemoryConsumer.java | 61 ---- .../backend/mailqueue/RabbitMQClusterTest.java | 294 ----------------- .../mailqueue/RabbitMQConfigurationTest.java | 134 -------- .../backend/mailqueue/RabbitMQFixture.java | 50 --- .../mailqueue/RabbitMQHealthCheckTest.java | 83 ----- .../james/backend/mailqueue/RabbitMQTest.java | 330 ------------------- .../backend/mailqueue/RabbitMQWaitStrategy.java | 67 ---- .../ReusableDockerRabbitMQExtension.java | 59 ---- .../DockerClusterRabbitMQExtension.java | 130 ++++++++ .../james/backend/rabbitmq/DockerRabbitMQ.java | 167 ++++++++++ .../rabbitmq/DockerRabbitMQExtension.java | 53 +++ .../rabbitmq/DockerRabbitMQExtensionTest.java | 50 +++ .../backend/rabbitmq/InMemoryConsumer.java | 61 ++++ .../backend/rabbitmq/RabbitMQClusterTest.java | 294 +++++++++++++++++ .../rabbitmq/RabbitMQConfigurationTest.java | 134 ++++++++ .../james/backend/rabbitmq/RabbitMQFixture.java | 50 +++ .../rabbitmq/RabbitMQHealthCheckTest.java | 83 +++++ .../james/backend/rabbitmq/RabbitMQTest.java | 330 +++++++++++++++++++ .../backend/rabbitmq/RabbitMQWaitStrategy.java | 67 ++++ .../ReusableDockerRabbitMQExtension.java | 59 ++++ .../james/modules/rabbitmq/RabbitMQModule.java | 2 +- .../queue/rabbitmq/RabbitMQMailQueueTest.java | 4 +- .../rabbitmq/RabbitMqMailQueueFactoryTest.java | 4 +- .../src/test/resources/logback-test.xml | 2 +- 32 files changed, 1668 insertions(+), 1668 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/74114e93/backends-common/rabbitmq/src/main/java/org/apache/james/backend/mailqueue/RabbitMQConfiguration.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/mailqueue/RabbitMQConfiguration.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/mailqueue/RabbitMQConfiguration.java deleted file mode 100644 index 362db74..0000000 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/mailqueue/RabbitMQConfiguration.java +++ /dev/null @@ -1,117 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ -package org.apache.james.backend.mailqueue; - -import java.net.URI; -import java.util.Objects; - -import org.apache.commons.configuration.PropertiesConfiguration; - -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; - -public class RabbitMQConfiguration { - @FunctionalInterface - public interface RequireAmqpUri { - RequireManagementUri amqpUri(URI amqpUri); - } - - @FunctionalInterface - public interface RequireManagementUri { - Builder managementUri(URI managementUri); - } - - public static class Builder { - private final URI amqpUri; - private final URI managementUri; - - private Builder(URI amqpUri, URI managementUri) { - this.amqpUri = amqpUri; - this.managementUri = managementUri; - } - - public RabbitMQConfiguration build() { - Preconditions.checkNotNull(amqpUri, "'amqpUri' should not be null"); - Preconditions.checkNotNull(managementUri, "'managementUri' should not be null"); - return new RabbitMQConfiguration(amqpUri, managementUri); - } - } - - private static final String URI_PROPERTY_NAME = "uri"; - private static final String MANAGEMENT_URI_PROPERTY_NAME = "management.uri"; - - public static RequireAmqpUri builder() { - return amqpUri -> managementUri -> new Builder(amqpUri, managementUri); - } - - public static RabbitMQConfiguration from(PropertiesConfiguration configuration) { - String uriAsString = configuration.getString(URI_PROPERTY_NAME); - Preconditions.checkState(!Strings.isNullOrEmpty(uriAsString), "You need to specify the URI of RabbitMQ"); - URI amqpUri = checkURI(uriAsString); - - String managementUriAsString = configuration.getString(MANAGEMENT_URI_PROPERTY_NAME); - Preconditions.checkState(!Strings.isNullOrEmpty(managementUriAsString), "You need to specify the management URI of RabbitMQ"); - URI managementUri = checkURI(managementUriAsString); - - return builder() - .amqpUri(amqpUri) - .managementUri(managementUri) - .build(); - } - - private static URI checkURI(String uri) { - try { - return URI.create(uri); - } catch (Exception e) { - throw new IllegalStateException("You need to specify a valid URI", e); - } - } - - private final URI uri; - private final URI managementUri; - - private RabbitMQConfiguration(URI uri, URI managementUri) { - this.uri = uri; - this.managementUri = managementUri; - } - - public URI getUri() { - return uri; - } - - public URI getManagementUri() { - return managementUri; - } - - @Override - public final boolean equals(Object o) { - if (o instanceof RabbitMQConfiguration) { - RabbitMQConfiguration that = (RabbitMQConfiguration) o; - - return Objects.equals(this.uri, that.uri) - && Objects.equals(this.managementUri, that.managementUri); - } - return false; - } - - @Override - public final int hashCode() { - return Objects.hash(uri, managementUri); - } -} http://git-wip-us.apache.org/repos/asf/james-project/blob/74114e93/backends-common/rabbitmq/src/main/java/org/apache/james/backend/mailqueue/RabbitMQHealthCheck.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/mailqueue/RabbitMQHealthCheck.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/mailqueue/RabbitMQHealthCheck.java deleted file mode 100644 index 2287cb6..0000000 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/mailqueue/RabbitMQHealthCheck.java +++ /dev/null @@ -1,67 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.backend.mailqueue; - -import java.net.URISyntaxException; -import java.security.KeyManagementException; -import java.security.NoSuchAlgorithmException; - -import javax.inject.Inject; - -import org.apache.james.core.healthcheck.ComponentName; -import org.apache.james.core.healthcheck.HealthCheck; -import org.apache.james.core.healthcheck.Result; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; - -public class RabbitMQHealthCheck implements HealthCheck { - private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQHealthCheck.class); - private static final ComponentName COMPONENT_NAME = new ComponentName("RabbitMQ backend"); - - private final ConnectionFactory connectionFactory; - - @Inject - public RabbitMQHealthCheck(RabbitMQConfiguration configuration) throws NoSuchAlgorithmException, KeyManagementException, URISyntaxException { - this.connectionFactory = new ConnectionFactory(); - this.connectionFactory.setUri(configuration.getUri()); - } - - @Override - public ComponentName componentName() { - return COMPONENT_NAME; - } - - @Override - public Result check() { - try (Connection connection = connectionFactory.newConnection()) { - if (connection.isOpen()) { - return Result.healthy(COMPONENT_NAME); - } - LOGGER.error("The created connection was not opened"); - return Result.unhealthy(COMPONENT_NAME); - } catch (Exception e) { - LOGGER.error("Unhealthy RabbitMQ instances: could not establish a connection", e); - return Result.unhealthy(COMPONENT_NAME); - } - } -} http://git-wip-us.apache.org/repos/asf/james-project/blob/74114e93/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConfiguration.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConfiguration.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConfiguration.java new file mode 100644 index 0000000..4093431 --- /dev/null +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConfiguration.java @@ -0,0 +1,117 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.backend.rabbitmq; + +import java.net.URI; +import java.util.Objects; + +import org.apache.commons.configuration.PropertiesConfiguration; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; + +public class RabbitMQConfiguration { + @FunctionalInterface + public interface RequireAmqpUri { + RequireManagementUri amqpUri(URI amqpUri); + } + + @FunctionalInterface + public interface RequireManagementUri { + Builder managementUri(URI managementUri); + } + + public static class Builder { + private final URI amqpUri; + private final URI managementUri; + + private Builder(URI amqpUri, URI managementUri) { + this.amqpUri = amqpUri; + this.managementUri = managementUri; + } + + public RabbitMQConfiguration build() { + Preconditions.checkNotNull(amqpUri, "'amqpUri' should not be null"); + Preconditions.checkNotNull(managementUri, "'managementUri' should not be null"); + return new RabbitMQConfiguration(amqpUri, managementUri); + } + } + + private static final String URI_PROPERTY_NAME = "uri"; + private static final String MANAGEMENT_URI_PROPERTY_NAME = "management.uri"; + + public static RequireAmqpUri builder() { + return amqpUri -> managementUri -> new Builder(amqpUri, managementUri); + } + + public static RabbitMQConfiguration from(PropertiesConfiguration configuration) { + String uriAsString = configuration.getString(URI_PROPERTY_NAME); + Preconditions.checkState(!Strings.isNullOrEmpty(uriAsString), "You need to specify the URI of RabbitMQ"); + URI amqpUri = checkURI(uriAsString); + + String managementUriAsString = configuration.getString(MANAGEMENT_URI_PROPERTY_NAME); + Preconditions.checkState(!Strings.isNullOrEmpty(managementUriAsString), "You need to specify the management URI of RabbitMQ"); + URI managementUri = checkURI(managementUriAsString); + + return builder() + .amqpUri(amqpUri) + .managementUri(managementUri) + .build(); + } + + private static URI checkURI(String uri) { + try { + return URI.create(uri); + } catch (Exception e) { + throw new IllegalStateException("You need to specify a valid URI", e); + } + } + + private final URI uri; + private final URI managementUri; + + private RabbitMQConfiguration(URI uri, URI managementUri) { + this.uri = uri; + this.managementUri = managementUri; + } + + public URI getUri() { + return uri; + } + + public URI getManagementUri() { + return managementUri; + } + + @Override + public final boolean equals(Object o) { + if (o instanceof RabbitMQConfiguration) { + RabbitMQConfiguration that = (RabbitMQConfiguration) o; + + return Objects.equals(this.uri, that.uri) + && Objects.equals(this.managementUri, that.managementUri); + } + return false; + } + + @Override + public final int hashCode() { + return Objects.hash(uri, managementUri); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/74114e93/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheck.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheck.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheck.java new file mode 100644 index 0000000..b850e26 --- /dev/null +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheck.java @@ -0,0 +1,67 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.backend.rabbitmq; + +import java.net.URISyntaxException; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; + +import javax.inject.Inject; + +import org.apache.james.core.healthcheck.ComponentName; +import org.apache.james.core.healthcheck.HealthCheck; +import org.apache.james.core.healthcheck.Result; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +public class RabbitMQHealthCheck implements HealthCheck { + private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQHealthCheck.class); + private static final ComponentName COMPONENT_NAME = new ComponentName("RabbitMQ backend"); + + private final ConnectionFactory connectionFactory; + + @Inject + public RabbitMQHealthCheck(RabbitMQConfiguration configuration) throws NoSuchAlgorithmException, KeyManagementException, URISyntaxException { + this.connectionFactory = new ConnectionFactory(); + this.connectionFactory.setUri(configuration.getUri()); + } + + @Override + public ComponentName componentName() { + return COMPONENT_NAME; + } + + @Override + public Result check() { + try (Connection connection = connectionFactory.newConnection()) { + if (connection.isOpen()) { + return Result.healthy(COMPONENT_NAME); + } + LOGGER.error("The created connection was not opened"); + return Result.unhealthy(COMPONENT_NAME); + } catch (Exception e) { + LOGGER.error("Unhealthy RabbitMQ instances: could not establish a connection", e); + return Result.unhealthy(COMPONENT_NAME); + } + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/74114e93/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerClusterRabbitMQExtension.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerClusterRabbitMQExtension.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerClusterRabbitMQExtension.java deleted file mode 100644 index dd382d0..0000000 --- a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerClusterRabbitMQExtension.java +++ /dev/null @@ -1,130 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ -package org.apache.james.backend.mailqueue; - -import java.nio.charset.StandardCharsets; - -import org.apache.james.util.Runnables; -import org.junit.jupiter.api.extension.AfterEachCallback; -import org.junit.jupiter.api.extension.BeforeEachCallback; -import org.junit.jupiter.api.extension.ExtensionContext; -import org.junit.jupiter.api.extension.ParameterContext; -import org.junit.jupiter.api.extension.ParameterResolutionException; -import org.junit.jupiter.api.extension.ParameterResolver; -import org.testcontainers.containers.Network; - -import com.github.fge.lambdas.Throwing; -import com.google.common.collect.ImmutableList; -import com.google.common.hash.Hashing; -import com.rabbitmq.client.Address; - -public class DockerClusterRabbitMQExtension implements BeforeEachCallback, AfterEachCallback, ParameterResolver { - - public static final String RABBIT_1 = "rabbit1"; - public static final String RABBIT_2 = "rabbit2"; - public static final String RABBIT_3 = "rabbit3"; - private DockerRabbitMQCluster cluster; - private Network network; - - @Override - public void beforeEach(ExtensionContext context) { - String cookie = Hashing.sha256().hashString("secret cookie here", StandardCharsets.UTF_8).toString(); - - network = Network.NetworkImpl.builder() - .enableIpv6(false) - .createNetworkCmdModifiers(ImmutableList.of()) - .build(); - - DockerRabbitMQ rabbitMQ1 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_1, cookie, "rabbit@rabbit1", network); - DockerRabbitMQ rabbitMQ2 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_2, cookie, "rabbit@rabbit2", network); - DockerRabbitMQ rabbitMQ3 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_3, cookie, "rabbit@rabbit3", network); - - Runnables.runParallel( - rabbitMQ1::start, - rabbitMQ2::start, - rabbitMQ3::start); - - Runnables.runParallel( - Throwing.runnable(() -> rabbitMQ2.join(rabbitMQ1)), - Throwing.runnable(() -> rabbitMQ3.join(rabbitMQ1))); - - - - Runnables.runParallel( - Throwing.runnable(rabbitMQ2::startApp), - Throwing.runnable(rabbitMQ3::startApp)); - - cluster = new DockerRabbitMQCluster(rabbitMQ1, rabbitMQ2, rabbitMQ3); - } - - @Override - public void afterEach(ExtensionContext context) throws Exception { - cluster.stop(); - network.close(); - } - - @Override - public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { - return (parameterContext.getParameter().getType() == DockerRabbitMQCluster.class); - } - - @Override - public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { - return cluster; - } - - public static class DockerRabbitMQCluster { - - private final DockerRabbitMQ rabbitMQ1; - private final DockerRabbitMQ rabbitMQ2; - private final DockerRabbitMQ rabbitMQ3; - - public DockerRabbitMQCluster(DockerRabbitMQ rabbitMQ1, DockerRabbitMQ rabbitMQ2, DockerRabbitMQ rabbitMQ3) { - this.rabbitMQ1 = rabbitMQ1; - this.rabbitMQ2 = rabbitMQ2; - this.rabbitMQ3 = rabbitMQ3; - } - - public void stop() { - Runnables.runParallel( - Throwing.runnable(rabbitMQ1::stop).orDoNothing(), - Throwing.runnable(rabbitMQ2::stop).orDoNothing(), - Throwing.runnable(rabbitMQ3::stop).orDoNothing()); - } - - public DockerRabbitMQ getRabbitMQ1() { - return rabbitMQ1; - } - - public DockerRabbitMQ getRabbitMQ2() { - return rabbitMQ2; - } - - public DockerRabbitMQ getRabbitMQ3() { - return rabbitMQ3; - } - - public ImmutableList<Address> getAddresses() { - return ImmutableList.of( - new Address(rabbitMQ1.getHostIp(), rabbitMQ1.getPort()), - new Address(rabbitMQ2.getHostIp(), rabbitMQ2.getPort()), - new Address(rabbitMQ3.getHostIp(), rabbitMQ3.getPort())); - } - } -} http://git-wip-us.apache.org/repos/asf/james-project/blob/74114e93/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerRabbitMQ.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerRabbitMQ.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerRabbitMQ.java deleted file mode 100644 index 29f6757..0000000 --- a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerRabbitMQ.java +++ /dev/null @@ -1,167 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ -package org.apache.james.backend.mailqueue; - -import java.util.Optional; -import java.util.UUID; - -import org.apache.james.util.docker.Images; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testcontainers.DockerClientFactory; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.Network; -import org.testcontainers.containers.wait.strategy.Wait; -import org.testcontainers.containers.wait.strategy.WaitAllStrategy; - -import com.google.common.collect.ImmutableMap; -import com.rabbitmq.client.ConnectionFactory; - -public class DockerRabbitMQ { - private static final Logger LOGGER = LoggerFactory.getLogger(DockerRabbitMQ.class); - - private static final String DEFAULT_RABBIT_NODE = "my-rabbit"; - private static final int DEFAULT_RABBITMQ_PORT = 5672; - private static final int DEFAULT_RABBITMQ_ADMIN_PORT = 15672; - private static final String DEFAULT_RABBITMQ_USERNAME = "guest"; - private static final String DEFAULT_RABBITMQ_PASSWORD = "guest"; - private static final String RABBITMQ_ERLANG_COOKIE = "RABBITMQ_ERLANG_COOKIE"; - private static final String RABBITMQ_NODENAME = "RABBITMQ_NODENAME"; - - private final GenericContainer<?> container; - private final Optional<String> nodeName; - - public static DockerRabbitMQ withCookieAndNodeName(String hostName, String erlangCookie, String nodeName, Network network) { - return new DockerRabbitMQ(Optional.ofNullable(hostName), Optional.ofNullable(erlangCookie), Optional.ofNullable(nodeName), - Optional.of(network)); - } - - public static DockerRabbitMQ withoutCookie() { - return new DockerRabbitMQ(Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty()); - } - - @SuppressWarnings("resource") - private DockerRabbitMQ(Optional<String> hostName, Optional<String> erlangCookie, Optional<String> nodeName, Optional<Network> net) { - container = new GenericContainer<>(Images.RABBITMQ) - .withCreateContainerCmdModifier(cmd -> cmd.withName(hostName.orElse(randomName()))) - .withCreateContainerCmdModifier(cmd -> cmd.withHostName(hostName.orElse(DEFAULT_RABBIT_NODE))) - .withExposedPorts(DEFAULT_RABBITMQ_PORT, DEFAULT_RABBITMQ_ADMIN_PORT) - .waitingFor(new WaitAllStrategy() - .withStrategy(Wait.forHttp("").forPort(DEFAULT_RABBITMQ_ADMIN_PORT)) - .withStrategy(RabbitMQWaitStrategy.withDefaultTimeout(this))) - .withLogConsumer(frame -> LOGGER.debug(frame.getUtf8String())) - .withCreateContainerCmdModifier(cmd -> cmd.getHostConfig() - .withTmpFs(ImmutableMap.of("/var/lib/rabbitmq/mnesia", "rw,noexec,nosuid,size=100m"))); - net.ifPresent(container::withNetwork); - erlangCookie.ifPresent(cookie -> container.withEnv(RABBITMQ_ERLANG_COOKIE, cookie)); - nodeName.ifPresent(name -> container.withEnv(RABBITMQ_NODENAME, name)); - this.nodeName = nodeName; - } - - private String randomName() { - return UUID.randomUUID().toString(); - } - - public String getHostIp() { - return container.getContainerIpAddress(); - } - - public Integer getPort() { - return container.getMappedPort(DEFAULT_RABBITMQ_PORT); - } - - public Integer getAdminPort() { - return container.getMappedPort(DEFAULT_RABBITMQ_ADMIN_PORT); - } - - public String getUsername() { - return DEFAULT_RABBITMQ_USERNAME; - } - - public String getPassword() { - return DEFAULT_RABBITMQ_PASSWORD; - } - - public ConnectionFactory connectionFactory() { - ConnectionFactory connectionFactory = new ConnectionFactory(); - connectionFactory.setHost(getHostIp()); - connectionFactory.setPort(getPort()); - connectionFactory.setUsername(getUsername()); - connectionFactory.setPassword(getPassword()); - return connectionFactory; - } - - public void start() { - container.start(); - } - - public void stop() { - container.stop(); - } - - public void restart() { - DockerClientFactory.instance().client() - .restartContainerCmd(container.getContainerId()); - } - - public GenericContainer<?> container() { - return container; - } - - public String node() { - return nodeName.get(); - } - - public void join(DockerRabbitMQ rabbitMQ) throws Exception { - stopApp(); - joinCluster(rabbitMQ); - } - - public void stopApp() throws java.io.IOException, InterruptedException { - String stdout = container() - .execInContainer("rabbitmqctl", "stop_app") - .getStdout(); - LOGGER.debug("stop_app: {}", stdout); - } - - private void joinCluster(DockerRabbitMQ rabbitMQ) throws java.io.IOException, InterruptedException { - String stdout = container() - .execInContainer("rabbitmqctl", "join_cluster", rabbitMQ.node()) - .getStdout(); - LOGGER.debug("join_cluster: {}", stdout); - } - - public void startApp() throws Exception { - String stdout = container() - .execInContainer("rabbitmqctl", "start_app") - .getStdout(); - LOGGER.debug("start_app: {}", stdout); - } - - public void reset() throws Exception { - stopApp(); - - String stdout = container() - .execInContainer("rabbitmqctl", "reset") - .getStdout(); - LOGGER.debug("reset: {}", stdout); - - startApp(); - } -} http://git-wip-us.apache.org/repos/asf/james-project/blob/74114e93/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerRabbitMQExtension.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerRabbitMQExtension.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerRabbitMQExtension.java deleted file mode 100644 index ea11f81..0000000 --- a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerRabbitMQExtension.java +++ /dev/null @@ -1,53 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ -package org.apache.james.backend.mailqueue; - -import org.junit.jupiter.api.extension.AfterEachCallback; -import org.junit.jupiter.api.extension.BeforeEachCallback; -import org.junit.jupiter.api.extension.ExtensionContext; -import org.junit.jupiter.api.extension.ParameterContext; -import org.junit.jupiter.api.extension.ParameterResolutionException; -import org.junit.jupiter.api.extension.ParameterResolver; - -public class DockerRabbitMQExtension implements BeforeEachCallback, AfterEachCallback, ParameterResolver { - - private DockerRabbitMQ rabbitMQ; - - @Override - public void beforeEach(ExtensionContext context) { - rabbitMQ = DockerRabbitMQ.withoutCookie(); - rabbitMQ.start(); - } - - @Override - public void afterEach(ExtensionContext context) { - rabbitMQ.stop(); - } - - @Override - public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { - return (parameterContext.getParameter().getType() == DockerRabbitMQ.class); - } - - @Override - public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { - return rabbitMQ; - } - -} http://git-wip-us.apache.org/repos/asf/james-project/blob/74114e93/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerRabbitMQExtensionTest.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerRabbitMQExtensionTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerRabbitMQExtensionTest.java deleted file mode 100644 index aaabc5b..0000000 --- a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerRabbitMQExtensionTest.java +++ /dev/null @@ -1,50 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ -package org.apache.james.backend.mailqueue; - -import static org.assertj.core.api.Assertions.assertThat; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; - -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; - -@ExtendWith(DockerRabbitMQExtension.class) -public class DockerRabbitMQExtensionTest { - - private ConnectionFactory connectionFactory; - - @BeforeEach - public void setup(DockerRabbitMQ rabbitMQ) { - connectionFactory = new ConnectionFactory(); - connectionFactory.setHost(rabbitMQ.getHostIp()); - connectionFactory.setPort(rabbitMQ.getPort()); - connectionFactory.setUsername(rabbitMQ.getUsername()); - connectionFactory.setPassword(rabbitMQ.getPassword()); - } - - @Test - public void containerShouldBeUp() throws Exception { - try (Connection connection = connectionFactory.newConnection()) { - assertThat(connection.isOpen()).isTrue(); - } - } -} http://git-wip-us.apache.org/repos/asf/james-project/blob/74114e93/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/InMemoryConsumer.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/InMemoryConsumer.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/InMemoryConsumer.java deleted file mode 100644 index e6fe021..0000000 --- a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/InMemoryConsumer.java +++ /dev/null @@ -1,61 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ -package org.apache.james.backend.mailqueue; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; - -import com.rabbitmq.client.AMQP; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.DefaultConsumer; -import com.rabbitmq.client.Envelope; - -public class InMemoryConsumer extends DefaultConsumer { - - @FunctionalInterface - interface Operation { - void perform(); - } - - private final ConcurrentLinkedQueue<Integer> messages; - private final Operation operation; - - public InMemoryConsumer(Channel channel) { - this(channel, () -> { }); - } - - public InMemoryConsumer(Channel channel, Operation operation) { - super(channel); - this.operation = operation; - this.messages = new ConcurrentLinkedQueue<>(); - } - - @Override - public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { - operation.perform(); - Integer payload = Integer.valueOf(new String(body, StandardCharsets.UTF_8)); - messages.add(payload); - } - - public Queue<Integer> getConsumedMessages() { - return messages; - } -} http://git-wip-us.apache.org/repos/asf/james-project/blob/74114e93/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQClusterTest.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQClusterTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQClusterTest.java deleted file mode 100644 index 5467840..0000000 --- a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQClusterTest.java +++ /dev/null @@ -1,294 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ -package org.apache.james.backend.mailqueue; - -import static org.apache.james.backend.mailqueue.RabbitMQFixture.AUTO_DELETE; -import static org.apache.james.backend.mailqueue.RabbitMQFixture.DIRECT; -import static org.apache.james.backend.mailqueue.RabbitMQFixture.DURABLE; -import static org.apache.james.backend.mailqueue.RabbitMQFixture.EXCHANGE_NAME; -import static org.apache.james.backend.mailqueue.RabbitMQFixture.EXCLUSIVE; -import static org.apache.james.backend.mailqueue.RabbitMQFixture.NO_PROPERTIES; -import static org.apache.james.backend.mailqueue.RabbitMQFixture.ROUTING_KEY; -import static org.apache.james.backend.mailqueue.RabbitMQFixture.awaitAtMostOneMinute; -import static org.assertj.core.api.Assertions.assertThat; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.IntStream; - -import org.apache.james.backend.mailqueue.DockerClusterRabbitMQExtension.DockerRabbitMQCluster; -import org.awaitility.Awaitility; -import org.awaitility.Duration; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Nested; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.github.fge.lambdas.Throwing; -import com.github.steveash.guavate.Guavate; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.rabbitmq.client.Address; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; - -@ExtendWith(DockerClusterRabbitMQExtension.class) -class RabbitMQClusterTest { - - private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQClusterTest.class); - - private static final String QUEUE = "queue"; - - @Nested - class ClusterSharing { - - private ConnectionFactory node1ConnectionFactory; - private ConnectionFactory node2ConnectionFactory; - private Connection node1Connection; - private Connection node2Connection; - private Channel node1Channel; - private Channel node2Channel; - - @BeforeEach - void setup(DockerRabbitMQCluster cluster) throws IOException, TimeoutException { - node1ConnectionFactory = cluster.getRabbitMQ1().connectionFactory(); - node2ConnectionFactory = cluster.getRabbitMQ2().connectionFactory(); - node1Connection = node1ConnectionFactory.newConnection(); - node2Connection = node2ConnectionFactory.newConnection(); - node1Channel = node1Connection.createChannel(); - node2Channel = node2Connection.createChannel(); - } - - @AfterEach - void tearDown() { - closeQuietly(node1Channel, node2Channel, node1Connection, node2Connection); - } - - @Test - void rabbitMQManagerShouldReturnThreeNodesWhenAskingForStatus(DockerRabbitMQCluster cluster) throws Exception { - String stdout = cluster.getRabbitMQ1().container() - .execInContainer("rabbitmqctl", "cluster_status") - .getStdout(); - - assertThat(stdout) - .contains( - DockerClusterRabbitMQExtension.RABBIT_1, - DockerClusterRabbitMQExtension.RABBIT_2, - DockerClusterRabbitMQExtension.RABBIT_3); - } - - @Test - void queuesShouldBeShared() throws Exception { - node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); - node1Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue(); - node1Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY); - - int nbMessages = 10; - IntStream.range(0, nbMessages) - .mapToObj(i -> asBytes(String.valueOf(i))) - .forEach(Throwing.consumer( - bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); - - - InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel); - node2Channel.basicConsume(QUEUE, consumer2); - - awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == nbMessages); - - List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList()); - assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult); - } - - @Test - void queuesShouldBeDeclarableOnAnotherNode() throws Exception { - node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); - node2Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue(); - node2Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY); - - int nbMessages = 10; - IntStream.range(0, nbMessages) - .mapToObj(i -> asBytes(String.valueOf(i))) - .forEach(Throwing.consumer( - bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); - - InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel); - node2Channel.basicConsume(QUEUE, consumer2); - - awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == nbMessages); - - List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList()); - assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult); - } - - } - - @Nested - class ClusterNodesFailure { - - private ConnectionFactory node1ConnectionFactory; - private Connection resilientConnection; - private Channel resilientChannel; - private Connection node2Connection; - private Channel node2Channel; - - @BeforeEach - void setup(DockerRabbitMQCluster cluster) throws IOException, TimeoutException { - node1ConnectionFactory = cluster.getRabbitMQ1().connectionFactory(); - resilientConnection = node1ConnectionFactory.newConnection(cluster.getAddresses()); - resilientChannel = resilientConnection.createChannel(); - ConnectionFactory node2ConnectionFactory = cluster.getRabbitMQ2().connectionFactory(); - node2Connection = node2ConnectionFactory.newConnection(); - node2Channel = node2Connection.createChannel(); - } - - @AfterEach - void tearDown() { - closeQuietly(resilientConnection, resilientChannel); - } - - @Disabled("JAMES-2334 For some reason, we are unable to recover topology when reconnecting" + - "See https://github.com/rabbitmq/rabbitmq-server/issues/959") - @Test - void nodeKillingWhenProducing(DockerRabbitMQCluster cluster) throws Exception { - resilientChannel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); - resilientChannel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue(); - resilientChannel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY); - - int nbMessages = 20; - int firstBatchSize = nbMessages / 2; - IntStream.range(0, firstBatchSize) - .mapToObj(i -> asBytes(String.valueOf(i))) - .forEach(Throwing.consumer( - bytes -> resilientChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); - - InMemoryConsumer consumer = new InMemoryConsumer(node2Channel); - node2Channel.basicConsume(QUEUE, consumer); - awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == firstBatchSize); - - cluster.getRabbitMQ1().stop(); - - IntStream.range(firstBatchSize, nbMessages) - .mapToObj(i -> asBytes(String.valueOf(i))) - .forEach(this::tryPublishWithRetry); - - awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == nbMessages); - - List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList()); - assertThat(consumer.getConsumedMessages()).containsOnlyElementsOf(expectedResult); - } - - private void tryPublishWithRetry(byte[] bytes) { - Awaitility.waitAtMost(Duration.ONE_MINUTE).pollInterval(Duration.ONE_SECOND).until(() -> tryPublish(bytes)); - } - - private boolean tryPublish(byte[] bytes) { - try { - resilientChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes); - return true; - } catch (Exception e) { - LOGGER.error("failed publish", e); - return false; - } - } - - @Test - void connectingToAClusterWithAFailedRabbit(DockerRabbitMQCluster cluster) throws Exception { - ConnectionFactory node3ConnectionFactory = cluster.getRabbitMQ3().connectionFactory(); - ImmutableList<Address> clusterAddresses = cluster.getAddresses(); - cluster.getRabbitMQ3().stop(); - - try (Connection connection = node3ConnectionFactory.newConnection(clusterAddresses); - Channel channel = connection.createChannel()) { - - channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); - channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue(); - channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY); - - int nbMessages = 10; - IntStream.range(0, nbMessages) - .mapToObj(i -> asBytes(String.valueOf(i))) - .forEach(Throwing.consumer( - bytes -> channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); - - InMemoryConsumer consumer = new InMemoryConsumer(channel); - channel.basicConsume(QUEUE, consumer); - - awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == nbMessages); - - List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList()); - assertThat(consumer.getConsumedMessages()).containsOnlyElementsOf(expectedResult); - } - } - - @Test - void nodeKillingWhenConsuming(DockerRabbitMQCluster cluster) throws Exception { - node2Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); - node2Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue(); - node2Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY); - - int nbMessages = 10; - IntStream.range(0, nbMessages) - .mapToObj(i -> asBytes(String.valueOf(i))) - .forEach(Throwing.consumer( - bytes -> resilientChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); - - AtomicInteger counter = new AtomicInteger(0); - InMemoryConsumer consumer = new InMemoryConsumer(resilientChannel, - () -> stopWhenHalfProcessed(cluster, nbMessages, counter)); - resilientChannel.basicConsume(QUEUE, consumer); - - awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == nbMessages); - - List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList()); - assertThat(consumer.getConsumedMessages()).containsOnlyElementsOf(expectedResult); - } - - private void stopWhenHalfProcessed(DockerRabbitMQCluster cluster, int nbMessages, AtomicInteger counter) { - if (counter.incrementAndGet() == nbMessages / 2) { - cluster.getRabbitMQ1().stop(); - } - } - - } - - private void closeQuietly(AutoCloseable... closeables) { - Arrays.stream(closeables).forEach(this::closeQuietly); - } - - private void closeQuietly(AutoCloseable closeable) { - try { - closeable.close(); - } catch (Exception e) { - //ignore error - } - } - - private byte[] asBytes(String message) { - return message.getBytes(StandardCharsets.UTF_8); - } -} http://git-wip-us.apache.org/repos/asf/james-project/blob/74114e93/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQConfigurationTest.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQConfigurationTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQConfigurationTest.java deleted file mode 100644 index 4da2a91..0000000 --- a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQConfigurationTest.java +++ /dev/null @@ -1,134 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ -package org.apache.james.backend.mailqueue; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; - -import java.net.URI; - -import org.apache.commons.configuration.PropertiesConfiguration; -import org.junit.jupiter.api.Test; - -import nl.jqno.equalsverifier.EqualsVerifier; - -class RabbitMQConfigurationTest { - - @Test - void shouldRespectBeanContract() { - EqualsVerifier.forClass(RabbitMQConfiguration.class).verify(); - } - - @Test - void fromShouldThrowWhenURIIsNotInTheConfiguration() { - PropertiesConfiguration configuration = new PropertiesConfiguration(); - - assertThatThrownBy(() -> RabbitMQConfiguration.from(configuration)) - .isInstanceOf(IllegalStateException.class) - .hasMessage("You need to specify the URI of RabbitMQ"); - } - - @Test - void fromShouldThrowWhenURIIsNull() { - PropertiesConfiguration configuration = new PropertiesConfiguration(); - configuration.addProperty("uri", null); - - assertThatThrownBy(() -> RabbitMQConfiguration.from(configuration)) - .isInstanceOf(IllegalStateException.class) - .hasMessage("You need to specify the URI of RabbitMQ"); - } - - @Test - void fromShouldThrowWhenURIIsEmpty() { - PropertiesConfiguration configuration = new PropertiesConfiguration(); - configuration.addProperty("uri", ""); - - assertThatThrownBy(() -> RabbitMQConfiguration.from(configuration)) - .isInstanceOf(IllegalStateException.class) - .hasMessage("You need to specify the URI of RabbitMQ"); - } - - @Test - void fromShouldThrowWhenURIIsInvalid() { - PropertiesConfiguration configuration = new PropertiesConfiguration(); - configuration.addProperty("uri", ":invalid"); - - assertThatThrownBy(() -> RabbitMQConfiguration.from(configuration)) - .isInstanceOf(IllegalStateException.class) - .hasMessage("You need to specify a valid URI"); - } - - @Test - void fromShouldThrowWhenManagementURIIsNotInTheConfiguration() { - PropertiesConfiguration configuration = new PropertiesConfiguration(); - configuration.addProperty("uri", "amqp://james:james@rabbitmq_host:5672"); - - assertThatThrownBy(() -> RabbitMQConfiguration.from(configuration)) - .isInstanceOf(IllegalStateException.class) - .hasMessage("You need to specify the management URI of RabbitMQ"); - } - - @Test - void fromShouldThrowWhenManagementURIIsNull() { - PropertiesConfiguration configuration = new PropertiesConfiguration(); - configuration.addProperty("uri", "amqp://james:james@rabbitmq_host:5672"); - configuration.addProperty("management.uri", null); - - assertThatThrownBy(() -> RabbitMQConfiguration.from(configuration)) - .isInstanceOf(IllegalStateException.class) - .hasMessage("You need to specify the management URI of RabbitMQ"); - } - - @Test - void fromShouldThrowWhenManagementURIIsEmpty() { - PropertiesConfiguration configuration = new PropertiesConfiguration(); - configuration.addProperty("uri", "amqp://james:james@rabbitmq_host:5672"); - configuration.addProperty("management.uri", ""); - - assertThatThrownBy(() -> RabbitMQConfiguration.from(configuration)) - .isInstanceOf(IllegalStateException.class) - .hasMessage("You need to specify the management URI of RabbitMQ"); - } - - @Test - void fromShouldThrowWhenManagementURIIsInvalid() { - PropertiesConfiguration configuration = new PropertiesConfiguration(); - configuration.addProperty("uri", "amqp://james:james@rabbitmq_host:5672"); - configuration.addProperty("management.uri", ":invalid"); - - assertThatThrownBy(() -> RabbitMQConfiguration.from(configuration)) - .isInstanceOf(IllegalStateException.class) - .hasMessage("You need to specify a valid URI"); - } - - @Test - void fromShouldReturnTheConfigurationWhenRequiredParametersAreGiven() { - PropertiesConfiguration configuration = new PropertiesConfiguration(); - String amqpUri = "amqp://james:james@rabbitmq_host:5672"; - configuration.addProperty("uri", amqpUri); - String managementUri = "http://james:james@rabbitmq_host:15672/api/"; - configuration.addProperty("management.uri", managementUri); - - assertThat(RabbitMQConfiguration.from(configuration)) - .isEqualTo(RabbitMQConfiguration.builder() - .amqpUri(URI.create(amqpUri)) - .managementUri(URI.create(managementUri)) - .build()); - } -} http://git-wip-us.apache.org/repos/asf/james-project/blob/74114e93/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQFixture.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQFixture.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQFixture.java deleted file mode 100644 index 8b83a96..0000000 --- a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQFixture.java +++ /dev/null @@ -1,50 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.backend.mailqueue; - -import static org.awaitility.Duration.ONE_HUNDRED_MILLISECONDS; -import static org.awaitility.Duration.ONE_MINUTE; - -import org.awaitility.Awaitility; -import org.awaitility.Duration; -import org.awaitility.core.ConditionFactory; - -import com.rabbitmq.client.AMQP; - -public class RabbitMQFixture { - public static final boolean DURABLE = true; - public static final boolean AUTO_ACK = true; - public static final AMQP.BasicProperties NO_PROPERTIES = null; - public static final String EXCHANGE_NAME = "exchangeName"; - public static final String ROUTING_KEY = "routingKey"; - public static final String DIRECT = "direct"; - public static final boolean EXCLUSIVE = true; - public static final boolean AUTO_DELETE = true; - public static final String WORK_QUEUE = "workQueue"; - - public static Duration slowPacedPollInterval = ONE_HUNDRED_MILLISECONDS; - public static ConditionFactory calmlyAwait = Awaitility.with() - .pollInterval(slowPacedPollInterval) - .and() - .with() - .pollDelay(slowPacedPollInterval) - .await(); - public static ConditionFactory awaitAtMostOneMinute = calmlyAwait.atMost(ONE_MINUTE); -} http://git-wip-us.apache.org/repos/asf/james-project/blob/74114e93/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQHealthCheckTest.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQHealthCheckTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQHealthCheckTest.java deleted file mode 100644 index 1352590..0000000 --- a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQHealthCheckTest.java +++ /dev/null @@ -1,83 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.backend.mailqueue; - -import static org.assertj.core.api.Assertions.assertThat; - -import java.net.URI; - -import org.apache.james.core.healthcheck.Result; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; - -@ExtendWith(DockerRabbitMQExtension.class) -class RabbitMQHealthCheckTest { - private RabbitMQHealthCheck healthCheck; - - @BeforeEach - void setUp(DockerRabbitMQ rabbitMQ) throws Exception { - URI amqpUri = URI.create("amqp://" + rabbitMQ.getHostIp() + ":" + rabbitMQ.getPort()); - URI managementUri = URI.create("http://" + rabbitMQ.getHostIp() + ":" + rabbitMQ.getAdminPort()); - - healthCheck = new RabbitMQHealthCheck( - RabbitMQConfiguration.builder() - .amqpUri(amqpUri) - .managementUri(managementUri) - .build()); - } - - @Test - void checkShouldReturnHealthyWhenRabbitMQIsRunning() { - Result check = healthCheck.check(); - - assertThat(check.isHealthy()).isTrue(); - } - - @Test - void checkShouldReturnUnhealthyWhenRabbitMQIsNotRunning(DockerRabbitMQ rabbitMQ) throws Exception { - rabbitMQ.stopApp(); - - Result check = healthCheck.check(); - - assertThat(check.isHealthy()).isFalse(); - } - - @Test - void checkShouldDetectWhenRabbitMQRecovered(DockerRabbitMQ rabbitMQ) throws Exception { - rabbitMQ.stopApp(); - healthCheck.check(); - - rabbitMQ.startApp(); - - Result check = healthCheck.check(); - assertThat(check.isHealthy()).isTrue(); - } - - @Test - void checkShouldDetectWhenRabbitMQFail(DockerRabbitMQ rabbitMQ) throws Exception { - healthCheck.check(); - - rabbitMQ.stopApp(); - - Result check = healthCheck.check(); - assertThat(check.isHealthy()).isFalse(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/james-project/blob/74114e93/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQTest.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQTest.java deleted file mode 100644 index f54ba25..0000000 --- a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQTest.java +++ /dev/null @@ -1,330 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ -package org.apache.james.backend.mailqueue; - -import static org.apache.james.backend.mailqueue.RabbitMQFixture.AUTO_ACK; -import static org.apache.james.backend.mailqueue.RabbitMQFixture.AUTO_DELETE; -import static org.apache.james.backend.mailqueue.RabbitMQFixture.DIRECT; -import static org.apache.james.backend.mailqueue.RabbitMQFixture.DURABLE; -import static org.apache.james.backend.mailqueue.RabbitMQFixture.EXCHANGE_NAME; -import static org.apache.james.backend.mailqueue.RabbitMQFixture.EXCLUSIVE; -import static org.apache.james.backend.mailqueue.RabbitMQFixture.NO_PROPERTIES; -import static org.apache.james.backend.mailqueue.RabbitMQFixture.ROUTING_KEY; -import static org.apache.james.backend.mailqueue.RabbitMQFixture.WORK_QUEUE; -import static org.apache.james.backend.mailqueue.RabbitMQFixture.awaitAtMostOneMinute; -import static org.assertj.core.api.Assertions.assertThat; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.Arrays; -import java.util.Queue; -import java.util.concurrent.TimeoutException; -import java.util.stream.IntStream; - -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Nested; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; - -import com.github.fge.lambdas.Throwing; -import com.github.steveash.guavate.Guavate; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; - -@ExtendWith(DockerRabbitMQExtension.class) -class RabbitMQTest { - - @Nested - class SingleConsumerTest { - - private ConnectionFactory connectionFactory; - private Connection connection; - private Channel channel; - - @BeforeEach - void setup(DockerRabbitMQ rabbitMQ) throws IOException, TimeoutException { - connectionFactory = rabbitMQ.connectionFactory(); - connection = connectionFactory.newConnection(); - channel = connection.createChannel(); - } - - @AfterEach - void tearDown() { - closeQuietly(connection, channel); - } - - @Test - void publishedEventWithoutSubscriberShouldNotBeLost() throws Exception { - String queueName = createQueue(channel); - publishAMessage(channel); - awaitAtMostOneMinute.until(() -> messageReceived(channel, queueName)); - } - - @Test - void demonstrateDurability(DockerRabbitMQ rabbitMQ) throws Exception { - String queueName = createQueue(channel); - publishAMessage(channel); - - rabbitMQ.restart(); - - awaitAtMostOneMinute.until(() -> containerIsRestarted(rabbitMQ)); - assertThat(channel.basicGet(queueName, !AUTO_ACK)).isNotNull(); - } - - private Boolean containerIsRestarted(DockerRabbitMQ rabbitMQ) { - try { - rabbitMQ.connectionFactory().newConnection(); - return true; - } catch (Exception e) { - return false; - } - } - - private String createQueue(Channel channel) throws IOException { - channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); - String queueName = channel.queueDeclare().getQueue(); - channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY); - return queueName; - } - - private void publishAMessage(Channel channel) throws IOException { - channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, asBytes("Hello, world!")); - } - - private Boolean messageReceived(Channel channel, String queueName) { - try { - return channel.basicGet(queueName, !AUTO_ACK) != null; - } catch (Exception e) { - return false; - } - } - } - - @Nested - class FourConnections { - - private ConnectionFactory connectionFactory1; - private ConnectionFactory connectionFactory2; - private ConnectionFactory connectionFactory3; - private ConnectionFactory connectionFactory4; - private Connection connection1; - private Connection connection2; - private Connection connection3; - private Connection connection4; - private Channel channel1; - private Channel channel2; - private Channel channel3; - private Channel channel4; - - @BeforeEach - void setup(DockerRabbitMQ rabbitMQ) throws IOException, TimeoutException { - connectionFactory1 = rabbitMQ.connectionFactory(); - connectionFactory2 = rabbitMQ.connectionFactory(); - connectionFactory3 = rabbitMQ.connectionFactory(); - connectionFactory4 = rabbitMQ.connectionFactory(); - connection1 = connectionFactory1.newConnection(); - connection2 = connectionFactory2.newConnection(); - connection3 = connectionFactory3.newConnection(); - connection4 = connectionFactory4.newConnection(); - channel1 = connection1.createChannel(); - channel2 = connection2.createChannel(); - channel3 = connection3.createChannel(); - channel4 = connection4.createChannel(); - } - - @AfterEach - void tearDown() { - closeQuietly( - channel1, channel2, channel3, channel4, - connection1, connection2, connection3, connection4); - } - - @Nested - class BroadCast { - - // In the following case, each consumer will receive the messages produced by the - // producer - // To do so, each consumer will bind it's queue to the producer exchange. - @Test - void rabbitMQShouldSupportTheBroadcastCase() throws Exception { - // Declare a single exchange and three queues attached to it. - channel1.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); - - String queue2 = channel2.queueDeclare().getQueue(); - channel2.queueBind(queue2, EXCHANGE_NAME, ROUTING_KEY); - String queue3 = channel3.queueDeclare().getQueue(); - channel3.queueBind(queue3, EXCHANGE_NAME, ROUTING_KEY); - String queue4 = channel4.queueDeclare().getQueue(); - channel4.queueBind(queue4, EXCHANGE_NAME, ROUTING_KEY); - - InMemoryConsumer consumer2 = new InMemoryConsumer(channel2); - InMemoryConsumer consumer3 = new InMemoryConsumer(channel3); - InMemoryConsumer consumer4 = new InMemoryConsumer(channel4); - channel2.basicConsume(queue2, consumer2); - channel3.basicConsume(queue3, consumer3); - channel4.basicConsume(queue4, consumer4); - - // the publisher will produce 10 messages - IntStream.range(0, 10) - .mapToObj(String::valueOf) - .map(RabbitMQTest.this::asBytes) - .forEach(Throwing.consumer( - bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); - - awaitAtMostOneMinute.until( - () -> countReceivedMessages(consumer2, consumer3, consumer4) == 30); - - ImmutableList<Integer> expectedResult = IntStream.range(0, 10).boxed().collect(Guavate.toImmutableList()); - // Check every subscriber have receive all the messages. - assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult); - assertThat(consumer3.getConsumedMessages()).containsOnlyElementsOf(expectedResult); - assertThat(consumer4.getConsumedMessages()).containsOnlyElementsOf(expectedResult); - } - } - - @Nested - class WorkQueue { - - // In the following case, consumers will receive the messages produced by the - // producer but will share them. - // To do so, we will bind a single queue to the producer exchange. - @Test - void rabbitMQShouldSupportTheWorkQueueCase() throws Exception { - int nbMessages = 100; - - // Declare the exchange and a single queue attached to it. - channel1.exchangeDeclare(EXCHANGE_NAME, "direct", DURABLE); - channel1.queueDeclare(WORK_QUEUE, DURABLE, !EXCLUSIVE, AUTO_DELETE, ImmutableMap.of()); - channel1.queueBind(WORK_QUEUE, EXCHANGE_NAME, ROUTING_KEY); - - // Publisher will produce 100 messages - IntStream.range(0, nbMessages) - .mapToObj(String::valueOf) - .map(RabbitMQTest.this::asBytes) - .forEach(Throwing.consumer( - bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); - - InMemoryConsumer consumer2 = new InMemoryConsumer(channel2); - InMemoryConsumer consumer3 = new InMemoryConsumer(channel3); - InMemoryConsumer consumer4 = new InMemoryConsumer(channel4); - channel2.basicConsume(WORK_QUEUE, consumer2); - channel3.basicConsume(WORK_QUEUE, consumer3); - channel4.basicConsume(WORK_QUEUE, consumer4); - - awaitAtMostOneMinute.until( - () -> countReceivedMessages(consumer2, consumer3, consumer4) == nbMessages); - - ImmutableList<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList()); - - assertThat( - Iterables.concat( - consumer2.getConsumedMessages(), - consumer3.getConsumedMessages(), - consumer4.getConsumedMessages())) - .containsOnlyElementsOf(expectedResult); - } - - } - - @Nested - class Routing { - @Test - void rabbitMQShouldSupportRouting() throws Exception { - String conversation1 = "c1"; - String conversation2 = "c2"; - String conversation3 = "c3"; - String conversation4 = "c4"; - - // Declare the exchange and a single queue attached to it. - channel1.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); - - String queue1 = channel1.queueDeclare().getQueue(); - // 1 will follow conversation 1 and 2 - channel1.queueBind(queue1, EXCHANGE_NAME, conversation1); - channel1.queueBind(queue1, EXCHANGE_NAME, conversation2); - - String queue2 = channel2.queueDeclare().getQueue(); - // 2 will follow conversation 2 and 3 - channel2.queueBind(queue2, EXCHANGE_NAME, conversation2); - channel2.queueBind(queue2, EXCHANGE_NAME, conversation3); - - String queue3 = channel3.queueDeclare().getQueue(); - // 3 will follow conversation 3 and 4 - channel3.queueBind(queue3, EXCHANGE_NAME, conversation3); - channel3.queueBind(queue3, EXCHANGE_NAME, conversation4); - - String queue4 = channel4.queueDeclare().getQueue(); - // 4 will follow conversation 1 and 4 - channel4.queueBind(queue4, EXCHANGE_NAME, conversation1); - channel4.queueBind(queue4, EXCHANGE_NAME, conversation4); - - channel1.basicPublish(EXCHANGE_NAME, conversation1, NO_PROPERTIES, asBytes("1")); - channel2.basicPublish(EXCHANGE_NAME, conversation2, NO_PROPERTIES, asBytes("2")); - channel3.basicPublish(EXCHANGE_NAME, conversation3, NO_PROPERTIES, asBytes("3")); - channel4.basicPublish(EXCHANGE_NAME, conversation4, NO_PROPERTIES, asBytes("4")); - - InMemoryConsumer consumer1 = new InMemoryConsumer(channel1); - InMemoryConsumer consumer2 = new InMemoryConsumer(channel2); - InMemoryConsumer consumer3 = new InMemoryConsumer(channel3); - InMemoryConsumer consumer4 = new InMemoryConsumer(channel4); - channel1.basicConsume(queue1, consumer1); - channel2.basicConsume(queue2, consumer2); - channel3.basicConsume(queue3, consumer3); - channel4.basicConsume(queue4, consumer4); - - awaitAtMostOneMinute.until(() -> countReceivedMessages(consumer1, consumer2, consumer3, consumer4) == 8); - - assertThat(consumer1.getConsumedMessages()).containsOnly(1, 2); - assertThat(consumer2.getConsumedMessages()).containsOnly(2, 3); - assertThat(consumer3.getConsumedMessages()).containsOnly(3, 4); - assertThat(consumer4.getConsumedMessages()).containsOnly(1, 4); - } - } - - private long countReceivedMessages(InMemoryConsumer... consumers) { - return Arrays.stream(consumers) - .map(InMemoryConsumer::getConsumedMessages) - .mapToLong(Queue::size) - .sum(); - } - - } - - private void closeQuietly(AutoCloseable... closeables) { - Arrays.stream(closeables).forEach(this::closeQuietly); - } - - private void closeQuietly(AutoCloseable closeable) { - try { - closeable.close(); - } catch (Exception e) { - //ignore error - } - } - - private byte[] asBytes(String message) { - return message.getBytes(StandardCharsets.UTF_8); - } - -} --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org