This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 2d54e04db52cb4df4f340c13050e7a2fd41385da
Author: Benoit Tellier <[email protected]>
AuthorDate: Wed Oct 21 11:55:19 2020 +0700

    JAMES-3277 JMAP Rely on MessageManager::setFlags to optimize massive flags 
updates (Draft)
    
    We notice that many slow traces at the JMAP level are flag updates, 
generating a high count of Cassandra queries.
    
    By grouping the update we enable to share modseq allocation, and counter 
updates, reducing drastically query count.
    
    Furthermore parallelism with a flag update stage is unlocked, significantly 
fasting the request up.
    
    Query count before: 10m+9 (6 message => 69 queries, 9 messages => 99 
queries, 12 messages => 129 queries)
    Query count after: 4m+16 (6 message => 40 queries, 9 messages => 52 
queries, 64 messages => 129 queries)
---
 .../java/org/apache/james/util/StreamUtils.java    |  6 ++
 .../methods/integration/SetMessagesMethodTest.java | 72 ++++++++++++++++++++--
 .../draft/methods/SetMessagesUpdateProcessor.java  | 58 ++++++++++++++++-
 .../james/jmap/draft/model/UpdateMessagePatch.java | 23 +++++++
 .../methods/SetMessagesUpdateProcessorTest.java    |  4 ++
 5 files changed, 154 insertions(+), 9 deletions(-)

diff --git 
a/server/container/util/src/main/java/org/apache/james/util/StreamUtils.java 
b/server/container/util/src/main/java/org/apache/james/util/StreamUtils.java
index 73452ad..6dc5db5 100644
--- a/server/container/util/src/main/java/org/apache/james/util/StreamUtils.java
+++ b/server/container/util/src/main/java/org/apache/james/util/StreamUtils.java
@@ -59,6 +59,12 @@ public class StreamUtils {
         return streams.flatMap(Function.identity());
     }
 
