This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 37ceb289a7449a4b354c222199c79f71f870f6d0
Author: Benoit TELLIER <[email protected]>
AuthorDate: Wed Jan 7 16:50:30 2026 +0100

    JAMES-4159 EventBus configuration for execution rate and timeout
---
 .../servers/partials/configure/listeners.adoc        |  4 ++++
 .../main/java/org/apache/james/events/EventBus.java  |  6 ++++++
 .../org/apache/james/events/GroupRegistration.java   | 20 ++++++++------------
 .../james/events/GroupRegistrationHandler.java       |  4 ++--
 .../org/apache/james/events/RabbitMQEventBus.java    |  7 +++++--
 .../james/events/delivery/InVmEventDelivery.java     | 15 ++++++++++++---
 .../modules/event/ContentDeletionEventBusModule.java |  5 +++--
 .../james/modules/event/JMAPEventBusModule.java      |  5 +++--
 .../james/modules/event/MailboxEventBusModule.java   |  5 +++--
 .../james/modules/mailbox/DefaultEventModule.java    | 14 ++++++++++++++
 10 files changed, 60 insertions(+), 25 deletions(-)

diff --git a/docs/modules/servers/partials/configure/listeners.adoc 
b/docs/modules/servers/partials/configure/listeners.adoc
index 4b8acb6670..39ea610964 100644
--- a/docs/modules/servers/partials/configure/listeners.adoc
+++ b/docs/modules/servers/partials/configure/listeners.adoc
@@ -26,6 +26,10 @@ If *true* the execution will be scheduled in a reactor 
elastic scheduler. If *fa
 
 Already provided additional listeners are documented below.
 
+The <executionRate> property controls the number of events processed in 
parallel. Defaults to 10.
+
+The <executionTimeout> property (duration) controls the timeout for the 
execution of each listener. None if omitted.
+
 === SpamAssassinListener
 
 Provides per user real-time HAM/SPAM feedback to a SpamAssassin server 
depending on user actions.
diff --git a/event-bus/api/src/main/java/org/apache/james/events/EventBus.java 
b/event-bus/api/src/main/java/org/apache/james/events/EventBus.java
index f46ba06e05..1fda2f2ad8 100644
--- a/event-bus/api/src/main/java/org/apache/james/events/EventBus.java
+++ b/event-bus/api/src/main/java/org/apache/james/events/EventBus.java
@@ -19,7 +19,9 @@
 
 package org.apache.james.events;
 
+import java.time.Duration;
 import java.util.Collection;
+import java.util.Optional;
 import java.util.Set;
 
 import org.reactivestreams.Publisher;
