JAMES-2641 EventBus caller should block to await dispatching to be done
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/24fe28f0 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/24fe28f0 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/24fe28f0 Branch: refs/heads/master Commit: 24fe28f024c2989cd7ce5584359a5b93c537872b Parents: 4c14a75 Author: Benoit Tellier <btell...@linagora.com> Authored: Thu Jan 10 16:29:53 2019 +0700 Committer: Benoit Tellier <btell...@linagora.com> Committed: Thu Jan 17 10:23:41 2019 +0700 ---------------------------------------------------------------------- .../QuotaThresholdListenersTestSystem.java | 2 +- .../mailbox/store/StoreMailboxManager.java | 12 ++- .../mailbox/store/StoreMessageIdManager.java | 36 +++++--- .../mailbox/store/StoreMessageManager.java | 93 +++++++++++--------- .../james/mailbox/store/StoreRightManager.java | 6 +- .../quota/ListeningCurrentQuotaUpdater.java | 6 +- 6 files changed, 91 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/24fe28f0/mailbox/plugin/quota-mailing/src/test/java/org/apache/james/mailbox/quota/mailing/listeners/QuotaThresholdListenersTestSystem.java ---------------------------------------------------------------------- diff --git a/mailbox/plugin/quota-mailing/src/test/java/org/apache/james/mailbox/quota/mailing/listeners/QuotaThresholdListenersTestSystem.java b/mailbox/plugin/quota-mailing/src/test/java/org/apache/james/mailbox/quota/mailing/listeners/QuotaThresholdListenersTestSystem.java index c399025..ae4e936 100644 --- a/mailbox/plugin/quota-mailing/src/test/java/org/apache/james/mailbox/quota/mailing/listeners/QuotaThresholdListenersTestSystem.java +++ b/mailbox/plugin/quota-mailing/src/test/java/org/apache/james/mailbox/quota/mailing/listeners/QuotaThresholdListenersTestSystem.java @@ -53,6 +53,6 @@ class QuotaThresholdListenersTestSystem { } void event(Event event) { - eventBus.dispatch(event, NO_KEYS); + eventBus.dispatch(event, NO_KEYS).block(); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/24fe28f0/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java ---------------------------------------------------------------------- diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java index df08dee..7e46527 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java @@ -348,7 +348,8 @@ public class StoreMailboxManager implements MailboxManager { .mailboxSession(mailboxSession) .mailbox(m) .build(), - new MailboxIdRegistrationKey(m.getMailboxId())); + new MailboxIdRegistrationKey(m.getMailboxId())) + .block(); } catch (MailboxExistsException e) { LOGGER.info("{} mailbox was created concurrently", m.generateAssociatedPath()); } @@ -400,7 +401,8 @@ public class StoreMailboxManager implements MailboxManager { .quotaCount(QuotaCount.count(messageCount)) .quotaSize(QuotaSize.size(totalSize)) .build(), - new MailboxIdRegistrationKey(mailbox.getMailboxId())); + new MailboxIdRegistrationKey(mailbox.getMailboxId())) + .block(); return m; }); @@ -446,7 +448,8 @@ public class StoreMailboxManager implements MailboxManager { .oldPath(from) .newPath(to) .build(), - new MailboxIdRegistrationKey(mailbox.getMailboxId())); + new MailboxIdRegistrationKey(mailbox.getMailboxId())) + .block(); // rename submailboxes MailboxPath children = new MailboxPath(from.getNamespace(), from.getUser(), from.getName() + getDelimiter() + "%"); @@ -465,7 +468,8 @@ public class StoreMailboxManager implements MailboxManager { .oldPath(fromPath) .newPath(sub.generateAssociatedPath()) .build(), - new MailboxIdRegistrationKey(sub.getMailboxId())); + new MailboxIdRegistrationKey(sub.getMailboxId())) + .block(); LOGGER.debug("Rename mailbox sub-mailbox {} to {}", subOriginalName, subNewName); } http://git-wip-us.apache.org/repos/asf/james-project/blob/24fe28f0/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java ---------------------------------------------------------------------- diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java index 04478cd..55a82fe 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java @@ -75,6 +75,9 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + public class StoreMessageIdManager implements MessageIdManager { private static class MetadataWithMailboxId { @@ -218,15 +221,18 @@ public class StoreMessageIdManager implements MessageIdManager { MailboxMessage::getMailboxId))); MailboxMapper mailboxMapper = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession); - for (MetadataWithMailboxId metadataWithMailboxId : metadataWithMailbox) { - eventBus.dispatch(EventFactory.expunged() - .randomEventId() - .mailboxSession(mailboxSession) - .mailbox(mailboxMapper.findMailboxById(metadataWithMailboxId.mailboxId)) - .addMetaData(metadataWithMailboxId.messageMetaData) - .build(), - new MailboxIdRegistrationKey(metadataWithMailboxId.mailboxId)); - } + Flux.fromIterable(metadataWithMailbox) + .flatMap(Throwing.<StoreMessageIdManager.MetadataWithMailboxId, Mono<Void>>function( + metadataWithMailboxId -> eventBus.dispatch(EventFactory.expunged() + .randomEventId() + .mailboxSession(mailboxSession) + .mailbox(mailboxMapper.findMailboxById(metadataWithMailboxId.mailboxId)) + .addMetaData(metadataWithMailboxId.messageMetaData) + .build(), + new MailboxIdRegistrationKey(metadataWithMailboxId.mailboxId))) + .sneakyThrow()) + .then() + .block(); } @Override @@ -295,7 +301,8 @@ public class StoreMessageIdManager implements MessageIdManager { .build(), messageMoves.impactedMailboxIds() .map(MailboxIdRegistrationKey::new) - .collect(Guavate.toImmutableSet())); + .collect(Guavate.toImmutableSet())) + .block(); } private void removeMessageFromMailboxes(MailboxMessage message, Set<MailboxId> mailboxesToRemove, MailboxSession mailboxSession) throws MailboxException { @@ -311,7 +318,8 @@ public class StoreMessageIdManager implements MessageIdManager { .mailbox(mailboxMapper.findMailboxById(mailboxId)) .addMetaData(eventPayload) .build(), - new MailboxIdRegistrationKey(mailboxId)); + new MailboxIdRegistrationKey(mailboxId)) + .block(); } } @@ -329,7 +337,8 @@ public class StoreMessageIdManager implements MessageIdManager { .mailbox(mailbox) .updatedFlag(updatedFlags) .build(), - new MailboxIdRegistrationKey(mailboxId)); + new MailboxIdRegistrationKey(mailboxId)) + .block(); } } @@ -397,7 +406,8 @@ public class StoreMessageIdManager implements MessageIdManager { .mailbox(mailboxMapper.findMailboxById(mailboxId)) .addMessage(copy) .build(), - new MailboxIdRegistrationKey(mailboxId)); + new MailboxIdRegistrationKey(mailboxId)) + .block(); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/24fe28f0/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java ---------------------------------------------------------------------- diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java index e38a291..1b4b20e 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java @@ -96,6 +96,8 @@ import com.github.steveash.guavate.Guavate; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSortedMap; +import reactor.core.publisher.Flux; + /** * Base class for {@link org.apache.james.mailbox.MessageManager} * implementations. @@ -264,7 +266,8 @@ public class StoreMessageManager implements org.apache.james.mailbox.MessageMana .mailbox(getMailboxEntity()) .metaData(ImmutableSortedMap.copyOf(uids)) .build(), - new MailboxIdRegistrationKey(mailbox.getMailboxId())); + new MailboxIdRegistrationKey(mailbox.getMailboxId())) + .block(); return uids.keySet().iterator(); } @@ -412,7 +415,8 @@ public class StoreMessageManager implements org.apache.james.mailbox.MessageMana .mailbox(mailbox) .addMessage(copy) .build(), - new MailboxIdRegistrationKey(mailbox.getMailboxId())); + new MailboxIdRegistrationKey(mailbox.getMailboxId())) + .block(); return new ComposedMessageId(mailbox.getMailboxId(), data.getMessageId(), data.getUid()); }, true); } @@ -575,7 +579,8 @@ public class StoreMessageManager implements org.apache.james.mailbox.MessageMana .mailbox(getMailboxEntity()) .updatedFlags(updatedFlags) .build(), - new MailboxIdRegistrationKey(mailbox.getMailboxId())); + new MailboxIdRegistrationKey(mailbox.getMailboxId())) + .block(); return updatedFlags.stream().collect(Guavate.toImmutableMap( UpdatedFlags::getUid, @@ -734,22 +739,24 @@ public class StoreMessageManager implements org.apache.james.mailbox.MessageMana messageIds.add(message.getMessageId()); } - eventBus.dispatch(EventFactory.added() - .randomEventId() - .mailboxSession(session) - .mailbox(to.getMailboxEntity()) - .metaData(copiedUids) - .build(), - new MailboxIdRegistrationKey(mailbox.getMailboxId())); - eventBus.dispatch(EventFactory.moved() - .session(session) - .messageMoves(MessageMoves.builder() - .previousMailboxIds(getMailboxEntity().getMailboxId()) - .targetMailboxIds(to.getMailboxEntity().getMailboxId(), getMailboxEntity().getMailboxId()) - .build()) - .messageId(messageIds.build()) - .build(), - new MailboxIdRegistrationKey(mailbox.getMailboxId())); + Flux.merge( + eventBus.dispatch(EventFactory.added() + .randomEventId() + .mailboxSession(session) + .mailbox(to.getMailboxEntity()) + .metaData(copiedUids) + .build(), + new MailboxIdRegistrationKey(to.getMailboxEntity().getMailboxId())), + eventBus.dispatch(EventFactory.moved() + .session(session) + .messageMoves(MessageMoves.builder() + .previousMailboxIds(getMailboxEntity().getMailboxId()) + .targetMailboxIds(to.getMailboxEntity().getMailboxId(), getMailboxEntity().getMailboxId()) + .build()) + .messageId(messageIds.build()) + .build(), + new MailboxIdRegistrationKey(mailbox.getMailboxId()))) + .then().block(); return copiedUids; } @@ -765,29 +772,31 @@ public class StoreMessageManager implements org.apache.james.mailbox.MessageMana messageIds.add(message.getMessageId()); } - eventBus.dispatch(EventFactory.added() - .randomEventId() - .mailboxSession(session) - .mailbox(to.getMailboxEntity()) - .metaData(moveUids) - .build(), - new MailboxIdRegistrationKey(mailbox.getMailboxId())); - eventBus.dispatch(EventFactory.expunged() - .randomEventId() - .mailboxSession(session) - .mailbox(getMailboxEntity()) - .addMetaData(moveResult.getOriginalMessages()) - .build(), - new MailboxIdRegistrationKey(mailbox.getMailboxId())); - eventBus.dispatch(EventFactory.moved() - .messageMoves(MessageMoves.builder() - .previousMailboxIds(getMailboxEntity().getMailboxId()) - .targetMailboxIds(to.getMailboxEntity().getMailboxId()) - .build()) - .messageId(messageIds.build()) - .session(session) - .build(), - new MailboxIdRegistrationKey(mailbox.getMailboxId())); + Flux.merge( + eventBus.dispatch(EventFactory.added() + .randomEventId() + .mailboxSession(session) + .mailbox(to.getMailboxEntity()) + .metaData(moveUids) + .build(), + new MailboxIdRegistrationKey(to.getMailboxEntity().getMailboxId())), + eventBus.dispatch(EventFactory.expunged() + .randomEventId() + .mailboxSession(session) + .mailbox(getMailboxEntity()) + .addMetaData(moveResult.getOriginalMessages()) + .build(), + new MailboxIdRegistrationKey(mailbox.getMailboxId())), + eventBus.dispatch(EventFactory.moved() + .messageMoves(MessageMoves.builder() + .previousMailboxIds(getMailboxEntity().getMailboxId()) + .targetMailboxIds(to.getMailboxEntity().getMailboxId()) + .build()) + .messageId(messageIds.build()) + .session(session) + .build(), + new MailboxIdRegistrationKey(mailbox.getMailboxId()))) + .then().block(); return moveUids; } http://git-wip-us.apache.org/repos/asf/james-project/blob/24fe28f0/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreRightManager.java ---------------------------------------------------------------------- diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreRightManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreRightManager.java index 25a8bdc..6cb00f4 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreRightManager.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreRightManager.java @@ -146,7 +146,8 @@ public class StoreRightManager implements RightManager { .mailbox(mailbox) .aclDiff(aclDiff) .build(), - new MailboxIdRegistrationKey(mailbox.getMailboxId())); + new MailboxIdRegistrationKey(mailbox.getMailboxId())) + .block(); } private void assertSharesBelongsToUserDomain(String user, ACLCommand mailboxACLCommand) throws DifferentDomainException { @@ -230,7 +231,8 @@ public class StoreRightManager implements RightManager { .mailbox(mailbox) .aclDiff(aclDiff) .build(), - new MailboxIdRegistrationKey(mailbox.getMailboxId())); + new MailboxIdRegistrationKey(mailbox.getMailboxId())) + .block(); } /** http://git-wip-us.apache.org/repos/asf/james-project/blob/24fe28f0/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdater.java ---------------------------------------------------------------------- diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdater.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdater.java index 272ced8..26fea63 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdater.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdater.java @@ -105,7 +105,8 @@ public class ListeningCurrentQuotaUpdater implements MailboxListener.GroupMailbo .quotaSize(quotaManager.getStorageQuota(quotaRoot)) .instant(Instant.now()) .build(), - NO_REGISTRATION_KEYS); + NO_REGISTRATION_KEYS) + .block(); } private void handleAddedEvent(Added added, QuotaRoot quotaRoot) throws MailboxException { @@ -128,7 +129,8 @@ public class ListeningCurrentQuotaUpdater implements MailboxListener.GroupMailbo .quotaSize(quotaManager.getStorageQuota(quotaRoot)) .instant(Instant.now()) .build(), - NO_REGISTRATION_KEYS); + NO_REGISTRATION_KEYS) + .block(); } private void handleMailboxDeletionEvent(MailboxDeletion mailboxDeletionEvent) throws MailboxException { --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org