This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit e995a086616d3c29722490451700c3c4c89b2392 Author: Benoit Tellier <[email protected]> AuthorDate: Tue Jul 7 14:28:39 2020 +0700 JAMES-3265 Fasten fetching all flags As we disabled buggy CONDSTORE extension upon reconnection, TBird has no choice than to do a full scan of all uid & flags to resync. This operation is costly. On top of the Cassandra backend, 99+% of the time is spent reading the messagev2 table that is not needed for this Fetch request, effectively slowing down the request. By adding a method allowing to retrieve ComposedMessageIdWithMetaDatas for a range in a given mailbox on top of the MessageManager, we allow significant performance enhancement for IMAP on top of the Cassandra backend. --- .../org/apache/james/mailbox/MessageManager.java | 4 + .../apache/james/mailbox/MailboxManagerTest.java | 58 ++++++++++++++ .../cassandra/mail/CassandraMessageMapper.java | 6 ++ .../james/mailbox/store/StoreMessageManager.java | 8 ++ .../james/mailbox/store/mail/MessageMapper.java | 14 ++++ .../apache/james/imap/api/message/FetchData.java | 10 +++ .../james/imap/processor/fetch/FetchProcessor.java | 86 ++++++++++++++------ .../imap/processor/fetch/FetchResponseBuilder.java | 92 ++++++++++++++++------ 8 files changed, 228 insertions(+), 50 deletions(-) diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageManager.java index 6c21579..88c0476 100644 --- a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageManager.java +++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageManager.java @@ -39,6 +39,7 @@ import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.exception.UnsupportedCriteriaException; import org.apache.james.mailbox.exception.UnsupportedRightException; import org.apache.james.mailbox.model.ComposedMessageId; +import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData; import org.apache.james.mailbox.model.FetchGroup; import org.apache.james.mailbox.model.Mailbox; import org.apache.james.mailbox.model.MailboxACL; @@ -53,6 +54,7 @@ import org.apache.james.mailbox.model.SearchQuery; import org.apache.james.mailbox.model.UidValidity; import org.apache.james.mime4j.dom.Message; import org.apache.james.mime4j.message.DefaultMessageWriter; +import org.reactivestreams.Publisher; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -321,6 +323,8 @@ public interface MessageManager { */ MessageResultIterator getMessages(MessageRange set, FetchGroup fetchGroup, MailboxSession mailboxSession) throws MailboxException; + Publisher<ComposedMessageIdWithMetaData> listMessagesMetadata(MessageRange set, MailboxSession session); + /** * Return the underlying {@link Mailbox} */ diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerTest.java b/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerTest.java index 7a8dbe5..c1692e9 100644 --- a/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerTest.java +++ b/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerTest.java @@ -19,6 +19,7 @@ package org.apache.james.mailbox; import static org.apache.james.mailbox.MailboxManager.RenameOption.RENAME_SUBSCRIPTIONS; +import static org.apache.james.mailbox.MessageManager.FlagsUpdateMode.REPLACE; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -63,6 +64,7 @@ import org.apache.james.mailbox.exception.TooLongMailboxNameException; import org.apache.james.mailbox.extension.PreDeletionHook; import org.apache.james.mailbox.mock.DataProvisioner; import org.apache.james.mailbox.model.ComposedMessageId; +import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData; import org.apache.james.mailbox.model.FetchGroup; import org.apache.james.mailbox.model.MailboxACL; import org.apache.james.mailbox.model.MailboxAnnotation; @@ -2665,6 +2667,62 @@ public abstract class MailboxManagerTest<T extends MailboxManager> { } @Test + void listMessagesMetadataShouldReturnEmptyWhenNoMessages() { + assertThat(Flux.from(inboxManager.listMessagesMetadata(MessageRange.all(), session)) + .collectList().block()) + .isEmpty(); + } + + @Test + void listMessagesMetadataShouldReturnAppendedMessage() throws Exception { + Flags flags = new Flags(Flags.Flag.DELETED); + ComposedMessageId composeId = inboxManager.appendMessage(AppendCommand.builder() + .withFlags(flags) + .build(ClassLoaderUtils.getSystemResourceAsSharedStream("eml/twoAttachmentsApi.eml")), session).getId(); + + assertThat(Flux.from(inboxManager.listMessagesMetadata(MessageRange.all(), session)) + .collectList().block()) + .hasSize(1) + .allSatisfy(ids -> SoftAssertions.assertSoftly(softly -> { + softly.assertThat(ids.getComposedMessageId().getMailboxId()).isEqualTo(composeId.getMailboxId()); + softly.assertThat(ids.getComposedMessageId().getUid()).isEqualTo(composeId.getUid()); + softly.assertThat(ids.getFlags()).isEqualTo(flags); + })); + } + + @Test + void listMessagesMetadataShouldReturnUpdatedMessage() throws Exception { + Flags flags = new Flags(Flags.Flag.SEEN); + ComposedMessageId composeId = inboxManager.appendMessage(AppendCommand.builder() + .withFlags(new Flags(Flags.Flag.DELETED)) + .build(ClassLoaderUtils.getSystemResourceAsSharedStream("eml/twoAttachmentsApi.eml")), session).getId(); + + inboxManager.setFlags(flags, REPLACE, MessageRange.all(), session); + + assertThat(Flux.from(inboxManager.listMessagesMetadata(MessageRange.all(), session)) + .collectList().block()) + .hasSize(1) + .allSatisfy(ids -> SoftAssertions.assertSoftly(softly -> { + softly.assertThat(ids.getComposedMessageId().getMailboxId()).isEqualTo(composeId.getMailboxId()); + softly.assertThat(ids.getComposedMessageId().getUid()).isEqualTo(composeId.getUid()); + softly.assertThat(ids.getFlags()).isEqualTo(flags); + })); + } + + @Test + void listMessagesMetadataShouldNotReturnDeletedMessage() throws Exception { + inboxManager.appendMessage(AppendCommand.builder() + .withFlags(new Flags(Flags.Flag.DELETED)) + .build(ClassLoaderUtils.getSystemResourceAsSharedStream("eml/twoAttachmentsApi.eml")), session).getId(); + + inboxManager.expunge(MessageRange.all(), session); + + assertThat(Flux.from(inboxManager.listMessagesMetadata(MessageRange.all(), session)) + .collectList().block()) + .isEmpty(); + } + + @Test void getMessagesShouldIncludeHasAttachmentInformation() throws Exception { ComposedMessageId composeId = inboxManager.appendMessage(AppendCommand.builder() .withFlags(new Flags(Flags.Flag.DELETED)) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java index fb94eff..75adad6 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java @@ -172,6 +172,12 @@ public class CassandraMessageMapper implements MessageMapper { } @Override + public Flux<ComposedMessageIdWithMetaData> listMessagesMetadata(Mailbox mailbox, MessageRange set) { + CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); + return messageIdDAO.retrieveMessages(mailboxId, set, Limit.unlimited()); + } + + @Override public Flux<MailboxMessage> findInMailboxReactive(Mailbox mailbox, MessageRange messageRange, FetchType ftype, int limitAsInt) { CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java index d4a8c6a..53079f4 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java @@ -60,6 +60,7 @@ import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.exception.ReadOnlyException; import org.apache.james.mailbox.exception.UnsupportedRightException; import org.apache.james.mailbox.model.ComposedMessageId; +import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData; import org.apache.james.mailbox.model.FetchGroup; import org.apache.james.mailbox.model.Mailbox; import org.apache.james.mailbox.model.MailboxACL; @@ -97,6 +98,7 @@ import org.apache.james.util.IteratorWrapper; import org.apache.james.util.io.BodyOffsetInputStream; import org.apache.james.util.io.InputStreamConsummer; import org.apache.james.util.streams.Iterators; +import org.reactivestreams.Publisher; import com.github.steveash.guavate.Guavate; import com.google.common.collect.ImmutableList; @@ -651,6 +653,12 @@ public class StoreMessageManager implements MessageManager { return new StoreMessageResultIterator(messageMapper, mailbox, set, batchSizes, fetchGroup); } + @Override + public Publisher<ComposedMessageIdWithMetaData> listMessagesMetadata(MessageRange set, MailboxSession session) { + MessageMapper messageMapper = mapperFactory.getMessageMapper(session); + return messageMapper.listMessagesMetadata(mailbox, set); + } + /** * Return a List which holds all uids of recent messages and optional reset * the recent flag on the messages for the uids diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageMapper.java index d92be02..1d4f6f4 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageMapper.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageMapper.java @@ -31,6 +31,8 @@ import org.apache.james.mailbox.MessageManager.FlagsUpdateMode; import org.apache.james.mailbox.MessageUid; import org.apache.james.mailbox.ModSeq; import org.apache.james.mailbox.exception.MailboxException; +import org.apache.james.mailbox.model.ComposedMessageId; +import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData; import org.apache.james.mailbox.model.Mailbox; import org.apache.james.mailbox.model.MailboxCounters; import org.apache.james.mailbox.model.MessageMetaData; @@ -52,6 +54,7 @@ import reactor.core.publisher.Mono; * to the end of the request. */ public interface MessageMapper extends Mapper { + int UNLIMITED = -1; /** * Return a {@link Iterator} which holds the messages for the given criterias @@ -65,6 +68,17 @@ public interface MessageMapper extends Mapper { Iterator<MailboxMessage> findInMailbox(Mailbox mailbox, MessageRange set, FetchType type, int limit) throws MailboxException; + default Flux<ComposedMessageIdWithMetaData> listMessagesMetadata(Mailbox mailbox, MessageRange set) { + return findInMailboxReactive(mailbox, set, FetchType.Metadata, UNLIMITED) + .map(message -> new ComposedMessageIdWithMetaData( + new ComposedMessageId( + message.getMailboxId(), + message.getMessageId(), + message.getUid()), + message.createFlags(), + message.getModSeq())); + } + default Flux<MailboxMessage> findInMailboxReactive(Mailbox mailbox, MessageRange set, FetchType type, int limit) { try { return Iterators.toFlux(findInMailbox(mailbox, set, type, limit)); diff --git a/protocols/imap/src/main/java/org/apache/james/imap/api/message/FetchData.java b/protocols/imap/src/main/java/org/apache/james/imap/api/message/FetchData.java index 3a4b305..822dfa2 100644 --- a/protocols/imap/src/main/java/org/apache/james/imap/api/message/FetchData.java +++ b/protocols/imap/src/main/java/org/apache/james/imap/api/message/FetchData.java @@ -142,6 +142,16 @@ public class FetchData { return vanished; } + public boolean isOnlyFlags() { + return bodyElements.isEmpty() + && itemToFetch.stream() + .filter(item -> item != Item.FLAGS) + .filter(item -> item != Item.UID) + .filter(item -> item != Item.MODSEQ) + .findAny() + .isEmpty(); + } + @Override public final int hashCode() { return Objects.hash(itemToFetch, bodyElements, setSeen, changedSince); diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchProcessor.java index 4fe3a81..53cdff2 100644 --- a/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchProcessor.java +++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchProcessor.java @@ -21,6 +21,7 @@ package org.apache.james.imap.processor.fetch; import java.io.Closeable; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import org.apache.james.imap.api.ImapConstants; @@ -41,6 +42,7 @@ import org.apache.james.mailbox.MessageManager; import org.apache.james.mailbox.MessageManager.MailboxMetaData; import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.exception.MessageRangeException; +import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData; import org.apache.james.mailbox.model.FetchGroup; import org.apache.james.mailbox.model.MessageRange; import org.apache.james.mailbox.model.MessageResult; @@ -50,6 +52,8 @@ import org.apache.james.util.MDCBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; + public class FetchProcessor extends AbstractMailboxProcessor<FetchRequest> { private static final Logger LOGGER = LoggerFactory.getLogger(FetchProcessor.class); @@ -140,37 +144,71 @@ public class FetchProcessor extends AbstractMailboxProcessor<FetchRequest> { FetchGroup resultToFetch = FetchDataConverter.getFetchGroup(fetch); for (MessageRange range : ranges) { - MessageResultIterator messages = mailbox.getMessages(range, resultToFetch, mailboxSession); - while (messages.hasNext()) { - final MessageResult result = messages.next(); + if (fetch.isOnlyFlags()) { + processMessageRangeForFlags(session, mailbox, fetch, mailboxSession, responder, builder, range); + } else { + processMessageRange(session, mailbox, fetch, useUids, mailboxSession, responder, builder, resultToFetch, range); + } + } - //skip unchanged messages - this should be filtered at the mailbox level to take advantage of indexes - if (fetch.contains(Item.MODSEQ) && result.getModSeq().asLong() <= fetch.getChangedSince()) { - continue; - } + } - try { - final FetchResponse response = builder.build(fetch, result, mailbox, session, useUids); - responder.respond(response); - } catch (MessageRangeException e) { - // we can't for whatever reason find the message so - // just skip it and log it to debug - LOGGER.debug("Unable to find message with uid {}", result.getUid(), e); - } catch (MailboxException e) { - // we can't for whatever reason find parse all requested parts of the message. This may because it was deleted while try to access the parts. - // So we just skip it - // - // See IMAP-347 - LOGGER.error("Unable to fetch message with uid {}, so skip it", result.getUid(), e); - } + private void processMessageRangeForFlags(ImapSession session, MessageManager mailbox, FetchData fetch, MailboxSession mailboxSession, Responder responder, FetchResponseBuilder builder, MessageRange range) { + Iterator<ComposedMessageIdWithMetaData> results = Flux.from(mailbox.listMessagesMetadata(range, mailboxSession)) + .filter(ids -> !fetch.contains(Item.MODSEQ) || ids.getModSeq().asLong() > fetch.getChangedSince()) + .toStream() + .iterator(); + + while (results.hasNext()) { + ComposedMessageIdWithMetaData result = results.next(); + + try { + final FetchResponse response = builder.build(fetch, result, mailbox, session); + responder.respond(response); + } catch (MessageRangeException e) { + // we can't for whatever reason find the message so + // just skip it and log it to debug + LOGGER.debug("Unable to find message with uid {}", result.getComposedMessageId().getUid(), e); + } catch (MailboxException e) { + // we can't for whatever reason find parse all requested parts of the message. This may because it was deleted while try to access the parts. + // So we just skip it + // + // See IMAP-347 + LOGGER.error("Unable to fetch message with uid {}, so skip it", result.getComposedMessageId().getUid(), e); } + } + } + + private void processMessageRange(ImapSession session, MessageManager mailbox, FetchData fetch, boolean useUids, MailboxSession mailboxSession, Responder responder, FetchResponseBuilder builder, FetchGroup resultToFetch, MessageRange range) throws MailboxException { + MessageResultIterator messages = mailbox.getMessages(range, resultToFetch, mailboxSession); + while (messages.hasNext()) { + final MessageResult result = messages.next(); - // Throw the exception if we received one - if (messages.getException() != null) { - throw messages.getException(); + //skip unchanged messages - this should be filtered at the mailbox level to take advantage of indexes + if (fetch.contains(Item.MODSEQ) && result.getModSeq().asLong() <= fetch.getChangedSince()) { + continue; + } + + try { + final FetchResponse response = builder.build(fetch, result, mailbox, session, useUids); + responder.respond(response); + } catch (MessageRangeException e) { + // we can't for whatever reason find the message so + // just skip it and log it to debug + LOGGER.debug("Unable to find message with uid {}", result.getUid(), e); + } catch (MailboxException e) { + // we can't for whatever reason find parse all requested parts of the message. This may because it was deleted while try to access the parts. + // So we just skip it + // + // See IMAP-347 + LOGGER.error("Unable to fetch message with uid {}, so skip it", result.getUid(), e); } } + // Throw the exception if we received one + if (messages.getException() != null) { + throw messages.getException(); + } } diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchResponseBuilder.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchResponseBuilder.java index bc978b7..73f18f7 100644 --- a/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchResponseBuilder.java +++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchResponseBuilder.java @@ -47,6 +47,7 @@ import org.apache.james.mailbox.MessageUid; import org.apache.james.mailbox.ModSeq; import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.exception.MessageRangeException; +import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData; import org.apache.james.mailbox.model.Content; import org.apache.james.mailbox.model.Header; import org.apache.james.mailbox.model.MessageRange; @@ -125,21 +126,9 @@ public final class FetchResponseBuilder { // message. If so, update the flags, and ensure that a flags response is // included in the response. final MailboxSession mailboxSession = session.getMailboxSession(); - boolean ensureFlagsResponse = false; - final Flags resultFlags = result.getFlags(); - if (fetch.isSetSeen() && !resultFlags.contains(Flags.Flag.SEEN)) { - mailbox.setFlags(new Flags(Flags.Flag.SEEN), MessageManager.FlagsUpdateMode.ADD, MessageRange.one(resultUid), mailboxSession); - resultFlags.add(Flags.Flag.SEEN); - ensureFlagsResponse = true; - } // FLAGS response - if (fetch.contains(Item.FLAGS) || ensureFlagsResponse) { - if (selected.isRecent(resultUid)) { - resultFlags.add(Flags.Flag.RECENT); - } - setFlags(resultFlags); - } + addFlags(fetch, mailbox, selected, resultUid, mailboxSession, result.getFlags()); // INTERNALDATE response if (fetch.contains(Item.INTERNAL_DATE)) { @@ -183,23 +172,74 @@ public final class FetchResponseBuilder { bodystructure = new MimeDescriptorStructure(true, result.getMimeDescriptor(), envelopeBuilder); } } - // UID response - if (fetch.contains(Item.UID)) { - setUid(resultUid); - } + addUid(fetch, resultUid); - if (fetch.contains(Item.MODSEQ)) { - long changedSince = fetch.getChangedSince(); - if (changedSince != -1) { - // check if the modsequence if higher then the one specified by the CHANGEDSINCE option - if (changedSince < result.getModSeq().asLong()) { - setModSeq(result.getModSeq()); - } - } else { - setModSeq(result.getModSeq()); + addModSeq(fetch, result.getModSeq()); + + return build(); + }); + } + + private void addUid(FetchData fetch, MessageUid resultUid) { + // UID response + if (fetch.contains(Item.UID)) { + setUid(resultUid); + } + } + + private void addModSeq(FetchData fetch, ModSeq modSeq) { + if (fetch.contains(Item.MODSEQ)) { + long changedSince = fetch.getChangedSince(); + if (changedSince != -1) { + // check if the modsequence if higher then the one specified by the CHANGEDSINCE option + if (changedSince < modSeq.asLong()) { + setModSeq(modSeq); } + } else { + setModSeq(modSeq); + } + } + } + + private void addFlags(FetchData fetch, MessageManager mailbox, SelectedMailbox selected, MessageUid resultUid, MailboxSession mailboxSession, Flags flags) throws MailboxException { + boolean ensureFlagsResponse = false; + final Flags resultFlags = flags; + if (fetch.isSetSeen() && !resultFlags.contains(Flags.Flag.SEEN)) { + mailbox.setFlags(new Flags(Flags.Flag.SEEN), MessageManager.FlagsUpdateMode.ADD, MessageRange.one(resultUid), mailboxSession); + resultFlags.add(Flags.Flag.SEEN); + ensureFlagsResponse = true; + } + + if (fetch.contains(Item.FLAGS) || ensureFlagsResponse) { + if (selected.isRecent(resultUid)) { + resultFlags.add(Flags.Flag.RECENT); } + setFlags(resultFlags); + } + } + + public FetchResponse build(FetchData fetch, ComposedMessageIdWithMetaData result, MessageManager mailbox, ImapSession session) throws MessageRangeException, MailboxException { + final SelectedMailbox selected = session.getSelected(); + final MessageUid resultUid = result.getComposedMessageId().getUid(); + return selected.msn(resultUid).fold(() -> { + throw new MessageRangeException("No such message found with uid " + resultUid); + }, msn -> { + + reset(msn); + // setMsn(resultMsn); + + // Check if this fetch will cause the "SEEN" flag to be set on this + // message. If so, update the flags, and ensure that a flags response is + // included in the response. + final MailboxSession mailboxSession = session.getMailboxSession(); + // FLAGS response + addFlags(fetch, mailbox, selected, resultUid, mailboxSession, result.getFlags()); + // UID response + addUid(fetch, resultUid); + + + addModSeq(fetch, result.getModSeq()); return build(); }); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
