This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 542e3567cf9c1d8553cd58d2d0c1b62591a7bfb1 Author: Benoit TELLIER <[email protected]> AuthorDate: Wed Jan 7 14:52:25 2026 +0100 JAMES-4159 Mutualize RetryBackoffSpec logic --- .../java/org/apache/james/events/RetryBackoffConfiguration.java | 7 +++++++ .../src/main/java/org/apache/james/events/GroupRegistration.java | 3 +-- .../java/org/apache/james/events/GroupRegistrationHandler.java | 3 +-- .../main/java/org/apache/james/events/delivery/EventDelivery.java | 5 ++--- 4 files changed, 11 insertions(+), 7 deletions(-) diff --git a/event-bus/api/src/main/java/org/apache/james/events/RetryBackoffConfiguration.java b/event-bus/api/src/main/java/org/apache/james/events/RetryBackoffConfiguration.java index aafeb2cd06..ba853db9c3 100644 --- a/event-bus/api/src/main/java/org/apache/james/events/RetryBackoffConfiguration.java +++ b/event-bus/api/src/main/java/org/apache/james/events/RetryBackoffConfiguration.java @@ -25,6 +25,9 @@ import java.util.Objects; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; +import reactor.util.retry.Retry; +import reactor.util.retry.RetryBackoffSpec; + public class RetryBackoffConfiguration { @FunctionalInterface @@ -97,6 +100,10 @@ public class RetryBackoffConfiguration { return jitterFactor; } + public RetryBackoffSpec asReactorRetry() { + return Retry.backoff(maxRetries, firstBackoff).jitter(jitterFactor); + } + @Override public final boolean equals(Object o) { if (o instanceof RetryBackoffConfiguration) { diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java index 76cd3d9b3c..fe7a1e317d 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java @@ -52,7 +52,6 @@ import reactor.rabbitmq.ConsumeOptions; import reactor.rabbitmq.QueueSpecification; import reactor.rabbitmq.Receiver; import reactor.rabbitmq.Sender; -import reactor.util.retry.Retry; class GroupRegistration implements Registration { @@ -117,7 +116,7 @@ class GroupRegistration implements Registration { .of(createGroupWorkQueue() .then(retryHandler.createRetryExchange(queueName)) .then(Mono.fromCallable(this::consumeWorkQueue)) - .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.boundedElastic())) + .retryWhen(retryBackoff.asReactorRetry().scheduler(Schedulers.boundedElastic())) .block()); return this; } diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java index e19319e636..09c0bd9d86 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java @@ -58,7 +58,6 @@ import reactor.rabbitmq.ConsumeOptions; import reactor.rabbitmq.QueueSpecification; import reactor.rabbitmq.Receiver; import reactor.rabbitmq.Sender; -import reactor.util.retry.Retry; public class GroupRegistrationHandler { private static final Logger LOGGER = LoggerFactory.getLogger(GroupRegistrationHandler.class); @@ -119,7 +118,7 @@ public class GroupRegistrationHandler { .exchange(namingStrategy.exchange()) .queue(queueName.asString()) .routingKey(EMPTY_ROUTING_KEY)) - .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.boundedElastic())) + .retryWhen(retryBackoff.asReactorRetry().scheduler(Schedulers.boundedElastic())) .block(); this.consumer = Optional.of(consumeWorkQueue()); diff --git a/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/EventDelivery.java b/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/EventDelivery.java index b9ad84b3c1..91ed00a8a1 100644 --- a/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/EventDelivery.java +++ b/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/EventDelivery.java @@ -35,7 +35,6 @@ import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; -import reactor.util.retry.Retry; public interface EventDelivery { @@ -99,7 +98,7 @@ public interface EventDelivery { @Override public Mono<Void> doRetry(Mono<Void> executionResult, Event event) { return executionResult - .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.parallel())) + .retryWhen(retryBackoff.asReactorRetry().scheduler(Schedulers.parallel())) .doOnError(throwable -> LOGGER.error("listener {} exceeded maximum retry({}) to handle event {}", listener.getClass().getCanonicalName(), retryBackoff.getMaxRetries(), @@ -111,7 +110,7 @@ public interface EventDelivery { @Override public Mono<Void> doRetry(Mono<Void> executionResult, List<Event> events) { return executionResult - .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.parallel())) + .retryWhen(retryBackoff.asReactorRetry().scheduler(Schedulers.parallel())) .doOnError(throwable -> LOGGER.error("listener {} exceeded maximum retry({}) to handle event {}", listener.getClass().getCanonicalName(), retryBackoff.getMaxRetries(), --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
