This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 7cb8ba9e88c76718f62ec32b2ded8c1d35420569 Author: Matthieu Baechler <matth...@apache.org> AuthorDate: Thu Apr 23 12:34:56 2020 +0200 JAMES-3153 make use of RetryWhen for backoff on reactor --- .../backends/cassandra/init/ResilientClusterProvider.java | 4 ++-- .../main/java/org/apache/james/backends/es/ClientProvider.java | 4 ++-- .../james/backends/rabbitmq/RabbitMQConnectionFactory.java | 4 ++-- .../james/backends/rabbitmq/ReactorRabbitMQChannelPool.java | 6 +++--- .../apache/james/backends/rabbitmq/SimpleConnectionPool.java | 4 ++-- .../apache/james/mailbox/events/RetryBackoffConfiguration.java | 1 - .../james/mailbox/cassandra/mail/CassandraMailboxMapper.java | 7 ++++--- .../james/mailbox/cassandra/mail/CassandraMessageIdMapper.java | 3 ++- .../james/mailbox/cassandra/mail/CassandraMessageMapper.java | 3 ++- .../james/mailbox/cassandra/mail/CassandraModSeqProvider.java | 4 ++-- .../james/mailbox/cassandra/mail/CassandraUidProvider.java | 4 ++-- .../apache/james/mailbox/events/delivery/EventDelivery.java | 6 ++---- .../org/apache/james/mailbox/events/GroupRegistration.java | 4 ++-- .../apache/james/mailbox/events/KeyRegistrationHandler.java | 10 +++++----- pom.xml | 2 +- .../james/transport/mailets/delivery/MailDispatcher.java | 3 ++- .../src/test/java/org/apache/james/webadmin/WebAdminUtils.java | 3 ++- server/queue/queue-activemq/pom.xml | 2 +- .../task/eventsourcing/distributed/RabbitMQWorkQueue.java | 8 ++++---- 19 files changed, 42 insertions(+), 40 deletions(-) diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/ResilientClusterProvider.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/ResilientClusterProvider.java index f91e68d..4eeb502 100644 --- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/ResilientClusterProvider.java +++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/ResilientClusterProvider.java @@ -40,6 +40,7 @@ import com.google.common.collect.ImmutableList; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; +import reactor.util.retry.Retry; @Singleton public class ResilientClusterProvider implements Provider<Cluster> { @@ -51,10 +52,9 @@ public class ResilientClusterProvider implements Provider<Cluster> { @Inject ResilientClusterProvider(ClusterConfiguration configuration) { Duration waitDelay = Duration.ofMillis(configuration.getMinDelay()); - Duration forever = Duration.ofMillis(Long.MAX_VALUE); cluster = Mono.fromCallable(getClusterRetryCallable(configuration)) .doOnError(e -> LOGGER.warn("Error establishing Cassandra connection. Next retry scheduled in {} ms", waitDelay, e)) - .retryBackoff(configuration.getMaxRetry(), waitDelay, forever, Schedulers.elastic()) + .retryWhen(Retry.backoff(configuration.getMaxRetry(), waitDelay).scheduler(Schedulers.elastic())) .publishOn(Schedulers.elastic()) .block(); } diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProvider.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProvider.java index 247ed50..b73fbd3 100644 --- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProvider.java +++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProvider.java @@ -56,6 +56,7 @@ import com.google.common.annotations.VisibleForTesting; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; +import reactor.util.retry.Retry; public class ClientProvider implements Provider<ReactorElasticSearchClient> { @@ -180,13 +181,12 @@ public class ClientProvider implements Provider<ReactorElasticSearchClient> { private RestHighLevelClient connect(ElasticSearchConfiguration configuration) { Duration waitDelay = Duration.ofMillis(configuration.getMinDelay()); - Duration forever = Duration.ofMillis(Long.MAX_VALUE); boolean suppressLeadingZeroElements = true; boolean suppressTrailingZeroElements = true; return Mono.fromCallable(() -> connectToCluster(configuration)) .doOnError(e -> LOGGER.warn("Error establishing ElasticSearch connection. Next retry scheduled in {}", DurationFormatUtils.formatDurationWords(waitDelay.toMillis(), suppressLeadingZeroElements, suppressTrailingZeroElements), e)) - .retryBackoff(configuration.getMaxRetries(), waitDelay, forever, Schedulers.elastic()) + .retryWhen(Retry.backoff(configuration.getMaxRetries(), waitDelay).scheduler(Schedulers.elastic())) .publishOn(Schedulers.elastic()) .block(); } diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConnectionFactory.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConnectionFactory.java index a3086c7..36989f9 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConnectionFactory.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConnectionFactory.java @@ -27,6 +27,7 @@ import com.rabbitmq.client.ConnectionFactory; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; +import reactor.util.retry.Retry; public class RabbitMQConnectionFactory { @@ -60,8 +61,7 @@ public class RabbitMQConnectionFactory { } Mono<Connection> connectionMono() { - Duration forever = Duration.ofMillis(Long.MAX_VALUE); return Mono.fromCallable(connectionFactory::newConnection) - .retryBackoff(configuration.getMaxRetries(), Duration.ofMillis(configuration.getMinDelayInMs()), forever, Schedulers.elastic()); + .retryWhen(Retry.backoff(configuration.getMaxRetries(), Duration.ofMillis(configuration.getMinDelayInMs())).scheduler(Schedulers.elastic())); } } diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java index 0c89212..f4703b4 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java @@ -49,6 +49,7 @@ import reactor.rabbitmq.Receiver; import reactor.rabbitmq.ReceiverOptions; import reactor.rabbitmq.Sender; import reactor.rabbitmq.SenderOptions; +import reactor.util.retry.Retry; public class ReactorRabbitMQChannelPool implements ChannelPool, Startable { @@ -82,7 +83,7 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable { return Mono.fromCallable(connection::openChannel) .map(maybeChannel -> maybeChannel.orElseThrow(() -> new RuntimeException("RabbitMQ reached to maximum opened channels, cannot get more channels"))) - .retryBackoff(MAX_RETRIES, RETRY_FIRST_BACK_OFF, FOREVER, Schedulers.elastic()) + .retryWhen(Retry.backoff(MAX_RETRIES, RETRY_FIRST_BACK_OFF).scheduler(Schedulers.elastic())) .doOnError(throwable -> LOGGER.error("error when creating new channel", throwable)); } @@ -105,7 +106,6 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable { private static final int MAX_CHANNELS_NUMBER = 3; private static final int MAX_BORROW_RETRIES = 3; private static final Duration MIN_BORROW_DELAY = Duration.ofMillis(50); - private static final Duration FOREVER = Duration.ofMillis(Long.MAX_VALUE); private final Mono<Connection> connectionMono; private final GenericObjectPool<Channel> pool; @@ -146,7 +146,7 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable { private Mono<Channel> borrow() { return tryBorrowFromPool() .doOnError(throwable -> LOGGER.warn("Cannot borrow channel", throwable)) - .retryBackoff(MAX_BORROW_RETRIES, MIN_BORROW_DELAY, FOREVER, Schedulers.elastic()) + .retryWhen(Retry.backoff(MAX_BORROW_RETRIES, MIN_BORROW_DELAY).scheduler(Schedulers.elastic())) .onErrorMap(this::propagateException) .subscribeOn(Schedulers.elastic()) .doOnNext(borrowedChannels::add); diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java index c8f282b..b174795 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java @@ -33,6 +33,7 @@ import com.rabbitmq.client.Connection; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; +import reactor.util.retry.Retry; public class SimpleConnectionPool implements AutoCloseable { private final AtomicReference<Connection> connectionReference; @@ -56,9 +57,8 @@ public class SimpleConnectionPool implements AutoCloseable { public Mono<Connection> getResilientConnection() { int numRetries = 10; Duration initialDelay = Duration.ofMillis(100); - Duration forever = Duration.ofMillis(Long.MAX_VALUE); return Mono.defer(this::getOpenConnection) - .retryBackoff(numRetries, initialDelay, forever, Schedulers.elastic()); + .retryWhen(Retry.backoff(numRetries, initialDelay).scheduler(Schedulers.elastic())); } private Mono<Connection> getOpenConnection() { diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java b/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java index 98bae12..f90a4d4 100644 --- a/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java +++ b/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java @@ -65,7 +65,6 @@ public class RetryBackoffConfiguration { static final double DEFAULT_JITTER_FACTOR = 0.5; static final int DEFAULT_MAX_RETRIES = 8; static final Duration DEFAULT_FIRST_BACKOFF = Duration.ofMillis(100); - static final Duration FOREVER = Duration.ofMillis(Long.MAX_VALUE); public static final RetryBackoffConfiguration DEFAULT = new RetryBackoffConfiguration( DEFAULT_MAX_RETRIES, DEFAULT_FIRST_BACKOFF, diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java index d408624..05d3339 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java @@ -49,6 +49,7 @@ import com.google.common.base.Preconditions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; public class CassandraMailboxMapper implements MailboxMapper { private static final Logger LOGGER = LoggerFactory.getLogger(CassandraMailboxMapper.class); @@ -89,7 +90,7 @@ public class CassandraMailboxMapper implements MailboxMapper { CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); deletePath(mailbox) .thenEmpty(mailboxDAO.delete(mailboxId) - .retryBackoff(MAX_RETRY, MIN_RETRY_BACKOFF, MAX_RETRY_BACKOFF)) + .retryWhen(Retry.backoff(MAX_RETRY, MIN_RETRY_BACKOFF).maxBackoff(MAX_RETRY_BACKOFF))) .block(); } @@ -247,12 +248,12 @@ public class CassandraMailboxMapper implements MailboxMapper { private Mono<Void> persistMailboxEntity(Mailbox cassandraMailbox) { return mailboxDAO.save(cassandraMailbox) - .retryBackoff(MAX_RETRY, MIN_RETRY_BACKOFF, MAX_RETRY_BACKOFF); + .retryWhen(Retry.backoff(MAX_RETRY, MIN_RETRY_BACKOFF).maxBackoff(MAX_RETRY_BACKOFF)); } private Mono<Void> deletePreviousMailboxPathReference(MailboxPath mailboxPath) { return mailboxPathV2DAO.delete(mailboxPath) - .retryBackoff(MAX_RETRY, MIN_RETRY_BACKOFF, MAX_RETRY_BACKOFF); + .retryWhen(Retry.backoff(MAX_RETRY, MIN_RETRY_BACKOFF).maxBackoff(MAX_RETRY_BACKOFF)); } @Override diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java index bee2732..25c473c 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java @@ -55,6 +55,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.GroupedFlux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; +import reactor.util.retry.Retry; public class CassandraMessageIdMapper implements MessageIdMapper { private static final Logger LOGGER = LoggerFactory.getLogger(CassandraMessageIdMapper.class); @@ -158,7 +159,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper { return imapUidDAO.insert(composedMessageIdWithMetaData) .thenEmpty(Flux.merge( messageIdDAO.insert(composedMessageIdWithMetaData) - .retryBackoff(MAX_RETRY, MIN_RETRY_BACKOFF, MAX_RETRY_BACKOFF), + .retryWhen(Retry.backoff(MAX_RETRY, MIN_RETRY_BACKOFF).maxBackoff(MAX_RETRY_BACKOFF)), indexTableHandler.updateIndexOnAdd(mailboxMessage, mailboxId)) .then()); } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java index 97c10cd..f5b9d55 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java @@ -60,6 +60,7 @@ import com.google.common.collect.ImmutableList; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; public class CassandraMessageMapper implements MessageMapper { public static final Logger LOGGER = LoggerFactory.getLogger(CassandraMessageMapper.class); @@ -389,7 +390,7 @@ public class CassandraMessageMapper implements MessageMapper { return imapUidDAO.insert(composedMessageIdWithMetaData) .then(Flux.merge( messageIdDAO.insert(composedMessageIdWithMetaData) - .retryBackoff(MAX_RETRY, MIN_RETRY_BACKOFF, MAX_RETRY_BACKOFF), + .retryWhen(Retry.backoff(MAX_RETRY, MIN_RETRY_BACKOFF).maxBackoff(MAX_RETRY_BACKOFF)), indexTableHandler.updateIndexOnAdd(message, mailboxId)) .then()); } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java index 9088471..f3ee258 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java @@ -51,6 +51,7 @@ import com.datastax.driver.core.Session; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; +import reactor.util.retry.Retry; public class CassandraModSeqProvider implements ModSeqProvider { @@ -185,11 +186,10 @@ public class CassandraModSeqProvider implements ModSeqProvider { } private Mono<ModSeq> handleRetries(CassandraId mailboxId) { - Duration forever = Duration.ofMillis(Long.MAX_VALUE); Duration firstBackoff = Duration.ofMillis(10); return tryFindThenUpdateOnce(mailboxId) .single() - .retryBackoff(maxModSeqRetries, firstBackoff, forever, Schedulers.elastic()); + .retryWhen(Retry.backoff(maxModSeqRetries, firstBackoff).scheduler(Schedulers.elastic())); } private Mono<ModSeq> tryFindThenUpdateOnce(CassandraId mailboxId) { diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java index aa957be..fea0d5e 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java @@ -49,6 +49,7 @@ import com.datastax.driver.core.Session; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; +import reactor.util.retry.Retry; public class CassandraUidProvider implements UidProvider { private static final String CONDITION = "Condition"; @@ -105,13 +106,12 @@ public class CassandraUidProvider implements UidProvider { Mono<MessageUid> updateUid = findHighestUid(cassandraId) .flatMap(messageUid -> tryUpdateUid(cassandraId, messageUid)); - Duration forever = Duration.ofMillis(Long.MAX_VALUE); Duration firstBackoff = Duration.ofMillis(10); return updateUid .switchIfEmpty(tryInsert(cassandraId)) .switchIfEmpty(updateUid) .single() - .retryBackoff(maxUidRetries, firstBackoff, forever, Schedulers.elastic()); + .retryWhen(Retry.backoff(maxUidRetries, firstBackoff).scheduler(Schedulers.elastic())); } @Override diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java index af7fa00..e37be04 100644 --- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java +++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java @@ -22,8 +22,6 @@ package org.apache.james.mailbox.events.delivery; import static org.apache.james.mailbox.events.delivery.EventDelivery.PermanentFailureHandler.NO_HANDLER; import static org.apache.james.mailbox.events.delivery.EventDelivery.Retryer.NO_RETRYER; -import java.time.Duration; - import org.apache.james.mailbox.events.Event; import org.apache.james.mailbox.events.EventDeadLetters; import org.apache.james.mailbox.events.Group; @@ -34,6 +32,7 @@ import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; +import reactor.util.retry.Retry; public interface EventDelivery { @@ -75,7 +74,6 @@ public interface EventDelivery { } private static final Logger LOGGER = LoggerFactory.getLogger(BackoffRetryer.class); - private static final Duration FOREVER = Duration.ofMillis(Long.MAX_VALUE); private final RetryBackoffConfiguration retryBackoff; private final MailboxListener mailboxListener; @@ -88,7 +86,7 @@ public interface EventDelivery { @Override public Mono<Void> doRetry(Mono<Void> executionResult, Event event) { return executionResult - .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor(), Schedulers.elastic()) + .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.elastic())) .doOnError(throwable -> LOGGER.error("listener {} exceeded maximum retry({}) to handle event {}", mailboxListener.getClass().getCanonicalName(), retryBackoff.getMaxRetries(), diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java index 8d1b7aa..7555267 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java @@ -26,7 +26,6 @@ import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE; import static org.apache.james.backends.rabbitmq.Constants.NO_ARGUMENTS; import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT; import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME; -import static org.apache.james.mailbox.events.RetryBackoffConfiguration.FOREVER; import java.nio.charset.StandardCharsets; import java.util.Objects; @@ -49,6 +48,7 @@ import reactor.rabbitmq.ConsumeOptions; import reactor.rabbitmq.QueueSpecification; import reactor.rabbitmq.Receiver; import reactor.rabbitmq.Sender; +import reactor.util.retry.Retry; class GroupRegistration implements Registration { static class WorkQueueName { @@ -109,7 +109,7 @@ class GroupRegistration implements Registration { .of(createGroupWorkQueue() .then(retryHandler.createRetryExchange(queueName)) .then(Mono.fromCallable(() -> this.consumeWorkQueue())) - .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor(), Schedulers.elastic()) + .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.elastic())) .block()); return this; } diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java index c9767b8..98fac7f 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java @@ -23,7 +23,6 @@ import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE; import static org.apache.james.backends.rabbitmq.Constants.DURABLE; import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE; import static org.apache.james.mailbox.events.RabbitMQEventBus.EVENT_BUS_ID; -import static org.apache.james.mailbox.events.RetryBackoffConfiguration.FOREVER; import java.nio.charset.StandardCharsets; import java.time.Duration; @@ -52,6 +51,7 @@ import reactor.rabbitmq.ConsumeOptions; import reactor.rabbitmq.QueueSpecification; import reactor.rabbitmq.Receiver; import reactor.rabbitmq.Sender; +import reactor.util.retry.Retry; class KeyRegistrationHandler { private static final Logger LOGGER = LoggerFactory.getLogger(KeyRegistrationHandler.class); @@ -107,7 +107,7 @@ class KeyRegistrationHandler { .autoDelete(AUTO_DELETE) .arguments(QUEUE_ARGUMENTS)) .map(AMQP.Queue.DeclareOk::getQueue) - .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor()) + .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor())) .doOnSuccess(queueName -> { if (!registrationQueueInitialized.get()) { registrationQueue.initialize(queueName); @@ -122,7 +122,7 @@ class KeyRegistrationHandler { .ifPresent(Disposable::dispose); receiver.close(); sender.delete(QueueSpecification.queue(registrationQueue.asString())) - .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor(), Schedulers.elastic()) + .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.elastic())) .block(); } @@ -130,13 +130,13 @@ class KeyRegistrationHandler { LocalListenerRegistry.LocalRegistration registration = localListenerRegistry.addListener(key, listener); if (registration.isFirstListener()) { registrationBinder.bind(key) - .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor(), Schedulers.elastic()) + .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.elastic())) .block(); } return new KeyRegistration(() -> { if (registration.unregister().lastListenerRemoved()) { registrationBinder.unbind(key) - .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor(), Schedulers.elastic()) + .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.elastic())) .block(); } }); diff --git a/pom.xml b/pom.xml index fb0b2f0..7b747ff 100644 --- a/pom.xml +++ b/pom.xml @@ -659,7 +659,7 @@ <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-bom</artifactId> - <version>Dysprosium-RELEASE</version> + <version>Dysprosium-SR6</version> <type>pom</type> <scope>import</scope> </dependency> diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailDispatcher.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailDispatcher.java index 9fb41be..055a56e 100644 --- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailDispatcher.java +++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailDispatcher.java @@ -45,6 +45,7 @@ import com.google.common.collect.ImmutableMap; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; +import reactor.util.retry.Retry; public class MailDispatcher { private static final Logger LOGGER = LoggerFactory.getLogger(MailDispatcher.class); @@ -156,7 +157,7 @@ public class MailDispatcher { return Mono.fromRunnable((ThrowingRunnable)() -> mailStore.storeMail(recipient, mail)) .doOnError(error -> LOGGER.error("Error While storing mail.", error)) .subscribeOn(scheduler) - .retryBackoff(RETRIES, FIRST_BACKOFF, MAX_BACKOFF, scheduler) + .retryWhen(Retry.backoff(RETRIES, FIRST_BACKOFF).maxBackoff(MAX_BACKOFF).scheduler(Schedulers.elastic())) .then(); } diff --git a/server/protocols/webadmin/webadmin-core/src/test/java/org/apache/james/webadmin/WebAdminUtils.java b/server/protocols/webadmin/webadmin-core/src/test/java/org/apache/james/webadmin/WebAdminUtils.java index 5b3d07b..60aeb11 100644 --- a/server/protocols/webadmin/webadmin-core/src/test/java/org/apache/james/webadmin/WebAdminUtils.java +++ b/server/protocols/webadmin/webadmin-core/src/test/java/org/apache/james/webadmin/WebAdminUtils.java @@ -40,6 +40,7 @@ import io.restassured.config.RestAssuredConfig; import io.restassured.http.ContentType; import io.restassured.specification.RequestSpecification; import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; public class WebAdminUtils { private static class ConcurrentSafeWebAdminServer extends WebAdminServer { @@ -54,7 +55,7 @@ public class WebAdminUtils { @Override public WebAdminServer start() { Mono.fromRunnable(super::start) - .retryBackoff(5, Duration.ofMillis(10)) + .retryWhen(Retry.backoff(5, Duration.ofMillis(10))) .block(); return this; } diff --git a/server/queue/queue-activemq/pom.xml b/server/queue/queue-activemq/pom.xml index 9d4b6a7..c37da34 100644 --- a/server/queue/queue-activemq/pom.xml +++ b/server/queue/queue-activemq/pom.xml @@ -37,7 +37,7 @@ <dependency> <groupId>io.netty</groupId> <artifactId>netty-transport</artifactId> - <version>4.1.39.Final</version> + <version>4.1.48.Final</version> </dependency> <dependency> <groupId>${james.groupId}</groupId> diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java index dbdce52..f1daf1f 100644 --- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java +++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java @@ -56,6 +56,7 @@ import reactor.rabbitmq.OutboundMessage; import reactor.rabbitmq.QueueSpecification; import reactor.rabbitmq.Receiver; import reactor.rabbitmq.Sender; +import reactor.util.retry.Retry; public class RabbitMQWorkQueue implements WorkQueue { private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQWorkQueue.class); @@ -71,7 +72,6 @@ public class RabbitMQWorkQueue implements WorkQueue { public static final int NUM_RETRIES = 8; public static final Duration FIRST_BACKOFF = Duration.ofMillis(100); - public static final Duration FOREVER = Duration.ofMillis(Long.MAX_VALUE); private final TaskManagerWorker worker; private final JsonTaskSerializer taskSerializer; @@ -106,13 +106,13 @@ public class RabbitMQWorkQueue implements WorkQueue { void declareQueue() { Mono<AMQP.Exchange.DeclareOk> declareExchange = sender .declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)) - .retryBackoff(NUM_RETRIES, FIRST_BACKOFF, FOREVER); + .retryWhen(Retry.backoff(NUM_RETRIES, FIRST_BACKOFF)); Mono<AMQP.Queue.DeclareOk> declareQueue = sender .declare(QueueSpecification.queue(QUEUE_NAME).durable(true).arguments(Constants.WITH_SINGLE_ACTIVE_CONSUMER)) - .retryBackoff(NUM_RETRIES, FIRST_BACKOFF, FOREVER); + .retryWhen(Retry.backoff(NUM_RETRIES, FIRST_BACKOFF)); Mono<AMQP.Queue.BindOk> bindQueueToExchange = sender .bind(BindingSpecification.binding(EXCHANGE_NAME, ROUTING_KEY, QUEUE_NAME)) - .retryBackoff(NUM_RETRIES, FIRST_BACKOFF, FOREVER); + .retryWhen(Retry.backoff(NUM_RETRIES, FIRST_BACKOFF)); declareExchange .then(declareQueue) --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org