This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 07250a7684737e2d6b8972a6a3e51ff3ba0d5b58 Author: Benoit Tellier <[email protected]> AuthorDate: Thu May 13 16:30:59 2021 +0700 [REFACTORING] StoreMessageIdManager::setInMailboxes should not block --- .../james/mailbox/store/StoreMessageIdManager.java | 82 ++++++++++++---------- 1 file changed, 43 insertions(+), 39 deletions(-) diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java index b1ac262..11ca8ff 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java @@ -39,9 +39,7 @@ import org.apache.james.events.EventBus; import org.apache.james.mailbox.MailboxSession; import org.apache.james.mailbox.MessageIdManager; import org.apache.james.mailbox.MessageManager; -import org.apache.james.mailbox.MessageUid; import org.apache.james.mailbox.MetadataWithMailboxId; -import org.apache.james.mailbox.ModSeq; import org.apache.james.mailbox.RightManager; import org.apache.james.mailbox.events.MailboxIdRegistrationKey; import org.apache.james.mailbox.exception.MailboxException; @@ -56,6 +54,7 @@ import org.apache.james.mailbox.model.MailboxACL; import org.apache.james.mailbox.model.MailboxACL.Right; import org.apache.james.mailbox.model.MailboxId; import org.apache.james.mailbox.model.MessageId; +import org.apache.james.mailbox.model.MessageMetaData; import org.apache.james.mailbox.model.MessageMoves; import org.apache.james.mailbox.model.MessageResult; import org.apache.james.mailbox.model.QuotaRoot; @@ -359,8 +358,7 @@ public class StoreMessageIdManager implements MessageIdManager { .collect(Guavate.toImmutableList()); return validateQuota(messageMoves, mailboxMessage.get()) - .then(Mono.fromRunnable(Throwing.runnable(() -> - addMessageToMailboxes(mailboxMessage.get(), messageMoves.addedMailboxes(), mailboxSession)).sneakyThrow())) + .then(addMessageToMailboxes(mailboxMessage.get(), messageMoves.addedMailboxes(), mailboxSession)) .then(removeMessageFromMailboxes(mailboxMessage.get().getMessageId(), messagesToRemove, mailboxSession)) .then(eventBus.dispatch(EventFactory.moved() .session(mailboxSession) @@ -456,45 +454,51 @@ public class StoreMessageIdManager implements MessageIdManager { } } - private void addMessageToMailboxes(MailboxMessage mailboxMessage, Set<Mailbox> mailboxes, MailboxSession mailboxSession) throws MailboxException { + private Mono<Void> addMessageToMailboxes(MailboxMessage mailboxMessage, Set<Mailbox> mailboxes, MailboxSession mailboxSession) { MessageIdMapper messageIdMapper = mailboxSessionMapperFactory.getMessageIdMapper(mailboxSession); - for (Mailbox mailbox : mailboxes) { - MailboxACL.Rfc4314Rights myRights = rightManager.myRights(mailbox, mailboxSession); - boolean shouldPreserveFlags = myRights.contains(Right.Write); - SimpleMailboxMessage copy = - SimpleMailboxMessage.from(mailboxMessage) - .mailboxId(mailbox.getMailboxId()) - .flags( - FlagsFactory - .builder() - .flags(mailboxMessage.createFlags()) - .filteringFlags( - FlagsFilter.builder() - .systemFlagFilter(f -> shouldPreserveFlags) - .userFlagFilter(f -> shouldPreserveFlags) - .build()) - .build()) - .build(); - save(messageIdMapper, copy, mailbox); - - eventBus.dispatch(EventFactory.added() - .randomEventId() - .mailboxSession(mailboxSession) - .mailbox(mailbox) - .addMetaData(copy.metaData()) - .build(), - new MailboxIdRegistrationKey(mailbox.getMailboxId())) - .block(); - } + return Flux.fromIterable(mailboxes) + .flatMap(Throwing.<Mailbox, Mono<Void>>function(mailbox -> { + MailboxACL.Rfc4314Rights myRights = rightManager.myRights(mailbox, mailboxSession); + boolean shouldPreserveFlags = myRights.contains(Right.Write); + SimpleMailboxMessage copy = + SimpleMailboxMessage.from(mailboxMessage) + .mailboxId(mailbox.getMailboxId()) + .flags( + FlagsFactory + .builder() + .flags(mailboxMessage.createFlags()) + .filteringFlags( + FlagsFilter.builder() + .systemFlagFilter(f -> shouldPreserveFlags) + .userFlagFilter(f -> shouldPreserveFlags) + .build()) + .build()) + .build(); + + return save(messageIdMapper, copy, mailbox) + .flatMap(metadata -> eventBus.dispatch(EventFactory.added() + .randomEventId() + .mailboxSession(mailboxSession) + .mailbox(mailbox) + .addMetaData(metadata) + .build(), + new MailboxIdRegistrationKey(mailbox.getMailboxId()))); + }).sneakyThrow()) + .then(); } - private void save(MessageIdMapper messageIdMapper, MailboxMessage mailboxMessage, Mailbox mailbox) throws MailboxException { - ModSeq modSeq = mailboxSessionMapperFactory.getModSeqProvider().nextModSeq(mailbox.getMailboxId()); - MessageUid uid = mailboxSessionMapperFactory.getUidProvider().nextUid(mailbox.getMailboxId()); - mailboxMessage.setModSeq(modSeq); - mailboxMessage.setUid(uid); - messageIdMapper.copyInMailbox(mailboxMessage, mailbox); + private Mono<MessageMetaData> save(MessageIdMapper messageIdMapper, MailboxMessage mailboxMessage, Mailbox mailbox) { + return Mono.zip( + mailboxSessionMapperFactory.getModSeqProvider().nextModSeqReactive(mailbox.getMailboxId()), + mailboxSessionMapperFactory.getUidProvider().nextUidReactive(mailbox.getMailboxId())) + .flatMap(modSeqAndUid -> { + mailboxMessage.setModSeq(modSeqAndUid.getT1()); + mailboxMessage.setUid(modSeqAndUid.getT2()); + + return messageIdMapper.copyInMailboxReactive(mailboxMessage, mailbox) + .thenReturn(mailboxMessage.metaData()); + }); } private ThrowingFunction<MailboxMessage, MessageResult> messageResultConverter(FetchGroup fetchGroup) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
