JAMES-2082 Try improving stream future redability

Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/bd69ab99
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/bd69ab99
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/bd69ab99

Branch: refs/heads/master
Commit: bd69ab9922b3e6d14dc34cc85a4b425bb3bfd3f8
Parents: 0ecc48a
Author: benwa <btell...@linagora.com>
Authored: Thu Jul 6 17:24:34 2017 +0700
Committer: Antoine Duprat <adup...@linagora.com>
Committed: Mon Jul 10 14:23:56 2017 +0200

----------------------------------------------------------------------
 .../mail/CassandraMessageIdMapper.java          | 26 +++++++++-----------
 .../cassandra/mail/CassandraMessageMapper.java  | 21 +++++++---------
 2 files changed, 21 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/bd69ab99/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index 95f0dd9..07c97c5 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -51,7 +51,6 @@ import 
org.apache.james.mailbox.store.mail.MessageMapper.FetchType;
 import org.apache.james.mailbox.store.mail.ModSeqProvider;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
-import org.apache.james.util.CompletableFutureUtil;
 import org.apache.james.util.FluentFutureStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -101,14 +100,14 @@ public class CassandraMessageIdMapper implements 
MessageIdMapper {
     }
 
     private Stream<SimpleMailboxMessage> findAsStream(List<MessageId> 
messageIds, FetchType fetchType) {
-        return CompletableFutureUtil.allOf(
+        return FluentFutureStream.ofNestedStreams(
             messageIds.stream()
                 .map(messageId -> imapUidDAO.retrieve((CassandraMessageId) 
messageId, Optional.empty())))
-            .thenApply(stream -> stream.flatMap(Function.identity()))
+            .completableFuture()
             .thenApply(stream -> stream.collect(Guavate.toImmutableList()))
             .thenCompose(composedMessageIds -> retrieveMessages(fetchType, 
composedMessageIds))
             .thenCompose(stream -> 
attachmentLoader.addAttachmentToMessages(stream, fetchType))
-            .thenCompose(this::filterMessagesWIthExistingMailbox)
+            .thenCompose(this::filterMessagesWithExistingMailbox)
             .join()
             .sorted(Comparator.comparing(MailboxMessage::getUid));
     }
@@ -116,19 +115,17 @@ public class CassandraMessageIdMapper implements 
MessageIdMapper {
     private CompletableFuture<Stream<Pair<MessageWithoutAttachment, 
Stream<MessageAttachmentRepresentation>>>>
             retrieveMessages(FetchType fetchType, 
ImmutableList<ComposedMessageIdWithMetaData> composedMessageIds) {
 
-        return messageDAOV2.retrieveMessages(composedMessageIds, fetchType, 
Limit.unlimited())
-            .thenCompose(messageResults -> FluentFutureStream.of(messageResults
-                .map(v1ToV2Migration::moveFromV1toV2))
-                .completableFuture());
+        return 
FluentFutureStream.of(messageDAOV2.retrieveMessages(composedMessageIds, 
fetchType, Limit.unlimited()))
+            .thenComposeOnAll(v1ToV2Migration::moveFromV1toV2)
+            .completableFuture();
     }
 
-    private CompletableFuture<Stream<SimpleMailboxMessage>> 
filterMessagesWIthExistingMailbox(Stream<SimpleMailboxMessage> stream) {
-        return FluentFutureStream.of(stream.map(this::mailboxExists))
-            .flatMap(m -> m)
+    private CompletableFuture<Stream<SimpleMailboxMessage>> 
filterMessagesWithExistingMailbox(Stream<SimpleMailboxMessage> stream) {
+        return 
FluentFutureStream.ofOptionals(stream.map(this::keepMessageIfMailboxExists))
             .completableFuture();
     }
 
-    private CompletableFuture<Stream<SimpleMailboxMessage>> 
mailboxExists(SimpleMailboxMessage message) {
+    private CompletableFuture<Optional<SimpleMailboxMessage>> 
keepMessageIfMailboxExists(SimpleMailboxMessage message) {
         CassandraId cassandraId = (CassandraId) message.getMailboxId();
         return mailboxDAO.retrieveMailbox(cassandraId)
             .thenApply(optional -> {
@@ -136,9 +133,10 @@ public class CassandraMessageIdMapper implements 
MessageIdMapper {
                     LOGGER.info("Mailbox {} have been deleted but message {} 
is still attached to it.",
                         cassandraId,
                         message.getMailboxId());
-                    return Stream.empty();
+                    return Optional.empty();
                 }
-                return Stream.of(message);
+
+                return Optional.of(message);
             });
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/bd69ab99/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 1daa256..6335389 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -55,8 +55,8 @@ import org.apache.james.mailbox.store.mail.MessageMapper;
 import org.apache.james.mailbox.store.mail.model.Mailbox;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
+import org.apache.james.util.CompletableFutureUtil;
 import org.apache.james.util.FluentFutureStream;
-import org.apache.james.util.OptionalConverter;
 import org.apache.james.util.streams.JamesCollectors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -228,22 +228,20 @@ public class CassandraMessageMapper implements 
MessageMapper {
     }
 
     private CompletableFuture<Stream<SimpleMailboxMessage>> 
expungeUidChunk(CassandraId mailboxId, Collection<MessageUid> uidChunk) {
-        return FluentFutureStream.of(uidChunk.stream()
-            .map(uid -> messageIdDAO.retrieve(mailboxId, uid)))
-            .flatMap(OptionalConverter::toStream)
+        return FluentFutureStream.ofOptionals(
+                uidChunk.stream().map(uid -> messageIdDAO.retrieve(mailboxId, 
uid)))
             .performOnAll(this::deleteUsingMailboxId)
-            .thenComposeOnAll(idWithMetadata -> 
retrieveMessagesAndDoMigrationIfNeeded(ImmutableList.of(idWithMetadata), 
FetchType.Metadata, Limit.unlimited()))
-            .flatMap(s -> s)
+            .thenFlatCompose(idWithMetadata -> 
retrieveMessagesAndDoMigrationIfNeeded(ImmutableList.of(idWithMetadata), 
FetchType.Metadata, Limit.unlimited()))
             .map(pair -> pair.getKey().toMailboxMessage(ImmutableList.of()))
             .completableFuture();
     }
 
     private CompletableFuture<Stream<Pair<MessageWithoutAttachment, 
Stream<MessageAttachmentRepresentation>>>> 
retrieveMessagesAndDoMigrationIfNeeded(
         List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, 
Limit limit) {
-        return messageDAOV2.retrieveMessages(messageIds, fetchType, limit)
-            .thenCompose(messageResults -> FluentFutureStream.of(messageResults
-                .map(v1ToV2Migration::moveFromV1toV2))
-                .completableFuture());
+
+        return FluentFutureStream.of(messageDAOV2.retrieveMessages(messageIds, 
fetchType, limit))
+            .thenComposeOnAll(v1ToV2Migration::moveFromV1toV2)
+            .completableFuture();
     }
 
     @Override
@@ -311,9 +309,8 @@ public class CassandraMessageMapper implements 
MessageMapper {
     }
 
     private FlagsUpdateStageResult retryUpdatesStage(CassandraId mailboxId, 
FlagsUpdateCalculator flagsUpdateCalculator, List<MessageUid> failed) {
-        Stream<ComposedMessageIdWithMetaData> idsFailed = 
FluentFutureStream.of(
+        Stream<ComposedMessageIdWithMetaData> idsFailed = 
FluentFutureStream.ofOptionals(
             failed.stream().map(uid -> messageIdDAO.retrieve(mailboxId, uid)))
-            .flatMap(OptionalConverter::toStream)
             .join();
 
         return runUpdateStage(mailboxId, idsFailed, flagsUpdateCalculator);


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to