This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 28409578f4918981c53c6d6d7effe9c6c50b14fa
Author: Benoit TELLIER <[email protected]>
AuthorDate: Thu Dec 12 14:21:27 2024 +0100

    [PERF] InVMEventBus ability to deliver several events at once
    
     - Events are handled separately upon retries
     - MDC preserved for single event, event infos are joined
     when multiple events
---
 .../org/apache/james/events/EventListener.java     |  1 +
 .../java/org/apache/james/events/InVMEventBus.java | 32 ++++++--
 .../james/events/delivery/EventDelivery.java       | 31 +++++++-
 .../james/events/delivery/InVmEventDelivery.java   | 89 ++++++++++++++++------
 4 files changed, 123 insertions(+), 30 deletions(-)

diff --git 
a/event-bus/api/src/main/java/org/apache/james/events/EventListener.java 
b/event-bus/api/src/main/java/org/apache/james/events/EventListener.java
index 9631a21121..b1eddb5c4c 100644
--- a/event-bus/api/src/main/java/org/apache/james/events/EventListener.java
+++ b/event-bus/api/src/main/java/org/apache/james/events/EventListener.java
@@ -40,6 +40,7 @@ public interface EventListener {
 
         default Publisher<Void> reactiveEvent(List<Event> event) {
             return Flux.fromIterable(event)
+                .filter(this::isHandling)
                 .concatMap(this::reactiveEvent)
                 .then();
         }
diff --git 
a/event-bus/in-vm/src/main/java/org/apache/james/events/InVMEventBus.java 
b/event-bus/in-vm/src/main/java/org/apache/james/events/InVMEventBus.java
index 3006afb794..079ad648d5 100644
--- a/event-bus/in-vm/src/main/java/org/apache/james/events/InVMEventBus.java
+++ b/event-bus/in-vm/src/main/java/org/apache/james/events/InVMEventBus.java
@@ -21,6 +21,7 @@ package org.apache.james.events;
 
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -29,6 +30,7 @@ import jakarta.inject.Inject;
 
 import org.apache.james.events.delivery.EventDelivery;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Multimaps;
@@ -74,7 +76,25 @@ public class InVMEventBus implements EventBus {
     @Override
     public Mono<Void> dispatch(Event event, Set<RegistrationKey> keys) {
         if (!event.isNoop()) {
-            return Flux.merge(groupDeliveries(event), keyDeliveries(event, 
keys))
+            return Flux.merge(groupDeliveries(ImmutableList.of(event)), 
keyDeliveries(event, keys))
+                .then()
+                .onErrorResume(throwable -> Mono.empty());
+        }
+        return Mono.empty();
+    }
+
+    @Override
+    public Mono<Void> dispatch(Collection<EventWithRegistrationKey> events) {
+        ImmutableList<EventWithRegistrationKey> notNoopEvents = events.stream()
+            .filter(e -> !e.event().isNoop())
+            .collect(ImmutableList.toImmutableList());
+        if (!notNoopEvents.isEmpty()) {
+            return Flux.merge(
+                    groupDeliveries(notNoopEvents.stream()
+                        .map(EventWithRegistrationKey::event)
+                        .collect(ImmutableList.toImmutableList())),
+                    Flux.fromIterable(events)
+                        .concatMap(e -> keyDeliveries(e.event(), e.keys())))
                 .then()
                 .onErrorResume(throwable -> Mono.empty());
         }
@@ -84,7 +104,7 @@ public class InVMEventBus implements EventBus {
     @Override
     public Mono<Void> reDeliver(Group group, Event event) {
         if (!event.isNoop()) {
-            return groupDelivery(event, retrieveListenerFromGroup(group), 
group);
+            return groupDelivery(ImmutableList.of(event), 
retrieveListenerFromGroup(group), group);
         }
         return Mono.empty();
     }
@@ -110,16 +130,16 @@ public class InVMEventBus implements EventBus {
             .then();
     }
 
-    private Mono<Void> groupDeliveries(Event event) {
+    private Mono<Void> groupDeliveries(List<Event> events) {
         return Flux.fromIterable(groups.entrySet())
-            .flatMap(entry -> groupDelivery(event, entry.getValue(), 
entry.getKey()), EventBus.EXECUTION_RATE)
+            .flatMap(entry -> groupDelivery(events, entry.getValue(), 
entry.getKey()), EventBus.EXECUTION_RATE)
             .then();
     }
 
-    private Mono<Void> groupDelivery(Event event, 
EventListener.ReactiveEventListener listener, Group group) {
+    private Mono<Void> groupDelivery(List<Event> events, 
EventListener.ReactiveEventListener listener, Group group) {
         return eventDelivery.deliver(
             listener,
-            event,
+            events,
             EventDelivery.DeliveryOption.of(
                 EventDelivery.Retryer.BackoffRetryer.of(retryBackoff, 
listener),
                 
EventDelivery.PermanentFailureHandler.StoreToDeadLetters.of(group, 
eventDeadLetters)));
diff --git 
a/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/EventDelivery.java
 
b/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/EventDelivery.java
index 4ad0feb3f3..b9ad84b3c1 100644
--- 
a/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/EventDelivery.java
+++ 
b/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/EventDelivery.java
@@ -22,6 +22,9 @@ package org.apache.james.events.delivery;
 import static 
org.apache.james.events.delivery.EventDelivery.PermanentFailureHandler.NO_HANDLER;
 import static 
org.apache.james.events.delivery.EventDelivery.Retryer.NO_RETRYER;
 
+import java.util.List;
+import java.util.stream.Collectors;
+
 import org.apache.james.events.Event;
 import org.apache.james.events.EventDeadLetters;
 import org.apache.james.events.EventListener;
@@ -65,7 +68,17 @@ public interface EventDelivery {
 
     interface Retryer {
 
-        Retryer NO_RETRYER = (executionResult, event) -> executionResult;
+        Retryer NO_RETRYER = new Retryer() {
+            @Override
+            public Mono<Void> doRetry(Mono<Void> executionResult, Event event) 
{
+                return executionResult;
+            }
+
+            @Override
+            public Mono<Void> doRetry(Mono<Void> executionResult, List<Event> 
events) {
+                return executionResult;
+            }
+        };
 
         class BackoffRetryer implements Retryer {
 
@@ -94,9 +107,23 @@ public interface EventDelivery {
                         throwable))
                     .then();
             }
+
+            @Override
+            public Mono<Void> doRetry(Mono<Void> executionResult, List<Event> 
events) {
+                return executionResult
+                    .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), 
retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.parallel()))
+                    .doOnError(throwable -> LOGGER.error("listener {} exceeded 
maximum retry({}) to handle event {}",
+                        listener.getClass().getCanonicalName(),
+                        retryBackoff.getMaxRetries(),
+                        events.stream().map(e -> 
e.getClass().getCanonicalName()).collect(Collectors.joining(",")),
+                        throwable))
+                    .then();
+            }
         }
 
         Mono<Void> doRetry(Mono<Void> executionResult, Event event);
+
+        Mono<Void> doRetry(Mono<Void> executionResult, List<Event> events);
     }
 
     interface PermanentFailureHandler {
@@ -128,6 +155,8 @@ public interface EventDelivery {
 
     Mono<Void> deliver(EventListener.ReactiveEventListener listener, Event 
event, DeliveryOption option);
 
+    Mono<Void> deliver(EventListener.ReactiveEventListener listener, 
List<Event> events, DeliveryOption option);
+
     default Mono<Void> deliver(EventListener listener, Event event, 
DeliveryOption option) {
         return deliver(EventListener.wrapReactive(listener), event, option);
     }
diff --git 
a/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/InVmEventDelivery.java
 
b/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/InVmEventDelivery.java
index de47730533..66fda0526f 100644
--- 
a/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/InVmEventDelivery.java
+++ 
b/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/InVmEventDelivery.java
@@ -22,6 +22,9 @@ package org.apache.james.events.delivery;
 import static org.apache.james.events.EventBus.Metrics.timerName;
 import static org.apache.james.util.ReactorUtils.context;
 
+import java.util.List;
+import java.util.stream.Collectors;
+
 import jakarta.inject.Inject;
 
 import org.apache.james.events.Event;
@@ -35,6 +38,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -52,9 +56,17 @@ public class InVmEventDelivery implements EventDelivery {
 
     @Override
     public Mono<Void> deliver(EventListener.ReactiveEventListener listener, 
Event event, DeliveryOption option) {
-        Mono<Void> executionResult = deliverByOption(listener, event, option);
+        return deliver(listener, ImmutableList.of(event), option);
+    }
+
+    @Override
+    public Mono<Void> deliver(EventListener.ReactiveEventListener listener, 
List<Event> events, DeliveryOption option) {
+        if (isHandling(listener, events)) {
+            Mono<Void> executionResult = deliverByOption(listener, events, 
option);
 
-        return waitForResultIfNeeded(listener.getExecutionMode(), 
executionResult);
+            return waitForResultIfNeeded(listener.getExecutionMode(), 
executionResult);
+        }
+        return Mono.empty();
     }
 
     private Mono<Void> waitForResultIfNeeded(EventListener.ExecutionMode 
executionMode, Mono<Void> executionResult) {
@@ -66,39 +78,70 @@ public class InVmEventDelivery implements EventDelivery {
             .onErrorResume(throwable -> Mono.empty());
     }
 
-    private Mono<Void> deliverByOption(EventListener.ReactiveEventListener 
listener, Event event, DeliveryOption deliveryOption) {
-        Mono<Void> deliveryToListener = doDeliverToListener(listener, event)
-            .doOnError(throwable -> structuredLogger(event, listener)
+    private Mono<Void> deliverByOption(EventListener.ReactiveEventListener 
listener, List<Event> events, DeliveryOption deliveryOption) {
+        Mono<Void> deliveryToListener = doDeliverToListener(listener, events)
+            .doOnError(throwable -> structuredLogger(events, listener)
                 .log(logger -> logger.error("Error while processing listener", 
throwable)))
             .then();
 
-        return deliveryOption.getRetrier().doRetry(deliveryToListener, event)
-            .onErrorResume(throwable -> 
deliveryOption.getPermanentFailureHandler().handle(event))
-            .then();
+        return deliveryOption.getRetrier().doRetry(deliveryToListener, events)
+            .then()
+            .onErrorResume(e -> Flux.fromIterable(events)
+                .concatMap(event -> 
deliveryOption.getPermanentFailureHandler().handle(event))
+                .then());
     }
 
-    private Mono<Void> doDeliverToListener(EventListener.ReactiveEventListener 
listener, Event event) {
-        if (listener.isHandling(event)) {
-            return Mono.defer(() -> 
Mono.from(metricFactory.decoratePublisherWithTimerMetric(timerName(listener),
-                    listener.reactiveEvent(event))))
-                .contextWrite(context("deliver", buildMDC(listener, event)));
-        }
-        return Mono.empty();
+    private Mono<Void> doDeliverToListener(EventListener.ReactiveEventListener 
listener, List<Event> events) {
+        return Mono.defer(() -> 
Mono.from(metricFactory.decoratePublisherWithTimerMetric(timerName(listener),
+                listener.reactiveEvent(events))))
+            .contextWrite(context("deliver", buildMDC(listener, events)));
+    }
+
+    private static boolean isHandling(EventListener.ReactiveEventListener 
listener, List<Event> events) {
+        return events.stream().anyMatch(listener::isHandling);
     }
 
-    private MDCBuilder buildMDC(EventListener listener, Event event) {
+    private MDCBuilder buildMDC(EventListener listener, List<Event> events) {
+        if (events.size() == 1) {
+            return MDCBuilder.create()
+                .addToContext(EventBus.StructuredLoggingFields.EVENT_ID, 
events.getFirst().getEventId().toString())
+                .addToContext(EventBus.StructuredLoggingFields.EVENT_CLASS, 
events.getFirst().getClass().getCanonicalName())
+                .addToContext(EventBus.StructuredLoggingFields.USER, 
events.getFirst().getUsername().asString())
+                .addToContext(EventBus.StructuredLoggingFields.LISTENER_CLASS, 
listener.getClass().getCanonicalName());
+        }
+
         return MDCBuilder.create()
-            .addToContext(EventBus.StructuredLoggingFields.EVENT_ID, 
event.getEventId().toString())
-            .addToContext(EventBus.StructuredLoggingFields.EVENT_CLASS, 
event.getClass().getCanonicalName())
-            .addToContext(EventBus.StructuredLoggingFields.USER, 
event.getUsername().asString())
+            .addToContext(EventBus.StructuredLoggingFields.EVENT_ID, 
events.stream()
+                .map(e -> e.getEventId().getId().toString())
+                .collect(Collectors.joining(",")))
+            .addToContext(EventBus.StructuredLoggingFields.EVENT_CLASS, 
events.stream()
+                .map(e -> e.getClass().getCanonicalName())
+                .collect(Collectors.joining(",")))
+            .addToContext(EventBus.StructuredLoggingFields.USER, 
events.stream()
+                .map(e -> e.getUsername().asString())
+                .collect(Collectors.joining(",")))
             .addToContext(EventBus.StructuredLoggingFields.LISTENER_CLASS, 
listener.getClass().getCanonicalName());
     }
 
-    private StructuredLogger structuredLogger(Event event, EventListener 
listener) {
+    private StructuredLogger structuredLogger(List<Event> events, 
EventListener listener) {
+        if (events.size() == 1) {
+            return MDCStructuredLogger.forLogger(LOGGER)
+                .field(EventBus.StructuredLoggingFields.EVENT_ID, 
events.getFirst().getEventId().toString())
+                .field(EventBus.StructuredLoggingFields.EVENT_CLASS, 
events.getFirst().getClass().getCanonicalName())
+                .field(EventBus.StructuredLoggingFields.USER, 
events.getFirst().getUsername().asString())
+                .field(EventBus.StructuredLoggingFields.LISTENER_CLASS, 
listener.getClass().getCanonicalName());
+        }
         return MDCStructuredLogger.forLogger(LOGGER)
-            .field(EventBus.StructuredLoggingFields.EVENT_ID, 
event.getEventId().getId().toString())
-            .field(EventBus.StructuredLoggingFields.EVENT_CLASS, 
event.getClass().getCanonicalName())
-            .field(EventBus.StructuredLoggingFields.USER, 
event.getUsername().asString())
+            .field(EventBus.StructuredLoggingFields.EVENT_ID, events.stream()
+                .map(e -> e.getEventId().getId().toString())
+                .collect(Collectors.joining(",")))
+            .field(EventBus.StructuredLoggingFields.EVENT_CLASS, 
events.stream()
+                .map(e -> e.getClass().getCanonicalName())
+                .collect(Collectors.joining(",")))
+            .field(EventBus.StructuredLoggingFields.USER, events.stream()
+                .map(e -> e.getUsername().asString())
+                .collect(Collectors.joining(",")))
             .field(EventBus.StructuredLoggingFields.LISTENER_CLASS, 
listener.getClass().getCanonicalName());
+
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to