JAMES-2630 Port CassandraMessageMapper to Reactor
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/1295a89b Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/1295a89b Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/1295a89b Branch: refs/heads/master Commit: 1295a89b3cf63106dadfa7d4e9c859a38c9618eb Parents: 2133f0b Author: Matthieu Baechler <matth...@apache.org> Authored: Sun Nov 25 16:06:06 2018 +0100 Committer: Matthieu Baechler <matth...@apache.org> Committed: Mon Jan 28 15:30:53 2019 +0100 ---------------------------------------------------------------------- .../cassandra/mail/CassandraMessageMapper.java | 190 ++++++++++--------- 1 file changed, 100 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/1295a89b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java index 59d9a1e..0133f05 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java @@ -26,7 +26,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.stream.Stream; import javax.mail.Flags; import javax.mail.Flags.Flag; @@ -51,9 +50,7 @@ import org.apache.james.mailbox.store.mail.MessageMapper; import org.apache.james.mailbox.store.mail.model.Mailbox; import org.apache.james.mailbox.store.mail.model.MailboxMessage; import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage; -import org.apache.james.util.FluentFutureStream; import org.apache.james.util.OptionalUtils; -import org.apache.james.util.streams.JamesCollectors; import org.apache.james.util.streams.Limit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,6 +58,8 @@ import org.slf4j.LoggerFactory; import com.github.fge.lambdas.Throwing; import com.github.steveash.guavate.Guavate; import com.google.common.collect.ImmutableList; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; public class CassandraMessageMapper implements MessageMapper { public static final MailboxCounters INITIAL_COUNTERS = MailboxCounters.builder() @@ -140,33 +139,34 @@ public class CassandraMessageMapper implements MessageMapper { @Override public void delete(Mailbox mailbox, MailboxMessage message) { deleteAsFuture(message) - .join(); + .block(); } - private CompletableFuture<Void> deleteAsFuture(MailboxMessage message) { + private Mono<Void> deleteAsFuture(MailboxMessage message) { ComposedMessageIdWithMetaData composedMessageIdWithMetaData = message.getComposedMessageIdWithMetaData(); return deleteUsingMailboxId(composedMessageIdWithMetaData); } - private CompletableFuture<Void> deleteUsingMailboxId(ComposedMessageIdWithMetaData composedMessageIdWithMetaData) { + private Mono<Void> deleteUsingMailboxId(ComposedMessageIdWithMetaData composedMessageIdWithMetaData) { ComposedMessageId composedMessageId = composedMessageIdWithMetaData.getComposedMessageId(); CassandraMessageId messageId = (CassandraMessageId) composedMessageId.getMessageId(); CassandraId mailboxId = (CassandraId) composedMessageId.getMailboxId(); MessageUid uid = composedMessageId.getUid(); - return CompletableFuture.allOf( - imapUidDAO.delete(messageId, mailboxId), - messageIdDAO.delete(mailboxId, uid) - ).thenCompose(voidValue -> indexTableHandler.updateIndexOnDelete(composedMessageIdWithMetaData, mailboxId)); + return Flux.merge( + Mono.fromCompletionStage(imapUidDAO.delete(messageId, mailboxId)), + Mono.fromCompletionStage(messageIdDAO.delete(mailboxId, uid))) + .concatWith(Mono.fromCompletionStage(indexTableHandler.updateIndexOnDelete(composedMessageIdWithMetaData, mailboxId))) + .last(); } @Override public Iterator<MailboxMessage> findInMailbox(Mailbox mailbox, MessageRange messageRange, FetchType ftype, int max) throws MailboxException { CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); return retrieveMessages(retrieveMessageIds(mailboxId, messageRange), ftype, Limit.from(max)) - .join() .map(SimpleMailboxMessage -> (MailboxMessage) SimpleMailboxMessage) - .sorted(Comparator.comparing(MailboxMessage::getUid)) + .sort(Comparator.comparing(MailboxMessage::getUid)) + .toIterable() .iterator(); } @@ -176,12 +176,13 @@ public class CassandraMessageMapper implements MessageMapper { .collect(Guavate.toImmutableList()); } - private CompletableFuture<Stream<SimpleMailboxMessage>> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) { - return messageDAO.retrieveMessages(messageIds, fetchType, limit) - .thenApply(steam -> steam + private Flux<SimpleMailboxMessage> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) { + return Mono.fromCompletionStage(messageDAO.retrieveMessages(messageIds, fetchType, limit)) + .map(stream -> stream .filter(CassandraMessageDAO.MessageResult::isFound) .map(CassandraMessageDAO.MessageResult::message)) - .thenCompose(stream -> attachmentLoader.addAttachmentToMessages(stream, fetchType)); + .flatMap(stream -> Mono.fromCompletionStage(attachmentLoader.addAttachmentToMessages(stream, fetchType))) + .flatMapMany(Flux::fromStream); } @Override @@ -204,32 +205,31 @@ public class CassandraMessageMapper implements MessageMapper { public Map<MessageUid, MessageMetaData> expungeMarkedForDeletionInMailbox(Mailbox mailbox, MessageRange messageRange) throws MailboxException { CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); - return deletedMessageDAO.retrieveDeletedMessage(mailboxId, messageRange) - .join() - .collect(JamesCollectors.chunker(cassandraConfiguration.getExpungeChunkSize())) - .map(uidChunk -> expungeUidChunk(mailboxId, uidChunk)) - .flatMap(CompletableFuture::join) - .collect(Guavate.toImmutableMap(MailboxMessage::getUid, MailboxMessage::metaData)); - } - - private CompletableFuture<Stream<SimpleMailboxMessage>> expungeUidChunk(CassandraId mailboxId, Collection<MessageUid> uidChunk) { - return FluentFutureStream.of( - uidChunk.stream().map(uid -> retrieveComposedId(mailboxId, uid)), - FluentFutureStream::unboxOptional) - .performOnAll(this::deleteUsingMailboxId) - .map(idWithMetadata -> FluentFutureStream.of( - messageDAO.retrieveMessages(ImmutableList.of(idWithMetadata), FetchType.Metadata, Limit.unlimited())), - FluentFutureStream::unboxFluentFuture) + return Mono.fromCompletionStage(deletedMessageDAO.retrieveDeletedMessage(mailboxId, messageRange)) + .flatMapMany(Flux::fromStream) + .buffer(cassandraConfiguration.getExpungeChunkSize()) + .flatMap(uidChunk -> expungeUidChunk(mailboxId, uidChunk)) + .collect(Guavate.<SimpleMailboxMessage, MessageUid, MessageMetaData>toImmutableMap(MailboxMessage::getUid, MailboxMessage::metaData)) + .block(); + } + + private Flux<SimpleMailboxMessage> expungeUidChunk(CassandraId mailboxId, Collection<MessageUid> uidChunk) { + return Flux.fromStream(uidChunk.stream()) + .flatMap(uid -> retrieveComposedId(mailboxId, uid)) + .doOnNext(this::deleteUsingMailboxId) + .flatMap(idWithMetadata -> + Mono.fromCompletionStage(messageDAO.retrieveMessages(ImmutableList.of(idWithMetadata), FetchType.Metadata, Limit.unlimited()))) + .flatMap(Flux::fromStream) .filter(CassandraMessageDAO.MessageResult::isFound) .map(CassandraMessageDAO.MessageResult::message) - .map(pair -> pair.getKey().toMailboxMessage(ImmutableList.of())) - .completableFuture(); + .map(pair -> pair.getKey().toMailboxMessage(ImmutableList.of())); } - private CompletableFuture<Optional<ComposedMessageIdWithMetaData>> retrieveComposedId(CassandraId mailboxId, MessageUid uid) { - return messageIdDAO.retrieve(mailboxId, uid) - .thenApply(optional -> OptionalUtils.executeIfEmpty(optional, - () -> LOGGER.warn("Could not retrieve message {} {}", mailboxId, uid))); + private Mono<ComposedMessageIdWithMetaData> retrieveComposedId(CassandraId mailboxId, MessageUid uid) { + return Mono.fromCompletionStage(messageIdDAO.retrieve(mailboxId, uid)) + .doOnNext(optional -> OptionalUtils.executeIfEmpty(optional, + () -> LOGGER.warn("Could not retrieve message {} {}", mailboxId, uid))) + .flatMap(Mono::justOrEmpty); } @Override @@ -237,7 +237,7 @@ public class CassandraMessageMapper implements MessageMapper { ComposedMessageIdWithMetaData composedMessageIdWithMetaData = original.getComposedMessageIdWithMetaData(); MessageMetaData messageMetaData = copy(destinationMailbox, original); - deleteUsingMailboxId(composedMessageIdWithMetaData).join(); + deleteUsingMailboxId(composedMessageIdWithMetaData).block(); return messageMetaData; } @@ -257,8 +257,8 @@ public class CassandraMessageMapper implements MessageMapper { CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); save(mailbox, addUidAndModseq(message, mailboxId)) - .thenCompose(voidValue -> indexTableHandler.updateIndexOnAdd(message, mailboxId)) - .join(); + .map(voidValue -> indexTableHandler.updateIndexOnAdd(message, mailboxId)) + .block(); return message.metaData(); } @@ -279,9 +279,10 @@ public class CassandraMessageMapper implements MessageMapper { public Iterator<UpdatedFlags> updateFlags(Mailbox mailbox, FlagsUpdateCalculator flagUpdateCalculator, MessageRange range) throws MailboxException { CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); - Stream<ComposedMessageIdWithMetaData> toBeUpdated = messageIdDAO.retrieveMessages(mailboxId, range).join(); + Flux<ComposedMessageIdWithMetaData> toBeUpdated = Mono.fromCompletionStage(messageIdDAO.retrieveMessages(mailboxId, range)) + .flatMapMany(Flux::fromStream); - FlagsUpdateStageResult firstResult = runUpdateStage(mailboxId, toBeUpdated, flagUpdateCalculator); + FlagsUpdateStageResult firstResult = runUpdateStage(mailboxId, toBeUpdated, flagUpdateCalculator).block(); FlagsUpdateStageResult finalResult = handleUpdatesStagedRetry(mailboxId, flagUpdateCalculator, firstResult); if (finalResult.containsFailedResults()) { LOGGER.error("Can not update following UIDs {} for mailbox {}", finalResult.getFailed(), mailboxId.asUuid()); @@ -294,50 +295,55 @@ public class CassandraMessageMapper implements MessageMapper { int retryCount = 0; while (retryCount < cassandraConfiguration.getFlagsUpdateMessageMaxRetry() && globalResult.containsFailedResults()) { retryCount++; - FlagsUpdateStageResult stageResult = retryUpdatesStage(mailboxId, flagUpdateCalculator, globalResult.getFailed()); + FlagsUpdateStageResult stageResult = retryUpdatesStage(mailboxId, flagUpdateCalculator, globalResult.getFailed()).block(); globalResult = globalResult.keepSucceded().merge(stageResult); } return globalResult; } - private FlagsUpdateStageResult retryUpdatesStage(CassandraId mailboxId, FlagsUpdateCalculator flagsUpdateCalculator, List<MessageUid> failed) { - Stream<ComposedMessageIdWithMetaData> idsFailed = FluentFutureStream.of( - failed.stream().map(uid -> messageIdDAO.retrieve(mailboxId, uid)), - FluentFutureStream::unboxOptional) - .join(); - - return runUpdateStage(mailboxId, idsFailed, flagsUpdateCalculator); + private Mono<FlagsUpdateStageResult> retryUpdatesStage(CassandraId mailboxId, FlagsUpdateCalculator flagsUpdateCalculator, List<MessageUid> failed) { + if (!failed.isEmpty()) { + Flux<ComposedMessageIdWithMetaData> toUpdate = Flux.fromIterable(failed) + .flatMap(uid -> + Mono.fromCompletionStage(messageIdDAO.retrieve(mailboxId, uid)) + .flatMap(Mono::justOrEmpty) + ); + return runUpdateStage(mailboxId, toUpdate, flagsUpdateCalculator); + } else { + return Mono.empty(); + } } - private FlagsUpdateStageResult runUpdateStage(CassandraId mailboxId, Stream<ComposedMessageIdWithMetaData> toBeUpdated, FlagsUpdateCalculator flagsUpdateCalculator) { - Long newModSeq = modSeqProvider.nextModSeq(mailboxId).join().orElseThrow(() -> new RuntimeException("ModSeq generation failed for mailbox " + mailboxId.asUuid())); - - return toBeUpdated.collect(JamesCollectors.chunker(cassandraConfiguration.getFlagsUpdateChunkSize())) - .map(uidChunk -> performUpdatesForChunk(mailboxId, flagsUpdateCalculator, newModSeq, uidChunk)) - .map(CompletableFuture::join) + private Mono<FlagsUpdateStageResult> runUpdateStage(CassandraId mailboxId, Flux<ComposedMessageIdWithMetaData> toBeUpdated, FlagsUpdateCalculator flagsUpdateCalculator) { + Mono<Long> newModSeq = computeNewModSeq(mailboxId); + return toBeUpdated + .buffer(cassandraConfiguration.getFlagsUpdateChunkSize()) + .flatMap(uidChunk -> newModSeq.flatMap(modSeq -> performUpdatesForChunk(mailboxId, flagsUpdateCalculator, modSeq, uidChunk))) .reduce(FlagsUpdateStageResult.none(), FlagsUpdateStageResult::merge); } - private CompletableFuture<FlagsUpdateStageResult> performUpdatesForChunk(CassandraId mailboxId, FlagsUpdateCalculator flagsUpdateCalculator, Long newModSeq, Collection<ComposedMessageIdWithMetaData> uidChunk) { - Stream<CompletableFuture<FlagsUpdateStageResult>> updateMetaDataFuture = - uidChunk.stream().map(oldMetadata -> tryFlagsUpdate(flagsUpdateCalculator, newModSeq, oldMetadata)); + private Mono<Long> computeNewModSeq(CassandraId mailboxId) { + return Mono.fromCompletionStage(modSeqProvider.nextModSeq(mailboxId)) + .map(value -> value.orElseThrow(() -> new RuntimeException("ModSeq generation failed for mailbox " + mailboxId.asUuid()))); + } - return FluentFutureStream.of(updateMetaDataFuture) + private Mono<FlagsUpdateStageResult> performUpdatesForChunk(CassandraId mailboxId, FlagsUpdateCalculator flagsUpdateCalculator, Long newModSeq, Collection<ComposedMessageIdWithMetaData> uidChunk) { + return Flux.fromIterable(uidChunk) + .flatMap(oldMetadata -> tryFlagsUpdate(flagsUpdateCalculator, newModSeq, oldMetadata)) .reduce(FlagsUpdateStageResult.none(), FlagsUpdateStageResult::merge) - .thenCompose(result -> updateIndexesForUpdatesResult(mailboxId, result)); + .flatMap(result -> updateIndexesForUpdatesResult(mailboxId, result)); } - private CompletableFuture<FlagsUpdateStageResult> updateIndexesForUpdatesResult(CassandraId mailboxId, FlagsUpdateStageResult result) { - return FluentFutureStream.of( - result.getSucceeded().stream() - .map(Throwing - .function((UpdatedFlags updatedFlags) -> indexTableHandler.updateIndexOnFlagsUpdate(mailboxId, updatedFlags)) - .fallbackTo(failedindex -> { - LOGGER.error("Could not update flag indexes for mailboxId {} UID {}. This will lead to inconsistencies across Cassandra tables", mailboxId, failedindex.getUid()); - return CompletableFuture.completedFuture(null); - }))) - .completableFuture() - .thenApply(any -> result); + private Mono<FlagsUpdateStageResult> updateIndexesForUpdatesResult(CassandraId mailboxId, FlagsUpdateStageResult result) { + return Flux.fromIterable(result.getSucceeded()) + .flatMap(Throwing + .function((UpdatedFlags updatedFlags) -> Mono.fromCompletionStage(indexTableHandler.updateIndexOnFlagsUpdate(mailboxId, updatedFlags))) + .fallbackTo(failedindex -> { + LOGGER.error("Could not update flag indexes for mailboxId {} UID {}. This will lead to inconsistencies across Cassandra tables", mailboxId, failedindex.getUid()); + return Mono.just(null); + })) + .collectList() + .map(any -> result); } @Override @@ -369,34 +375,36 @@ public class CassandraMessageMapper implements MessageMapper { CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); insertIds(addUidAndModseq(message, mailboxId), mailboxId) - .thenCompose(voidValue -> indexTableHandler.updateIndexOnAdd(message, mailboxId)) - .join(); + .map(voidValue -> indexTableHandler.updateIndexOnAdd(message, mailboxId)) + .block(); return message.metaData(); } - private CompletableFuture<Void> save(Mailbox mailbox, MailboxMessage message) throws MailboxException { + private Mono<Void> save(Mailbox mailbox, MailboxMessage message) throws MailboxException { CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); return messageDAO.save(message) - .thenCompose(aVoid -> insertIds(message, mailboxId)); + .flatMap(aVoid -> insertIds(message, mailboxId)); } - private CompletableFuture<Void> insertIds(MailboxMessage message, CassandraId mailboxId) { + private Mono<Void> insertIds(MailboxMessage message, CassandraId mailboxId) { ComposedMessageIdWithMetaData composedMessageIdWithMetaData = ComposedMessageIdWithMetaData.builder() .composedMessageId(new ComposedMessageId(mailboxId, message.getMessageId(), message.getUid())) .flags(message.createFlags()) .modSeq(message.getModSeq()) .build(); - return CompletableFuture.allOf(messageIdDAO.insert(composedMessageIdWithMetaData), - imapUidDAO.insert(composedMessageIdWithMetaData)); + return Flux.merge( + Mono.fromCompletionStage(messageIdDAO.insert(composedMessageIdWithMetaData)), + Mono.fromCompletionStage(imapUidDAO.insert(composedMessageIdWithMetaData))) + .last(); } - private CompletableFuture<FlagsUpdateStageResult> tryFlagsUpdate(FlagsUpdateCalculator flagUpdateCalculator, long newModSeq, ComposedMessageIdWithMetaData oldMetaData) { + private Mono<FlagsUpdateStageResult> tryFlagsUpdate(FlagsUpdateCalculator flagUpdateCalculator, long newModSeq, ComposedMessageIdWithMetaData oldMetaData) { Flags oldFlags = oldMetaData.getFlags(); Flags newFlags = flagUpdateCalculator.buildNewFlags(oldFlags); if (identicalFlags(oldFlags, newFlags)) { - return CompletableFuture.completedFuture(FlagsUpdateStageResult.success(UpdatedFlags.builder() + return Mono.just(FlagsUpdateStageResult.success(UpdatedFlags.builder() .uid(oldMetaData.getComposedMessageId().getUid()) .modSeq(oldMetaData.getModSeq()) .oldFlags(oldFlags) @@ -405,7 +413,7 @@ public class CassandraMessageMapper implements MessageMapper { } return updateFlags(oldMetaData, newFlags, newModSeq) - .thenApply(success -> { + .map(success -> { if (success) { return FlagsUpdateStageResult.success(UpdatedFlags.builder() .uid(oldMetaData.getComposedMessageId().getUid()) @@ -423,17 +431,19 @@ public class CassandraMessageMapper implements MessageMapper { return oldFlags.equals(newFlags); } - private CompletableFuture<Boolean> updateFlags(ComposedMessageIdWithMetaData oldMetadata, Flags newFlags, long newModSeq) { + private Mono<Boolean> updateFlags(ComposedMessageIdWithMetaData oldMetadata, Flags newFlags, long newModSeq) { ComposedMessageIdWithMetaData newMetadata = ComposedMessageIdWithMetaData.builder() .composedMessageId(oldMetadata.getComposedMessageId()) .modSeq(newModSeq) .flags(newFlags) .build(); - return imapUidDAO.updateMetadata(newMetadata, oldMetadata.getModSeq()) - .thenCompose(success -> Optional.of(success) - .filter(b -> b) - .map((Boolean any) -> messageIdDAO.updateMetadata(newMetadata) - .thenApply(v -> success)) - .orElse(CompletableFuture.completedFuture(success))); + return Mono.fromCompletionStage(imapUidDAO.updateMetadata(newMetadata, oldMetadata.getModSeq())) + .flatMap(success -> { + if (success) { + return Mono.fromCompletionStage(messageIdDAO.updateMetadata(newMetadata)) + .map(ignored -> true); + } else { + return Mono.just(false); + }}); } } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org