JAMES-2630 Migrate CassandraAsyncExecutor.executeReturnExists consumers to Reactor contd
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/06ee1e43 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/06ee1e43 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/06ee1e43 Branch: refs/heads/master Commit: 06ee1e430335693a13ee7682c98ed093e17bcd2a Parents: 1295a89 Author: Gautier DI FOLCO <gdifo...@linagora.com> Authored: Thu Dec 13 13:57:28 2018 +0100 Committer: Matthieu Baechler <matth...@apache.org> Committed: Mon Jan 28 15:31:00 2019 +0100 ---------------------------------------------------------------------- .../cassandra/utils/CassandraAsyncExecutor.java | 2 +- .../cassandra/utils/CassandraUtils.java | 5 + .../mail/CassandraApplicableFlagDAO.java | 16 +- .../mail/CassandraDeletedMessageDAO.java | 39 ++- .../cassandra/mail/CassandraFirstUnseenDAO.java | 18 +- .../mail/CassandraIndexTableHandler.java | 91 +++--- .../mail/CassandraMailboxCounterDAO.java | 42 ++- .../mail/CassandraMailboxRecentsDAO.java | 23 +- .../mail/CassandraMessageIdMapper.java | 13 +- .../cassandra/mail/CassandraMessageMapper.java | 89 +++--- .../mail/CassandraApplicableFlagDAOTest.java | 46 ++-- .../mail/CassandraDeletedMessageDAOTest.java | 86 +++--- .../mail/CassandraFirstUnseenDAOTest.java | 66 ++--- .../mail/CassandraIndexTableHandlerTest.java | 274 +++++++++---------- .../mail/CassandraMailboxCounterDAOTest.java | 90 +++--- .../mail/CassandraMailboxRecentDAOTest.java | 74 +++-- .../routes/CassandraMailboxMergingRoutes.java | 2 +- 17 files changed, 486 insertions(+), 490 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/06ee1e43/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java ---------------------------------------------------------------------- diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java index 61bd6f9..899635e 100644 --- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java +++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java @@ -88,7 +88,7 @@ public class CassandraAsyncExecutor { .map(resultSet -> Optional.ofNullable(resultSet.one())); } - public Mono<Boolean> executeReturnExistsReactor(Statement statement) { + public Mono<Boolean> executeReturnExists(Statement statement) { return executeSingleRowReactor(statement) .hasElement(); } http://git-wip-us.apache.org/repos/asf/james-project/blob/06ee1e43/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraUtils.java ---------------------------------------------------------------------- diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraUtils.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraUtils.java index fa749e6..6732966 100644 --- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraUtils.java +++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraUtils.java @@ -28,6 +28,7 @@ import org.apache.james.backends.cassandra.init.configuration.CassandraConfigura import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; +import reactor.core.publisher.Flux; public class CassandraUtils { @@ -40,6 +41,10 @@ public class CassandraUtils { this.cassandraConfiguration = cassandraConfiguration; } + public Flux<Row> convertToFlux(ResultSet resultSet) { + return Flux.fromIterable(resultSet); + } + public Stream<Row> convertToStream(ResultSet resultSet) { return StreamSupport.stream(resultSet.spliterator(), true) .peek(row -> ensureFetchedNextPage(resultSet)); http://git-wip-us.apache.org/repos/asf/james-project/blob/06ee1e43/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraApplicableFlagDAO.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraApplicableFlagDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraApplicableFlagDAO.java index eed9857..4820e83 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraApplicableFlagDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraApplicableFlagDAO.java @@ -29,9 +29,7 @@ import static org.apache.james.mailbox.cassandra.table.CassandraApplicableFlagTa import static org.apache.james.mailbox.cassandra.table.CassandraApplicableFlagTable.TABLE_NAME; import static org.apache.james.mailbox.cassandra.table.Flag.USER_FLAGS; -import java.util.Optional; import java.util.Set; -import java.util.concurrent.CompletableFuture; import javax.inject.Inject; import javax.mail.Flags; @@ -43,6 +41,7 @@ import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Session; import com.datastax.driver.core.querybuilder.Update; import com.datastax.driver.core.querybuilder.Update.Assignments; +import reactor.core.publisher.Mono; public class CassandraApplicableFlagDAO { @@ -61,19 +60,18 @@ public class CassandraApplicableFlagDAO { .where(eq(MAILBOX_ID, bindMarker(MAILBOX_ID)))); } - public CompletableFuture<Optional<Flags>> retrieveApplicableFlag(CassandraId mailboxId) { - return cassandraAsyncExecutor.executeSingleRow( + public Mono<Flags> retrieveApplicableFlag(CassandraId mailboxId) { + return cassandraAsyncExecutor.executeSingleRowReactor( select.bind() .setUUID(MAILBOX_ID, mailboxId.asUuid())) - .thenApply(rowOptional -> - rowOptional.map(row -> new FlagsExtractor(row).getApplicableFlags())); + .map(row -> new FlagsExtractor(row).getApplicableFlags()); } - public CompletableFuture<Void> updateApplicableFlags(CassandraId cassandraId, Set<String> toBeAdded) { + public Mono<Void> updateApplicableFlags(CassandraId cassandraId, Set<String> toBeAdded) { if (toBeAdded.isEmpty()) { - return CompletableFuture.completedFuture(null); + return Mono.empty(); } - return cassandraAsyncExecutor.executeVoid(updateQuery(cassandraId, toBeAdded)); + return cassandraAsyncExecutor.executeVoidReactor(updateQuery(cassandraId, toBeAdded)); } private Update.Where updateQuery(CassandraId cassandraId, Set<String> userFlags) { http://git-wip-us.apache.org/repos/asf/james-project/blob/06ee1e43/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java index 4c6ebd9..4b5da30 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java @@ -30,9 +30,6 @@ import static org.apache.james.mailbox.cassandra.table.CassandraDeletedMessageTa import static org.apache.james.mailbox.cassandra.table.CassandraDeletedMessageTable.TABLE_NAME; import static org.apache.james.mailbox.cassandra.table.CassandraDeletedMessageTable.UID; -import java.util.concurrent.CompletableFuture; -import java.util.stream.Stream; - import javax.inject.Inject; import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; @@ -45,6 +42,8 @@ import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Session; import com.google.common.annotations.VisibleForTesting; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; public class CassandraDeletedMessageDAO { private static final String UID_TO = "uid_to"; @@ -118,25 +117,25 @@ public class CassandraDeletedMessageDAO { .value(UID, bindMarker(UID))); } - public CompletableFuture<Void> addDeleted(CassandraId cassandraId, MessageUid uid) { - return cassandraAsyncExecutor.executeVoid( + public Mono<Void> addDeleted(CassandraId cassandraId, MessageUid uid) { + return cassandraAsyncExecutor.executeVoidReactor( addStatement.bind() .setUUID(MAILBOX_ID, cassandraId.asUuid()) .setLong(UID, uid.asLong())); } - public CompletableFuture<Void> removeDeleted(CassandraId cassandraId, MessageUid uid) { - return cassandraAsyncExecutor.executeVoid(deleteStatement.bind() + public Mono<Void> removeDeleted(CassandraId cassandraId, MessageUid uid) { + return cassandraAsyncExecutor.executeVoidReactor(deleteStatement.bind() .setUUID(MAILBOX_ID, cassandraId.asUuid()) .setLong(UID, uid.asLong())); } - public CompletableFuture<Stream<MessageUid>> retrieveDeletedMessage(CassandraId cassandraId, MessageRange range) { + public Flux<MessageUid> retrieveDeletedMessage(CassandraId cassandraId, MessageRange range) { return retrieveResultSetOfDeletedMessage(cassandraId, range) - .thenApply(this::resultSetToStream); + .flatMapMany(this::resultSetToFlux); } - private CompletableFuture<ResultSet> retrieveResultSetOfDeletedMessage(CassandraId cassandraId, MessageRange range) { + private Mono<ResultSet> retrieveResultSetOfDeletedMessage(CassandraId cassandraId, MessageRange range) { switch (range.getType()) { case ALL: return retrieveAllDeleted(cassandraId); @@ -151,35 +150,35 @@ public class CassandraDeletedMessageDAO { throw new UnsupportedOperationException(); } - private Stream<MessageUid> resultSetToStream(ResultSet resultSet) { - return cassandraUtils.convertToStream(resultSet) + private Flux<MessageUid> resultSetToFlux(ResultSet resultSet) { + return cassandraUtils.convertToFlux(resultSet) .map(row -> MessageUid.of(row.getLong(UID))); } - private CompletableFuture<ResultSet> retrieveAllDeleted(CassandraId cassandraId) { - return cassandraAsyncExecutor.execute( + private Mono<ResultSet> retrieveAllDeleted(CassandraId cassandraId) { + return cassandraAsyncExecutor.executeReactor( selectAllUidStatement.bind() .setUUID(MAILBOX_ID, cassandraId.asUuid())); } - private CompletableFuture<ResultSet> retrieveOneDeleted(CassandraId cassandraId, MessageUid uid) { - return cassandraAsyncExecutor.execute( + private Mono<ResultSet> retrieveOneDeleted(CassandraId cassandraId, MessageUid uid) { + return cassandraAsyncExecutor.executeReactor( selectOneUidStatement.bind() .setUUID(MAILBOX_ID, cassandraId.asUuid()) .setLong(UID, uid.asLong())); } - private CompletableFuture<ResultSet> retrieveDeletedBetween(CassandraId cassandraId, MessageUid from, MessageUid to) { - return cassandraAsyncExecutor.execute( + private Mono<ResultSet> retrieveDeletedBetween(CassandraId cassandraId, MessageUid from, MessageUid to) { + return cassandraAsyncExecutor.executeReactor( selectBetweenUidStatement.bind() .setUUID(MAILBOX_ID, cassandraId.asUuid()) .setLong(UID_FROM, from.asLong()) .setLong(UID_TO, to.asLong())); } - private CompletableFuture<ResultSet> retrieveDeletedAfter(CassandraId cassandraId, MessageUid from) { - return cassandraAsyncExecutor.execute( + private Mono<ResultSet> retrieveDeletedAfter(CassandraId cassandraId, MessageUid from) { + return cassandraAsyncExecutor.executeReactor( selectFromUidStatement.bind() .setUUID(MAILBOX_ID, cassandraId.asUuid()) .setLong(UID_FROM, from.asLong())); http://git-wip-us.apache.org/repos/asf/james-project/blob/06ee1e43/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAO.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAO.java index e710e69..50ec510 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAO.java @@ -29,9 +29,6 @@ import static org.apache.james.mailbox.cassandra.table.CassandraFirstUnseenTable import static org.apache.james.mailbox.cassandra.table.CassandraFirstUnseenTable.TABLE_NAME; import static org.apache.james.mailbox.cassandra.table.CassandraFirstUnseenTable.UID; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; - import javax.inject.Inject; import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; @@ -40,6 +37,7 @@ import org.apache.james.mailbox.cassandra.ids.CassandraId; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Session; +import reactor.core.publisher.Mono; public class CassandraFirstUnseenDAO { private final CassandraAsyncExecutor cassandraAsyncExecutor; @@ -76,24 +74,24 @@ public class CassandraFirstUnseenDAO { .value(UID, bindMarker(UID))); } - public CompletableFuture<Void> addUnread(CassandraId cassandraId, MessageUid uid) { - return cassandraAsyncExecutor.executeVoid( + public Mono<Void> addUnread(CassandraId cassandraId, MessageUid uid) { + return cassandraAsyncExecutor.executeVoidReactor( addStatement.bind() .setUUID(MAILBOX_ID, cassandraId.asUuid()) .setLong(UID, uid.asLong())); } - public CompletableFuture<Void> removeUnread(CassandraId cassandraId, MessageUid uid) { - return cassandraAsyncExecutor.executeVoid(deleteStatement.bind() + public Mono<Void> removeUnread(CassandraId cassandraId, MessageUid uid) { + return cassandraAsyncExecutor.executeVoidReactor(deleteStatement.bind() .setUUID(MAILBOX_ID, cassandraId.asUuid()) .setLong(UID, uid.asLong())); } - public CompletableFuture<Optional<MessageUid>> retrieveFirstUnread(CassandraId cassandraId) { - return cassandraAsyncExecutor.executeSingleRow( + public Mono<MessageUid> retrieveFirstUnread(CassandraId cassandraId) { + return cassandraAsyncExecutor.executeSingleRowReactor( readStatement.bind() .setUUID(MAILBOX_ID, cassandraId.asUuid())) - .thenApply(optional -> optional.map(row -> MessageUid.of(row.getLong(UID)))); + .map(row -> MessageUid.of(row.getLong(UID))); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/06ee1e43/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java index 3f1485e..6ca3f46 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java @@ -19,8 +19,6 @@ package org.apache.james.mailbox.cassandra.mail; -import java.util.concurrent.CompletableFuture; - import javax.inject.Inject; import javax.mail.Flags; @@ -31,6 +29,8 @@ import org.apache.james.mailbox.model.UpdatedFlags; import org.apache.james.mailbox.store.mail.model.MailboxMessage; import com.google.common.collect.ImmutableSet; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; public class CassandraIndexTableHandler { @@ -53,116 +53,121 @@ public class CassandraIndexTableHandler { this.deletedMessageDAO = deletedMessageDAO; } - public CompletableFuture<Void> updateIndexOnDelete(ComposedMessageIdWithMetaData composedMessageIdWithMetaData, CassandraId mailboxId) { + public Mono<Void> updateIndexOnDelete(ComposedMessageIdWithMetaData composedMessageIdWithMetaData, CassandraId mailboxId) { MessageUid uid = composedMessageIdWithMetaData.getComposedMessageId().getUid(); - return CompletableFuture.allOf( - updateFirstUnseenOnDelete(mailboxId, composedMessageIdWithMetaData.getFlags(), composedMessageIdWithMetaData.getComposedMessageId().getUid()), - mailboxRecentDAO.removeFromRecent(mailboxId, composedMessageIdWithMetaData.getComposedMessageId().getUid()), - mailboxCounterDAO.decrementCount(mailboxId), - deletedMessageDAO.removeDeleted(mailboxId, uid), - decrementUnseenOnDelete(mailboxId, composedMessageIdWithMetaData.getFlags())); + + return Flux.merge( + updateFirstUnseenOnDelete(mailboxId, composedMessageIdWithMetaData.getFlags(), composedMessageIdWithMetaData.getComposedMessageId().getUid()), + mailboxRecentDAO.removeFromRecent(mailboxId, composedMessageIdWithMetaData.getComposedMessageId().getUid()), + mailboxCounterDAO.decrementCount(mailboxId), + deletedMessageDAO.removeDeleted(mailboxId, uid), + decrementUnseenOnDelete(mailboxId, composedMessageIdWithMetaData.getFlags())) + .then(); } - public CompletableFuture<Void> updateIndexOnAdd(MailboxMessage message, CassandraId mailboxId) { + public Mono<Void> updateIndexOnAdd(MailboxMessage message, CassandraId mailboxId) { Flags flags = message.createFlags(); - return CompletableFuture.allOf( - checkDeletedOnAdd(mailboxId, message.createFlags(), message.getUid()), - updateFirstUnseenOnAdd(mailboxId, message.createFlags(), message.getUid()), - addRecentOnSave(mailboxId, message), - incrementUnseenOnSave(mailboxId, flags), - mailboxCounterDAO.incrementCount(mailboxId), - applicableFlagDAO.updateApplicableFlags(mailboxId, ImmutableSet.copyOf(flags.getUserFlags()))); + return Flux.merge( + checkDeletedOnAdd(mailboxId, message.createFlags(), message.getUid()), + updateFirstUnseenOnAdd(mailboxId, message.createFlags(), message.getUid()), + addRecentOnSave(mailboxId, message), + incrementUnseenOnSave(mailboxId, flags), + mailboxCounterDAO.incrementCount(mailboxId), + applicableFlagDAO.updateApplicableFlags(mailboxId, ImmutableSet.copyOf(flags.getUserFlags()))) + .then(); } - public CompletableFuture<Void> updateIndexOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags updatedFlags) { - return CompletableFuture.allOf(manageUnseenMessageCountsOnFlagsUpdate(mailboxId, updatedFlags), - manageRecentOnFlagsUpdate(mailboxId, updatedFlags), - updateFirstUnseenOnFlagsUpdate(mailboxId, updatedFlags), - applicableFlagDAO.updateApplicableFlags(mailboxId, ImmutableSet.copyOf(updatedFlags.userFlagIterator())), - updateDeletedOnFlagsUpdate(mailboxId, updatedFlags)); + public Mono<Void> updateIndexOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags updatedFlags) { + return Flux.merge( + manageUnseenMessageCountsOnFlagsUpdate(mailboxId, updatedFlags), + manageRecentOnFlagsUpdate(mailboxId, updatedFlags), + updateFirstUnseenOnFlagsUpdate(mailboxId, updatedFlags), + applicableFlagDAO.updateApplicableFlags(mailboxId, ImmutableSet.copyOf(updatedFlags.userFlagIterator())), + updateDeletedOnFlagsUpdate(mailboxId, updatedFlags)) + .then(); } - private CompletableFuture<Void> updateDeletedOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags updatedFlags) { + private Mono<Void> updateDeletedOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags updatedFlags) { if (updatedFlags.isModifiedToSet(Flags.Flag.DELETED)) { return deletedMessageDAO.addDeleted(mailboxId, updatedFlags.getUid()); } else if (updatedFlags.isModifiedToUnset(Flags.Flag.DELETED)) { return deletedMessageDAO.removeDeleted(mailboxId, updatedFlags.getUid()); } else { - return CompletableFuture.completedFuture(null); + return Mono.empty(); } } - private CompletableFuture<Void> decrementUnseenOnDelete(CassandraId mailboxId, Flags flags) { + private Mono<Void> decrementUnseenOnDelete(CassandraId mailboxId, Flags flags) { if (flags.contains(Flags.Flag.SEEN)) { - return CompletableFuture.completedFuture(null); + return Mono.empty(); } return mailboxCounterDAO.decrementUnseen(mailboxId); } - private CompletableFuture<Void> incrementUnseenOnSave(CassandraId mailboxId, Flags flags) { + private Mono<Void> incrementUnseenOnSave(CassandraId mailboxId, Flags flags) { if (flags.contains(Flags.Flag.SEEN)) { - return CompletableFuture.completedFuture(null); + return Mono.empty(); } return mailboxCounterDAO.incrementUnseen(mailboxId); } - private CompletableFuture<Void> addRecentOnSave(CassandraId mailboxId, MailboxMessage message) { + private Mono<Void> addRecentOnSave(CassandraId mailboxId, MailboxMessage message) { if (message.createFlags().contains(Flags.Flag.RECENT)) { return mailboxRecentDAO.addToRecent(mailboxId, message.getUid()); } - return CompletableFuture.completedFuture(null); + return Mono.empty(); } - private CompletableFuture<Void> manageUnseenMessageCountsOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags updatedFlags) { + private Mono<Void> manageUnseenMessageCountsOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags updatedFlags) { if (updatedFlags.isModifiedToUnset(Flags.Flag.SEEN)) { return mailboxCounterDAO.incrementUnseen(mailboxId); } if (updatedFlags.isModifiedToSet(Flags.Flag.SEEN)) { return mailboxCounterDAO.decrementUnseen(mailboxId); } - return CompletableFuture.completedFuture(null); + return Mono.empty(); } - private CompletableFuture<Void> manageRecentOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags updatedFlags) { + private Mono<Void> manageRecentOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags updatedFlags) { if (updatedFlags.isModifiedToUnset(Flags.Flag.RECENT)) { return mailboxRecentDAO.removeFromRecent(mailboxId, updatedFlags.getUid()); } if (updatedFlags.isModifiedToSet(Flags.Flag.RECENT)) { return mailboxRecentDAO.addToRecent(mailboxId, updatedFlags.getUid()); } - return CompletableFuture.completedFuture(null); + return Mono.empty(); } - private CompletableFuture<Void> updateFirstUnseenOnAdd(CassandraId mailboxId, Flags flags, MessageUid uid) { + private Mono<Void> updateFirstUnseenOnAdd(CassandraId mailboxId, Flags flags, MessageUid uid) { if (flags.contains(Flags.Flag.SEEN)) { - return CompletableFuture.completedFuture(null); + return Mono.empty(); } return firstUnseenDAO.addUnread(mailboxId, uid); } - private CompletableFuture<Void> checkDeletedOnAdd(CassandraId mailboxId, Flags flags, MessageUid uid) { + private Mono<Void> checkDeletedOnAdd(CassandraId mailboxId, Flags flags, MessageUid uid) { if (flags.contains(Flags.Flag.DELETED)) { return deletedMessageDAO.addDeleted(mailboxId, uid); } - return CompletableFuture.completedFuture(null); + return Mono.empty(); } - private CompletableFuture<Void> updateFirstUnseenOnDelete(CassandraId mailboxId, Flags flags, MessageUid uid) { + private Mono<Void> updateFirstUnseenOnDelete(CassandraId mailboxId, Flags flags, MessageUid uid) { if (flags.contains(Flags.Flag.SEEN)) { - return CompletableFuture.completedFuture(null); + return Mono.empty(); } return firstUnseenDAO.removeUnread(mailboxId, uid); } - private CompletableFuture<Void> updateFirstUnseenOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags updatedFlags) { + private Mono<Void> updateFirstUnseenOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags updatedFlags) { if (updatedFlags.isModifiedToUnset(Flags.Flag.SEEN)) { return firstUnseenDAO.addUnread(mailboxId, updatedFlags.getUid()); } if (updatedFlags.isModifiedToSet(Flags.Flag.SEEN)) { return firstUnseenDAO.removeUnread(mailboxId, updatedFlags.getUid()); } - return CompletableFuture.completedFuture(null); + return Mono.empty(); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/06ee1e43/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxCounterDAO.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxCounterDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxCounterDAO.java index f2e2fce..5f2dec0 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxCounterDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxCounterDAO.java @@ -26,9 +26,6 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.incr; import static com.datastax.driver.core.querybuilder.QueryBuilder.select; import static com.datastax.driver.core.querybuilder.QueryBuilder.update; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; - import javax.inject.Inject; import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; @@ -42,6 +39,7 @@ import com.datastax.driver.core.BoundStatement; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Session; import com.datastax.driver.core.querybuilder.Assignment; +import reactor.core.publisher.Mono; public class CassandraMailboxCounterDAO { @@ -76,48 +74,48 @@ public class CassandraMailboxCounterDAO { .where(eq(CassandraMailboxCountersTable.MAILBOX_ID, bindMarker(CassandraMailboxCountersTable.MAILBOX_ID)))); } - public CompletableFuture<Optional<MailboxCounters>> retrieveMailboxCounters(Mailbox mailbox) throws MailboxException { + public Mono<MailboxCounters> retrieveMailboxCounters(Mailbox mailbox) throws MailboxException { CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); - return cassandraAsyncExecutor.executeSingleRow(bindWithMailbox(mailboxId, readStatement)) - .thenApply(optional -> optional.map(row -> MailboxCounters.builder() + return cassandraAsyncExecutor.executeSingleRowReactor(bindWithMailbox(mailboxId, readStatement)) + .map(row -> MailboxCounters.builder() .count(row.getLong(CassandraMailboxCountersTable.COUNT)) .unseen(row.getLong(CassandraMailboxCountersTable.UNSEEN)) - .build())); + .build()); } - public CompletableFuture<Optional<Long>> countMessagesInMailbox(Mailbox mailbox) throws MailboxException { + public Mono<Long> countMessagesInMailbox(Mailbox mailbox) throws MailboxException { CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); return countMessagesInMailbox(mailboxId); } - public CompletableFuture<Optional<Long>> countMessagesInMailbox(CassandraId cassandraId) { - return cassandraAsyncExecutor.executeSingleRow(bindWithMailbox(cassandraId, readStatement)) - .thenApply(optional -> optional.map(row -> row.getLong(CassandraMailboxCountersTable.COUNT))); + public Mono<Long> countMessagesInMailbox(CassandraId cassandraId) { + return cassandraAsyncExecutor.executeSingleRowReactor(bindWithMailbox(cassandraId, readStatement)) + .map(row -> row.getLong(CassandraMailboxCountersTable.COUNT)); } - public CompletableFuture<Optional<Long>> countUnseenMessagesInMailbox(Mailbox mailbox) throws MailboxException { + public Mono<Long> countUnseenMessagesInMailbox(Mailbox mailbox) throws MailboxException { CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); - return cassandraAsyncExecutor.executeSingleRow(bindWithMailbox(mailboxId, readStatement)) - .thenApply(optional -> optional.map(row -> row.getLong(CassandraMailboxCountersTable.UNSEEN))); + return cassandraAsyncExecutor.executeSingleRowReactor(bindWithMailbox(mailboxId, readStatement)) + .map(row -> row.getLong(CassandraMailboxCountersTable.UNSEEN)); } - public CompletableFuture<Void> decrementCount(CassandraId mailboxId) { - return cassandraAsyncExecutor.executeVoid(bindWithMailbox(mailboxId, decrementMessageCountStatement)); + public Mono<Void> decrementCount(CassandraId mailboxId) { + return cassandraAsyncExecutor.executeVoidReactor(bindWithMailbox(mailboxId, decrementMessageCountStatement)); } - public CompletableFuture<Void> incrementCount(CassandraId mailboxId) { - return cassandraAsyncExecutor.executeVoid(bindWithMailbox(mailboxId, incrementMessageCountStatement)); + public Mono<Void> incrementCount(CassandraId mailboxId) { + return cassandraAsyncExecutor.executeVoidReactor(bindWithMailbox(mailboxId, incrementMessageCountStatement)); } - public CompletableFuture<Void> decrementUnseen(CassandraId mailboxId) { - return cassandraAsyncExecutor.executeVoid(bindWithMailbox(mailboxId, decrementUnseenCountStatement)); + public Mono<Void> decrementUnseen(CassandraId mailboxId) { + return cassandraAsyncExecutor.executeVoidReactor(bindWithMailbox(mailboxId, decrementUnseenCountStatement)); } - public CompletableFuture<Void> incrementUnseen(CassandraId mailboxId) { - return cassandraAsyncExecutor.executeVoid(bindWithMailbox(mailboxId, incrementUnseenCountStatement)); + public Mono<Void> incrementUnseen(CassandraId mailboxId) { + return cassandraAsyncExecutor.executeVoidReactor(bindWithMailbox(mailboxId, incrementUnseenCountStatement)); } private BoundStatement bindWithMailbox(CassandraId mailboxId, PreparedStatement statement) { http://git-wip-us.apache.org/repos/asf/james-project/blob/06ee1e43/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java index 2018e0b..9a521d0 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java @@ -25,9 +25,6 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; import static com.datastax.driver.core.querybuilder.QueryBuilder.select; -import java.util.concurrent.CompletableFuture; -import java.util.stream.Stream; - import javax.inject.Inject; import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; @@ -40,6 +37,8 @@ import com.datastax.driver.core.BoundStatement; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Session; import com.google.common.annotations.VisibleForTesting; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; public class CassandraMailboxRecentsDAO { @@ -85,11 +84,11 @@ public class CassandraMailboxRecentsDAO { .value(CassandraMailboxRecentsTable.RECENT_MESSAGE_UID, bindMarker(CassandraMailboxRecentsTable.RECENT_MESSAGE_UID))); } - public CompletableFuture<Stream<MessageUid>> getRecentMessageUidsInMailbox(CassandraId mailboxId) { - return cassandraAsyncExecutor.execute(bindWithMailbox(mailboxId, readStatement)) - .thenApply(cassandraUtils::convertToStream) - .thenApply(stream -> stream.map(row -> row.getLong(CassandraMailboxRecentsTable.RECENT_MESSAGE_UID))) - .thenApply(stream -> stream.map(MessageUid::of)); + public Flux<MessageUid> getRecentMessageUidsInMailbox(CassandraId mailboxId) { + return cassandraAsyncExecutor.executeReactor(bindWithMailbox(mailboxId, readStatement)) + .flatMapMany(cassandraUtils::convertToFlux) + .map(row -> row.getLong(CassandraMailboxRecentsTable.RECENT_MESSAGE_UID)) + .map(MessageUid::of); } private BoundStatement bindWithMailbox(CassandraId mailboxId, PreparedStatement statement) { @@ -97,14 +96,14 @@ public class CassandraMailboxRecentsDAO { .setUUID(CassandraMailboxRecentsTable.MAILBOX_ID, mailboxId.asUuid()); } - public CompletableFuture<Void> removeFromRecent(CassandraId mailboxId, MessageUid messageUid) { - return cassandraAsyncExecutor.executeVoid(deleteStatement.bind() + public Mono<Void> removeFromRecent(CassandraId mailboxId, MessageUid messageUid) { + return cassandraAsyncExecutor.executeVoidReactor(deleteStatement.bind() .setUUID(CassandraMailboxRecentsTable.MAILBOX_ID, mailboxId.asUuid()) .setLong(CassandraMailboxRecentsTable.RECENT_MESSAGE_UID, messageUid.asLong())); } - public CompletableFuture<Void> addToRecent(CassandraId mailboxId, MessageUid messageUid) { - return cassandraAsyncExecutor.executeVoid(addStatement.bind() + public Mono<Void> addToRecent(CassandraId mailboxId, MessageUid messageUid) { + return cassandraAsyncExecutor.executeVoidReactor(addStatement.bind() .setUUID(CassandraMailboxRecentsTable.MAILBOX_ID, mailboxId.asUuid()) .setLong(CassandraMailboxRecentsTable.RECENT_MESSAGE_UID, messageUid.asLong())); } http://git-wip-us.apache.org/repos/asf/james-project/blob/06ee1e43/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java index 2d9a154..880ac16 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java @@ -57,6 +57,7 @@ import org.slf4j.LoggerFactory; import com.github.steveash.guavate.Guavate; import com.google.common.collect.Multimap; +import reactor.core.publisher.Mono; public class CassandraMessageIdMapper implements MessageIdMapper { private static final Logger LOGGER = LoggerFactory.getLogger(CassandraMessageIdMapper.class); @@ -150,7 +151,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper { .thenCompose(voidValue -> CompletableFuture.allOf( imapUidDAO.insert(composedMessageIdWithMetaData), messageIdDAO.insert(composedMessageIdWithMetaData))) - .thenCompose(voidValue -> indexTableHandler.updateIndexOnAdd(mailboxMessage, mailboxId)) + .thenCompose(voidValue -> indexTableHandler.updateIndexOnAdd(mailboxMessage, mailboxId).toFuture()) .join(); } @@ -162,7 +163,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper { CompletableFuture.allOf( imapUidDAO.insert(composedMessageIdWithMetaData), messageIdDAO.insert(composedMessageIdWithMetaData)) - .thenCompose(voidValue -> indexTableHandler.updateIndexOnAdd(mailboxMessage, mailboxId)) + .thenCompose(voidValue -> indexTableHandler.updateIndexOnAdd(mailboxMessage, mailboxId).toFuture()) .join(); } @@ -225,7 +226,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper { return CompletableFuture.allOf( imapUidDAO.delete(messageId, mailboxId), messageIdDAO.delete(mailboxId, metaData.getComposedMessageId().getUid())) - .thenCompose(voidValue -> indexTableHandler.updateIndexOnDelete(metaData, mailboxId)); + .thenCompose(voidValue -> indexTableHandler.updateIndexOnDelete(metaData, mailboxId).toFuture()); } @Override @@ -239,7 +240,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper { .isPresent()) .flatMap(mailboxId -> flagsUpdateWithRetry(newState, updateMode, mailboxId, messageId)) .map(this::updateCounts) - .map(CompletableFuture::join) + .map(Mono::block) .collect(Guavate.toImmutableMap(Pair::getLeft, Pair::getRight)); } @@ -264,10 +265,10 @@ public class CassandraMessageIdMapper implements MessageIdMapper { } } - private CompletableFuture<Pair<MailboxId, UpdatedFlags>> updateCounts(Pair<MailboxId, UpdatedFlags> pair) { + private Mono<Pair<MailboxId, UpdatedFlags>> updateCounts(Pair<MailboxId, UpdatedFlags> pair) { CassandraId cassandraId = (CassandraId) pair.getLeft(); return indexTableHandler.updateIndexOnFlagsUpdate(cassandraId, pair.getRight()) - .thenApply(voidValue -> pair); + .then(Mono.just(pair)); } private Optional<Pair<Flags, ComposedMessageIdWithMetaData>> tryFlagsUpdate(Flags newState, MessageManager.FlagsUpdateMode updateMode, MailboxId mailboxId, MessageId messageId) { http://git-wip-us.apache.org/repos/asf/james-project/blob/06ee1e43/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java index 0133f05..f2b468f 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java @@ -19,7 +19,6 @@ package org.apache.james.mailbox.cassandra.mail; -import java.util.Collection; import java.util.Comparator; import java.util.Iterator; import java.util.List; @@ -118,22 +117,22 @@ public class CassandraMessageMapper implements MessageMapper { @Override public long countMessagesInMailbox(Mailbox mailbox) throws MailboxException { return mailboxCounterDAO.countMessagesInMailbox(mailbox) - .join() - .orElse(0L); + .defaultIfEmpty(0L) + .block(); } @Override public long countUnseenMessagesInMailbox(Mailbox mailbox) throws MailboxException { return mailboxCounterDAO.countUnseenMessagesInMailbox(mailbox) - .join() - .orElse(0L); + .defaultIfEmpty(0L) + .block(); } @Override public MailboxCounters getMailboxCounters(Mailbox mailbox) throws MailboxException { return mailboxCounterDAO.retrieveMailboxCounters(mailbox) - .join() - .orElse(INITIAL_COUNTERS); + .defaultIfEmpty(INITIAL_COUNTERS) + .block(); } @Override @@ -156,8 +155,7 @@ public class CassandraMessageMapper implements MessageMapper { return Flux.merge( Mono.fromCompletionStage(imapUidDAO.delete(messageId, mailboxId)), Mono.fromCompletionStage(messageIdDAO.delete(mailboxId, uid))) - .concatWith(Mono.fromCompletionStage(indexTableHandler.updateIndexOnDelete(composedMessageIdWithMetaData, mailboxId))) - .last(); + .then(indexTableHandler.updateIndexOnDelete(composedMessageIdWithMetaData, mailboxId)); } @Override @@ -165,7 +163,8 @@ public class CassandraMessageMapper implements MessageMapper { CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); return retrieveMessages(retrieveMessageIds(mailboxId, messageRange), ftype, Limit.from(max)) .map(SimpleMailboxMessage -> (MailboxMessage) SimpleMailboxMessage) - .sort(Comparator.comparing(MailboxMessage::getUid)) + .collectSortedList(Comparator.comparing(MailboxMessage::getUid)) + .flatMapMany(Flux::fromIterable) .toIterable() .iterator(); } @@ -189,37 +188,37 @@ public class CassandraMessageMapper implements MessageMapper { public List<MessageUid> findRecentMessageUidsInMailbox(Mailbox mailbox) throws MailboxException { CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); return mailboxRecentDAO.getRecentMessageUidsInMailbox(mailboxId) - .join() - .collect(Guavate.toImmutableList()); + .collectList() + .block(); } @Override public MessageUid findFirstUnseenMessageUid(Mailbox mailbox) throws MailboxException { CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); return firstUnseenDAO.retrieveFirstUnread(mailboxId) - .join() - .orElse(null); + .map(Optional::of) + .defaultIfEmpty(Optional.empty()) + .block() + .orElse(null); } @Override public Map<MessageUid, MessageMetaData> expungeMarkedForDeletionInMailbox(Mailbox mailbox, MessageRange messageRange) throws MailboxException { CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); - return Mono.fromCompletionStage(deletedMessageDAO.retrieveDeletedMessage(mailboxId, messageRange)) - .flatMapMany(Flux::fromStream) - .buffer(cassandraConfiguration.getExpungeChunkSize()) - .flatMap(uidChunk -> expungeUidChunk(mailboxId, uidChunk)) + return deletedMessageDAO.retrieveDeletedMessage(mailboxId, messageRange) + .limitRate(cassandraConfiguration.getExpungeChunkSize()) + .flatMap(messageUid -> expungeOne(mailboxId, messageUid)) .collect(Guavate.<SimpleMailboxMessage, MessageUid, MessageMetaData>toImmutableMap(MailboxMessage::getUid, MailboxMessage::metaData)) .block(); } - private Flux<SimpleMailboxMessage> expungeUidChunk(CassandraId mailboxId, Collection<MessageUid> uidChunk) { - return Flux.fromStream(uidChunk.stream()) - .flatMap(uid -> retrieveComposedId(mailboxId, uid)) - .doOnNext(this::deleteUsingMailboxId) + private Flux<SimpleMailboxMessage> expungeOne(CassandraId mailboxId, MessageUid messageUid) { + return retrieveComposedId(mailboxId, messageUid) + .flatMap(idWithMetadata -> deleteUsingMailboxId(idWithMetadata).thenReturn(idWithMetadata)) .flatMap(idWithMetadata -> Mono.fromCompletionStage(messageDAO.retrieveMessages(ImmutableList.of(idWithMetadata), FetchType.Metadata, Limit.unlimited()))) - .flatMap(Flux::fromStream) + .flatMapMany(Flux::fromStream) .filter(CassandraMessageDAO.MessageResult::isFound) .map(CassandraMessageDAO.MessageResult::message) .map(pair -> pair.getKey().toMailboxMessage(ImmutableList.of())); @@ -257,7 +256,7 @@ public class CassandraMessageMapper implements MessageMapper { CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); save(mailbox, addUidAndModseq(message, mailboxId)) - .map(voidValue -> indexTableHandler.updateIndexOnAdd(message, mailboxId)) + .thenEmpty(indexTableHandler.updateIndexOnAdd(message, mailboxId)) .block(); return message.metaData(); } @@ -317,9 +316,10 @@ public class CassandraMessageMapper implements MessageMapper { private Mono<FlagsUpdateStageResult> runUpdateStage(CassandraId mailboxId, Flux<ComposedMessageIdWithMetaData> toBeUpdated, FlagsUpdateCalculator flagsUpdateCalculator) { Mono<Long> newModSeq = computeNewModSeq(mailboxId); return toBeUpdated - .buffer(cassandraConfiguration.getFlagsUpdateChunkSize()) - .flatMap(uidChunk -> newModSeq.flatMap(modSeq -> performUpdatesForChunk(mailboxId, flagsUpdateCalculator, modSeq, uidChunk))) - .reduce(FlagsUpdateStageResult.none(), FlagsUpdateStageResult::merge); + .limitRate(cassandraConfiguration.getFlagsUpdateChunkSize()) + .flatMapSequential(metadata -> newModSeq.flatMap(modSeq -> tryFlagsUpdate(flagsUpdateCalculator, modSeq, metadata))) + .reduce(FlagsUpdateStageResult.none(), FlagsUpdateStageResult::merge) + .flatMap(result -> updateIndexesForUpdatesResult(mailboxId, result)); } private Mono<Long> computeNewModSeq(CassandraId mailboxId) { @@ -327,23 +327,15 @@ public class CassandraMessageMapper implements MessageMapper { .map(value -> value.orElseThrow(() -> new RuntimeException("ModSeq generation failed for mailbox " + mailboxId.asUuid()))); } - private Mono<FlagsUpdateStageResult> performUpdatesForChunk(CassandraId mailboxId, FlagsUpdateCalculator flagsUpdateCalculator, Long newModSeq, Collection<ComposedMessageIdWithMetaData> uidChunk) { - return Flux.fromIterable(uidChunk) - .flatMap(oldMetadata -> tryFlagsUpdate(flagsUpdateCalculator, newModSeq, oldMetadata)) - .reduce(FlagsUpdateStageResult.none(), FlagsUpdateStageResult::merge) - .flatMap(result -> updateIndexesForUpdatesResult(mailboxId, result)); - } - private Mono<FlagsUpdateStageResult> updateIndexesForUpdatesResult(CassandraId mailboxId, FlagsUpdateStageResult result) { return Flux.fromIterable(result.getSucceeded()) .flatMap(Throwing - .function((UpdatedFlags updatedFlags) -> Mono.fromCompletionStage(indexTableHandler.updateIndexOnFlagsUpdate(mailboxId, updatedFlags))) - .fallbackTo(failedindex -> { - LOGGER.error("Could not update flag indexes for mailboxId {} UID {}. This will lead to inconsistencies across Cassandra tables", mailboxId, failedindex.getUid()); - return Mono.just(null); + .function((UpdatedFlags updatedFlags) -> indexTableHandler.updateIndexOnFlagsUpdate(mailboxId, updatedFlags)) + .fallbackTo(failedIndex -> { + LOGGER.error("Could not update flag indexes for mailboxId {} UID {}. This will lead to inconsistencies across Cassandra tables", mailboxId, failedIndex.getUid()); + return Mono.empty(); })) - .collectList() - .map(any -> result); + .then(Mono.just(result)); } @Override @@ -366,8 +358,8 @@ public class CassandraMessageMapper implements MessageMapper { public Flags getApplicableFlag(Mailbox mailbox) throws MailboxException { return ApplicableFlagBuilder.builder() .add(applicableFlagDAO.retrieveApplicableFlag((CassandraId) mailbox.getMailboxId()) - .join() - .orElse(new Flags())) + .defaultIfEmpty(new Flags()) + .block()) .build(); } @@ -375,15 +367,15 @@ public class CassandraMessageMapper implements MessageMapper { CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); insertIds(addUidAndModseq(message, mailboxId), mailboxId) - .map(voidValue -> indexTableHandler.updateIndexOnAdd(message, mailboxId)) + .thenEmpty(indexTableHandler.updateIndexOnAdd(message, mailboxId)) .block(); return message.metaData(); } private Mono<Void> save(Mailbox mailbox, MailboxMessage message) throws MailboxException { CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); - return messageDAO.save(message) - .flatMap(aVoid -> insertIds(message, mailboxId)); + return Mono.fromFuture(messageDAO.save(message)) + .thenEmpty(insertIds(message, mailboxId)); } private Mono<Void> insertIds(MailboxMessage message, CassandraId mailboxId) { @@ -395,7 +387,7 @@ public class CassandraMessageMapper implements MessageMapper { return Flux.merge( Mono.fromCompletionStage(messageIdDAO.insert(composedMessageIdWithMetaData)), Mono.fromCompletionStage(imapUidDAO.insert(composedMessageIdWithMetaData))) - .last(); + .then(); } @@ -441,9 +433,10 @@ public class CassandraMessageMapper implements MessageMapper { .flatMap(success -> { if (success) { return Mono.fromCompletionStage(messageIdDAO.updateMetadata(newMetadata)) - .map(ignored -> true); + .then(Mono.just(true)); } else { return Mono.just(false); - }}); + } + }); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/06ee1e43/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraApplicableFlagDAOTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraApplicableFlagDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraApplicableFlagDAOTest.java index e2e4f62..55f7b5c 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraApplicableFlagDAOTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraApplicableFlagDAOTest.java @@ -52,59 +52,59 @@ class CassandraApplicableFlagDAOTest { @Test void updateApplicableFlagsShouldReturnEmptyByDefault() { - assertThat(testee.retrieveApplicableFlag(CASSANDRA_ID).join()) - .isEmpty(); + assertThat(testee.retrieveApplicableFlag(CASSANDRA_ID).hasElement().block()) + .isFalse(); } @Test void updateApplicableFlagsShouldSupportEmptyUserFlags() { - testee.updateApplicableFlags(CASSANDRA_ID, ImmutableSet.of()).join(); + testee.updateApplicableFlags(CASSANDRA_ID, ImmutableSet.of()).block(); - assertThat(testee.retrieveApplicableFlag(CASSANDRA_ID).join()) - .isEmpty(); + assertThat(testee.retrieveApplicableFlag(CASSANDRA_ID).hasElement().block()) + .isFalse(); } @Test void updateApplicableFlagsShouldUpdateUserFlag() { - testee.updateApplicableFlags(CASSANDRA_ID, ImmutableSet.of(USER_FLAG)).join(); + testee.updateApplicableFlags(CASSANDRA_ID, ImmutableSet.of(USER_FLAG)).block(); - assertThat(testee.retrieveApplicableFlag(CASSANDRA_ID).join()) - .contains(new Flags(USER_FLAG)); + assertThat(testee.retrieveApplicableFlag(CASSANDRA_ID).block()) + .isEqualTo(new Flags(USER_FLAG)); } @Test void updateApplicableFlagsShouldUnionUserFlags() { - testee.updateApplicableFlags(CASSANDRA_ID, ImmutableSet.of(USER_FLAG)).join(); - testee.updateApplicableFlags(CASSANDRA_ID, ImmutableSet.of(USER_FLAG2)).join(); + testee.updateApplicableFlags(CASSANDRA_ID, ImmutableSet.of(USER_FLAG)).block(); + testee.updateApplicableFlags(CASSANDRA_ID, ImmutableSet.of(USER_FLAG2)).block(); - assertThat(testee.retrieveApplicableFlag(CASSANDRA_ID).join()) - .contains(FlagsBuilder.builder().add(USER_FLAG, USER_FLAG2).build()); + assertThat(testee.retrieveApplicableFlag(CASSANDRA_ID).block()) + .isEqualTo(FlagsBuilder.builder().add(USER_FLAG, USER_FLAG2).build()); } @Test void updateApplicableFlagsShouldBeIdempotent() { - testee.updateApplicableFlags(CASSANDRA_ID, ImmutableSet.of(USER_FLAG)).join(); - testee.updateApplicableFlags(CASSANDRA_ID, ImmutableSet.of(USER_FLAG)).join(); + testee.updateApplicableFlags(CASSANDRA_ID, ImmutableSet.of(USER_FLAG)).block(); + testee.updateApplicableFlags(CASSANDRA_ID, ImmutableSet.of(USER_FLAG)).block(); - assertThat(testee.retrieveApplicableFlag(CASSANDRA_ID).join()) - .contains(new Flags(USER_FLAG)); + assertThat(testee.retrieveApplicableFlag(CASSANDRA_ID).block()) + .isEqualTo(new Flags(USER_FLAG)); } @Test void updateApplicableFlagsShouldSkipAlreadyStoredFlagsWhenAddingFlag() { - testee.updateApplicableFlags(CASSANDRA_ID, ImmutableSet.of(USER_FLAG)).join(); - testee.updateApplicableFlags(CASSANDRA_ID, ImmutableSet.of(USER_FLAG, USER_FLAG2)).join(); + testee.updateApplicableFlags(CASSANDRA_ID, ImmutableSet.of(USER_FLAG)).block(); + testee.updateApplicableFlags(CASSANDRA_ID, ImmutableSet.of(USER_FLAG, USER_FLAG2)).block(); - assertThat(testee.retrieveApplicableFlag(CASSANDRA_ID).join()) - .contains(FlagsBuilder.builder().add(USER_FLAG, USER_FLAG2).build()); + assertThat(testee.retrieveApplicableFlag(CASSANDRA_ID).block()) + .isEqualTo(FlagsBuilder.builder().add(USER_FLAG, USER_FLAG2).build()); } @Test void updateApplicableFlagsShouldUpdateMultiFlags() { - testee.updateApplicableFlags(CASSANDRA_ID, ImmutableSet.of(USER_FLAG, USER_FLAG2)).join(); + testee.updateApplicableFlags(CASSANDRA_ID, ImmutableSet.of(USER_FLAG, USER_FLAG2)).block(); - assertThat(testee.retrieveApplicableFlag(CASSANDRA_ID).join()) - .contains(FlagsBuilder.builder().add(USER_FLAG, USER_FLAG2).build()); + assertThat(testee.retrieveApplicableFlag(CASSANDRA_ID).block()) + .isEqualTo(FlagsBuilder.builder().add(USER_FLAG, USER_FLAG2).build()); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/james-project/blob/06ee1e43/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAOTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAOTest.java index 906f9c7..6197506 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAOTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAOTest.java @@ -34,8 +34,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import com.github.steveash.guavate.Guavate; - class CassandraDeletedMessageDAOTest { private static final CassandraId MAILBOX_ID = CassandraId.of(UUID.fromString("110e8400-e29b-11d4-a716-446655440000")); private static final MessageUid UID_1 = MessageUid.of(1); @@ -59,32 +57,32 @@ class CassandraDeletedMessageDAOTest { void retrieveDeletedMessageShouldReturnEmptyByDefault() { List<MessageUid> result = testee .retrieveDeletedMessage(MAILBOX_ID, MessageRange.all()) - .join() - .collect(Guavate.toImmutableList()); + .collectList() + .block(); assertThat(result).isEmpty(); } @Test void addDeletedMessageShouldThenBeReportedAsDeletedMessage() { - testee.addDeleted(MAILBOX_ID, UID_1).join(); - testee.addDeleted(MAILBOX_ID, UID_2).join(); + testee.addDeleted(MAILBOX_ID, UID_1).block(); + testee.addDeleted(MAILBOX_ID, UID_2).block(); List<MessageUid> result = testee.retrieveDeletedMessage(MAILBOX_ID, MessageRange.all()) - .join() - .collect(Guavate.toImmutableList()); + .collectList() + .block(); assertThat(result).containsExactly(UID_1, UID_2); } @Test void addDeletedMessageShouldBeIdempotent() { - testee.addDeleted(MAILBOX_ID, UID_1).join(); - testee.addDeleted(MAILBOX_ID, UID_1).join(); + testee.addDeleted(MAILBOX_ID, UID_1).block(); + testee.addDeleted(MAILBOX_ID, UID_1).block(); List<MessageUid> result = testee.retrieveDeletedMessage(MAILBOX_ID, MessageRange.all()) - .join() - .collect(Guavate.toImmutableList()); + .collectList() + .block(); assertThat(result).containsExactly(UID_1); } @@ -92,52 +90,52 @@ class CassandraDeletedMessageDAOTest { @Test void removeUnreadShouldReturnEmptyWhenNoData() { - testee.removeDeleted(MAILBOX_ID, UID_1).join(); + testee.removeDeleted(MAILBOX_ID, UID_1).block(); List<MessageUid> result = testee .retrieveDeletedMessage(MAILBOX_ID, MessageRange.all()) - .join() - .collect(Guavate.toImmutableList()); + .collectList() + .block(); assertThat(result).isEmpty(); } @Test void removeDeletedMessageShouldNotAffectOtherMessage() { - testee.addDeleted(MAILBOX_ID, UID_2).join(); - testee.addDeleted(MAILBOX_ID, UID_1).join(); + testee.addDeleted(MAILBOX_ID, UID_2).block(); + testee.addDeleted(MAILBOX_ID, UID_1).block(); - testee.removeDeleted(MAILBOX_ID, UID_1).join(); + testee.removeDeleted(MAILBOX_ID, UID_1).block(); List<MessageUid> result = testee .retrieveDeletedMessage(MAILBOX_ID, MessageRange.all()) - .join() - .collect(Guavate.toImmutableList()); + .collectList() + .block(); assertThat(result).containsExactly(UID_2); } @Test void removeDeletedShouldRemoveSpecifiedUID() { - testee.addDeleted(MAILBOX_ID, UID_2).join(); + testee.addDeleted(MAILBOX_ID, UID_2).block(); - testee.removeDeleted(MAILBOX_ID, UID_2).join(); + testee.removeDeleted(MAILBOX_ID, UID_2).block(); List<MessageUid> result = testee .retrieveDeletedMessage(MAILBOX_ID, MessageRange.all()) - .join() - .collect(Guavate.toImmutableList()); + .collectList() + .block(); assertThat(result).isEmpty(); } private void addMessageForRetrieveTest() { - testee.addDeleted(MAILBOX_ID, UID_1).join(); - testee.addDeleted(MAILBOX_ID, UID_2).join(); - testee.addDeleted(MAILBOX_ID, UID_3).join(); - testee.addDeleted(MAILBOX_ID, UID_4).join(); - testee.addDeleted(MAILBOX_ID, UID_7).join(); - testee.addDeleted(MAILBOX_ID, UID_8).join(); + testee.addDeleted(MAILBOX_ID, UID_1).block(); + testee.addDeleted(MAILBOX_ID, UID_2).block(); + testee.addDeleted(MAILBOX_ID, UID_3).block(); + testee.addDeleted(MAILBOX_ID, UID_4).block(); + testee.addDeleted(MAILBOX_ID, UID_7).block(); + testee.addDeleted(MAILBOX_ID, UID_8).block(); } @Test @@ -146,8 +144,8 @@ class CassandraDeletedMessageDAOTest { List<MessageUid> result = testee .retrieveDeletedMessage(MAILBOX_ID, MessageRange.all()) - .join() - .collect(Guavate.toImmutableList()); + .collectList() + .block(); assertThat(result).containsExactly(UID_1, UID_2, UID_3, UID_4, UID_7, UID_8); } @@ -158,8 +156,8 @@ class CassandraDeletedMessageDAOTest { List<MessageUid> result = testee .retrieveDeletedMessage(MAILBOX_ID, MessageRange.one(UID_1)) - .join() - .collect(Guavate.toImmutableList()); + .collectList() + .block(); assertThat(result).containsExactly(UID_1); } @@ -170,8 +168,8 @@ class CassandraDeletedMessageDAOTest { List<MessageUid> result = testee .retrieveDeletedMessage(MAILBOX_ID, MessageRange.one(MessageUid.of(42))) - .join() - .collect(Guavate.toImmutableList()); + .collectList() + .block(); assertThat(result).isEmpty(); } @@ -182,8 +180,8 @@ class CassandraDeletedMessageDAOTest { List<MessageUid> result = testee .retrieveDeletedMessage(MAILBOX_ID, MessageRange.range(MessageUid.of(3), MessageUid.of(7))) - .join() - .collect(Guavate.toImmutableList()); + .collectList() + .block(); assertThat(result).containsExactly(UID_3, UID_4, UID_7); } @@ -194,8 +192,8 @@ class CassandraDeletedMessageDAOTest { List<MessageUid> result = testee .retrieveDeletedMessage(MAILBOX_ID, MessageRange.range(MessageUid.of(5), MessageUid.of(6))) - .join() - .collect(Guavate.toImmutableList()); + .collectList() + .block(); assertThat(result).isEmpty(); } @@ -206,8 +204,8 @@ class CassandraDeletedMessageDAOTest { List<MessageUid> result = testee .retrieveDeletedMessage(MAILBOX_ID, MessageRange.from(MessageUid.of(9))) - .join() - .collect(Guavate.toImmutableList()); + .collectList() + .block(); assertThat(result).isEmpty(); } @@ -218,8 +216,8 @@ class CassandraDeletedMessageDAOTest { List<MessageUid> result = testee .retrieveDeletedMessage(MAILBOX_ID, MessageRange.from(MessageUid.of(4))) - .join() - .collect(Guavate.toImmutableList()); + .collectList() + .block(); assertThat(result).containsExactly(UID_4, UID_7, UID_8); } http://git-wip-us.apache.org/repos/asf/james-project/blob/06ee1e43/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAOTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAOTest.java index 4fae89a..882ed3c 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAOTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAOTest.java @@ -47,85 +47,85 @@ class CassandraFirstUnseenDAOTest { @Test void retrieveFirstUnreadShouldReturnEmptyByDefault() { - assertThat(testee.retrieveFirstUnread(MAILBOX_ID).join().isPresent()) + assertThat(testee.retrieveFirstUnread(MAILBOX_ID).hasElement().block()) .isFalse(); } @Test void addUnreadShouldThenBeReportedAsFirstUnseen() { - testee.addUnread(MAILBOX_ID, UID_1).join(); + testee.addUnread(MAILBOX_ID, UID_1).block(); - assertThat(testee.retrieveFirstUnread(MAILBOX_ID).join()) - .contains(UID_1); + assertThat(testee.retrieveFirstUnread(MAILBOX_ID).block()) + .isEqualByComparingTo(UID_1); } @Test void retrieveFirstUnreadShouldReturnLowestUnreadUid() { - testee.addUnread(MAILBOX_ID, UID_1).join(); + testee.addUnread(MAILBOX_ID, UID_1).block(); - testee.addUnread(MAILBOX_ID, UID_2).join(); + testee.addUnread(MAILBOX_ID, UID_2).block(); - assertThat(testee.retrieveFirstUnread(MAILBOX_ID).join()) - .contains(UID_1); + assertThat(testee.retrieveFirstUnread(MAILBOX_ID).block()) + .isEqualByComparingTo(UID_1); } @Test void retrieveFirstUnreadShouldBeOrderIndependent() { - testee.addUnread(MAILBOX_ID, UID_2).join(); + testee.addUnread(MAILBOX_ID, UID_2).block(); - testee.addUnread(MAILBOX_ID, UID_1).join(); + testee.addUnread(MAILBOX_ID, UID_1).block(); - assertThat(testee.retrieveFirstUnread(MAILBOX_ID).join()) - .contains(UID_1); + assertThat(testee.retrieveFirstUnread(MAILBOX_ID).block()) + .isEqualByComparingTo(UID_1); } @Test void addUnreadShouldBeIdempotent() { - testee.addUnread(MAILBOX_ID, UID_1).join(); + testee.addUnread(MAILBOX_ID, UID_1).block(); - testee.addUnread(MAILBOX_ID, UID_1).join(); + testee.addUnread(MAILBOX_ID, UID_1).block(); - assertThat(testee.retrieveFirstUnread(MAILBOX_ID).join()) - .contains(UID_1); + assertThat(testee.retrieveFirstUnread(MAILBOX_ID).block()) + .isEqualByComparingTo(UID_1); } @Test void removeUnreadShouldReturnWhenNoData() { - testee.removeUnread(MAILBOX_ID, UID_1).join(); + testee.removeUnread(MAILBOX_ID, UID_1).block(); - assertThat(testee.retrieveFirstUnread(MAILBOX_ID).join()) - .isEmpty(); + assertThat(testee.retrieveFirstUnread(MAILBOX_ID).hasElement().block()) + .isFalse(); } @Test void removeUnreadShouldRemoveOnlyUnread() { - testee.addUnread(MAILBOX_ID, UID_1).join(); + testee.addUnread(MAILBOX_ID, UID_1).block(); - testee.removeUnread(MAILBOX_ID, UID_1).join(); + testee.removeUnread(MAILBOX_ID, UID_1).block(); - assertThat(testee.retrieveFirstUnread(MAILBOX_ID).join()) - .isEmpty(); + assertThat(testee.retrieveFirstUnread(MAILBOX_ID).hasElement().block()) + .isFalse(); } @Test void removeUnreadShouldRemoveLastUnread() { - testee.addUnread(MAILBOX_ID, UID_1).join(); - testee.addUnread(MAILBOX_ID, UID_2).join(); + testee.addUnread(MAILBOX_ID, UID_1).block(); + testee.addUnread(MAILBOX_ID, UID_2).block(); - testee.removeUnread(MAILBOX_ID, UID_2).join(); + testee.removeUnread(MAILBOX_ID, UID_2).block(); - assertThat(testee.retrieveFirstUnread(MAILBOX_ID).join()) - .contains(UID_1); + assertThat(testee.retrieveFirstUnread(MAILBOX_ID).block()) + .isEqualByComparingTo(UID_1); } @Test void removeUnreadShouldHaveNoEffectWhenNotLast() { - testee.addUnread(MAILBOX_ID, UID_1).join(); - testee.addUnread(MAILBOX_ID, UID_2).join(); + testee.addUnread(MAILBOX_ID, UID_1).block(); + testee.addUnread(MAILBOX_ID, UID_2).block(); - testee.removeUnread(MAILBOX_ID, UID_1).join(); + testee.removeUnread(MAILBOX_ID, UID_1).block(); - assertThat(testee.retrieveFirstUnread(MAILBOX_ID).join()) - .contains(UID_2); + assertThat(testee.retrieveFirstUnread(MAILBOX_ID).block()) + .isEqualByComparingTo(UID_2); } } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org