MAILBOX-374 RabbitMQConnectionFactory should rely on reactor for retries

Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/603de62f
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/603de62f
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/603de62f

Branch: refs/heads/master
Commit: 603de62f3044de9e9d10ac19b0ff6cc8400dd217
Parents: c47b846
Author: Benoit Tellier <[email protected]>
Authored: Thu Jan 24 13:39:00 2019 +0700
Committer: Benoit Tellier <[email protected]>
Committed: Fri Jan 25 10:31:40 2019 +0700

----------------------------------------------------------------------
 .../rabbitmq/RabbitMQConnectionFactory.java     | 27 +++++++++-----------
 .../james/backend/rabbitmq/DockerRabbitMQ.java  |  9 +------
 .../rabbitmq/RabbitMQConnectionFactoryTest.java | 22 +++-------------
 3 files changed, 17 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/603de62f/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java
----------------------------------------------------------------------
diff --git 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java
 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java
index 2f901f4..e02fb28 100644
--- 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java
+++ 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java
@@ -18,27 +18,24 @@
  ****************************************************************/
 package org.apache.james.backend.rabbitmq;
 
-import java.util.concurrent.ExecutionException;
+import java.time.Duration;
 
 import javax.inject.Inject;
 
-import org.apache.james.util.retry.RetryExecutorUtil;
-
-import com.nurkiewicz.asyncretry.AsyncRetryExecutor;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 
-public class RabbitMQConnectionFactory {
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
-    private final AsyncRetryExecutor executor;
+public class RabbitMQConnectionFactory {
     private final ConnectionFactory connectionFactory;
 
     private final int maxRetries;
     private final int minDelay;
 
     @Inject
-    public RabbitMQConnectionFactory(RabbitMQConfiguration 
rabbitMQConfiguration, AsyncRetryExecutor executor) {
-        this.executor = executor;
+    public RabbitMQConnectionFactory(RabbitMQConfiguration 
rabbitMQConfiguration) {
         this.connectionFactory = from(rabbitMQConfiguration);
         this.maxRetries = rabbitMQConfiguration.getMaxRetries();
         this.minDelay = rabbitMQConfiguration.getMinDelay();
@@ -55,12 +52,12 @@ public class RabbitMQConnectionFactory {
     }
 
     public Connection create() {
-        try {
-            return RetryExecutorUtil.retryOnExceptions(executor, maxRetries, 
minDelay, Exception.class)
-                    .getWithRetry(context -> connectionFactory.newConnection())
-                    .get();
-        } catch (InterruptedException | ExecutionException e) {
-            throw new RuntimeException(e);
-        }
+        return connectionMono().block();
+    }
+
+    public Mono<Connection> connectionMono() {
+        return Mono.fromCallable(connectionFactory::newConnection)
+            .retryBackoff(maxRetries, Duration.ofMillis(minDelay))
+            .publishOn(Schedulers.elastic());
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/603de62f/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/DockerRabbitMQ.java
----------------------------------------------------------------------
diff --git 
a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/DockerRabbitMQ.java
 
b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/DockerRabbitMQ.java
index 0d32047..1dc7daf 100644
--- 
a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/DockerRabbitMQ.java
+++ 
b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/DockerRabbitMQ.java
@@ -25,11 +25,8 @@ import java.net.URISyntaxException;
 import java.time.Duration;
 import java.util.Optional;
 import java.util.UUID;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
 
 import org.apache.http.client.utils.URIBuilder;
-import org.apache.james.util.concurrent.NamedThreadFactory;
 import org.apache.james.util.docker.Images;
 import org.apache.james.util.docker.RateLimiters;
 import org.slf4j.Logger;
@@ -43,7 +40,6 @@ import 
org.testcontainers.containers.wait.strategy.WaitStrategy;
 
 import com.github.fge.lambdas.consumers.ThrowingConsumer;
 import com.google.common.collect.ImmutableMap;
-import com.nurkiewicz.asyncretry.AsyncRetryExecutor;
 import com.rabbitmq.client.Address;
 import com.rabbitmq.client.ConnectionFactory;
 
@@ -253,9 +249,6 @@ public class DockerRabbitMQ {
             .minDelay(MIN_DELAY_OF_ONE_HUNDRED_MILLISECONDS)
             .build();
 
-        ThreadFactory threadFactory = 
NamedThreadFactory.withClassName(getClass());
-        return new RabbitMQConnectionFactory(
-            rabbitMQConfiguration,
-            new 
AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor(threadFactory)));
+        return new RabbitMQConnectionFactory(rabbitMQConfiguration);
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/603de62f/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactoryTest.java
 
b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactoryTest.java
index 5a357a7..024db49 100644
--- 
a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactoryTest.java
+++ 
b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactoryTest.java
@@ -22,24 +22,10 @@ import static 
org.apache.james.backend.rabbitmq.RabbitMQFixture.DEFAULT_MANAGEME
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.net.URI;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 
-import org.apache.james.util.concurrent.NamedThreadFactory;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import com.nurkiewicz.asyncretry.AsyncRetryExecutor;
-
 class RabbitMQConnectionFactoryTest {
-
-    private ScheduledExecutorService scheduledExecutor;
-
-    @BeforeEach
-    void setUp() throws Exception {
-        scheduledExecutor = 
Executors.newSingleThreadScheduledExecutor(NamedThreadFactory.withClassName(getClass()));
-    }
-
     @Test
     void creatingAFactoryShouldWorkWhenConfigurationIsValid() {
         RabbitMQConfiguration rabbitMQConfiguration = 
RabbitMQConfiguration.builder()
@@ -48,7 +34,7 @@ class RabbitMQConnectionFactoryTest {
             .managementCredentials(DEFAULT_MANAGEMENT_CREDENTIAL)
             .build();
 
-        new RabbitMQConnectionFactory(rabbitMQConfiguration, new 
AsyncRetryExecutor(scheduledExecutor));
+        new RabbitMQConnectionFactory(rabbitMQConfiguration);
     }
 
     @Test
@@ -59,7 +45,7 @@ class RabbitMQConnectionFactoryTest {
             .managementCredentials(DEFAULT_MANAGEMENT_CREDENTIAL)
             .build();
 
-        assertThatThrownBy(() -> new 
RabbitMQConnectionFactory(rabbitMQConfiguration, new 
AsyncRetryExecutor(scheduledExecutor)))
+        assertThatThrownBy(() -> new 
RabbitMQConnectionFactory(rabbitMQConfiguration))
             .isInstanceOf(RuntimeException.class);
     }
 
@@ -73,9 +59,9 @@ class RabbitMQConnectionFactoryTest {
                 .minDelay(1)
                 .build();
 
-        RabbitMQConnectionFactory rabbitMQConnectionFactory = new 
RabbitMQConnectionFactory(rabbitMQConfiguration, new 
AsyncRetryExecutor(scheduledExecutor));
+        RabbitMQConnectionFactory rabbitMQConnectionFactory = new 
RabbitMQConnectionFactory(rabbitMQConfiguration);
 
-        assertThatThrownBy(() -> rabbitMQConnectionFactory.create())
+        assertThatThrownBy(rabbitMQConnectionFactory::create)
             .isInstanceOf(RuntimeException.class);
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to