This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit ec2450754a92d7b9d5f5704783757f880da92c51
Author: Benoit TELLIER <[email protected]>
AuthorDate: Thu Jan 8 13:56:30 2026 +0100

    JAMES-4159 s/executionRate/maxConcurrency/g
---
 docs/modules/servers/partials/configure/listeners.adoc            | 2 +-
 event-bus/api/src/main/java/org/apache/james/events/EventBus.java | 6 +++---
 .../api/src/test/java/org/apache/james/events/GroupContract.java  | 2 +-
 .../api/src/test/java/org/apache/james/events/KeyContract.java    | 2 +-
 .../src/main/java/org/apache/james/events/EventDispatcher.java    | 4 ++--
 .../src/main/java/org/apache/james/events/GroupRegistration.java  | 6 +++---
 .../java/org/apache/james/events/GroupRegistrationHandler.java    | 6 +++---
 .../main/java/org/apache/james/events/KeyRegistrationHandler.java | 8 ++++----
 .../src/main/java/org/apache/james/events/RabbitMQEventBus.java   | 2 +-
 .../in-vm/src/main/java/org/apache/james/events/InVMEventBus.java | 6 +++---
 .../java/org/apache/james/events/delivery/InVmEventDelivery.java  | 2 +-
 .../java/org/apache/james/modules/mailbox/DefaultEventModule.java | 2 +-
 12 files changed, 24 insertions(+), 24 deletions(-)

diff --git a/docs/modules/servers/partials/configure/listeners.adoc 
b/docs/modules/servers/partials/configure/listeners.adoc
index 39ea610964..f4b9d657ef 100644
--- a/docs/modules/servers/partials/configure/listeners.adoc
+++ b/docs/modules/servers/partials/configure/listeners.adoc
@@ -26,7 +26,7 @@ If *true* the execution will be scheduled in a reactor 
elastic scheduler. If *fa
 
 Already provided additional listeners are documented below.
 
-The <executionRate> property controls the number of events processed in 
parallel. Defaults to 10.
+The <maxConcurrency> property controls the number of events processed in 
parallel. Defaults to 10.
 
 The <executionTimeout> property (duration) controls the timeout for the 
execution of each listener. None if omitted.
 