+    public static <T> boolean isSingleValued(Stream<T> stream) {
+        return stream.distinct()
+            .limit(2)
+            .count() == 1;
+    }
+
     @SafeVarargs
     public static <T> Stream<T> flatten(Stream<T>... streams) {
         return flatten(Arrays.stream(streams));
diff --git 
a/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesMethodTest.java
 
b/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesMethodTest.java
index 988df1f..2fbed4fff 100644
--- 
a/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesMethodTest.java
+++ 
b/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesMethodTest.java
@@ -119,6 +119,7 @@ import org.apache.james.utils.SMTPMessageSender;
 import org.apache.james.utils.TestIMAPClient;
 import org.apache.mailet.Mail;
 import org.apache.mailet.base.test.FakeMail;
+import org.assertj.core.api.SoftAssertions;
 import org.awaitility.Duration;
 import org.hamcrest.Matcher;
 import org.hamcrest.Matchers;
@@ -144,12 +145,12 @@ import net.javacrumbs.jsonunit.core.internal.Options;
 public abstract class SetMessagesMethodTest {
     private static final String FORWARDED = "$Forwarded";
     private static final int _1MB = 1024 * 1024;
-    private static final Username USERNAME = Username.of("username@" + DOMAIN);
+    protected static final Username USERNAME = Username.of("username@" + 
DOMAIN);
     private static final String ALIAS_OF_USERNAME_MAIL = "alias@" + DOMAIN;
     private static final String GROUP_MAIL = "group@" + DOMAIN;
     private static final Username ALIAS_OF_USERNAME = 
Username.of(ALIAS_OF_USERNAME_MAIL);
     private static final String PASSWORD = "password";
-    private static final MailboxPath USER_MAILBOX = 
MailboxPath.forUser(USERNAME, "mailbox");
+    protected static final MailboxPath USER_MAILBOX = 
MailboxPath.forUser(USERNAME, "mailbox");
     private static final String NOT_UPDATED = ARGUMENTS + ".notUpdated";
     private static final int BIG_MESSAGE_SIZE = 20 * 1024 * 1024;
     public static final String OCTET_CONTENT_TYPE = "application/octet-stream";
@@ -161,11 +162,11 @@ public abstract class SetMessagesMethodTest {
 
     protected abstract MessageId randomMessageId();
 
-    private AccessToken accessToken;
-    private GuiceJamesServer jmapServer;
-    private MailboxProbe mailboxProbe;
+    protected AccessToken accessToken;
+    protected GuiceJamesServer jmapServer;
+    protected MailboxProbe mailboxProbe;
     private DataProbe dataProbe;
-    private MessageIdProbe messageProbe;
+    protected MessageIdProbe messageProbe;
     private ACLProbe aclProbe;
 
     @Before
@@ -424,6 +425,65 @@ public abstract class SetMessagesMethodTest {
     }
 
     @Test
+    public void massiveFlagUpdateShouldBeApplied() throws MailboxException {
+        mailboxProbe.createMailbox(MailboxConstants.USER_NAMESPACE, 
USERNAME.asString(), "mailbox");
+
+        ComposedMessageId message1 = 
mailboxProbe.appendMessage(USERNAME.asString(), USER_MAILBOX,
+            new ByteArrayInputStream("Subject: 
test\r\n\r\ntestmail".getBytes(StandardCharsets.UTF_8)), new Date(), false, new 
Flags());
+        ComposedMessageId message2 = 
mailboxProbe.appendMessage(USERNAME.asString(), USER_MAILBOX,
+            new ByteArrayInputStream("Subject: 
test\r\n\r\ntestmail".getBytes(StandardCharsets.UTF_8)), new Date(), false, new 
Flags());
+        ComposedMessageId message3 = 
mailboxProbe.appendMessage(USERNAME.asString(), USER_MAILBOX,
+            new ByteArrayInputStream("Subject: 
test\r\n\r\ntestmail".getBytes(StandardCharsets.UTF_8)), new Date(), false, new 
Flags(Flag.SEEN));
+        ComposedMessageId message4 = 
mailboxProbe.appendMessage(USERNAME.asString(), USER_MAILBOX,
+            new ByteArrayInputStream("Subject: 
test\r\n\r\ntestmail".getBytes(StandardCharsets.UTF_8)), new Date(), false, new 
Flags());
+        ComposedMessageId message5 = 
mailboxProbe.appendMessage(USERNAME.asString(), USER_MAILBOX,
+            new ByteArrayInputStream("Subject: 
test\r\n\r\ntestmail".getBytes(StandardCharsets.UTF_8)), new Date(), false, new 
Flags(Flag.ANSWERED));
+        ComposedMessageId message6 = 
mailboxProbe.appendMessage(USERNAME.asString(), USER_MAILBOX,
+            new ByteArrayInputStream("Subject: 
test\r\n\r\ntestmail".getBytes(StandardCharsets.UTF_8)), new Date(), false, new 
Flags());
+
+        String serializedMessageId1 = message1.getMessageId().serialize();
+        String serializedMessageId2 = message2.getMessageId().serialize();
+        String serializedMessageId3 = message3.getMessageId().serialize();
+        String serializedMessageId4 = message4.getMessageId().serialize();
+        String serializedMessageId5 = message5.getMessageId().serialize();
+        String serializedMessageId6 = message6.getMessageId().serialize();
+
+        // When
+        given()
+            .header("Authorization", accessToken.asString())
+            .body(String.format("[[\"setMessages\", {\"update\": {" +
+                "  \"%s\" : { \"isUnread\" : false }, " +
+                "  \"%s\" : { \"isUnread\" : false }, " +
+                "  \"%s\" : { \"isUnread\" : false }, " +
+                "  \"%s\" : { \"isUnread\" : false }, " +
+                "  \"%s\" : { \"isUnread\" : false }, " +
+                "  \"%s\" : { \"isUnread\" : false } " +
+                "} }, \"#0\"]]", serializedMessageId1, serializedMessageId2, 
serializedMessageId3,
+                serializedMessageId4, serializedMessageId5, 
serializedMessageId6))
+        .when()
+            .post("/jmap")
+        // Then
+        .then()
+            .log().ifValidationFails().body(ARGUMENTS + ".updated", 
hasSize(6));
+
+        Flags flags1 = messageProbe.getMessages(message1.getMessageId(), 
USERNAME).iterator().next().getFlags();
+        Flags flags2 = messageProbe.getMessages(message1.getMessageId(), 
USERNAME).iterator().next().getFlags();
+        Flags flags3 = messageProbe.getMessages(message1.getMessageId(), 
USERNAME).iterator().next().getFlags();
+        Flags flags4 = messageProbe.getMessages(message1.getMessageId(), 
USERNAME).iterator().next().getFlags();
+        Flags flags5 = messageProbe.getMessages(message1.getMessageId(), 
USERNAME).iterator().next().getFlags();
+        Flags flags6 = messageProbe.getMessages(message1.getMessageId(), 
USERNAME).iterator().next().getFlags();
+
+        SoftAssertions.assertSoftly(softly -> {
+            softly.assertThat(flags1).isEqualTo(new Flags(Flag.SEEN));
+            softly.assertThat(flags2).isEqualTo(new Flags(Flag.SEEN));
+            softly.assertThat(flags3).isEqualTo(new Flags(Flag.SEEN));
+            softly.assertThat(flags4).isEqualTo(new Flags(Flag.SEEN));
+            softly.assertThat(flags5).isEqualTo(new Flags(Flag.SEEN));
+            softly.assertThat(flags6).isEqualTo(new Flags(Flag.SEEN));
+        });
+    }
+
+    @Test
     public void 
setMessagesWithUpdateShouldReturnAnErrorWhenBothIsFlagAndKeywordsArePassed() 
throws MailboxException {
         mailboxProbe.createMailbox(MailboxConstants.USER_NAMESPACE, 
USERNAME.asString(), "mailbox");
 
diff --git 
a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java
 
b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java
index 2c74d40..84d31e5 100644
--- 
a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java
+++ 
b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java
@@ -32,6 +32,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import javax.inject.Inject;
+import javax.mail.Flags;
 import javax.mail.MessagingException;
 import javax.mail.Session;
 import javax.mail.internet.MimeMessage;
@@ -47,6 +48,7 @@ import org.apache.james.jmap.draft.model.SetMessagesRequest;
 import org.apache.james.jmap.draft.model.SetMessagesResponse;
 import org.apache.james.jmap.draft.model.UpdateMessagePatch;
 import org.apache.james.jmap.draft.utils.KeywordsCombiner;
+import org.apache.james.mailbox.MailboxManager;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.MessageIdManager;
 import org.apache.james.mailbox.MessageManager;
@@ -61,11 +63,13 @@ import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.model.MailboxId.Factory;
 import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.mailbox.model.MessageMoves;
+import org.apache.james.mailbox.model.MessageRange;
 import org.apache.james.mailbox.model.MessageResult;
 import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.metrics.api.TimeMetric;
 import org.apache.james.rrt.api.CanSendFrom;
 import org.apache.james.server.core.MailImpl;
+import org.apache.james.util.StreamUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -85,6 +89,7 @@ public class SetMessagesUpdateProcessor implements 
SetMessagesProcessor {
 
     private final UpdateMessagePatchConverter updatePatchConverter;
     private final MessageIdManager messageIdManager;
+    private final MailboxManager mailboxManager;
     private final SystemMailboxesProvider systemMailboxesProvider;
     private final Factory mailboxIdFactory;
     private final MetricFactory metricFactory;
@@ -97,6 +102,7 @@ public class SetMessagesUpdateProcessor implements 
SetMessagesProcessor {
     @VisibleForTesting SetMessagesUpdateProcessor(
             UpdateMessagePatchConverter updatePatchConverter,
             MessageIdManager messageIdManager,
+            MailboxManager mailboxManager,
             SystemMailboxesProvider systemMailboxesProvider,
             Factory mailboxIdFactory,
             MessageSender messageSender,
@@ -105,6 +111,7 @@ public class SetMessagesUpdateProcessor implements 
SetMessagesProcessor {
             CanSendFrom canSendFrom) {
         this.updatePatchConverter = updatePatchConverter;
         this.messageIdManager = messageIdManager;
+        this.mailboxManager = mailboxManager;
         this.systemMailboxesProvider = systemMailboxesProvider;
         this.mailboxIdFactory = mailboxIdFactory;
         this.metricFactory = metricFactory;
@@ -144,14 +151,59 @@ public class SetMessagesUpdateProcessor implements 
SetMessagesProcessor {
             .collect(Guavate.toImmutableListMultimap(metaData -> 
metaData.getComposedMessageId().getMessageId()))
             .block();
 
-        patches.forEach((id, patch) -> {
+        if (isAMassiveFlagUpdate(patches, messages)) {
+            applyRangedFlagUpdate(patches, messages, responseBuilder, 
mailboxSession);
+        } else {
+            patches.forEach((id, patch) -> {
                 if (patch.isValid()) {
                     update(outboxes, id, patch, mailboxSession, 
responseBuilder, messages);
                 } else {
                     handleInvalidRequest(responseBuilder, id, 
patch.getValidationErrors(), patch);
                 }
-            }
-        );
+            });
+        }
+    }
+
+    private boolean isAMassiveFlagUpdate(Map<MessageId, UpdateMessagePatch> 
patches, Multimap<MessageId, ComposedMessageIdWithMetaData> messages) {
+        // The same patch, that only represents a flag update, is applied to 
messages within a single mailbox
+        return StreamUtils.isSingleValued(patches.values().stream())
+            && 
StreamUtils.isSingleValued(messages.values().stream().map(metaData -> 
metaData.getComposedMessageId().getMailboxId()))
+            && patches.values().iterator().next().isOnlyAFlagUpdate()
+            && messages.size() > 3;
+    }
+
+    private void applyRangedFlagUpdate(Map<MessageId, UpdateMessagePatch> 
patches, Multimap<MessageId, ComposedMessageIdWithMetaData> messages, 
SetMessagesResponse.Builder responseBuilder, MailboxSession mailboxSession) {
+        MailboxId mailboxId = messages.values()
+            .iterator()
+            .next()
+            .getComposedMessageId()
+            .getMailboxId();
+        UpdateMessagePatch patch = patches.values().iterator().next();
+        List<MessageRange> uidRanges = 
MessageRange.toRanges(messages.values().stream().map(metaData -> 
metaData.getComposedMessageId().getUid())
+            .distinct()
+            .collect(Guavate.toImmutableList()));
+
+        if (patch.isValid()) {
+            uidRanges.forEach(range -> {
+                ImmutableList<MessageId> messageIds = messages.entries()
+                    .stream()
+                    .filter(entry -> 
range.includes(entry.getValue().getComposedMessageId().getUid()))
+                    .map(Map.Entry::getKey)
+                    .distinct()
+                    .collect(Guavate.toImmutableList());
+                try {
+                    mailboxManager.getMailbox(mailboxId, mailboxSession)
+                        .setFlags(patch.applyToState(new Flags()), 
FlagsUpdateMode.REPLACE, range, mailboxSession);
+                    responseBuilder.updated(messageIds);
+                } catch (MailboxException e) {
+                    messageIds
+                        .forEach(messageId -> 
handleMessageUpdateException(messageId, responseBuilder, e));
+                }
+            });
+        } else {
+            messages.keySet()
+                .forEach(messageId -> handleInvalidRequest(responseBuilder, 
messageId, patch.getValidationErrors(), patch));
+        }
     }
 
     private void update(Set<MailboxId> outboxes, MessageId messageId, 
UpdateMessagePatch updateMessagePatch, MailboxSession mailboxSession,
diff --git 
a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/UpdateMessagePatch.java
 
b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/UpdateMessagePatch.java
index 59cb832..4aa3663 100644
--- 
a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/UpdateMessagePatch.java
+++ 
b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/UpdateMessagePatch.java
@@ -21,6 +21,7 @@ package org.apache.james.jmap.draft.model;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 
@@ -139,6 +140,10 @@ public class UpdateMessagePatch {
         return !oldKeywords.isPresent() && !keywords.isPresent();
     }
 
+    public boolean isOnlyAFlagUpdate() {
+        return !mailboxIds.isPresent() && (oldKeywords.isPresent() || 
keywords.isPresent());
+    }
+
     public ImmutableList<ValidationResult> getValidationErrors() {
         return validationErrors;
     }
@@ -156,6 +161,24 @@ public class UpdateMessagePatch {
     }
 
     @Override
+    public final boolean equals(Object o) {
+        if (o instanceof UpdateMessagePatch) {
+            UpdateMessagePatch that = (UpdateMessagePatch) o;
+
+            return Objects.equals(this.mailboxIds, that.mailboxIds)
+                && Objects.equals(this.keywords, that.keywords)
+                && Objects.equals(this.oldKeywords, that.oldKeywords)
+                && Objects.equals(this.validationErrors, 
that.validationErrors);
+        }
+        return false;
+    }
+
+    @Override
+    public final int hashCode() {
+        return Objects.hash(mailboxIds, keywords, oldKeywords, 
validationErrors);
+    }
+
+    @Override
     public String toString() {
         return MoreObjects.toStringHelper(this)
             .add("mailboxIds", mailboxIds)
diff --git 
a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessorTest.java
 
b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessorTest.java
index aa01dcd..b33f25c 100644
--- 
a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessorTest.java
+++ 
b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessorTest.java
@@ -178,8 +178,10 @@ public class SetMessagesUpdateProcessorTest {
         referenceUpdater = new ReferenceUpdater(mockMessageIdManager, 
mockedMailboxManager);
 
         UpdateMessagePatchConverter updateMessagePatchConverter = null;
+        MailboxManager mailboxManager = null;
         sut = new SetMessagesUpdateProcessor(updateMessagePatchConverter,
             messageIdManager,
+            mailboxManager,
             fakeSystemMailboxesProvider,
             mockedMailboxIdFactory,
             messageSender,
@@ -234,8 +236,10 @@ public class SetMessagesUpdateProcessorTest {
                 .thenReturn(mockInvalidPatch);
 
 
+        MailboxManager mailboxManager = null;
         SetMessagesUpdateProcessor sut = new 
SetMessagesUpdateProcessor(mockConverter,
             mockMessageIdManager,
+            mailboxManager,
             fakeSystemMailboxesProvider,
             mockedMailboxIdFactory,
             messageSender,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to