This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 66dc43c67c463be0c26a771fae80cd321c2c84c4
Author: Benoit Tellier <[email protected]>
AuthorDate: Sat May 15 10:12:09 2021 +0700

    [REFACTORING] JMAP draft setMessages update: do not block for outbox reading
---
 .../james/mailbox/SystemMailboxesProvider.java     |  2 +-
 .../draft/methods/SetMessagesUpdateProcessor.java  | 37 +++++++++++-----------
 2 files changed, 20 insertions(+), 19 deletions(-)

diff --git 
a/mailbox/api/src/main/java/org/apache/james/mailbox/SystemMailboxesProvider.java
 
b/mailbox/api/src/main/java/org/apache/james/mailbox/SystemMailboxesProvider.java
index 58c0f57..53a9410 100644
--- 
a/mailbox/api/src/main/java/org/apache/james/mailbox/SystemMailboxesProvider.java
+++ 
b/mailbox/api/src/main/java/org/apache/james/mailbox/SystemMailboxesProvider.java
@@ -27,7 +27,7 @@ import org.reactivestreams.Publisher;
 import reactor.core.publisher.Flux;
 
 public interface SystemMailboxesProvider {
-    Publisher<MessageManager> getMailboxByRole(Role aRole, Username username) 
throws MailboxException;
+    Publisher<MessageManager> getMailboxByRole(Role aRole, Username username);
 
     default MessageManager findMailbox(Role role, Username username) throws 
MailboxException {
         return Flux.from(getMailboxByRole(role, username))
diff --git 
a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java
 
b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java
index 9f4c176..82c8f57 100644
--- 
a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java
+++ 
b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java
@@ -66,7 +66,6 @@ import org.apache.james.mailbox.model.MessageMoves;
 import org.apache.james.mailbox.model.MessageRange;
 import org.apache.james.mailbox.model.MessageResult;
 import org.apache.james.metrics.api.MetricFactory;
-import org.apache.james.metrics.api.TimeMetric;
 import org.apache.james.rrt.api.CanSendFrom;
 import org.apache.james.server.core.MailImpl;
 import org.apache.james.util.StreamUtils;
@@ -80,8 +79,9 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Multimap;
 
-import io.vavr.control.Try;
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 public class SetMessagesUpdateProcessor implements SetMessagesProcessor {
 
@@ -121,19 +121,21 @@ public class SetMessagesUpdateProcessor implements 
SetMessagesProcessor {
     }
 
     @Override
-    public SetMessagesResponse process(SetMessagesRequest request, 
MailboxSession mailboxSession) {
-        TimeMetric timeMetric = metricFactory.timer(JMAP_PREFIX + 
"SetMessagesUpdateProcessor");
-        SetMessagesResponse.Builder responseBuilder = 
SetMessagesResponse.builder();
-        Try.ofCallable(() -> listMailboxIdsForRole(mailboxSession, 
Role.OUTBOX))
-            .map(outboxes -> {
-                prepareResponse(request, mailboxSession, responseBuilder, 
outboxes);
-                return null;
-            })
-            .onFailure(e -> request.buildUpdatePatches(updatePatchConverter)
-                .forEach((id, patch) -> 
prepareResponseIfCantReadOutboxes(responseBuilder, e, id, patch)));
-
-        timeMetric.stopAndPublish();
-        return responseBuilder.build();
+    public Mono<SetMessagesResponse> processReactive(SetMessagesRequest 
request, MailboxSession mailboxSession) {
+        return 
Mono.from(metricFactory.decoratePublisherWithTimerMetricLogP99(JMAP_PREFIX + 
"SetMessagesUpdateProcessor",
+            listMailboxIdsForRole(mailboxSession, Role.OUTBOX)
+                .flatMap(outboxIds -> Mono.fromCallable(() -> {
+                    SetMessagesResponse.Builder responseBuilder = 
SetMessagesResponse.builder();
+                    prepareResponse(request, mailboxSession, responseBuilder, 
outboxIds);
+                    return responseBuilder.build();
+                }).subscribeOn(Schedulers.elastic()))
+                .onErrorResume(e ->
+                    Mono.fromCallable(() -> {
+                        SetMessagesResponse.Builder responseBuilder = 
SetMessagesResponse.builder();
+                        request.buildUpdatePatches(updatePatchConverter)
+                            .forEach((id, patch) -> 
prepareResponseIfCantReadOutboxes(responseBuilder, e, id, patch));
+                        return responseBuilder.build();
+                    }).subscribeOn(Schedulers.elastic()))));
     }
 
     private void prepareResponseIfCantReadOutboxes(SetMessagesResponse.Builder 
responseBuilder, Throwable e, MessageId id, UpdateMessagePatch patch) {
@@ -406,13 +408,12 @@ public class SetMessagesUpdateProcessor implements 
SetMessagesProcessor {
             .collect(Guavate.toImmutableSet());
     }
 
-    private boolean isTargetingOutbox(Set<MailboxId> outboxes, Set<MailboxId> 
targetMailboxIds) throws MailboxException {
+    private boolean isTargetingOutbox(Set<MailboxId> outboxes, Set<MailboxId> 
targetMailboxIds) {
         return targetMailboxIds.stream().anyMatch(outboxes::contains);
     }
 
-    private Set<MailboxId> listMailboxIdsForRole(MailboxSession session, Role 
role) throws MailboxException {
+    private Mono<Set<MailboxId>> listMailboxIdsForRole(MailboxSession session, 
Role role) {
         return Flux.from(systemMailboxesProvider.getMailboxByRole(role, 
session.getUser()))
-            .toStream()
             .map(MessageManager::getId)
             .collect(Guavate.toImmutableSet());
     }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to