This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 2d54e04db52cb4df4f340c13050e7a2fd41385da Author: Benoit Tellier <[email protected]> AuthorDate: Wed Oct 21 11:55:19 2020 +0700 JAMES-3277 JMAP Rely on MessageManager::setFlags to optimize massive flags updates (Draft) We notice that many slow traces at the JMAP level are flag updates, generating a high count of Cassandra queries. By grouping the update we enable to share modseq allocation, and counter updates, reducing drastically query count. Furthermore parallelism with a flag update stage is unlocked, significantly fasting the request up. Query count before: 10m+9 (6 message => 69 queries, 9 messages => 99 queries, 12 messages => 129 queries) Query count after: 4m+16 (6 message => 40 queries, 9 messages => 52 queries, 64 messages => 129 queries) --- .../java/org/apache/james/util/StreamUtils.java | 6 ++ .../methods/integration/SetMessagesMethodTest.java | 72 ++++++++++++++++++++-- .../draft/methods/SetMessagesUpdateProcessor.java | 58 ++++++++++++++++- .../james/jmap/draft/model/UpdateMessagePatch.java | 23 +++++++ .../methods/SetMessagesUpdateProcessorTest.java | 4 ++ 5 files changed, 154 insertions(+), 9 deletions(-) diff --git a/server/container/util/src/main/java/org/apache/james/util/StreamUtils.java b/server/container/util/src/main/java/org/apache/james/util/StreamUtils.java index 73452ad..6dc5db5 100644 --- a/server/container/util/src/main/java/org/apache/james/util/StreamUtils.java +++ b/server/container/util/src/main/java/org/apache/james/util/StreamUtils.java @@ -59,6 +59,12 @@ public class StreamUtils { return streams.flatMap(Function.identity()); } + public static <T> boolean isSingleValued(Stream<T> stream) { + return stream.distinct() + .limit(2) + .count() == 1; + } + @SafeVarargs public static <T> Stream<T> flatten(Stream<T>... streams) { return flatten(Arrays.stream(streams)); diff --git a/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesMethodTest.java b/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesMethodTest.java index 988df1f..2fbed4fff 100644 --- a/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesMethodTest.java +++ b/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesMethodTest.java @@ -119,6 +119,7 @@ import org.apache.james.utils.SMTPMessageSender; import org.apache.james.utils.TestIMAPClient; import org.apache.mailet.Mail; import org.apache.mailet.base.test.FakeMail; +import org.assertj.core.api.SoftAssertions; import org.awaitility.Duration; import org.hamcrest.Matcher; import org.hamcrest.Matchers; @@ -144,12 +145,12 @@ import net.javacrumbs.jsonunit.core.internal.Options; public abstract class SetMessagesMethodTest { private static final String FORWARDED = "$Forwarded"; private static final int _1MB = 1024 * 1024; - private static final Username USERNAME = Username.of("username@" + DOMAIN); + protected static final Username USERNAME = Username.of("username@" + DOMAIN); private static final String ALIAS_OF_USERNAME_MAIL = "alias@" + DOMAIN; private static final String GROUP_MAIL = "group@" + DOMAIN; private static final Username ALIAS_OF_USERNAME = Username.of(ALIAS_OF_USERNAME_MAIL); private static final String PASSWORD = "password"; - private static final MailboxPath USER_MAILBOX = MailboxPath.forUser(USERNAME, "mailbox"); + protected static final MailboxPath USER_MAILBOX = MailboxPath.forUser(USERNAME, "mailbox"); private static final String NOT_UPDATED = ARGUMENTS + ".notUpdated"; private static final int BIG_MESSAGE_SIZE = 20 * 1024 * 1024; public static final String OCTET_CONTENT_TYPE = "application/octet-stream"; @@ -161,11 +162,11 @@ public abstract class SetMessagesMethodTest { protected abstract MessageId randomMessageId(); - private AccessToken accessToken; - private GuiceJamesServer jmapServer; - private MailboxProbe mailboxProbe; + protected AccessToken accessToken; + protected GuiceJamesServer jmapServer; + protected MailboxProbe mailboxProbe; private DataProbe dataProbe; - private MessageIdProbe messageProbe; + protected MessageIdProbe messageProbe; private ACLProbe aclProbe; @Before @@ -424,6 +425,65 @@ public abstract class SetMessagesMethodTest { } @Test + public void massiveFlagUpdateShouldBeApplied() throws MailboxException { + mailboxProbe.createMailbox(MailboxConstants.USER_NAMESPACE, USERNAME.asString(), "mailbox"); + + ComposedMessageId message1 = mailboxProbe.appendMessage(USERNAME.asString(), USER_MAILBOX, + new ByteArrayInputStream("Subject: test\r\n\r\ntestmail".getBytes(StandardCharsets.UTF_8)), new Date(), false, new Flags()); + ComposedMessageId message2 = mailboxProbe.appendMessage(USERNAME.asString(), USER_MAILBOX, + new ByteArrayInputStream("Subject: test\r\n\r\ntestmail".getBytes(StandardCharsets.UTF_8)), new Date(), false, new Flags()); + ComposedMessageId message3 = mailboxProbe.appendMessage(USERNAME.asString(), USER_MAILBOX, + new ByteArrayInputStream("Subject: test\r\n\r\ntestmail".getBytes(StandardCharsets.UTF_8)), new Date(), false, new Flags(Flag.SEEN)); + ComposedMessageId message4 = mailboxProbe.appendMessage(USERNAME.asString(), USER_MAILBOX, + new ByteArrayInputStream("Subject: test\r\n\r\ntestmail".getBytes(StandardCharsets.UTF_8)), new Date(), false, new Flags()); + ComposedMessageId message5 = mailboxProbe.appendMessage(USERNAME.asString(), USER_MAILBOX, + new ByteArrayInputStream("Subject: test\r\n\r\ntestmail".getBytes(StandardCharsets.UTF_8)), new Date(), false, new Flags(Flag.ANSWERED)); + ComposedMessageId message6 = mailboxProbe.appendMessage(USERNAME.asString(), USER_MAILBOX, + new ByteArrayInputStream("Subject: test\r\n\r\ntestmail".getBytes(StandardCharsets.UTF_8)), new Date(), false, new Flags()); + + String serializedMessageId1 = message1.getMessageId().serialize(); + String serializedMessageId2 = message2.getMessageId().serialize(); + String serializedMessageId3 = message3.getMessageId().serialize(); + String serializedMessageId4 = message4.getMessageId().serialize(); + String serializedMessageId5 = message5.getMessageId().serialize(); + String serializedMessageId6 = message6.getMessageId().serialize(); + + // When + given() + .header("Authorization", accessToken.asString()) + .body(String.format("[[\"setMessages\", {\"update\": {" + + " \"%s\" : { \"isUnread\" : false }, " + + " \"%s\" : { \"isUnread\" : false }, " + + " \"%s\" : { \"isUnread\" : false }, " + + " \"%s\" : { \"isUnread\" : false }, " + + " \"%s\" : { \"isUnread\" : false }, " + + " \"%s\" : { \"isUnread\" : false } " + + "} }, \"#0\"]]", serializedMessageId1, serializedMessageId2, serializedMessageId3, + serializedMessageId4, serializedMessageId5, serializedMessageId6)) + .when() + .post("/jmap") + // Then + .then() + .log().ifValidationFails().body(ARGUMENTS + ".updated", hasSize(6)); + + Flags flags1 = messageProbe.getMessages(message1.getMessageId(), USERNAME).iterator().next().getFlags(); + Flags flags2 = messageProbe.getMessages(message1.getMessageId(), USERNAME).iterator().next().getFlags(); + Flags flags3 = messageProbe.getMessages(message1.getMessageId(), USERNAME).iterator().next().getFlags(); + Flags flags4 = messageProbe.getMessages(message1.getMessageId(), USERNAME).iterator().next().getFlags(); + Flags flags5 = messageProbe.getMessages(message1.getMessageId(), USERNAME).iterator().next().getFlags(); + Flags flags6 = messageProbe.getMessages(message1.getMessageId(), USERNAME).iterator().next().getFlags(); + + SoftAssertions.assertSoftly(softly -> { + softly.assertThat(flags1).isEqualTo(new Flags(Flag.SEEN)); + softly.assertThat(flags2).isEqualTo(new Flags(Flag.SEEN)); + softly.assertThat(flags3).isEqualTo(new Flags(Flag.SEEN)); + softly.assertThat(flags4).isEqualTo(new Flags(Flag.SEEN)); + softly.assertThat(flags5).isEqualTo(new Flags(Flag.SEEN)); + softly.assertThat(flags6).isEqualTo(new Flags(Flag.SEEN)); + }); + } + + @Test public void setMessagesWithUpdateShouldReturnAnErrorWhenBothIsFlagAndKeywordsArePassed() throws MailboxException { mailboxProbe.createMailbox(MailboxConstants.USER_NAMESPACE, USERNAME.asString(), "mailbox"); diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java index 2c74d40..84d31e5 100644 --- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java @@ -32,6 +32,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import javax.inject.Inject; +import javax.mail.Flags; import javax.mail.MessagingException; import javax.mail.Session; import javax.mail.internet.MimeMessage; @@ -47,6 +48,7 @@ import org.apache.james.jmap.draft.model.SetMessagesRequest; import org.apache.james.jmap.draft.model.SetMessagesResponse; import org.apache.james.jmap.draft.model.UpdateMessagePatch; import org.apache.james.jmap.draft.utils.KeywordsCombiner; +import org.apache.james.mailbox.MailboxManager; import org.apache.james.mailbox.MailboxSession; import org.apache.james.mailbox.MessageIdManager; import org.apache.james.mailbox.MessageManager; @@ -61,11 +63,13 @@ import org.apache.james.mailbox.model.MailboxId; import org.apache.james.mailbox.model.MailboxId.Factory; import org.apache.james.mailbox.model.MessageId; import org.apache.james.mailbox.model.MessageMoves; +import org.apache.james.mailbox.model.MessageRange; import org.apache.james.mailbox.model.MessageResult; import org.apache.james.metrics.api.MetricFactory; import org.apache.james.metrics.api.TimeMetric; import org.apache.james.rrt.api.CanSendFrom; import org.apache.james.server.core.MailImpl; +import org.apache.james.util.StreamUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,6 +89,7 @@ public class SetMessagesUpdateProcessor implements SetMessagesProcessor { private final UpdateMessagePatchConverter updatePatchConverter; private final MessageIdManager messageIdManager; + private final MailboxManager mailboxManager; private final SystemMailboxesProvider systemMailboxesProvider; private final Factory mailboxIdFactory; private final MetricFactory metricFactory; @@ -97,6 +102,7 @@ public class SetMessagesUpdateProcessor implements SetMessagesProcessor { @VisibleForTesting SetMessagesUpdateProcessor( UpdateMessagePatchConverter updatePatchConverter, MessageIdManager messageIdManager, + MailboxManager mailboxManager, SystemMailboxesProvider systemMailboxesProvider, Factory mailboxIdFactory, MessageSender messageSender, @@ -105,6 +111,7 @@ public class SetMessagesUpdateProcessor implements SetMessagesProcessor { CanSendFrom canSendFrom) { this.updatePatchConverter = updatePatchConverter; this.messageIdManager = messageIdManager; + this.mailboxManager = mailboxManager; this.systemMailboxesProvider = systemMailboxesProvider; this.mailboxIdFactory = mailboxIdFactory; this.metricFactory = metricFactory; @@ -144,14 +151,59 @@ public class SetMessagesUpdateProcessor implements SetMessagesProcessor { .collect(Guavate.toImmutableListMultimap(metaData -> metaData.getComposedMessageId().getMessageId())) .block(); - patches.forEach((id, patch) -> { + if (isAMassiveFlagUpdate(patches, messages)) { + applyRangedFlagUpdate(patches, messages, responseBuilder, mailboxSession); + } else { + patches.forEach((id, patch) -> { if (patch.isValid()) { update(outboxes, id, patch, mailboxSession, responseBuilder, messages); } else { handleInvalidRequest(responseBuilder, id, patch.getValidationErrors(), patch); } - } - ); + }); + } + } + + private boolean isAMassiveFlagUpdate(Map<MessageId, UpdateMessagePatch> patches, Multimap<MessageId, ComposedMessageIdWithMetaData> messages) { + // The same patch, that only represents a flag update, is applied to messages within a single mailbox + return StreamUtils.isSingleValued(patches.values().stream()) + && StreamUtils.isSingleValued(messages.values().stream().map(metaData -> metaData.getComposedMessageId().getMailboxId())) + && patches.values().iterator().next().isOnlyAFlagUpdate() + && messages.size() > 3; + } + + private void applyRangedFlagUpdate(Map<MessageId, UpdateMessagePatch> patches, Multimap<MessageId, ComposedMessageIdWithMetaData> messages, SetMessagesResponse.Builder responseBuilder, MailboxSession mailboxSession) { + MailboxId mailboxId = messages.values() + .iterator() + .next() + .getComposedMessageId() + .getMailboxId(); + UpdateMessagePatch patch = patches.values().iterator().next(); + List<MessageRange> uidRanges = MessageRange.toRanges(messages.values().stream().map(metaData -> metaData.getComposedMessageId().getUid()) + .distinct() + .collect(Guavate.toImmutableList())); + + if (patch.isValid()) { + uidRanges.forEach(range -> { + ImmutableList<MessageId> messageIds = messages.entries() + .stream() + .filter(entry -> range.includes(entry.getValue().getComposedMessageId().getUid())) + .map(Map.Entry::getKey) + .distinct() + .collect(Guavate.toImmutableList()); + try { + mailboxManager.getMailbox(mailboxId, mailboxSession) + .setFlags(patch.applyToState(new Flags()), FlagsUpdateMode.REPLACE, range, mailboxSession); + responseBuilder.updated(messageIds); + } catch (MailboxException e) { + messageIds + .forEach(messageId -> handleMessageUpdateException(messageId, responseBuilder, e)); + } + }); + } else { + messages.keySet() + .forEach(messageId -> handleInvalidRequest(responseBuilder, messageId, patch.getValidationErrors(), patch)); + } } private void update(Set<MailboxId> outboxes, MessageId messageId, UpdateMessagePatch updateMessagePatch, MailboxSession mailboxSession, diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/UpdateMessagePatch.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/UpdateMessagePatch.java index 59cb832..4aa3663 100644 --- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/UpdateMessagePatch.java +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/UpdateMessagePatch.java @@ -21,6 +21,7 @@ package org.apache.james.jmap.draft.model; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; @@ -139,6 +140,10 @@ public class UpdateMessagePatch { return !oldKeywords.isPresent() && !keywords.isPresent(); } + public boolean isOnlyAFlagUpdate() { + return !mailboxIds.isPresent() && (oldKeywords.isPresent() || keywords.isPresent()); + } + public ImmutableList<ValidationResult> getValidationErrors() { return validationErrors; } @@ -156,6 +161,24 @@ public class UpdateMessagePatch { } @Override + public final boolean equals(Object o) { + if (o instanceof UpdateMessagePatch) { + UpdateMessagePatch that = (UpdateMessagePatch) o; + + return Objects.equals(this.mailboxIds, that.mailboxIds) + && Objects.equals(this.keywords, that.keywords) + && Objects.equals(this.oldKeywords, that.oldKeywords) + && Objects.equals(this.validationErrors, that.validationErrors); + } + return false; + } + + @Override + public final int hashCode() { + return Objects.hash(mailboxIds, keywords, oldKeywords, validationErrors); + } + + @Override public String toString() { return MoreObjects.toStringHelper(this) .add("mailboxIds", mailboxIds) diff --git a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessorTest.java b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessorTest.java index aa01dcd..b33f25c 100644 --- a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessorTest.java +++ b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessorTest.java @@ -178,8 +178,10 @@ public class SetMessagesUpdateProcessorTest { referenceUpdater = new ReferenceUpdater(mockMessageIdManager, mockedMailboxManager); UpdateMessagePatchConverter updateMessagePatchConverter = null; + MailboxManager mailboxManager = null; sut = new SetMessagesUpdateProcessor(updateMessagePatchConverter, messageIdManager, + mailboxManager, fakeSystemMailboxesProvider, mockedMailboxIdFactory, messageSender, @@ -234,8 +236,10 @@ public class SetMessagesUpdateProcessorTest { .thenReturn(mockInvalidPatch); + MailboxManager mailboxManager = null; SetMessagesUpdateProcessor sut = new SetMessagesUpdateProcessor(mockConverter, mockMessageIdManager, + mailboxManager, fakeSystemMailboxesProvider, mockedMailboxIdFactory, messageSender, --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
