This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit df8c6e7adc7d870cbd2518275a2f4dc6edc4b7fa
Author: Benoit Tellier <[email protected]>
AuthorDate: Sat May 15 12:41:55 2021 +0700

    [REFACTORING] SetMessagesUpdateProcessor: Reactify updates
---
 .../org/apache/james/mailbox/MessageIdManager.java |   2 +-
 .../inmemory/mail/InMemoryMessageIdMapper.java     |   6 +-
 .../james/mailbox/store/mail/MessageIdMapper.java  |   4 +-
 .../draft/methods/SetMessagesUpdateProcessor.java  | 130 ++++++++++-----------
 .../jmap/draft/methods/SetMailboxesMethodTest.java |  12 +-
 .../methods/SetMessagesUpdateProcessorTest.java    |   4 +-
 6 files changed, 81 insertions(+), 77 deletions(-)

diff --git 
a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java 
b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
index 272d94c..4bb6060 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
@@ -52,7 +52,7 @@ public interface MessageIdManager {
 
     void setFlags(Flags newState, FlagsUpdateMode replace, MessageId 
messageId, List<MailboxId> mailboxIds, MailboxSession mailboxSession) throws 
MailboxException;
 
-    Publisher<Void> setFlagsReactive(Flags newState, FlagsUpdateMode replace, 
MessageId messageId, List<MailboxId> mailboxIds, MailboxSession mailboxSession) 
throws MailboxException;
+    Publisher<Void> setFlagsReactive(Flags newState, FlagsUpdateMode replace, 
MessageId messageId, List<MailboxId> mailboxIds, MailboxSession mailboxSession);
 
     List<MessageResult> getMessages(Collection<MessageId> messageIds, 
FetchGroup minimal, MailboxSession mailboxSession) throws MailboxException;
 
diff --git 
a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java
 
b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java
index 8be1835..569b1ae 100644
--- 
a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java
+++ 
b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java
@@ -53,6 +53,7 @@ import com.google.common.collect.Multimap;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 public class InMemoryMessageIdMapper implements MessageIdMapper {
     private final MailboxMapper mailboxMapper;
@@ -135,14 +136,15 @@ public class InMemoryMessageIdMapper implements 
MessageIdMapper {
     @Override
     public Mono<Multimap<MailboxId, UpdatedFlags>> setFlags(MessageId 
messageId, List<MailboxId> mailboxIds,
                                                             Flags newState, 
FlagsUpdateMode updateMode) {
-        return Mono.fromCallable(() -> find(ImmutableList.of(messageId), 
MessageMapper.FetchType.Metadata)
+        return Mono.<Multimap<MailboxId, UpdatedFlags>>fromCallable(() -> 
find(ImmutableList.of(messageId), MessageMapper.FetchType.Metadata)
             .stream()
             .filter(message -> mailboxIds.contains(message.getMailboxId()))
             .map(updateMessage(newState, updateMode))
             .distinct()
             .collect(Guavate.toImmutableListMultimap(
                 Pair::getKey,
-                Pair::getValue)));
+                Pair::getValue)))
+            .subscribeOn(Schedulers.elastic());
     }
 
     private Function<MailboxMessage, Pair<MailboxId, UpdatedFlags>> 
updateMessage(Flags newState, FlagsUpdateMode updateMode) {
diff --git 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java
 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java
index 02740cd..4af711b 100644
--- 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java
+++ 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java
@@ -40,6 +40,7 @@ import com.google.common.collect.Multimap;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 public interface MessageIdMapper {
 
@@ -58,7 +59,8 @@ public interface MessageIdMapper {
     void copyInMailbox(MailboxMessage mailboxMessage, Mailbox mailbox) throws 
MailboxException;
 
     default Mono<Void> copyInMailboxReactive(MailboxMessage mailboxMessage, 
Mailbox mailbox) {
-        return Mono.fromRunnable(Throwing.runnable(() -> 
copyInMailbox(mailboxMessage, mailbox)).sneakyThrow());
+        return Mono.<Void>fromRunnable(Throwing.runnable(() -> 
copyInMailbox(mailboxMessage, mailbox)).sneakyThrow())
+            .subscribeOn(Schedulers.elastic());
     }
 
     void delete(MessageId messageId);
diff --git 
a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java
 
b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java
index fea0e2a..5884cab 100644
--- 
a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java
+++ 
b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java
@@ -221,7 +221,7 @@ public class SetMessagesUpdateProcessor implements 
SetMessagesProcessor {
                         .reduce(SetMessagesResponse.Builder::mergeWith)
                         .orElse(SetMessagesResponse.builder());
                 }
-            }) .reduce(SetMessagesResponse.Builder::mergeWith)
+            }).reduce(SetMessagesResponse.Builder::mergeWith)
                 .orElse(SetMessagesResponse.builder());
         } else {
             return messages.keySet().stream()
@@ -270,7 +270,7 @@ public class SetMessagesUpdateProcessor implements 
SetMessagesProcessor {
                         .reduce(SetMessagesResponse.Builder::mergeWith)
                         .orElse(SetMessagesResponse.builder());
                 }
-            }) .reduce(SetMessagesResponse.Builder::mergeWith)
+            }).reduce(SetMessagesResponse.Builder::mergeWith)
                 .orElse(SetMessagesResponse.builder());
         } else {
             return messages.keySet().stream()
@@ -280,74 +280,73 @@ public class SetMessagesUpdateProcessor implements 
SetMessagesProcessor {
         }
     }
 
