This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 37ceb289a7449a4b354c222199c79f71f870f6d0 Author: Benoit TELLIER <[email protected]> AuthorDate: Wed Jan 7 16:50:30 2026 +0100 JAMES-4159 EventBus configuration for execution rate and timeout --- .../servers/partials/configure/listeners.adoc | 4 ++++ .../main/java/org/apache/james/events/EventBus.java | 6 ++++++ .../org/apache/james/events/GroupRegistration.java | 20 ++++++++------------ .../james/events/GroupRegistrationHandler.java | 4 ++-- .../org/apache/james/events/RabbitMQEventBus.java | 7 +++++-- .../james/events/delivery/InVmEventDelivery.java | 15 ++++++++++++--- .../modules/event/ContentDeletionEventBusModule.java | 5 +++-- .../james/modules/event/JMAPEventBusModule.java | 5 +++-- .../james/modules/event/MailboxEventBusModule.java | 5 +++-- .../james/modules/mailbox/DefaultEventModule.java | 14 ++++++++++++++ 10 files changed, 60 insertions(+), 25 deletions(-) diff --git a/docs/modules/servers/partials/configure/listeners.adoc b/docs/modules/servers/partials/configure/listeners.adoc index 4b8acb6670..39ea610964 100644 --- a/docs/modules/servers/partials/configure/listeners.adoc +++ b/docs/modules/servers/partials/configure/listeners.adoc @@ -26,6 +26,10 @@ If *true* the execution will be scheduled in a reactor elastic scheduler. If *fa Already provided additional listeners are documented below. +The <executionRate> property controls the number of events processed in parallel. Defaults to 10. + +The <executionTimeout> property (duration) controls the timeout for the execution of each listener. None if omitted. + === SpamAssassinListener Provides per user real-time HAM/SPAM feedback to a SpamAssassin server depending on user actions. diff --git a/event-bus/api/src/main/java/org/apache/james/events/EventBus.java b/event-bus/api/src/main/java/org/apache/james/events/EventBus.java index f46ba06e05..1fda2f2ad8 100644 --- a/event-bus/api/src/main/java/org/apache/james/events/EventBus.java +++ b/event-bus/api/src/main/java/org/apache/james/events/EventBus.java @@ -19,7 +19,9 @@ package org.apache.james.events; +import java.time.Duration; import java.util.Collection; +import java.util.Optional; import java.util.Set; import org.reactivestreams.Publisher; @@ -31,6 +33,10 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public interface EventBus { + record Configuration(int executionRate, Optional<Duration> executionTimeout) { + + } + int EXECUTION_RATE = 10; interface StructuredLoggingFields { diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java index dc8be9a9ae..5dcf4734b0 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java @@ -139,11 +139,11 @@ class GroupRegistration implements Registration { private Disposable consumeWorkQueue() { return Flux.using( receiverProvider::createReceiver, - receiver -> receiver.consumeManualAck(queueName.asString(), new ConsumeOptions().qos(EventBus.EXECUTION_RATE)), + receiver -> receiver.consumeManualAck(queueName.asString(), new ConsumeOptions().qos(configurations.eventBusConfiguration().executionRate())), Receiver::close) .publishOn(Schedulers.parallel()) .filter(delivery -> Objects.nonNull(delivery.getBody())) - .flatMap(this::deliver, EventBus.EXECUTION_RATE) + .flatMap(this::deliver, configurations.eventBusConfiguration().executionRate()) .subscribeOn(scheduler) .subscribe(); } @@ -186,19 +186,15 @@ class GroupRegistration implements Registration { } private Mono<Void> runListener(Event event) { - return listenerExecutor.execute( - listener, - MDCBuilder.create() - .addToContext(EventBus.StructuredLoggingFields.GROUP, group.asString()), - event); + MDCBuilder mdc = MDCBuilder.create().addToContext(EventBus.StructuredLoggingFields.GROUP, group.asString()); + Mono<Void> result = listenerExecutor.execute(listener, mdc, event); + return configurations.eventBusConfiguration().executionTimeout().map(result::timeout).orElse(result); } private Mono<Void> runListener(List<Event> events) { - return listenerExecutor.execute( - listener, - MDCBuilder.create() - .addToContext(EventBus.StructuredLoggingFields.GROUP, group.asString()), - events); + MDCBuilder mdc = MDCBuilder.create().addToContext(EventBus.StructuredLoggingFields.GROUP, group.asString()); + Mono<Void> result = listenerExecutor.execute(listener, mdc, events); + return configurations.eventBusConfiguration().executionTimeout().map(result::timeout).orElse(result); } private int getRetryCount(AcknowledgableDelivery acknowledgableDelivery) { diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java index d20734429a..5b6ecd08c5 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java @@ -124,10 +124,10 @@ public class GroupRegistrationHandler { private Disposable consumeWorkQueue() { return Flux.using( receiverProvider::createReceiver, - receiver -> receiver.consumeManualAck(queueName.asString(), new ConsumeOptions().qos(EventBus.EXECUTION_RATE)), + receiver -> receiver.consumeManualAck(queueName.asString(), new ConsumeOptions().qos(configurations.eventBusConfiguration().executionRate())), Receiver::close) .filter(delivery -> Objects.nonNull(delivery.getBody())) - .flatMap(this::deliver, EventBus.EXECUTION_RATE) + .flatMap(this::deliver, configurations.eventBusConfiguration().executionRate()) .subscribeOn(scheduler) .subscribe(); } diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java index 481844e565..b8261bca63 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java @@ -20,6 +20,7 @@ package org.apache.james.events; import java.util.Collection; +import java.util.Optional; import java.util.Set; import jakarta.annotation.PreDestroy; @@ -65,8 +66,10 @@ public class RabbitMQEventBus implements EventBus, Startable { private static final String NOT_RUNNING_ERROR_MESSAGE = "Event Bus is not running"; static final String EVENT_BUS_ID = "eventBusId"; - public record Configurations(RabbitMQConfiguration rabbitMQConfiguration, RetryBackoffConfiguration retryBackoff) { - + public record Configurations(RabbitMQConfiguration rabbitMQConfiguration, RetryBackoffConfiguration retryBackoff, EventBus.Configuration eventBusConfiguration) { + public Configurations(RabbitMQConfiguration rabbitMQConfiguration, RetryBackoffConfiguration retryBackoff) { + this(rabbitMQConfiguration, retryBackoff, new Configuration(EventBus.EXECUTION_RATE, Optional.empty())); + } } private final NamingStrategy namingStrategy; diff --git a/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/InVmEventDelivery.java b/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/InVmEventDelivery.java index 7140f5e7fa..0f6d2dbefa 100644 --- a/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/InVmEventDelivery.java +++ b/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/InVmEventDelivery.java @@ -23,6 +23,7 @@ import static org.apache.james.events.EventBus.Metrics.timerName; import static org.apache.james.util.ReactorUtils.context; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; import jakarta.inject.Inject; @@ -47,11 +48,18 @@ public class InVmEventDelivery implements EventDelivery { private static final Logger LOGGER = LoggerFactory.getLogger(InVmEventDelivery.class); private final MetricFactory metricFactory; + private final EventBus.Configuration configuration; - @Inject @VisibleForTesting public InVmEventDelivery(MetricFactory metricFactory) { this.metricFactory = metricFactory; + this.configuration = new EventBus.Configuration(EventBus.EXECUTION_RATE, Optional.empty()); + } + + @Inject + public InVmEventDelivery(MetricFactory metricFactory, EventBus.Configuration configuration) { + this.metricFactory = metricFactory; + this.configuration = configuration; } @Override @@ -95,8 +103,9 @@ public class InVmEventDelivery implements EventDelivery { if (events.stream().noneMatch(listener::isHandling)) { return Mono.empty(); } - return Mono.defer(() -> Mono.from(metricFactory.decoratePublisherWithTimerMetric(timerName(listener), - listener.reactiveEvent(events)))) + Mono<Void> result = Mono.defer(() -> Mono.from(metricFactory.decoratePublisherWithTimerMetric(timerName(listener), + listener.reactiveEvent(events)))); + return configuration.executionTimeout().map(result::timeout).orElse(result) .contextWrite(context("deliver", buildMDC(listener, events))); } diff --git a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/ContentDeletionEventBusModule.java b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/ContentDeletionEventBusModule.java index 02c2a4baf1..409dc08f40 100644 --- a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/ContentDeletionEventBusModule.java +++ b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/ContentDeletionEventBusModule.java @@ -103,8 +103,9 @@ public class ContentDeletionEventBusModule extends AbstractModule { MailboxEventSerializer eventSerializer, RetryBackoffConfiguration retryBackoffConfiguration, @Named(CONTENT_DELETION) EventBusId eventBusId, - RabbitMQConfiguration configuration) { - return eventBusFactory.create(eventBusId, CONTENT_DELETION_NAMING_STRATEGY, new RoutingKeyConverter(ImmutableSet.of(new Factory())), eventSerializer, new RabbitMQEventBus.Configurations(configuration, retryBackoffConfiguration)); + RabbitMQConfiguration configuration, + EventBus.Configuration eventBusConfiguration) { + return eventBusFactory.create(eventBusId, CONTENT_DELETION_NAMING_STRATEGY, new RoutingKeyConverter(ImmutableSet.of(new Factory())), eventSerializer, new RabbitMQEventBus.Configurations(configuration, retryBackoffConfiguration, eventBusConfiguration)); } @Provides diff --git a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java index a4e24239b0..ecb6bc46b9 100644 --- a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java +++ b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java @@ -97,9 +97,10 @@ public class JMAPEventBusModule extends AbstractModule { JmapEventSerializer eventSerializer, RetryBackoffConfiguration retryBackoffConfiguration, @Named(InjectionKeys.JMAP) EventBusId eventBusId, - RabbitMQConfiguration configuration) { + RabbitMQConfiguration configuration, + EventBus.Configuration eventBusConfiguration) { return eventBusFactory.create(eventBusId, - JMAP_NAMING_STRATEGY, new RoutingKeyConverter(ImmutableSet.of(new Factory())), eventSerializer, new RabbitMQEventBus.Configurations(configuration, retryBackoffConfiguration)); + JMAP_NAMING_STRATEGY, new RoutingKeyConverter(ImmutableSet.of(new Factory())), eventSerializer, new RabbitMQEventBus.Configurations(configuration, retryBackoffConfiguration, eventBusConfiguration)); } @Provides diff --git a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/MailboxEventBusModule.java b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/MailboxEventBusModule.java index d98720cd86..bf7dbf63cc 100644 --- a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/MailboxEventBusModule.java +++ b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/MailboxEventBusModule.java @@ -92,8 +92,9 @@ public class MailboxEventBusModule extends AbstractModule { RabbitMQEventBus provideRabbitMQEventBus(RabbitMQEventBus.Factory eventBusFactory, NamingStrategy namingStrategy, MailboxEventSerializer eventSerializer, RetryBackoffConfiguration retryBackoff, RoutingKeyConverter routingKeyConverter, EventBusId eventBusId, - RabbitMQConfiguration configuration) { - return eventBusFactory.create(eventBusId, namingStrategy, routingKeyConverter, eventSerializer, new RabbitMQEventBus.Configurations(configuration, retryBackoff)); + RabbitMQConfiguration configuration, + EventBus.Configuration eventBusConfiguration) { + return eventBusFactory.create(eventBusId, namingStrategy, routingKeyConverter, eventSerializer, new RabbitMQEventBus.Configurations(configuration, retryBackoff, eventBusConfiguration)); } @Provides diff --git a/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java b/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java index e68106e868..6341002ec4 100644 --- a/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java +++ b/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java @@ -19,7 +19,11 @@ package org.apache.james.modules.mailbox; +import java.util.Optional; + +import org.apache.commons.configuration2.HierarchicalConfiguration; import org.apache.commons.configuration2.ex.ConfigurationException; +import org.apache.commons.configuration2.tree.ImmutableNode; import org.apache.james.event.json.MailboxEventSerializer; import org.apache.james.events.EventBus; import org.apache.james.events.EventListener; @@ -31,6 +35,7 @@ import org.apache.james.events.delivery.EventDelivery; import org.apache.james.events.delivery.InVmEventDelivery; import org.apache.james.modules.EventDeadLettersProbe; import org.apache.james.server.core.configuration.ConfigurationProvider; +import org.apache.james.util.DurationParser; import org.apache.james.utils.GuiceProbe; import org.apache.james.utils.InitializationOperation; import org.apache.james.utils.InitilizationOperationBuilder; @@ -81,4 +86,13 @@ public class DefaultEventModule extends AbstractModule { .forClass(MailboxListenersLoaderImpl.class) .init(() -> listeners.configure(configuration)); } + + @Provides + @Singleton + EventBus.Configuration providesEventBusConfiguration(ConfigurationProvider configurationProvider) throws ConfigurationException { + HierarchicalConfiguration<ImmutableNode> configuration = configurationProvider.getConfiguration("listeners"); + + return new EventBus.Configuration(configuration.getInt("executionRate", EventBus.EXECUTION_RATE), + Optional.ofNullable(configuration.getString("executionTimeout", null)).map(DurationParser::parse)); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