diff --git a/event-bus/api/src/main/java/org/apache/james/events/EventBus.java 
b/event-bus/api/src/main/java/org/apache/james/events/EventBus.java
index 19f3c98fa2..61723074a8 100644
--- a/event-bus/api/src/main/java/org/apache/james/events/EventBus.java
+++ b/event-bus/api/src/main/java/org/apache/james/events/EventBus.java
@@ -33,11 +33,11 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public interface EventBus {
-    record Configuration(int executionRate, Optional<Duration> 
executionTimeout) {
-        public static Configuration DEFAULT = new 
Configuration(EXECUTION_RATE, Optional.empty());
+    record Configuration(int maxConcurrency, Optional<Duration> 
executionTimeout) {
+        public static Configuration DEFAULT = new 
Configuration(DEFAULT_MAX_CONCURRENCY, Optional.empty());
     }
 
-    int EXECUTION_RATE = 10;
+    int DEFAULT_MAX_CONCURRENCY = 10;
 
     interface StructuredLoggingFields {
         String EVENT_ID = "eventId";
diff --git 
a/event-bus/api/src/test/java/org/apache/james/events/GroupContract.java 
b/event-bus/api/src/test/java/org/apache/james/events/GroupContract.java
index 7b6efd5829..2615caa9b3 100644
--- a/event-bus/api/src/test/java/org/apache/james/events/GroupContract.java
+++ b/event-bus/api/src/test/java/org/apache/james/events/GroupContract.java
@@ -82,7 +82,7 @@ public interface GroupContract {
 
                 @Override
                 public void event(Event event) throws Exception {
-                    if (inFlight.incrementAndGet() > EventBus.EXECUTION_RATE) {
+                    if (inFlight.incrementAndGet() > 
EventBus.DEFAULT_MAX_CONCURRENCY) {
                         rateExceeded.set(true);
                     }
                     nbCalls.incrementAndGet();
diff --git 
a/event-bus/api/src/test/java/org/apache/james/events/KeyContract.java 
b/event-bus/api/src/test/java/org/apache/james/events/KeyContract.java
index 162bc37cf7..0e0812e8d3 100644
--- a/event-bus/api/src/test/java/org/apache/james/events/KeyContract.java
+++ b/event-bus/api/src/test/java/org/apache/james/events/KeyContract.java
@@ -73,7 +73,7 @@ public interface KeyContract extends EventBusContract {
             AtomicBoolean rateExceeded = new AtomicBoolean(false);
 
             Mono.from(eventBus().register(event -> {
-                if (nbCalls.get() - finishedExecutions.get() > 
EventBus.EXECUTION_RATE) {
+                if (nbCalls.get() - finishedExecutions.get() > 
EventBus.DEFAULT_MAX_CONCURRENCY) {
                     rateExceeded.set(true);
                 }
                 nbCalls.incrementAndGet();
diff --git 
a/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java
 
b/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java
index 32624cdb86..f0316ea3e2 100644
--- 
a/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java
+++ 
b/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java
@@ -137,9 +137,9 @@ public class EventDispatcher {
     private Mono<Void> dispatchToLocalListeners(Event event, 
Set<RegistrationKey> keys) {
         return Flux.fromIterable(keys)
             .flatMap(key -> 
Flux.fromIterable(localListenerRegistry.getLocalListeners(key))
-                .map(listener -> Tuples.of(key, listener)), 
EventBus.EXECUTION_RATE)
+                .map(listener -> Tuples.of(key, listener)), 
EventBus.DEFAULT_MAX_CONCURRENCY)
             .filter(pair -> pair.getT2().getExecutionMode() == 
EventListener.ExecutionMode.SYNCHRONOUS)
-            .flatMap(pair -> executeListener(event, pair.getT2(), 
pair.getT1()), EventBus.EXECUTION_RATE)
+            .flatMap(pair -> executeListener(event, pair.getT2(), 
pair.getT1()), EventBus.DEFAULT_MAX_CONCURRENCY)
             .then();
     }
 
diff --git 
a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java
 
b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java
index 5dcf4734b0..bd9e72f4a9 100644
--- 
a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java
+++ 
b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java
@@ -107,7 +107,7 @@ class GroupRegistration implements Registration {
     }
 
     GroupRegistration start() {
-        scheduler = Schedulers.newBoundedElastic(EventBus.EXECUTION_RATE, 
ReactorUtils.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "group-handler");
+        scheduler = 
Schedulers.newBoundedElastic(configurations.eventBusConfiguration().maxConcurrency(),
 ReactorUtils.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "group-handler");
         receiverSubscriber = Optional
             .of(createGroupWorkQueue()
                 .then(retryHandler.createRetryExchange(queueName))
@@ -139,11 +139,11 @@ class GroupRegistration implements Registration {
     private Disposable consumeWorkQueue() {
         return Flux.using(
                 receiverProvider::createReceiver,
-                receiver -> receiver.consumeManualAck(queueName.asString(), 
new 
ConsumeOptions().qos(configurations.eventBusConfiguration().executionRate())),
+                receiver -> receiver.consumeManualAck(queueName.asString(), 
new 
ConsumeOptions().qos(configurations.eventBusConfiguration().maxConcurrency())),
                 Receiver::close)
             .publishOn(Schedulers.parallel())
             .filter(delivery -> Objects.nonNull(delivery.getBody()))
-            .flatMap(this::deliver, 
configurations.eventBusConfiguration().executionRate())
+            .flatMap(this::deliver, 
configurations.eventBusConfiguration().maxConcurrency())
             .subscribeOn(scheduler)
             .subscribe();
     }
diff --git 
a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
 
b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
index 5b6ecd08c5..6f4b540f53 100644
--- 
a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
+++ 
b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
@@ -93,7 +93,7 @@ public class GroupRegistrationHandler {
         this.configurations = configurations;
         this.groupRegistrations = new ConcurrentHashMap<>();
         this.queueName = namingStrategy.workQueue(GROUP);
-        this.scheduler = Schedulers.newBoundedElastic(EventBus.EXECUTION_RATE, 
ReactorUtils.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "groups-handler");
+        this.scheduler = 
Schedulers.newBoundedElastic(configurations.eventBusConfiguration().maxConcurrency(),
 ReactorUtils.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "groups-handler");
         this.consumer = Optional.empty();
     }
 
@@ -124,10 +124,10 @@ public class GroupRegistrationHandler {
     private Disposable consumeWorkQueue() {
         return Flux.using(
                 receiverProvider::createReceiver,
-            receiver -> receiver.consumeManualAck(queueName.asString(), new 
ConsumeOptions().qos(configurations.eventBusConfiguration().executionRate())),
+            receiver -> receiver.consumeManualAck(queueName.asString(), new 
ConsumeOptions().qos(configurations.eventBusConfiguration().maxConcurrency())),
             Receiver::close)
             .filter(delivery -> Objects.nonNull(delivery.getBody()))
-            .flatMap(this::deliver, 
configurations.eventBusConfiguration().executionRate())
+            .flatMap(this::deliver, 
configurations.eventBusConfiguration().maxConcurrency())
             .subscribeOn(scheduler)
             .subscribe();
     }
diff --git 
a/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java
 
b/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java
index f7d5aad970..ab378fbec6 100644
--- 
a/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java
+++ 
b/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java
@@ -95,14 +95,14 @@ class KeyRegistrationHandler {
     }
 
     void start() {
-        scheduler = Schedulers.newBoundedElastic(EventBus.EXECUTION_RATE, 
ReactorUtils.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "keys-handler");
+        scheduler = 
Schedulers.newBoundedElastic(EventBus.DEFAULT_MAX_CONCURRENCY, 
ReactorUtils.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "keys-handler");
         declareQueue();
 
         newSubscription = Flux.using(
             receiverProvider::createReceiver,
-            receiver -> receiver.consumeAutoAck(registrationQueue.asString(), 
new ConsumeOptions().qos(EventBus.EXECUTION_RATE)),
+            receiver -> receiver.consumeAutoAck(registrationQueue.asString(), 
new ConsumeOptions().qos(EventBus.DEFAULT_MAX_CONCURRENCY)),
             Receiver::close)
-            .flatMap(this::handleDelivery, EventBus.EXECUTION_RATE)
+            .flatMap(this::handleDelivery, EventBus.DEFAULT_MAX_CONCURRENCY)
             .subscribeOn(scheduler)
             .subscribe();
         receiverSubscriber = Optional.of(newSubscription);
@@ -194,7 +194,7 @@ class KeyRegistrationHandler {
         List<Event> events = toEvent(delivery);
 
         return Flux.fromIterable(listenersToCall)
-            .flatMap(listener -> executeListener(listener, events, 
registrationKey), EventBus.EXECUTION_RATE)
+            .flatMap(listener -> executeListener(listener, events, 
registrationKey), EventBus.DEFAULT_MAX_CONCURRENCY)
             .then();
     }
 
diff --git 
a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
 
b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
index b8261bca63..558866651d 100644
--- 
a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
+++ 
b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
@@ -68,7 +68,7 @@ public class RabbitMQEventBus implements EventBus, Startable {
 
     public record Configurations(RabbitMQConfiguration rabbitMQConfiguration, 
RetryBackoffConfiguration retryBackoff, EventBus.Configuration 
eventBusConfiguration) {
         public Configurations(RabbitMQConfiguration rabbitMQConfiguration, 
RetryBackoffConfiguration retryBackoff) {
-            this(rabbitMQConfiguration, retryBackoff, new 
Configuration(EventBus.EXECUTION_RATE, Optional.empty()));
+            this(rabbitMQConfiguration, retryBackoff, new 
Configuration(EventBus.DEFAULT_MAX_CONCURRENCY, Optional.empty()));
         }
     }
 
diff --git 
a/event-bus/in-vm/src/main/java/org/apache/james/events/InVMEventBus.java 
b/event-bus/in-vm/src/main/java/org/apache/james/events/InVMEventBus.java
index bd7c15a9cd..5ebe475d18 100644
--- a/event-bus/in-vm/src/main/java/org/apache/james/events/InVMEventBus.java
+++ b/event-bus/in-vm/src/main/java/org/apache/james/events/InVMEventBus.java
@@ -134,19 +134,19 @@ public class InVMEventBus implements EventBus {
 
     private Mono<Void> keyDeliveries(Event event, Set<RegistrationKey> keys) {
         return Flux.fromIterable(registeredListenersByKeys(keys))
-            .flatMap(listener -> eventDelivery.deliver(listener, event, 
EventDelivery.DeliveryOption.none()), EventBus.EXECUTION_RATE)
+            .flatMap(listener -> eventDelivery.deliver(listener, event, 
EventDelivery.DeliveryOption.none()), EventBus.DEFAULT_MAX_CONCURRENCY)
             .then();
     }
 
     private Mono<Void> keyDeliveries(List<Event> events, Set<RegistrationKey> 
keys) {
         return Flux.fromIterable(registeredListenersByKeys(keys))
-            .flatMap(listener -> eventDelivery.deliver(listener, events, 
EventDelivery.DeliveryOption.none()), EventBus.EXECUTION_RATE)
+            .flatMap(listener -> eventDelivery.deliver(listener, events, 
EventDelivery.DeliveryOption.none()), EventBus.DEFAULT_MAX_CONCURRENCY)
             .then();
     }
 
     private Mono<Void> groupDeliveries(List<Event> events) {
         return Flux.fromIterable(groups.entrySet())
-            .flatMap(entry -> groupDelivery(events, entry.getValue(), 
entry.getKey()), EventBus.EXECUTION_RATE)
+            .flatMap(entry -> groupDelivery(events, entry.getValue(), 
entry.getKey()), EventBus.DEFAULT_MAX_CONCURRENCY)
             .then();
     }
 
diff --git 
a/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/InVmEventDelivery.java
 
b/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/InVmEventDelivery.java
index 0f6d2dbefa..4f124fa67f 100644
--- 
a/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/InVmEventDelivery.java
+++ 
b/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/InVmEventDelivery.java
@@ -53,7 +53,7 @@ public class InVmEventDelivery implements EventDelivery {
     @VisibleForTesting
     public InVmEventDelivery(MetricFactory metricFactory) {
         this.metricFactory = metricFactory;
-        this.configuration = new 
EventBus.Configuration(EventBus.EXECUTION_RATE, Optional.empty());
+        this.configuration = new 
EventBus.Configuration(EventBus.DEFAULT_MAX_CONCURRENCY, Optional.empty());
     }
 
     @Inject
diff --git 
a/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
 
b/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
index 6341002ec4..bfeda1cd68 100644
--- 
a/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
+++ 
b/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
@@ -92,7 +92,7 @@ public class DefaultEventModule extends AbstractModule {
     EventBus.Configuration providesEventBusConfiguration(ConfigurationProvider 
configurationProvider) throws ConfigurationException {
         HierarchicalConfiguration<ImmutableNode> configuration = 
configurationProvider.getConfiguration("listeners");
 
-        return new 
EventBus.Configuration(configuration.getInt("executionRate", 
EventBus.EXECUTION_RATE),
+        return new 
EventBus.Configuration(configuration.getInt("maxConcurrency", 
EventBus.DEFAULT_MAX_CONCURRENCY),
             Optional.ofNullable(configuration.getString("executionTimeout", 
null)).map(DurationParser::parse));
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to