JAMES-2082 Try improving stream future redability
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/bd69ab99 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/bd69ab99 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/bd69ab99 Branch: refs/heads/master Commit: bd69ab9922b3e6d14dc34cc85a4b425bb3bfd3f8 Parents: 0ecc48a Author: benwa <btell...@linagora.com> Authored: Thu Jul 6 17:24:34 2017 +0700 Committer: Antoine Duprat <adup...@linagora.com> Committed: Mon Jul 10 14:23:56 2017 +0200 ---------------------------------------------------------------------- .../mail/CassandraMessageIdMapper.java | 26 +++++++++----------- .../cassandra/mail/CassandraMessageMapper.java | 21 +++++++--------- 2 files changed, 21 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/bd69ab99/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java index 95f0dd9..07c97c5 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java @@ -51,7 +51,6 @@ import org.apache.james.mailbox.store.mail.MessageMapper.FetchType; import org.apache.james.mailbox.store.mail.ModSeqProvider; import org.apache.james.mailbox.store.mail.model.MailboxMessage; import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage; -import org.apache.james.util.CompletableFutureUtil; import org.apache.james.util.FluentFutureStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -101,14 +100,14 @@ public class CassandraMessageIdMapper implements MessageIdMapper { } private Stream<SimpleMailboxMessage> findAsStream(List<MessageId> messageIds, FetchType fetchType) { - return CompletableFutureUtil.allOf( + return FluentFutureStream.ofNestedStreams( messageIds.stream() .map(messageId -> imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty()))) - .thenApply(stream -> stream.flatMap(Function.identity())) + .completableFuture() .thenApply(stream -> stream.collect(Guavate.toImmutableList())) .thenCompose(composedMessageIds -> retrieveMessages(fetchType, composedMessageIds)) .thenCompose(stream -> attachmentLoader.addAttachmentToMessages(stream, fetchType)) - .thenCompose(this::filterMessagesWIthExistingMailbox) + .thenCompose(this::filterMessagesWithExistingMailbox) .join() .sorted(Comparator.comparing(MailboxMessage::getUid)); } @@ -116,19 +115,17 @@ public class CassandraMessageIdMapper implements MessageIdMapper { private CompletableFuture<Stream<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>>> retrieveMessages(FetchType fetchType, ImmutableList<ComposedMessageIdWithMetaData> composedMessageIds) { - return messageDAOV2.retrieveMessages(composedMessageIds, fetchType, Limit.unlimited()) - .thenCompose(messageResults -> FluentFutureStream.of(messageResults - .map(v1ToV2Migration::moveFromV1toV2)) - .completableFuture()); + return FluentFutureStream.of(messageDAOV2.retrieveMessages(composedMessageIds, fetchType, Limit.unlimited())) + .thenComposeOnAll(v1ToV2Migration::moveFromV1toV2) + .completableFuture(); } - private CompletableFuture<Stream<SimpleMailboxMessage>> filterMessagesWIthExistingMailbox(Stream<SimpleMailboxMessage> stream) { - return FluentFutureStream.of(stream.map(this::mailboxExists)) - .flatMap(m -> m) + private CompletableFuture<Stream<SimpleMailboxMessage>> filterMessagesWithExistingMailbox(Stream<SimpleMailboxMessage> stream) { + return FluentFutureStream.ofOptionals(stream.map(this::keepMessageIfMailboxExists)) .completableFuture(); } - private CompletableFuture<Stream<SimpleMailboxMessage>> mailboxExists(SimpleMailboxMessage message) { + private CompletableFuture<Optional<SimpleMailboxMessage>> keepMessageIfMailboxExists(SimpleMailboxMessage message) { CassandraId cassandraId = (CassandraId) message.getMailboxId(); return mailboxDAO.retrieveMailbox(cassandraId) .thenApply(optional -> { @@ -136,9 +133,10 @@ public class CassandraMessageIdMapper implements MessageIdMapper { LOGGER.info("Mailbox {} have been deleted but message {} is still attached to it.", cassandraId, message.getMailboxId()); - return Stream.empty(); + return Optional.empty(); } - return Stream.of(message); + + return Optional.of(message); }); } http://git-wip-us.apache.org/repos/asf/james-project/blob/bd69ab99/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java index 1daa256..6335389 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java @@ -55,8 +55,8 @@ import org.apache.james.mailbox.store.mail.MessageMapper; import org.apache.james.mailbox.store.mail.model.Mailbox; import org.apache.james.mailbox.store.mail.model.MailboxMessage; import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage; +import org.apache.james.util.CompletableFutureUtil; import org.apache.james.util.FluentFutureStream; -import org.apache.james.util.OptionalConverter; import org.apache.james.util.streams.JamesCollectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -228,22 +228,20 @@ public class CassandraMessageMapper implements MessageMapper { } private CompletableFuture<Stream<SimpleMailboxMessage>> expungeUidChunk(CassandraId mailboxId, Collection<MessageUid> uidChunk) { - return FluentFutureStream.of(uidChunk.stream() - .map(uid -> messageIdDAO.retrieve(mailboxId, uid))) - .flatMap(OptionalConverter::toStream) + return FluentFutureStream.ofOptionals( + uidChunk.stream().map(uid -> messageIdDAO.retrieve(mailboxId, uid))) .performOnAll(this::deleteUsingMailboxId) - .thenComposeOnAll(idWithMetadata -> retrieveMessagesAndDoMigrationIfNeeded(ImmutableList.of(idWithMetadata), FetchType.Metadata, Limit.unlimited())) - .flatMap(s -> s) + .thenFlatCompose(idWithMetadata -> retrieveMessagesAndDoMigrationIfNeeded(ImmutableList.of(idWithMetadata), FetchType.Metadata, Limit.unlimited())) .map(pair -> pair.getKey().toMailboxMessage(ImmutableList.of())) .completableFuture(); } private CompletableFuture<Stream<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>>> retrieveMessagesAndDoMigrationIfNeeded( List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) { - return messageDAOV2.retrieveMessages(messageIds, fetchType, limit) - .thenCompose(messageResults -> FluentFutureStream.of(messageResults - .map(v1ToV2Migration::moveFromV1toV2)) - .completableFuture()); + + return FluentFutureStream.of(messageDAOV2.retrieveMessages(messageIds, fetchType, limit)) + .thenComposeOnAll(v1ToV2Migration::moveFromV1toV2) + .completableFuture(); } @Override @@ -311,9 +309,8 @@ public class CassandraMessageMapper implements MessageMapper { } private FlagsUpdateStageResult retryUpdatesStage(CassandraId mailboxId, FlagsUpdateCalculator flagsUpdateCalculator, List<MessageUid> failed) { - Stream<ComposedMessageIdWithMetaData> idsFailed = FluentFutureStream.of( + Stream<ComposedMessageIdWithMetaData> idsFailed = FluentFutureStream.ofOptionals( failed.stream().map(uid -> messageIdDAO.retrieve(mailboxId, uid))) - .flatMap(OptionalConverter::toStream) .join(); return runUpdateStage(mailboxId, idsFailed, flagsUpdateCalculator); --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org