-    private SetMessagesResponse.Builder update(Set<MailboxId> outboxes, 
MessageId messageId, UpdateMessagePatch updateMessagePatch, MailboxSession 
mailboxSession, Multimap<MessageId, ComposedMessageIdWithMetaData> metadata) {
+    private Mono<SetMessagesResponse.Builder> update(Set<MailboxId> outboxes, 
MessageId messageId, UpdateMessagePatch updateMessagePatch, MailboxSession 
mailboxSession, Multimap<MessageId, ComposedMessageIdWithMetaData> metadata) {
         try {
-            SetMessagesResponse.Builder builder = 
SetMessagesResponse.builder();
             List<ComposedMessageIdWithMetaData> messages = 
Optional.ofNullable(metadata.get(messageId))
                 .map(ImmutableList::copyOf)
                 .orElse(ImmutableList.of());
             assertValidUpdate(messages, updateMessagePatch, outboxes);
 
             if (messages.isEmpty()) {
-                builder.mergeWith(addMessageIdNotFoundToResponse(messageId));
+                return 
Mono.just(SetMessagesResponse.builder().mergeWith(addMessageIdNotFoundToResponse(messageId)));
             } else {
-                setInMailboxes(messageId, updateMessagePatch, mailboxSession);
-                Optional<MailboxException> updateError = messages.stream()
-                    .flatMap(message -> updateFlags(messageId, 
updateMessagePatch, mailboxSession, message))
-                    .findAny();
-                if (updateError.isPresent()) {
-                    builder.mergeWith(handleMessageUpdateException(messageId, 
updateError.get()));
-                } else {
-                    builder.updated(ImmutableList.of(messageId));
-                }
-                
builder.mergeWith(sendMessageWhenOutboxInTargetMailboxIds(outboxes, messageId, 
updateMessagePatch, mailboxSession));
+                return setInMailboxes(messageId, updateMessagePatch, 
mailboxSession)
+                    .then(Flux.fromIterable(messages)
+                        .flatMap(message -> updateFlags(messageId, 
updateMessagePatch, mailboxSession, message))
+                        .then())
+                    
.then(Mono.just(SetMessagesResponse.builder().updated(ImmutableList.of(messageId))))
+                    .flatMap(builder -> 
sendMessageWhenOutboxInTargetMailboxIds(outboxes, messageId, 
updateMessagePatch, mailboxSession)
+                        .map(builder::mergeWith))
+                    .onErrorResume(OverQuotaException.class, e -> 
Mono.just(SetMessagesResponse.builder().notUpdated(messageId,
+                        SetError.builder()
+                            .type(SetError.Type.MAX_QUOTA_REACHED)
+                            .description(e.getMessage())
+                            .build())))
+                    .onErrorResume(MailboxException.class, e -> 
Mono.just(handleMessageUpdateException(messageId, e)))
+                    .onErrorResume(IOException.class, e -> 
Mono.just(handleMessageUpdateException(messageId, e)))
+                    .onErrorResume(MessagingException.class, e -> 
Mono.just(handleMessageUpdateException(messageId, e)))
+                    .onErrorResume(IllegalArgumentException.class, e -> {
+                        ValidationResult invalidPropertyKeywords = 
ValidationResult.builder()
+                            
.property(MessageProperties.MessageProperty.keywords.asFieldName())
+                            .message(e.getMessage())
+                            .build();
+
+                        return Mono.just(handleInvalidRequest(messageId, 
ImmutableList.of(invalidPropertyKeywords), updateMessagePatch));
+                    });
             }
-            return builder;
         } catch (InvalidOutboxMoveException e) {
             ValidationResult invalidPropertyMailboxIds = 
ValidationResult.builder()
                 
.property(MessageProperties.MessageProperty.mailboxIds.asFieldName())
                 .message(e.getMessage())
                 .build();
 
-            return handleInvalidRequest(messageId, 
ImmutableList.of(invalidPropertyMailboxIds), updateMessagePatch);
-        } catch (OverQuotaException e) {
-            return SetMessagesResponse.builder().notUpdated(messageId,
-                SetError.builder()
-                    .type(SetError.Type.MAX_QUOTA_REACHED)
-                    .description(e.getMessage())
-                    .build());
-        } catch (MailboxException | IOException | MessagingException e) {
-            return handleMessageUpdateException(messageId, e);
-        } catch (IllegalArgumentException e) {
-            ValidationResult invalidPropertyKeywords = 
ValidationResult.builder()
-                    
.property(MessageProperties.MessageProperty.keywords.asFieldName())
-                    .message(e.getMessage())
-                    .build();
-
-            return handleInvalidRequest(messageId, 
ImmutableList.of(invalidPropertyKeywords), updateMessagePatch);
+            return Mono.just(handleInvalidRequest(messageId, 
ImmutableList.of(invalidPropertyMailboxIds), updateMessagePatch));
         }
     }
 
-    private SetMessagesResponse.Builder 
sendMessageWhenOutboxInTargetMailboxIds(Set<MailboxId> outboxes, MessageId 
messageId, UpdateMessagePatch updateMessagePatch, MailboxSession 
mailboxSession) throws MailboxException, MessagingException, IOException {
+    private Mono<SetMessagesResponse.Builder> 
sendMessageWhenOutboxInTargetMailboxIds(Set<MailboxId> outboxes, MessageId 
messageId, UpdateMessagePatch updateMessagePatch, MailboxSession 
mailboxSession) {
         if (isTargetingOutbox(outboxes, 
listTargetMailboxIds(updateMessagePatch))) {
-            Optional<MessageResult> maybeMessageToSend =
-                messageIdManager.getMessage(messageId, 
FetchGroup.FULL_CONTENT, mailboxSession)
-                    .stream()
-                    .findFirst();
-            if (maybeMessageToSend.isPresent()) {
-                MessageResult messageToSend = maybeMessageToSend.get();
-                MailImpl mail = buildMailFromMessage(messageToSend);
-                Optional<Username> fromUser = mail.getMaybeSender()
-                    .asOptional()
-                    .map(Username::fromMailAddress);
-                assertUserCanSendFrom(mailboxSession.getUser(), fromUser);
-                messageSender.sendMessage(messageId, mail, mailboxSession);
-                referenceUpdater.updateReferences(messageToSend.getHeaders(), 
mailboxSession);
-            } else {
-                return addMessageIdNotFoundToResponse(messageId);
-            }
+            return Mono.fromCallable(() -> {
+                Optional<MessageResult> maybeMessageToSend =
+                    messageIdManager.getMessage(messageId, 
FetchGroup.FULL_CONTENT, mailboxSession)
+                        .stream()
+                        .findFirst();
+                if (maybeMessageToSend.isPresent()) {
+                    MessageResult messageToSend = maybeMessageToSend.get();
+                    MailImpl mail = buildMailFromMessage(messageToSend);
+                    Optional<Username> fromUser = mail.getMaybeSender()
+                        .asOptional()
+                        .map(Username::fromMailAddress);
+                    assertUserCanSendFrom(mailboxSession.getUser(), fromUser);
+                    messageSender.sendMessage(messageId, mail, mailboxSession);
+                    
referenceUpdater.updateReferences(messageToSend.getHeaders(), mailboxSession);
+                    return SetMessagesResponse.builder();
+                } else {
+                    return addMessageIdNotFoundToResponse(messageId);
+                }
+            }).subscribeOn(Schedulers.elastic());
         }
-        return SetMessagesResponse.builder();
+        return Mono.just(SetMessagesResponse.builder());
     }
 
     @VisibleForTesting
@@ -430,20 +429,16 @@ public class SetMessagesUpdateProcessor implements 
SetMessagesProcessor {
             .collect(Guavate.toImmutableSet());
     }
 
-    private Stream<MailboxException> updateFlags(MessageId messageId, 
UpdateMessagePatch updateMessagePatch, MailboxSession mailboxSession, 
ComposedMessageIdWithMetaData message) {
-        try {
-            if (!updateMessagePatch.isFlagsIdentity()) {
-                messageIdManager.setFlags(
-                    updateMessagePatch.applyToState(message.getFlags()),
-                    FlagsUpdateMode.REPLACE, messageId, 
ImmutableList.of(message.getComposedMessageId().getMailboxId()), 
mailboxSession);
-            }
-            return Stream.of();
-        } catch (MailboxException e) {
-            return Stream.of(e);
+    private Mono<Void> updateFlags(MessageId messageId, UpdateMessagePatch 
updateMessagePatch, MailboxSession mailboxSession, 
ComposedMessageIdWithMetaData message) {
+        if (!updateMessagePatch.isFlagsIdentity()) {
+            return Mono.from(messageIdManager.setFlagsReactive(
+                updateMessagePatch.applyToState(message.getFlags()),
+                FlagsUpdateMode.REPLACE, messageId, 
ImmutableList.of(message.getComposedMessageId().getMailboxId()), 
mailboxSession));
         }
+        return Mono.empty();
     }
 
-    private void setInMailboxes(MessageId messageId, UpdateMessagePatch 
updateMessagePatch, MailboxSession mailboxSession) throws MailboxException {
+    private Mono<Void> setInMailboxes(MessageId messageId, UpdateMessagePatch 
updateMessagePatch, MailboxSession mailboxSession) {
         Optional<List<String>> serializedMailboxIds = 
updateMessagePatch.getMailboxIds();
         if (serializedMailboxIds.isPresent()) {
             List<MailboxId> mailboxIds = serializedMailboxIds.get()
@@ -451,17 +446,18 @@ public class SetMessagesUpdateProcessor implements 
SetMessagesProcessor {
                 .map(mailboxIdFactory::fromString)
                 .collect(Guavate.toImmutableList());
 
-            messageIdManager.setInMailboxes(messageId, mailboxIds, 
mailboxSession);
+            return 
Mono.from(messageIdManager.setInMailboxesReactive(messageId, mailboxIds, 
mailboxSession));
         }
+        return Mono.empty();
     }
 
     private SetMessagesResponse.Builder 
addMessageIdNotFoundToResponse(MessageId messageId) {
         return 
SetMessagesResponse.builder().notUpdated(ImmutableMap.of(messageId,
-                SetError.builder()
-                        .type(SetError.Type.NOT_FOUND)
-                        
.properties(ImmutableSet.of(MessageProperties.MessageProperty.id))
-                        .description("message not found")
-                        .build()));
+            SetError.builder()
+                .type(SetError.Type.NOT_FOUND)
+                
.properties(ImmutableSet.of(MessageProperties.MessageProperty.id))
+                .description("message not found")
+                .build()));
     }
 
     private SetMessagesResponse.Builder handleMessageUpdateException(MessageId 
messageId,
diff --git 
a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/SetMailboxesMethodTest.java
 
b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/SetMailboxesMethodTest.java
index ea0ce18..23f2f21 100644
--- 
a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/SetMailboxesMethodTest.java
+++ 
b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/SetMailboxesMethodTest.java
@@ -42,6 +42,8 @@ import org.junit.Test;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 
+import reactor.core.publisher.Mono;
+
 public class SetMailboxesMethodTest {
 
     private static final ImmutableSet<SetMailboxesProcessor> NO_PROCESSOR = 
ImmutableSet.of();
@@ -106,11 +108,12 @@ public class SetMailboxesMethodTest {
 
         MailboxSession session = mock(MailboxSession.class);
         SetMailboxesProcessor creatorProcessor = 
mock(SetMailboxesProcessor.class);
-        when(creatorProcessor.process(creationRequest, 
session)).thenReturn(creationResponse);
+        when(creatorProcessor.processReactive(creationRequest, 
session)).thenReturn(Mono.just(creationResponse));
 
         Stream<JmapResponse> actual =
             new SetMailboxesMethod(ImmutableSet.of(creatorProcessor), 
TIME_METRIC_FACTORY)
-                    .processToStream(creationRequest, 
MethodCallId.of("methodCallId"), session);
+                    .process(creationRequest, MethodCallId.of("methodCallId"), 
session)
+            .toStream();
 
         assertThat(actual).contains(jmapResponse);
     }
@@ -129,11 +132,12 @@ public class SetMailboxesMethodTest {
 
         MailboxSession session = mock(MailboxSession.class);
         SetMailboxesProcessor destructorProcessor = 
mock(SetMailboxesProcessor.class);
-        when(destructorProcessor.process(destructionRequest, 
session)).thenReturn(destructionResponse);
+        when(destructorProcessor.processReactive(destructionRequest, 
session)).thenReturn(Mono.just(destructionResponse));
 
         Stream<JmapResponse> actual =
             new SetMailboxesMethod(ImmutableSet.of(destructorProcessor), 
TIME_METRIC_FACTORY)
-                    .processToStream(destructionRequest, 
MethodCallId.of("methodCallId"), session);
+                    .process(destructionRequest, 
MethodCallId.of("methodCallId"), session)
+            .toStream();
 
         assertThat(actual).contains(jmapResponse);
     }
diff --git 
a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessorTest.java
 
b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessorTest.java
index 5ce3a4b..9c643cd 100644
--- 
a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessorTest.java
+++ 
b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessorTest.java
@@ -215,7 +215,7 @@ public class SetMessagesUpdateProcessorTest {
     public void processShouldReturnEmptyUpdatedWhenRequestHasEmptyUpdate() {
         SetMessagesRequest requestWithEmptyUpdate = 
SetMessagesRequest.builder().build();
 
-        SetMessagesResponse result = sut.process(requestWithEmptyUpdate, null);
+        SetMessagesResponse result = sut.process(requestWithEmptyUpdate, 
session);
 
         assertThat(result.getUpdated()).isEmpty();
         assertThat(result.getNotUpdated()).isEmpty();
@@ -253,7 +253,7 @@ public class SetMessagesUpdateProcessorTest {
                 .build();
 
         // When
-        SetMessagesResponse result = sut.process(requestWithInvalidUpdate, 
null);
+        SetMessagesResponse result = sut.process(requestWithInvalidUpdate, 
session);
 
         // Then
         assertThat(result.getNotUpdated()).describedAs("NotUpdated should not 
be empty").isNotEmpty();

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to