This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 28409578f4918981c53c6d6d7effe9c6c50b14fa Author: Benoit TELLIER <[email protected]> AuthorDate: Thu Dec 12 14:21:27 2024 +0100 [PERF] InVMEventBus ability to deliver several events at once - Events are handled separately upon retries - MDC preserved for single event, event infos are joined when multiple events --- .../org/apache/james/events/EventListener.java | 1 + .../java/org/apache/james/events/InVMEventBus.java | 32 ++++++-- .../james/events/delivery/EventDelivery.java | 31 +++++++- .../james/events/delivery/InVmEventDelivery.java | 89 ++++++++++++++++------ 4 files changed, 123 insertions(+), 30 deletions(-) diff --git a/event-bus/api/src/main/java/org/apache/james/events/EventListener.java b/event-bus/api/src/main/java/org/apache/james/events/EventListener.java index 9631a21121..b1eddb5c4c 100644 --- a/event-bus/api/src/main/java/org/apache/james/events/EventListener.java +++ b/event-bus/api/src/main/java/org/apache/james/events/EventListener.java @@ -40,6 +40,7 @@ public interface EventListener { default Publisher<Void> reactiveEvent(List<Event> event) { return Flux.fromIterable(event) + .filter(this::isHandling) .concatMap(this::reactiveEvent) .then(); } diff --git a/event-bus/in-vm/src/main/java/org/apache/james/events/InVMEventBus.java b/event-bus/in-vm/src/main/java/org/apache/james/events/InVMEventBus.java index 3006afb794..079ad648d5 100644 --- a/event-bus/in-vm/src/main/java/org/apache/james/events/InVMEventBus.java +++ b/event-bus/in-vm/src/main/java/org/apache/james/events/InVMEventBus.java @@ -21,6 +21,7 @@ package org.apache.james.events; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -29,6 +30,7 @@ import jakarta.inject.Inject; import org.apache.james.events.delivery.EventDelivery; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Multimap; import com.google.common.collect.Multimaps; @@ -74,7 +76,25 @@ public class InVMEventBus implements EventBus { @Override public Mono<Void> dispatch(Event event, Set<RegistrationKey> keys) { if (!event.isNoop()) { - return Flux.merge(groupDeliveries(event), keyDeliveries(event, keys)) + return Flux.merge(groupDeliveries(ImmutableList.of(event)), keyDeliveries(event, keys)) + .then() + .onErrorResume(throwable -> Mono.empty()); + } + return Mono.empty(); + } + + @Override + public Mono<Void> dispatch(Collection<EventWithRegistrationKey> events) { + ImmutableList<EventWithRegistrationKey> notNoopEvents = events.stream() + .filter(e -> !e.event().isNoop()) + .collect(ImmutableList.toImmutableList()); + if (!notNoopEvents.isEmpty()) { + return Flux.merge( + groupDeliveries(notNoopEvents.stream() + .map(EventWithRegistrationKey::event) + .collect(ImmutableList.toImmutableList())), + Flux.fromIterable(events) + .concatMap(e -> keyDeliveries(e.event(), e.keys()))) .then() .onErrorResume(throwable -> Mono.empty()); } @@ -84,7 +104,7 @@ public class InVMEventBus implements EventBus { @Override public Mono<Void> reDeliver(Group group, Event event) { if (!event.isNoop()) { - return groupDelivery(event, retrieveListenerFromGroup(group), group); + return groupDelivery(ImmutableList.of(event), retrieveListenerFromGroup(group), group); } return Mono.empty(); } @@ -110,16 +130,16 @@ public class InVMEventBus implements EventBus { .then(); } - private Mono<Void> groupDeliveries(Event event) { + private Mono<Void> groupDeliveries(List<Event> events) { return Flux.fromIterable(groups.entrySet()) - .flatMap(entry -> groupDelivery(event, entry.getValue(), entry.getKey()), EventBus.EXECUTION_RATE) + .flatMap(entry -> groupDelivery(events, entry.getValue(), entry.getKey()), EventBus.EXECUTION_RATE) .then(); } - private Mono<Void> groupDelivery(Event event, EventListener.ReactiveEventListener listener, Group group) { + private Mono<Void> groupDelivery(List<Event> events, EventListener.ReactiveEventListener listener, Group group) { return eventDelivery.deliver( listener, - event, + events, EventDelivery.DeliveryOption.of( EventDelivery.Retryer.BackoffRetryer.of(retryBackoff, listener), EventDelivery.PermanentFailureHandler.StoreToDeadLetters.of(group, eventDeadLetters))); diff --git a/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/EventDelivery.java b/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/EventDelivery.java index 4ad0feb3f3..b9ad84b3c1 100644 --- a/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/EventDelivery.java +++ b/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/EventDelivery.java @@ -22,6 +22,9 @@ package org.apache.james.events.delivery; import static org.apache.james.events.delivery.EventDelivery.PermanentFailureHandler.NO_HANDLER; import static org.apache.james.events.delivery.EventDelivery.Retryer.NO_RETRYER; +import java.util.List; +import java.util.stream.Collectors; + import org.apache.james.events.Event; import org.apache.james.events.EventDeadLetters; import org.apache.james.events.EventListener; @@ -65,7 +68,17 @@ public interface EventDelivery { interface Retryer { - Retryer NO_RETRYER = (executionResult, event) -> executionResult; + Retryer NO_RETRYER = new Retryer() { + @Override + public Mono<Void> doRetry(Mono<Void> executionResult, Event event) { + return executionResult; + } + + @Override + public Mono<Void> doRetry(Mono<Void> executionResult, List<Event> events) { + return executionResult; + } + }; class BackoffRetryer implements Retryer { @@ -94,9 +107,23 @@ public interface EventDelivery { throwable)) .then(); } + + @Override + public Mono<Void> doRetry(Mono<Void> executionResult, List<Event> events) { + return executionResult + .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.parallel())) + .doOnError(throwable -> LOGGER.error("listener {} exceeded maximum retry({}) to handle event {}", + listener.getClass().getCanonicalName(), + retryBackoff.getMaxRetries(), + events.stream().map(e -> e.getClass().getCanonicalName()).collect(Collectors.joining(",")), + throwable)) + .then(); + } } Mono<Void> doRetry(Mono<Void> executionResult, Event event); + + Mono<Void> doRetry(Mono<Void> executionResult, List<Event> events); } interface PermanentFailureHandler { @@ -128,6 +155,8 @@ public interface EventDelivery { Mono<Void> deliver(EventListener.ReactiveEventListener listener, Event event, DeliveryOption option); + Mono<Void> deliver(EventListener.ReactiveEventListener listener, List<Event> events, DeliveryOption option); + default Mono<Void> deliver(EventListener listener, Event event, DeliveryOption option) { return deliver(EventListener.wrapReactive(listener), event, option); } diff --git a/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/InVmEventDelivery.java b/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/InVmEventDelivery.java index de47730533..66fda0526f 100644 --- a/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/InVmEventDelivery.java +++ b/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/InVmEventDelivery.java @@ -22,6 +22,9 @@ package org.apache.james.events.delivery; import static org.apache.james.events.EventBus.Metrics.timerName; import static org.apache.james.util.ReactorUtils.context; +import java.util.List; +import java.util.stream.Collectors; + import jakarta.inject.Inject; import org.apache.james.events.Event; @@ -35,6 +38,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -52,9 +56,17 @@ public class InVmEventDelivery implements EventDelivery { @Override public Mono<Void> deliver(EventListener.ReactiveEventListener listener, Event event, DeliveryOption option) { - Mono<Void> executionResult = deliverByOption(listener, event, option); + return deliver(listener, ImmutableList.of(event), option); + } + + @Override + public Mono<Void> deliver(EventListener.ReactiveEventListener listener, List<Event> events, DeliveryOption option) { + if (isHandling(listener, events)) { + Mono<Void> executionResult = deliverByOption(listener, events, option); - return waitForResultIfNeeded(listener.getExecutionMode(), executionResult); + return waitForResultIfNeeded(listener.getExecutionMode(), executionResult); + } + return Mono.empty(); } private Mono<Void> waitForResultIfNeeded(EventListener.ExecutionMode executionMode, Mono<Void> executionResult) { @@ -66,39 +78,70 @@ public class InVmEventDelivery implements EventDelivery { .onErrorResume(throwable -> Mono.empty()); } - private Mono<Void> deliverByOption(EventListener.ReactiveEventListener listener, Event event, DeliveryOption deliveryOption) { - Mono<Void> deliveryToListener = doDeliverToListener(listener, event) - .doOnError(throwable -> structuredLogger(event, listener) + private Mono<Void> deliverByOption(EventListener.ReactiveEventListener listener, List<Event> events, DeliveryOption deliveryOption) { + Mono<Void> deliveryToListener = doDeliverToListener(listener, events) + .doOnError(throwable -> structuredLogger(events, listener) .log(logger -> logger.error("Error while processing listener", throwable))) .then(); - return deliveryOption.getRetrier().doRetry(deliveryToListener, event) - .onErrorResume(throwable -> deliveryOption.getPermanentFailureHandler().handle(event)) - .then(); + return deliveryOption.getRetrier().doRetry(deliveryToListener, events) + .then() + .onErrorResume(e -> Flux.fromIterable(events) + .concatMap(event -> deliveryOption.getPermanentFailureHandler().handle(event)) + .then()); } - private Mono<Void> doDeliverToListener(EventListener.ReactiveEventListener listener, Event event) { - if (listener.isHandling(event)) { - return Mono.defer(() -> Mono.from(metricFactory.decoratePublisherWithTimerMetric(timerName(listener), - listener.reactiveEvent(event)))) - .contextWrite(context("deliver", buildMDC(listener, event))); - } - return Mono.empty(); + private Mono<Void> doDeliverToListener(EventListener.ReactiveEventListener listener, List<Event> events) { + return Mono.defer(() -> Mono.from(metricFactory.decoratePublisherWithTimerMetric(timerName(listener), + listener.reactiveEvent(events)))) + .contextWrite(context("deliver", buildMDC(listener, events))); + } + + private static boolean isHandling(EventListener.ReactiveEventListener listener, List<Event> events) { + return events.stream().anyMatch(listener::isHandling); } - private MDCBuilder buildMDC(EventListener listener, Event event) { + private MDCBuilder buildMDC(EventListener listener, List<Event> events) { + if (events.size() == 1) { + return MDCBuilder.create() + .addToContext(EventBus.StructuredLoggingFields.EVENT_ID, events.getFirst().getEventId().toString()) + .addToContext(EventBus.StructuredLoggingFields.EVENT_CLASS, events.getFirst().getClass().getCanonicalName()) + .addToContext(EventBus.StructuredLoggingFields.USER, events.getFirst().getUsername().asString()) + .addToContext(EventBus.StructuredLoggingFields.LISTENER_CLASS, listener.getClass().getCanonicalName()); + } + return MDCBuilder.create() - .addToContext(EventBus.StructuredLoggingFields.EVENT_ID, event.getEventId().toString()) - .addToContext(EventBus.StructuredLoggingFields.EVENT_CLASS, event.getClass().getCanonicalName()) - .addToContext(EventBus.StructuredLoggingFields.USER, event.getUsername().asString()) + .addToContext(EventBus.StructuredLoggingFields.EVENT_ID, events.stream() + .map(e -> e.getEventId().getId().toString()) + .collect(Collectors.joining(","))) + .addToContext(EventBus.StructuredLoggingFields.EVENT_CLASS, events.stream() + .map(e -> e.getClass().getCanonicalName()) + .collect(Collectors.joining(","))) + .addToContext(EventBus.StructuredLoggingFields.USER, events.stream() + .map(e -> e.getUsername().asString()) + .collect(Collectors.joining(","))) .addToContext(EventBus.StructuredLoggingFields.LISTENER_CLASS, listener.getClass().getCanonicalName()); } - private StructuredLogger structuredLogger(Event event, EventListener listener) { + private StructuredLogger structuredLogger(List<Event> events, EventListener listener) { + if (events.size() == 1) { + return MDCStructuredLogger.forLogger(LOGGER) + .field(EventBus.StructuredLoggingFields.EVENT_ID, events.getFirst().getEventId().toString()) + .field(EventBus.StructuredLoggingFields.EVENT_CLASS, events.getFirst().getClass().getCanonicalName()) + .field(EventBus.StructuredLoggingFields.USER, events.getFirst().getUsername().asString()) + .field(EventBus.StructuredLoggingFields.LISTENER_CLASS, listener.getClass().getCanonicalName()); + } return MDCStructuredLogger.forLogger(LOGGER) - .field(EventBus.StructuredLoggingFields.EVENT_ID, event.getEventId().getId().toString()) - .field(EventBus.StructuredLoggingFields.EVENT_CLASS, event.getClass().getCanonicalName()) - .field(EventBus.StructuredLoggingFields.USER, event.getUsername().asString()) + .field(EventBus.StructuredLoggingFields.EVENT_ID, events.stream() + .map(e -> e.getEventId().getId().toString()) + .collect(Collectors.joining(","))) + .field(EventBus.StructuredLoggingFields.EVENT_CLASS, events.stream() + .map(e -> e.getClass().getCanonicalName()) + .collect(Collectors.joining(","))) + .field(EventBus.StructuredLoggingFields.USER, events.stream() + .map(e -> e.getUsername().asString()) + .collect(Collectors.joining(","))) .field(EventBus.StructuredLoggingFields.LISTENER_CLASS, listener.getClass().getCanonicalName()); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
