MAILBOX-374 RabbitMQConnectionFactory should rely on reactor for retries
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/603de62f Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/603de62f Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/603de62f Branch: refs/heads/master Commit: 603de62f3044de9e9d10ac19b0ff6cc8400dd217 Parents: c47b846 Author: Benoit Tellier <[email protected]> Authored: Thu Jan 24 13:39:00 2019 +0700 Committer: Benoit Tellier <[email protected]> Committed: Fri Jan 25 10:31:40 2019 +0700 ---------------------------------------------------------------------- .../rabbitmq/RabbitMQConnectionFactory.java | 27 +++++++++----------- .../james/backend/rabbitmq/DockerRabbitMQ.java | 9 +------ .../rabbitmq/RabbitMQConnectionFactoryTest.java | 22 +++------------- 3 files changed, 17 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/603de62f/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java index 2f901f4..e02fb28 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java @@ -18,27 +18,24 @@ ****************************************************************/ package org.apache.james.backend.rabbitmq; -import java.util.concurrent.ExecutionException; +import java.time.Duration; import javax.inject.Inject; -import org.apache.james.util.retry.RetryExecutorUtil; - -import com.nurkiewicz.asyncretry.AsyncRetryExecutor; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; -public class RabbitMQConnectionFactory { +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; - private final AsyncRetryExecutor executor; +public class RabbitMQConnectionFactory { private final ConnectionFactory connectionFactory; private final int maxRetries; private final int minDelay; @Inject - public RabbitMQConnectionFactory(RabbitMQConfiguration rabbitMQConfiguration, AsyncRetryExecutor executor) { - this.executor = executor; + public RabbitMQConnectionFactory(RabbitMQConfiguration rabbitMQConfiguration) { this.connectionFactory = from(rabbitMQConfiguration); this.maxRetries = rabbitMQConfiguration.getMaxRetries(); this.minDelay = rabbitMQConfiguration.getMinDelay(); @@ -55,12 +52,12 @@ public class RabbitMQConnectionFactory { } public Connection create() { - try { - return RetryExecutorUtil.retryOnExceptions(executor, maxRetries, minDelay, Exception.class) - .getWithRetry(context -> connectionFactory.newConnection()) - .get(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } + return connectionMono().block(); + } + + public Mono<Connection> connectionMono() { + return Mono.fromCallable(connectionFactory::newConnection) + .retryBackoff(maxRetries, Duration.ofMillis(minDelay)) + .publishOn(Schedulers.elastic()); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/603de62f/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/DockerRabbitMQ.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/DockerRabbitMQ.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/DockerRabbitMQ.java index 0d32047..1dc7daf 100644 --- a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/DockerRabbitMQ.java +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/DockerRabbitMQ.java @@ -25,11 +25,8 @@ import java.net.URISyntaxException; import java.time.Duration; import java.util.Optional; import java.util.UUID; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; import org.apache.http.client.utils.URIBuilder; -import org.apache.james.util.concurrent.NamedThreadFactory; import org.apache.james.util.docker.Images; import org.apache.james.util.docker.RateLimiters; import org.slf4j.Logger; @@ -43,7 +40,6 @@ import org.testcontainers.containers.wait.strategy.WaitStrategy; import com.github.fge.lambdas.consumers.ThrowingConsumer; import com.google.common.collect.ImmutableMap; -import com.nurkiewicz.asyncretry.AsyncRetryExecutor; import com.rabbitmq.client.Address; import com.rabbitmq.client.ConnectionFactory; @@ -253,9 +249,6 @@ public class DockerRabbitMQ { .minDelay(MIN_DELAY_OF_ONE_HUNDRED_MILLISECONDS) .build(); - ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass()); - return new RabbitMQConnectionFactory( - rabbitMQConfiguration, - new AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor(threadFactory))); + return new RabbitMQConnectionFactory(rabbitMQConfiguration); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/603de62f/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactoryTest.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactoryTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactoryTest.java index 5a357a7..024db49 100644 --- a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactoryTest.java +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactoryTest.java @@ -22,24 +22,10 @@ import static org.apache.james.backend.rabbitmq.RabbitMQFixture.DEFAULT_MANAGEME import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.net.URI; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import org.apache.james.util.concurrent.NamedThreadFactory; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import com.nurkiewicz.asyncretry.AsyncRetryExecutor; - class RabbitMQConnectionFactoryTest { - - private ScheduledExecutorService scheduledExecutor; - - @BeforeEach - void setUp() throws Exception { - scheduledExecutor = Executors.newSingleThreadScheduledExecutor(NamedThreadFactory.withClassName(getClass())); - } - @Test void creatingAFactoryShouldWorkWhenConfigurationIsValid() { RabbitMQConfiguration rabbitMQConfiguration = RabbitMQConfiguration.builder() @@ -48,7 +34,7 @@ class RabbitMQConnectionFactoryTest { .managementCredentials(DEFAULT_MANAGEMENT_CREDENTIAL) .build(); - new RabbitMQConnectionFactory(rabbitMQConfiguration, new AsyncRetryExecutor(scheduledExecutor)); + new RabbitMQConnectionFactory(rabbitMQConfiguration); } @Test @@ -59,7 +45,7 @@ class RabbitMQConnectionFactoryTest { .managementCredentials(DEFAULT_MANAGEMENT_CREDENTIAL) .build(); - assertThatThrownBy(() -> new RabbitMQConnectionFactory(rabbitMQConfiguration, new AsyncRetryExecutor(scheduledExecutor))) + assertThatThrownBy(() -> new RabbitMQConnectionFactory(rabbitMQConfiguration)) .isInstanceOf(RuntimeException.class); } @@ -73,9 +59,9 @@ class RabbitMQConnectionFactoryTest { .minDelay(1) .build(); - RabbitMQConnectionFactory rabbitMQConnectionFactory = new RabbitMQConnectionFactory(rabbitMQConfiguration, new AsyncRetryExecutor(scheduledExecutor)); + RabbitMQConnectionFactory rabbitMQConnectionFactory = new RabbitMQConnectionFactory(rabbitMQConfiguration); - assertThatThrownBy(() -> rabbitMQConnectionFactory.create()) + assertThatThrownBy(rabbitMQConnectionFactory::create) .isInstanceOf(RuntimeException.class); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
