This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 7cb8ba9e88c76718f62ec32b2ded8c1d35420569
Author: Matthieu Baechler <matth...@apache.org>
AuthorDate: Thu Apr 23 12:34:56 2020 +0200

    JAMES-3153 make use of RetryWhen for backoff on reactor
---
 .../backends/cassandra/init/ResilientClusterProvider.java      |  4 ++--
 .../main/java/org/apache/james/backends/es/ClientProvider.java |  4 ++--
 .../james/backends/rabbitmq/RabbitMQConnectionFactory.java     |  4 ++--
 .../james/backends/rabbitmq/ReactorRabbitMQChannelPool.java    |  6 +++---
 .../apache/james/backends/rabbitmq/SimpleConnectionPool.java   |  4 ++--
 .../apache/james/mailbox/events/RetryBackoffConfiguration.java |  1 -
 .../james/mailbox/cassandra/mail/CassandraMailboxMapper.java   |  7 ++++---
 .../james/mailbox/cassandra/mail/CassandraMessageIdMapper.java |  3 ++-
 .../james/mailbox/cassandra/mail/CassandraMessageMapper.java   |  3 ++-
 .../james/mailbox/cassandra/mail/CassandraModSeqProvider.java  |  4 ++--
 .../james/mailbox/cassandra/mail/CassandraUidProvider.java     |  4 ++--
 .../apache/james/mailbox/events/delivery/EventDelivery.java    |  6 ++----
 .../org/apache/james/mailbox/events/GroupRegistration.java     |  4 ++--
 .../apache/james/mailbox/events/KeyRegistrationHandler.java    | 10 +++++-----
 pom.xml                                                        |  2 +-
 .../james/transport/mailets/delivery/MailDispatcher.java       |  3 ++-
 .../src/test/java/org/apache/james/webadmin/WebAdminUtils.java |  3 ++-
 server/queue/queue-activemq/pom.xml                            |  2 +-
 .../task/eventsourcing/distributed/RabbitMQWorkQueue.java      |  8 ++++----
 19 files changed, 42 insertions(+), 40 deletions(-)