@@ -31,6 +33,10 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public interface EventBus {
+    record Configuration(int executionRate, Optional<Duration> 
executionTimeout) {
+
+    }
+
     int EXECUTION_RATE = 10;
 
     interface StructuredLoggingFields {
diff --git 
a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java
 
b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java
index dc8be9a9ae..5dcf4734b0 100644
--- 
a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java
+++ 
b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java
@@ -139,11 +139,11 @@ class GroupRegistration implements Registration {
     private Disposable consumeWorkQueue() {
         return Flux.using(
                 receiverProvider::createReceiver,
-                receiver -> receiver.consumeManualAck(queueName.asString(), 
new ConsumeOptions().qos(EventBus.EXECUTION_RATE)),
+                receiver -> receiver.consumeManualAck(queueName.asString(), 
new 
ConsumeOptions().qos(configurations.eventBusConfiguration().executionRate())),
                 Receiver::close)
             .publishOn(Schedulers.parallel())
             .filter(delivery -> Objects.nonNull(delivery.getBody()))
-            .flatMap(this::deliver, EventBus.EXECUTION_RATE)
+            .flatMap(this::deliver, 
configurations.eventBusConfiguration().executionRate())
             .subscribeOn(scheduler)
             .subscribe();
     }
@@ -186,19 +186,15 @@ class GroupRegistration implements Registration {
     }
 
     private Mono<Void> runListener(Event event) {
-        return listenerExecutor.execute(
-            listener,
-            MDCBuilder.create()
-                .addToContext(EventBus.StructuredLoggingFields.GROUP, 
group.asString()),
-            event);
+        MDCBuilder mdc = 
MDCBuilder.create().addToContext(EventBus.StructuredLoggingFields.GROUP, 
group.asString());
+        Mono<Void> result = listenerExecutor.execute(listener, mdc, event);
+        return 
configurations.eventBusConfiguration().executionTimeout().map(result::timeout).orElse(result);
     }
 
     private Mono<Void> runListener(List<Event> events) {
-        return listenerExecutor.execute(
-            listener,
-            MDCBuilder.create()
-                .addToContext(EventBus.StructuredLoggingFields.GROUP, 
group.asString()),
-            events);
+        MDCBuilder mdc = 
MDCBuilder.create().addToContext(EventBus.StructuredLoggingFields.GROUP, 
group.asString());
+        Mono<Void> result = listenerExecutor.execute(listener, mdc, events);
+        return 
configurations.eventBusConfiguration().executionTimeout().map(result::timeout).orElse(result);
     }
 
     private int getRetryCount(AcknowledgableDelivery acknowledgableDelivery) {
diff --git 
a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
 
b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
index d20734429a..5b6ecd08c5 100644
--- 
a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
+++ 
b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
@@ -124,10 +124,10 @@ public class GroupRegistrationHandler {
     private Disposable consumeWorkQueue() {
         return Flux.using(
                 receiverProvider::createReceiver,
-            receiver -> receiver.consumeManualAck(queueName.asString(), new 
ConsumeOptions().qos(EventBus.EXECUTION_RATE)),
+            receiver -> receiver.consumeManualAck(queueName.asString(), new 
ConsumeOptions().qos(configurations.eventBusConfiguration().executionRate())),
             Receiver::close)
             .filter(delivery -> Objects.nonNull(delivery.getBody()))
-            .flatMap(this::deliver, EventBus.EXECUTION_RATE)
+            .flatMap(this::deliver, 
configurations.eventBusConfiguration().executionRate())
             .subscribeOn(scheduler)
             .subscribe();
     }
diff --git 
a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
 
b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
index 481844e565..b8261bca63 100644
--- 
a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
+++ 
b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
@@ -20,6 +20,7 @@
 package org.apache.james.events;
 
 import java.util.Collection;
+import java.util.Optional;
 import java.util.Set;
 
 import jakarta.annotation.PreDestroy;
@@ -65,8 +66,10 @@ public class RabbitMQEventBus implements EventBus, Startable 
{
     private static final String NOT_RUNNING_ERROR_MESSAGE = "Event Bus is not 
running";
     static final String EVENT_BUS_ID = "eventBusId";
 
-    public record Configurations(RabbitMQConfiguration rabbitMQConfiguration, 
RetryBackoffConfiguration retryBackoff) {
-
+    public record Configurations(RabbitMQConfiguration rabbitMQConfiguration, 
RetryBackoffConfiguration retryBackoff, EventBus.Configuration 
eventBusConfiguration) {
+        public Configurations(RabbitMQConfiguration rabbitMQConfiguration, 
RetryBackoffConfiguration retryBackoff) {
+            this(rabbitMQConfiguration, retryBackoff, new 
Configuration(EventBus.EXECUTION_RATE, Optional.empty()));
+        }
     }
 
     private final NamingStrategy namingStrategy;
diff --git 
a/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/InVmEventDelivery.java
 
b/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/InVmEventDelivery.java
index 7140f5e7fa..0f6d2dbefa 100644
--- 
a/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/InVmEventDelivery.java
+++ 
b/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/InVmEventDelivery.java
@@ -23,6 +23,7 @@ import static 
org.apache.james.events.EventBus.Metrics.timerName;
 import static org.apache.james.util.ReactorUtils.context;
 
 import java.util.List;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 import jakarta.inject.Inject;
@@ -47,11 +48,18 @@ public class InVmEventDelivery implements EventDelivery {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(InVmEventDelivery.class);
 
     private final MetricFactory metricFactory;
+    private final EventBus.Configuration configuration;
 
-    @Inject
     @VisibleForTesting
     public InVmEventDelivery(MetricFactory metricFactory) {
         this.metricFactory = metricFactory;
+        this.configuration = new 
EventBus.Configuration(EventBus.EXECUTION_RATE, Optional.empty());
+    }
+
+    @Inject
+    public InVmEventDelivery(MetricFactory metricFactory, 
EventBus.Configuration configuration) {
+        this.metricFactory = metricFactory;
+        this.configuration = configuration;
     }
 
     @Override
@@ -95,8 +103,9 @@ public class InVmEventDelivery implements EventDelivery {
         if (events.stream().noneMatch(listener::isHandling)) {
             return Mono.empty();
         }
-        return Mono.defer(() -> 
Mono.from(metricFactory.decoratePublisherWithTimerMetric(timerName(listener),
-                listener.reactiveEvent(events))))
+        Mono<Void> result = Mono.defer(() -> 
Mono.from(metricFactory.decoratePublisherWithTimerMetric(timerName(listener),
+            listener.reactiveEvent(events))));
+        return 
configuration.executionTimeout().map(result::timeout).orElse(result)
             .contextWrite(context("deliver", buildMDC(listener, events)));
     }
 
diff --git 
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/ContentDeletionEventBusModule.java
 
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/ContentDeletionEventBusModule.java
index 02c2a4baf1..409dc08f40 100644
--- 
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/ContentDeletionEventBusModule.java
+++ 
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/ContentDeletionEventBusModule.java
@@ -103,8 +103,9 @@ public class ContentDeletionEventBusModule extends 
AbstractModule {
                                                     MailboxEventSerializer 
eventSerializer,
                                                     RetryBackoffConfiguration 
retryBackoffConfiguration,
                                                     @Named(CONTENT_DELETION) 
EventBusId eventBusId,
-                                                    RabbitMQConfiguration 
configuration) {
-        return eventBusFactory.create(eventBusId, 
CONTENT_DELETION_NAMING_STRATEGY,  new RoutingKeyConverter(ImmutableSet.of(new 
Factory())), eventSerializer, new 
RabbitMQEventBus.Configurations(configuration, retryBackoffConfiguration));
+                                                    RabbitMQConfiguration 
configuration,
+                                                    EventBus.Configuration 
eventBusConfiguration) {
+        return eventBusFactory.create(eventBusId, 
CONTENT_DELETION_NAMING_STRATEGY,  new RoutingKeyConverter(ImmutableSet.of(new 
Factory())), eventSerializer, new 
RabbitMQEventBus.Configurations(configuration, retryBackoffConfiguration, 
eventBusConfiguration));
     }
 
     @Provides
diff --git 
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java
 
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java
index a4e24239b0..ecb6bc46b9 100644
--- 
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java
+++ 
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java
@@ -97,9 +97,10 @@ public class JMAPEventBusModule extends AbstractModule {
                                          JmapEventSerializer eventSerializer,
                                          RetryBackoffConfiguration 
retryBackoffConfiguration,
                                          @Named(InjectionKeys.JMAP) EventBusId 
eventBusId,
-                                         RabbitMQConfiguration configuration) {
+                                         RabbitMQConfiguration configuration,
+                                         EventBus.Configuration 
eventBusConfiguration) {
         return eventBusFactory.create(eventBusId,
-            JMAP_NAMING_STRATEGY, new RoutingKeyConverter(ImmutableSet.of(new 
Factory())), eventSerializer, new 
RabbitMQEventBus.Configurations(configuration, retryBackoffConfiguration));
+            JMAP_NAMING_STRATEGY, new RoutingKeyConverter(ImmutableSet.of(new 
Factory())), eventSerializer, new 
RabbitMQEventBus.Configurations(configuration, retryBackoffConfiguration, 
eventBusConfiguration));
     }
 
     @Provides
diff --git 
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/MailboxEventBusModule.java
 
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/MailboxEventBusModule.java
index d98720cd86..bf7dbf63cc 100644
--- 
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/MailboxEventBusModule.java
+++ 
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/MailboxEventBusModule.java
@@ -92,8 +92,9 @@ public class MailboxEventBusModule extends AbstractModule {
     RabbitMQEventBus provideRabbitMQEventBus(RabbitMQEventBus.Factory 
eventBusFactory, NamingStrategy namingStrategy,
                                              MailboxEventSerializer 
eventSerializer, RetryBackoffConfiguration retryBackoff,
                                              RoutingKeyConverter 
routingKeyConverter, EventBusId eventBusId,
-                                             RabbitMQConfiguration 
configuration) {
-        return eventBusFactory.create(eventBusId, namingStrategy, 
routingKeyConverter, eventSerializer, new 
RabbitMQEventBus.Configurations(configuration, retryBackoff));
+                                             RabbitMQConfiguration 
configuration,
+                                             EventBus.Configuration 
eventBusConfiguration) {
+        return eventBusFactory.create(eventBusId, namingStrategy, 
routingKeyConverter, eventSerializer, new 
RabbitMQEventBus.Configurations(configuration, retryBackoff, 
eventBusConfiguration));
     }
 
     @Provides
diff --git 
a/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
 
b/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
index e68106e868..6341002ec4 100644
--- 
a/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
+++ 
b/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
@@ -19,7 +19,11 @@
 
 package org.apache.james.modules.mailbox;
 
+import java.util.Optional;
+
+import org.apache.commons.configuration2.HierarchicalConfiguration;
 import org.apache.commons.configuration2.ex.ConfigurationException;
+import org.apache.commons.configuration2.tree.ImmutableNode;
 import org.apache.james.event.json.MailboxEventSerializer;
 import org.apache.james.events.EventBus;
 import org.apache.james.events.EventListener;
@@ -31,6 +35,7 @@ import org.apache.james.events.delivery.EventDelivery;
 import org.apache.james.events.delivery.InVmEventDelivery;
 import org.apache.james.modules.EventDeadLettersProbe;
 import org.apache.james.server.core.configuration.ConfigurationProvider;
+import org.apache.james.util.DurationParser;
 import org.apache.james.utils.GuiceProbe;
 import org.apache.james.utils.InitializationOperation;
 import org.apache.james.utils.InitilizationOperationBuilder;
@@ -81,4 +86,13 @@ public class DefaultEventModule extends AbstractModule {
             .forClass(MailboxListenersLoaderImpl.class)
             .init(() -> listeners.configure(configuration));
     }
+
+    @Provides
+    @Singleton
+    EventBus.Configuration providesEventBusConfiguration(ConfigurationProvider 
configurationProvider) throws ConfigurationException {
+        HierarchicalConfiguration<ImmutableNode> configuration = 
configurationProvider.getConfiguration("listeners");
+
+        return new 
EventBus.Configuration(configuration.getInt("executionRate", 
EventBus.EXECUTION_RATE),
+            Optional.ofNullable(configuration.getString("executionTimeout", 
null)).map(DurationParser::parse));
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to