This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 542e3567cf9c1d8553cd58d2d0c1b62591a7bfb1
Author: Benoit TELLIER <[email protected]>
AuthorDate: Wed Jan 7 14:52:25 2026 +0100

    JAMES-4159 Mutualize RetryBackoffSpec logic
---
 .../java/org/apache/james/events/RetryBackoffConfiguration.java    | 7 +++++++
 .../src/main/java/org/apache/james/events/GroupRegistration.java   | 3 +--
 .../java/org/apache/james/events/GroupRegistrationHandler.java     | 3 +--
 .../main/java/org/apache/james/events/delivery/EventDelivery.java  | 5 ++---
 4 files changed, 11 insertions(+), 7 deletions(-)

diff --git 
a/event-bus/api/src/main/java/org/apache/james/events/RetryBackoffConfiguration.java
 
b/event-bus/api/src/main/java/org/apache/james/events/RetryBackoffConfiguration.java
index aafeb2cd06..ba853db9c3 100644
--- 
a/event-bus/api/src/main/java/org/apache/james/events/RetryBackoffConfiguration.java
+++ 
b/event-bus/api/src/main/java/org/apache/james/events/RetryBackoffConfiguration.java
@@ -25,6 +25,9 @@ import java.util.Objects;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
 
+import reactor.util.retry.Retry;
+import reactor.util.retry.RetryBackoffSpec;
+
 public class RetryBackoffConfiguration {
 
     @FunctionalInterface
@@ -97,6 +100,10 @@ public class RetryBackoffConfiguration {
         return jitterFactor;
     }
 
+    public RetryBackoffSpec asReactorRetry() {
+        return Retry.backoff(maxRetries, firstBackoff).jitter(jitterFactor);
+    }
+
     @Override
     public final boolean equals(Object o) {
         if (o instanceof RetryBackoffConfiguration) {
diff --git 
a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java
 
b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java
index 76cd3d9b3c..fe7a1e317d 100644
--- 
a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java
+++ 
b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java
@@ -52,7 +52,6 @@ import reactor.rabbitmq.ConsumeOptions;
 import reactor.rabbitmq.QueueSpecification;
 import reactor.rabbitmq.Receiver;
 import reactor.rabbitmq.Sender;
-import reactor.util.retry.Retry;
 
 class GroupRegistration implements Registration {
 
@@ -117,7 +116,7 @@ class GroupRegistration implements Registration {
             .of(createGroupWorkQueue()
                 .then(retryHandler.createRetryExchange(queueName))
                 .then(Mono.fromCallable(this::consumeWorkQueue))
-                .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), 
retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.boundedElastic()))
+                
.retryWhen(retryBackoff.asReactorRetry().scheduler(Schedulers.boundedElastic()))
                 .block());
         return this;
     }
diff --git 
a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
 
b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
index e19319e636..09c0bd9d86 100644
--- 
a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
+++ 
b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
@@ -58,7 +58,6 @@ import reactor.rabbitmq.ConsumeOptions;
 import reactor.rabbitmq.QueueSpecification;
 import reactor.rabbitmq.Receiver;
 import reactor.rabbitmq.Sender;
-import reactor.util.retry.Retry;
 
 public class GroupRegistrationHandler {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(GroupRegistrationHandler.class);
@@ -119,7 +118,7 @@ public class GroupRegistrationHandler {
                 .exchange(namingStrategy.exchange())
                 .queue(queueName.asString())
                 .routingKey(EMPTY_ROUTING_KEY))
-            .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), 
retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.boundedElastic()))
+            
.retryWhen(retryBackoff.asReactorRetry().scheduler(Schedulers.boundedElastic()))
             .block();
 
         this.consumer = Optional.of(consumeWorkQueue());
diff --git 
a/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/EventDelivery.java
 
b/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/EventDelivery.java
index b9ad84b3c1..91ed00a8a1 100644
--- 
a/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/EventDelivery.java
+++ 
b/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/EventDelivery.java
@@ -35,7 +35,6 @@ import org.slf4j.LoggerFactory;
 
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
-import reactor.util.retry.Retry;
 
 public interface EventDelivery {
 
@@ -99,7 +98,7 @@ public interface EventDelivery {
             @Override
             public Mono<Void> doRetry(Mono<Void> executionResult, Event event) 
{
                 return executionResult
-                    .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), 
retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.parallel()))
+                    
.retryWhen(retryBackoff.asReactorRetry().scheduler(Schedulers.parallel()))
                     .doOnError(throwable -> LOGGER.error("listener {} exceeded 
maximum retry({}) to handle event {}",
                         listener.getClass().getCanonicalName(),
                         retryBackoff.getMaxRetries(),
@@ -111,7 +110,7 @@ public interface EventDelivery {
             @Override
             public Mono<Void> doRetry(Mono<Void> executionResult, List<Event> 
events) {
                 return executionResult
-                    .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), 
retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.parallel()))
+                    
.retryWhen(retryBackoff.asReactorRetry().scheduler(Schedulers.parallel()))
                     .doOnError(throwable -> LOGGER.error("listener {} exceeded 
maximum retry({}) to handle event {}",
                         listener.getClass().getCanonicalName(),
                         retryBackoff.getMaxRetries(),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to