diff --git 
a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/ResilientClusterProvider.java
 
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/ResilientClusterProvider.java
index f91e68d..4eeb502 100644
--- 
a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/ResilientClusterProvider.java
+++ 
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/ResilientClusterProvider.java
@@ -40,6 +40,7 @@ import com.google.common.collect.ImmutableList;
 
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
+import reactor.util.retry.Retry;
 
 @Singleton
 public class ResilientClusterProvider implements Provider<Cluster> {
@@ -51,10 +52,9 @@ public class ResilientClusterProvider implements 
Provider<Cluster> {
     @Inject
     ResilientClusterProvider(ClusterConfiguration configuration) {
         Duration waitDelay = Duration.ofMillis(configuration.getMinDelay());
-        Duration forever = Duration.ofMillis(Long.MAX_VALUE);
         cluster = Mono.fromCallable(getClusterRetryCallable(configuration))
             .doOnError(e -> LOGGER.warn("Error establishing Cassandra 
connection. Next retry scheduled in {} ms", waitDelay, e))
-            .retryBackoff(configuration.getMaxRetry(), waitDelay, forever, 
Schedulers.elastic())
+            .retryWhen(Retry.backoff(configuration.getMaxRetry(), 
waitDelay).scheduler(Schedulers.elastic()))
             .publishOn(Schedulers.elastic())
             .block();
     }
diff --git 
a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProvider.java
 
b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProvider.java
index 247ed50..b73fbd3 100644
--- 
a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProvider.java
+++ 
b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProvider.java
@@ -56,6 +56,7 @@ import com.google.common.annotations.VisibleForTesting;
 
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
+import reactor.util.retry.Retry;
 
 public class ClientProvider implements Provider<ReactorElasticSearchClient> {
 
@@ -180,13 +181,12 @@ public class ClientProvider implements 
Provider<ReactorElasticSearchClient> {
 
     private RestHighLevelClient connect(ElasticSearchConfiguration 
configuration) {
         Duration waitDelay = Duration.ofMillis(configuration.getMinDelay());
-        Duration forever = Duration.ofMillis(Long.MAX_VALUE);
         boolean suppressLeadingZeroElements = true;
         boolean suppressTrailingZeroElements = true;
         return Mono.fromCallable(() -> connectToCluster(configuration))
             .doOnError(e -> LOGGER.warn("Error establishing ElasticSearch 
connection. Next retry scheduled in {}",
                 DurationFormatUtils.formatDurationWords(waitDelay.toMillis(), 
suppressLeadingZeroElements, suppressTrailingZeroElements), e))
-            .retryBackoff(configuration.getMaxRetries(), waitDelay, forever, 
Schedulers.elastic())
+            .retryWhen(Retry.backoff(configuration.getMaxRetries(), 
waitDelay).scheduler(Schedulers.elastic()))
             .publishOn(Schedulers.elastic())
             .block();
     }
diff --git 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConnectionFactory.java
 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConnectionFactory.java
index a3086c7..36989f9 100644
--- 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConnectionFactory.java
+++ 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConnectionFactory.java
@@ -27,6 +27,7 @@ import com.rabbitmq.client.ConnectionFactory;
 
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
+import reactor.util.retry.Retry;
 
 public class RabbitMQConnectionFactory {
 
@@ -60,8 +61,7 @@ public class RabbitMQConnectionFactory {
     }
 
     Mono<Connection> connectionMono() {
-        Duration forever = Duration.ofMillis(Long.MAX_VALUE);
         return Mono.fromCallable(connectionFactory::newConnection)
-            .retryBackoff(configuration.getMaxRetries(), 
Duration.ofMillis(configuration.getMinDelayInMs()), forever, 
Schedulers.elastic());
+            .retryWhen(Retry.backoff(configuration.getMaxRetries(), 
Duration.ofMillis(configuration.getMinDelayInMs())).scheduler(Schedulers.elastic()));
     }
 }
diff --git 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
index 0c89212..f4703b4 100644
--- 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
+++ 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
@@ -49,6 +49,7 @@ import reactor.rabbitmq.Receiver;
 import reactor.rabbitmq.ReceiverOptions;
 import reactor.rabbitmq.Sender;
 import reactor.rabbitmq.SenderOptions;
+import reactor.util.retry.Retry;
 
 public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
 
@@ -82,7 +83,7 @@ public class ReactorRabbitMQChannelPool implements 
ChannelPool, Startable {
             return Mono.fromCallable(connection::openChannel)
                 .map(maybeChannel ->
                     maybeChannel.orElseThrow(() -> new 
RuntimeException("RabbitMQ reached to maximum opened channels, cannot get more 
channels")))
-                .retryBackoff(MAX_RETRIES, RETRY_FIRST_BACK_OFF, FOREVER, 
Schedulers.elastic())
+                .retryWhen(Retry.backoff(MAX_RETRIES, 
RETRY_FIRST_BACK_OFF).scheduler(Schedulers.elastic()))
                 .doOnError(throwable -> LOGGER.error("error when creating new 
channel", throwable));
         }
 
@@ -105,7 +106,6 @@ public class ReactorRabbitMQChannelPool implements 
ChannelPool, Startable {
     private static final int MAX_CHANNELS_NUMBER = 3;
     private static final int MAX_BORROW_RETRIES = 3;
     private static final Duration MIN_BORROW_DELAY = Duration.ofMillis(50);
-    private static final Duration FOREVER = Duration.ofMillis(Long.MAX_VALUE);
 
     private final Mono<Connection> connectionMono;
     private final GenericObjectPool<Channel> pool;
@@ -146,7 +146,7 @@ public class ReactorRabbitMQChannelPool implements 
ChannelPool, Startable {
     private Mono<Channel> borrow() {
         return tryBorrowFromPool()
             .doOnError(throwable -> LOGGER.warn("Cannot borrow channel", 
throwable))
-            .retryBackoff(MAX_BORROW_RETRIES, MIN_BORROW_DELAY, FOREVER, 
Schedulers.elastic())
+            .retryWhen(Retry.backoff(MAX_BORROW_RETRIES, 
MIN_BORROW_DELAY).scheduler(Schedulers.elastic()))
             .onErrorMap(this::propagateException)
             .subscribeOn(Schedulers.elastic())
             .doOnNext(borrowedChannels::add);
diff --git 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java
 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java
index c8f282b..b174795 100644
--- 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java
+++ 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java
@@ -33,6 +33,7 @@ import com.rabbitmq.client.Connection;
 
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
+import reactor.util.retry.Retry;
 
 public class SimpleConnectionPool implements AutoCloseable {
     private final AtomicReference<Connection> connectionReference;
@@ -56,9 +57,8 @@ public class SimpleConnectionPool implements AutoCloseable {
     public Mono<Connection> getResilientConnection() {
         int numRetries = 10;
         Duration initialDelay = Duration.ofMillis(100);
-        Duration forever = Duration.ofMillis(Long.MAX_VALUE);
         return Mono.defer(this::getOpenConnection)
-            .retryBackoff(numRetries, initialDelay, forever, 
Schedulers.elastic());
+            .retryWhen(Retry.backoff(numRetries, 
initialDelay).scheduler(Schedulers.elastic()));
     }
 
     private Mono<Connection> getOpenConnection() {
diff --git 
a/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java
 
b/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java
index 98bae12..f90a4d4 100644
--- 
a/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java
+++ 
b/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java
@@ -65,7 +65,6 @@ public class RetryBackoffConfiguration {
     static final double DEFAULT_JITTER_FACTOR = 0.5;
     static final int DEFAULT_MAX_RETRIES = 8;
     static final Duration DEFAULT_FIRST_BACKOFF = Duration.ofMillis(100);
-    static final Duration FOREVER = Duration.ofMillis(Long.MAX_VALUE);
     public static final RetryBackoffConfiguration DEFAULT = new 
RetryBackoffConfiguration(
         DEFAULT_MAX_RETRIES,
         DEFAULT_FIRST_BACKOFF,
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
index d408624..05d3339 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
@@ -49,6 +49,7 @@ import com.google.common.base.Preconditions;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
 
 public class CassandraMailboxMapper implements MailboxMapper {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraMailboxMapper.class);
@@ -89,7 +90,7 @@ public class CassandraMailboxMapper implements MailboxMapper {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
         deletePath(mailbox)
             .thenEmpty(mailboxDAO.delete(mailboxId)
-                .retryBackoff(MAX_RETRY, MIN_RETRY_BACKOFF, MAX_RETRY_BACKOFF))
+                .retryWhen(Retry.backoff(MAX_RETRY, 
MIN_RETRY_BACKOFF).maxBackoff(MAX_RETRY_BACKOFF)))
             .block();
     }
 
@@ -247,12 +248,12 @@ public class CassandraMailboxMapper implements 
MailboxMapper {
 
     private Mono<Void> persistMailboxEntity(Mailbox cassandraMailbox) {
         return mailboxDAO.save(cassandraMailbox)
-            .retryBackoff(MAX_RETRY, MIN_RETRY_BACKOFF, MAX_RETRY_BACKOFF);
+            .retryWhen(Retry.backoff(MAX_RETRY, 
MIN_RETRY_BACKOFF).maxBackoff(MAX_RETRY_BACKOFF));
     }
 
     private Mono<Void> deletePreviousMailboxPathReference(MailboxPath 
mailboxPath) {
         return mailboxPathV2DAO.delete(mailboxPath)
-            .retryBackoff(MAX_RETRY, MIN_RETRY_BACKOFF, MAX_RETRY_BACKOFF);
+            .retryWhen(Retry.backoff(MAX_RETRY, 
MIN_RETRY_BACKOFF).maxBackoff(MAX_RETRY_BACKOFF));
     }
 
     @Override
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index bee2732..25c473c 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -55,6 +55,7 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.GroupedFlux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
+import reactor.util.retry.Retry;
 
 public class CassandraMessageIdMapper implements MessageIdMapper {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraMessageIdMapper.class);
@@ -158,7 +159,7 @@ public class CassandraMessageIdMapper implements 
MessageIdMapper {
         return imapUidDAO.insert(composedMessageIdWithMetaData)
             .thenEmpty(Flux.merge(
                 messageIdDAO.insert(composedMessageIdWithMetaData)
-                    .retryBackoff(MAX_RETRY, MIN_RETRY_BACKOFF, 
MAX_RETRY_BACKOFF),
+                    .retryWhen(Retry.backoff(MAX_RETRY, 
MIN_RETRY_BACKOFF).maxBackoff(MAX_RETRY_BACKOFF)),
                 indexTableHandler.updateIndexOnAdd(mailboxMessage, mailboxId))
             .then());
     }
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 97c10cd..f5b9d55 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -60,6 +60,7 @@ import com.google.common.collect.ImmutableList;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
 
 public class CassandraMessageMapper implements MessageMapper {
     public static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraMessageMapper.class);
@@ -389,7 +390,7 @@ public class CassandraMessageMapper implements 
MessageMapper {
         return imapUidDAO.insert(composedMessageIdWithMetaData)
             .then(Flux.merge(
                 messageIdDAO.insert(composedMessageIdWithMetaData)
-                    .retryBackoff(MAX_RETRY, MIN_RETRY_BACKOFF, 
MAX_RETRY_BACKOFF),
+                    .retryWhen(Retry.backoff(MAX_RETRY, 
MIN_RETRY_BACKOFF).maxBackoff(MAX_RETRY_BACKOFF)),
                 indexTableHandler.updateIndexOnAdd(message, mailboxId))
             .then());
     }
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
index 9088471..f3ee258 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
@@ -51,6 +51,7 @@ import com.datastax.driver.core.Session;
 
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
+import reactor.util.retry.Retry;
 
 public class CassandraModSeqProvider implements ModSeqProvider {
 
@@ -185,11 +186,10 @@ public class CassandraModSeqProvider implements 
ModSeqProvider {
     }
 
     private Mono<ModSeq> handleRetries(CassandraId mailboxId) {
-        Duration forever = Duration.ofMillis(Long.MAX_VALUE);
         Duration firstBackoff = Duration.ofMillis(10);
         return tryFindThenUpdateOnce(mailboxId)
             .single()
-            .retryBackoff(maxModSeqRetries, firstBackoff, forever, 
Schedulers.elastic());
+            .retryWhen(Retry.backoff(maxModSeqRetries, 
firstBackoff).scheduler(Schedulers.elastic()));
     }
 
     private Mono<ModSeq> tryFindThenUpdateOnce(CassandraId mailboxId) {
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
index aa957be..fea0d5e 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
@@ -49,6 +49,7 @@ import com.datastax.driver.core.Session;
 
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
+import reactor.util.retry.Retry;
 
 public class CassandraUidProvider implements UidProvider {
     private static final String CONDITION = "Condition";
@@ -105,13 +106,12 @@ public class CassandraUidProvider implements UidProvider {
         Mono<MessageUid> updateUid = findHighestUid(cassandraId)
             .flatMap(messageUid -> tryUpdateUid(cassandraId, messageUid));
 
-        Duration forever = Duration.ofMillis(Long.MAX_VALUE);
         Duration firstBackoff = Duration.ofMillis(10);
         return updateUid
             .switchIfEmpty(tryInsert(cassandraId))
             .switchIfEmpty(updateUid)
             .single()
-            .retryBackoff(maxUidRetries, firstBackoff, forever, 
Schedulers.elastic());
+            .retryWhen(Retry.backoff(maxUidRetries, 
firstBackoff).scheduler(Schedulers.elastic()));
     }
 
     @Override
diff --git 
a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
 
b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
index af7fa00..e37be04 100644
--- 
a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
+++ 
b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
@@ -22,8 +22,6 @@ package org.apache.james.mailbox.events.delivery;
 import static 
org.apache.james.mailbox.events.delivery.EventDelivery.PermanentFailureHandler.NO_HANDLER;
 import static 
org.apache.james.mailbox.events.delivery.EventDelivery.Retryer.NO_RETRYER;
 
-import java.time.Duration;
-
 import org.apache.james.mailbox.events.Event;
 import org.apache.james.mailbox.events.EventDeadLetters;
 import org.apache.james.mailbox.events.Group;
@@ -34,6 +32,7 @@ import org.slf4j.LoggerFactory;
 
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
+import reactor.util.retry.Retry;
 
 public interface EventDelivery {
 
@@ -75,7 +74,6 @@ public interface EventDelivery {
             }
 
             private static final Logger LOGGER = 
LoggerFactory.getLogger(BackoffRetryer.class);
-            private static final Duration FOREVER = 
Duration.ofMillis(Long.MAX_VALUE);
 
             private final RetryBackoffConfiguration retryBackoff;
             private final MailboxListener mailboxListener;
@@ -88,7 +86,7 @@ public interface EventDelivery {
             @Override
             public Mono<Void> doRetry(Mono<Void> executionResult, Event event) 
{
                 return executionResult
-                    .retryBackoff(retryBackoff.getMaxRetries(), 
retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor(), 
Schedulers.elastic())
+                    .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), 
retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.elastic()))
                     .doOnError(throwable -> LOGGER.error("listener {} exceeded 
maximum retry({}) to handle event {}",
                         mailboxListener.getClass().getCanonicalName(),
                         retryBackoff.getMaxRetries(),
diff --git 
a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
 
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
index 8d1b7aa..7555267 100644
--- 
a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
+++ 
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
@@ -26,7 +26,6 @@ import static 
org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
 import static org.apache.james.backends.rabbitmq.Constants.NO_ARGUMENTS;
 import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT;
 import static 
org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME;
-import static 
org.apache.james.mailbox.events.RetryBackoffConfiguration.FOREVER;
 
 import java.nio.charset.StandardCharsets;
 import java.util.Objects;
@@ -49,6 +48,7 @@ import reactor.rabbitmq.ConsumeOptions;
 import reactor.rabbitmq.QueueSpecification;
 import reactor.rabbitmq.Receiver;
 import reactor.rabbitmq.Sender;
+import reactor.util.retry.Retry;
 
 class GroupRegistration implements Registration {
     static class WorkQueueName {
@@ -109,7 +109,7 @@ class GroupRegistration implements Registration {
             .of(createGroupWorkQueue()
                 .then(retryHandler.createRetryExchange(queueName))
                 .then(Mono.fromCallable(() -> this.consumeWorkQueue()))
-                .retryBackoff(retryBackoff.getMaxRetries(), 
retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor(), 
Schedulers.elastic())
+                .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), 
retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.elastic()))
                 .block());
         return this;
     }
diff --git 
a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
 
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
index c9767b8..98fac7f 100644
--- 
a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
+++ 
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
@@ -23,7 +23,6 @@ import static 
org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE;
 import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
 import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
 import static org.apache.james.mailbox.events.RabbitMQEventBus.EVENT_BUS_ID;
-import static 
org.apache.james.mailbox.events.RetryBackoffConfiguration.FOREVER;
 
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
@@ -52,6 +51,7 @@ import reactor.rabbitmq.ConsumeOptions;
 import reactor.rabbitmq.QueueSpecification;
 import reactor.rabbitmq.Receiver;
 import reactor.rabbitmq.Sender;
+import reactor.util.retry.Retry;
 
 class KeyRegistrationHandler {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(KeyRegistrationHandler.class);
@@ -107,7 +107,7 @@ class KeyRegistrationHandler {
             .autoDelete(AUTO_DELETE)
             .arguments(QUEUE_ARGUMENTS))
             .map(AMQP.Queue.DeclareOk::getQueue)
-            .retryBackoff(retryBackoff.getMaxRetries(), 
retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor())
+            .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), 
retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()))
             .doOnSuccess(queueName -> {
                 if (!registrationQueueInitialized.get()) {
                     registrationQueue.initialize(queueName);
@@ -122,7 +122,7 @@ class KeyRegistrationHandler {
             .ifPresent(Disposable::dispose);
         receiver.close();
         sender.delete(QueueSpecification.queue(registrationQueue.asString()))
-            .retryBackoff(retryBackoff.getMaxRetries(), 
retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor(), 
Schedulers.elastic())
+            .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), 
retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.elastic()))
             .block();
     }
 
@@ -130,13 +130,13 @@ class KeyRegistrationHandler {
         LocalListenerRegistry.LocalRegistration registration = 
localListenerRegistry.addListener(key, listener);
         if (registration.isFirstListener()) {
             registrationBinder.bind(key)
-                .retryBackoff(retryBackoff.getMaxRetries(), 
retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor(), 
Schedulers.elastic())
+                .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), 
retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.elastic()))
                 .block();
         }
         return new KeyRegistration(() -> {
             if (registration.unregister().lastListenerRemoved()) {
                 registrationBinder.unbind(key)
-                    .retryBackoff(retryBackoff.getMaxRetries(), 
retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor(), 
Schedulers.elastic())
+                    .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), 
retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.elastic()))
                     .block();
             }
         });
diff --git a/pom.xml b/pom.xml
index fb0b2f0..7b747ff 100644
--- a/pom.xml
+++ b/pom.xml
@@ -659,7 +659,7 @@
             <dependency>
                 <groupId>io.projectreactor</groupId>
                 <artifactId>reactor-bom</artifactId>
-                <version>Dysprosium-RELEASE</version>
+                <version>Dysprosium-SR6</version>
                 <type>pom</type>
                 <scope>import</scope>
             </dependency>
diff --git 
a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailDispatcher.java
 
b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailDispatcher.java
index 9fb41be..055a56e 100644
--- 
a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailDispatcher.java
+++ 
b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailDispatcher.java
@@ -45,6 +45,7 @@ import com.google.common.collect.ImmutableMap;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Scheduler;
 import reactor.core.scheduler.Schedulers;
+import reactor.util.retry.Retry;
 
 public class MailDispatcher {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(MailDispatcher.class);
@@ -156,7 +157,7 @@ public class MailDispatcher {
        return Mono.fromRunnable((ThrowingRunnable)() -> 
mailStore.storeMail(recipient, mail))
            .doOnError(error -> LOGGER.error("Error While storing mail.", 
error))
            .subscribeOn(scheduler)
-           .retryBackoff(RETRIES, FIRST_BACKOFF, MAX_BACKOFF, scheduler)
+           .retryWhen(Retry.backoff(RETRIES, 
FIRST_BACKOFF).maxBackoff(MAX_BACKOFF).scheduler(Schedulers.elastic()))
            .then();
     }
 
diff --git 
a/server/protocols/webadmin/webadmin-core/src/test/java/org/apache/james/webadmin/WebAdminUtils.java
 
b/server/protocols/webadmin/webadmin-core/src/test/java/org/apache/james/webadmin/WebAdminUtils.java
index 5b3d07b..60aeb11 100644
--- 
a/server/protocols/webadmin/webadmin-core/src/test/java/org/apache/james/webadmin/WebAdminUtils.java
+++ 
b/server/protocols/webadmin/webadmin-core/src/test/java/org/apache/james/webadmin/WebAdminUtils.java
@@ -40,6 +40,7 @@ import io.restassured.config.RestAssuredConfig;
 import io.restassured.http.ContentType;
 import io.restassured.specification.RequestSpecification;
 import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
 
 public class WebAdminUtils {
     private static class ConcurrentSafeWebAdminServer extends WebAdminServer {
@@ -54,7 +55,7 @@ public class WebAdminUtils {
         @Override
         public WebAdminServer start() {
             Mono.fromRunnable(super::start)
-                .retryBackoff(5, Duration.ofMillis(10))
+                .retryWhen(Retry.backoff(5, Duration.ofMillis(10)))
                 .block();
             return this;
         }
diff --git a/server/queue/queue-activemq/pom.xml 
b/server/queue/queue-activemq/pom.xml
index 9d4b6a7..c37da34 100644
--- a/server/queue/queue-activemq/pom.xml
+++ b/server/queue/queue-activemq/pom.xml
@@ -37,7 +37,7 @@
         <dependency>
             <groupId>io.netty</groupId>
             <artifactId>netty-transport</artifactId>
-            <version>4.1.39.Final</version>
+            <version>4.1.48.Final</version>
         </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>
diff --git 
a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
 
b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
index dbdce52..f1daf1f 100644
--- 
a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
+++ 
b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
@@ -56,6 +56,7 @@ import reactor.rabbitmq.OutboundMessage;
 import reactor.rabbitmq.QueueSpecification;
 import reactor.rabbitmq.Receiver;
 import reactor.rabbitmq.Sender;
+import reactor.util.retry.Retry;
 
 public class RabbitMQWorkQueue implements WorkQueue {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(RabbitMQWorkQueue.class);
@@ -71,7 +72,6 @@ public class RabbitMQWorkQueue implements WorkQueue {
 
     public static final int NUM_RETRIES = 8;
     public static final Duration FIRST_BACKOFF = Duration.ofMillis(100);
-    public static final Duration FOREVER = Duration.ofMillis(Long.MAX_VALUE);
 
     private final TaskManagerWorker worker;
     private final JsonTaskSerializer taskSerializer;
@@ -106,13 +106,13 @@ public class RabbitMQWorkQueue implements WorkQueue {
     void declareQueue() {
         Mono<AMQP.Exchange.DeclareOk> declareExchange = sender
             .declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME))
-            .retryBackoff(NUM_RETRIES, FIRST_BACKOFF, FOREVER);
+            .retryWhen(Retry.backoff(NUM_RETRIES, FIRST_BACKOFF));
         Mono<AMQP.Queue.DeclareOk> declareQueue = sender
             
.declare(QueueSpecification.queue(QUEUE_NAME).durable(true).arguments(Constants.WITH_SINGLE_ACTIVE_CONSUMER))
-            .retryBackoff(NUM_RETRIES, FIRST_BACKOFF, FOREVER);
+            .retryWhen(Retry.backoff(NUM_RETRIES, FIRST_BACKOFF));
         Mono<AMQP.Queue.BindOk> bindQueueToExchange = sender
             .bind(BindingSpecification.binding(EXCHANGE_NAME, ROUTING_KEY, 
QUEUE_NAME))
-            .retryBackoff(NUM_RETRIES, FIRST_BACKOFF, FOREVER);
+            .retryWhen(Retry.backoff(NUM_RETRIES, FIRST_BACKOFF));
 
         declareExchange
             .then(declareQueue)


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to