MAILBOX-377 Slightly improve InVMEventDelivery
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/3f8b8f6a Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/3f8b8f6a Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/3f8b8f6a Branch: refs/heads/master Commit: 3f8b8f6a57fdcf15bdeb5a17056147212602fe22 Parents: 897a206 Author: Benoit Tellier <btell...@linagora.com> Authored: Tue Jan 15 11:00:00 2019 +0700 Committer: Benoit Tellier <btell...@linagora.com> Committed: Thu Jan 17 13:49:09 2019 +0700 ---------------------------------------------------------------------- .../events/delivery/InVmEventDelivery.java | 26 ++++++++------------ 1 file changed, 10 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/3f8b8f6a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java ---------------------------------------------------------------------- diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java index 63edafb..c59a8b8 100644 --- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java +++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java @@ -21,6 +21,7 @@ package org.apache.james.mailbox.events.delivery; import java.time.Duration; import java.util.Collection; +import java.util.stream.Stream; import javax.inject.Inject; @@ -31,12 +32,11 @@ import org.apache.james.metrics.api.TimeMetric; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.github.steveash.guavate.Guavate; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableList; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoProcessor; import reactor.core.scheduler.Schedulers; public class InVmEventDelivery implements EventDelivery { @@ -56,27 +56,21 @@ public class InVmEventDelivery implements EventDelivery { @Override public ExecutionStages deliver(Collection<MailboxListener> mailboxListeners, Event event) { - Mono<Void> synchronousListeners = doDeliver( - filterByExecutionMode(mailboxListeners, MailboxListener.ExecutionMode.SYNCHRONOUS), event) - .cache(); - Mono<Void> asyncListener = doDeliver( - filterByExecutionMode(mailboxListeners, MailboxListener.ExecutionMode.ASYNCHRONOUS), event) - .cache(); - - synchronousListeners.subscribe(); - asyncListener.subscribe(); + Mono<Void> synchronousListeners = doDeliver(filterByExecutionMode(mailboxListeners, MailboxListener.ExecutionMode.SYNCHRONOUS), event) + .subscribeWith(MonoProcessor.create()); + Mono<Void> asyncListener = doDeliver(filterByExecutionMode(mailboxListeners, MailboxListener.ExecutionMode.ASYNCHRONOUS), event) + .subscribeWith(MonoProcessor.create()); return new ExecutionStages(synchronousListeners, asyncListener); } - private ImmutableList<MailboxListener> filterByExecutionMode(Collection<MailboxListener> mailboxListeners, MailboxListener.ExecutionMode executionMode) { + private Stream<MailboxListener> filterByExecutionMode(Collection<MailboxListener> mailboxListeners, MailboxListener.ExecutionMode executionMode) { return mailboxListeners.stream() - .filter(listener -> listener.getExecutionMode() == executionMode) - .collect(Guavate.toImmutableList()); + .filter(listener -> listener.getExecutionMode() == executionMode); } - private Mono<Void> doDeliver(Collection<MailboxListener> mailboxListeners, Event event) { - return Flux.fromIterable(mailboxListeners) + private Mono<Void> doDeliver(Stream<MailboxListener> mailboxListeners, Event event) { + return Flux.fromStream(mailboxListeners) .flatMap(mailboxListener -> deliveryWithRetries(event, mailboxListener)) .then() .subscribeOn(Schedulers.elastic()); --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org