JAMES-2630 Migrate CassandraAsyncExecutor.executeReturnExists consumers to 
Reactor contd


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/06ee1e43
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/06ee1e43
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/06ee1e43

Branch: refs/heads/master
Commit: 06ee1e430335693a13ee7682c98ed093e17bcd2a
Parents: 1295a89
Author: Gautier DI FOLCO <gdifo...@linagora.com>
Authored: Thu Dec 13 13:57:28 2018 +0100
Committer: Matthieu Baechler <matth...@apache.org>
Committed: Mon Jan 28 15:31:00 2019 +0100

----------------------------------------------------------------------
 .../cassandra/utils/CassandraAsyncExecutor.java |   2 +-
 .../cassandra/utils/CassandraUtils.java         |   5 +
 .../mail/CassandraApplicableFlagDAO.java        |  16 +-
 .../mail/CassandraDeletedMessageDAO.java        |  39 ++-
 .../cassandra/mail/CassandraFirstUnseenDAO.java |  18 +-
 .../mail/CassandraIndexTableHandler.java        |  91 +++---
 .../mail/CassandraMailboxCounterDAO.java        |  42 ++-
 .../mail/CassandraMailboxRecentsDAO.java        |  23 +-
 .../mail/CassandraMessageIdMapper.java          |  13 +-
 .../cassandra/mail/CassandraMessageMapper.java  |  89 +++---
 .../mail/CassandraApplicableFlagDAOTest.java    |  46 ++--
 .../mail/CassandraDeletedMessageDAOTest.java    |  86 +++---
 .../mail/CassandraFirstUnseenDAOTest.java       |  66 ++---
 .../mail/CassandraIndexTableHandlerTest.java    | 274 +++++++++----------
 .../mail/CassandraMailboxCounterDAOTest.java    |  90 +++---
 .../mail/CassandraMailboxRecentDAOTest.java     |  74 +++--
 .../routes/CassandraMailboxMergingRoutes.java   |   2 +-
 17 files changed, 486 insertions(+), 490 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/06ee1e43/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
----------------------------------------------------------------------
diff --git 
a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
 
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
index 61bd6f9..899635e 100644
--- 
a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
+++ 
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
@@ -88,7 +88,7 @@ public class CassandraAsyncExecutor {
             .map(resultSet -> Optional.ofNullable(resultSet.one()));
     }
 
-    public Mono<Boolean> executeReturnExistsReactor(Statement statement) {
+    public Mono<Boolean> executeReturnExists(Statement statement) {
         return executeSingleRowReactor(statement)
                 .hasElement();
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/06ee1e43/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraUtils.java
----------------------------------------------------------------------
diff --git 
a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraUtils.java
 
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraUtils.java
index fa749e6..6732966 100644
--- 
a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraUtils.java
+++ 
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraUtils.java
@@ -28,6 +28,7 @@ import 
org.apache.james.backends.cassandra.init.configuration.CassandraConfigura
 
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
+import reactor.core.publisher.Flux;
 
 public class CassandraUtils {
 
@@ -40,6 +41,10 @@ public class CassandraUtils {
         this.cassandraConfiguration = cassandraConfiguration;
     }
 
+    public Flux<Row> convertToFlux(ResultSet resultSet) {
+        return Flux.fromIterable(resultSet);
+    }
+
     public Stream<Row> convertToStream(ResultSet resultSet) {
         return StreamSupport.stream(resultSet.spliterator(), true)
             .peek(row -> ensureFetchedNextPage(resultSet));

http://git-wip-us.apache.org/repos/asf/james-project/blob/06ee1e43/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraApplicableFlagDAO.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraApplicableFlagDAO.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraApplicableFlagDAO.java
index eed9857..4820e83 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraApplicableFlagDAO.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraApplicableFlagDAO.java
@@ -29,9 +29,7 @@ import static 
org.apache.james.mailbox.cassandra.table.CassandraApplicableFlagTa
 import static 
org.apache.james.mailbox.cassandra.table.CassandraApplicableFlagTable.TABLE_NAME;
 import static org.apache.james.mailbox.cassandra.table.Flag.USER_FLAGS;
 
-import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 
 import javax.inject.Inject;
 import javax.mail.Flags;
@@ -43,6 +41,7 @@ import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.querybuilder.Update;
 import com.datastax.driver.core.querybuilder.Update.Assignments;
+import reactor.core.publisher.Mono;
 
 public class CassandraApplicableFlagDAO {
 
@@ -61,19 +60,18 @@ public class CassandraApplicableFlagDAO {
             .where(eq(MAILBOX_ID, bindMarker(MAILBOX_ID))));
     }
 
-    public CompletableFuture<Optional<Flags>> 
retrieveApplicableFlag(CassandraId mailboxId) {
-        return cassandraAsyncExecutor.executeSingleRow(
+    public Mono<Flags> retrieveApplicableFlag(CassandraId mailboxId) {
+        return cassandraAsyncExecutor.executeSingleRowReactor(
             select.bind()
                 .setUUID(MAILBOX_ID, mailboxId.asUuid()))
-            .thenApply(rowOptional ->
-                rowOptional.map(row -> new 
FlagsExtractor(row).getApplicableFlags()));
+            .map(row -> new FlagsExtractor(row).getApplicableFlags());
     }
 
-    public CompletableFuture<Void> updateApplicableFlags(CassandraId 
cassandraId, Set<String> toBeAdded) {
+    public Mono<Void> updateApplicableFlags(CassandraId cassandraId, 
Set<String> toBeAdded) {
         if (toBeAdded.isEmpty()) {
-            return CompletableFuture.completedFuture(null);
+            return Mono.empty();
         }
-        return cassandraAsyncExecutor.executeVoid(updateQuery(cassandraId, 
toBeAdded));
+        return 
cassandraAsyncExecutor.executeVoidReactor(updateQuery(cassandraId, toBeAdded));
     }
 
     private Update.Where updateQuery(CassandraId cassandraId, Set<String> 
userFlags) {

http://git-wip-us.apache.org/repos/asf/james-project/blob/06ee1e43/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java
index 4c6ebd9..4b5da30 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java
@@ -30,9 +30,6 @@ import static 
org.apache.james.mailbox.cassandra.table.CassandraDeletedMessageTa
 import static 
org.apache.james.mailbox.cassandra.table.CassandraDeletedMessageTable.TABLE_NAME;
 import static 
org.apache.james.mailbox.cassandra.table.CassandraDeletedMessageTable.UID;
 
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Stream;
-
 import javax.inject.Inject;
 
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
@@ -45,6 +42,8 @@ import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Session;
 import com.google.common.annotations.VisibleForTesting;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 public class CassandraDeletedMessageDAO {
     private static final String UID_TO = "uid_to";
@@ -118,25 +117,25 @@ public class CassandraDeletedMessageDAO {
             .value(UID, bindMarker(UID)));
     }
 
-    public CompletableFuture<Void> addDeleted(CassandraId cassandraId, 
MessageUid uid) {
-        return cassandraAsyncExecutor.executeVoid(
+    public Mono<Void> addDeleted(CassandraId cassandraId, MessageUid uid) {
+        return cassandraAsyncExecutor.executeVoidReactor(
             addStatement.bind()
                 .setUUID(MAILBOX_ID, cassandraId.asUuid())
                 .setLong(UID, uid.asLong()));
     }
 
-    public CompletableFuture<Void> removeDeleted(CassandraId cassandraId, 
MessageUid uid) {
-        return cassandraAsyncExecutor.executeVoid(deleteStatement.bind()
+    public Mono<Void> removeDeleted(CassandraId cassandraId, MessageUid uid) {
+        return cassandraAsyncExecutor.executeVoidReactor(deleteStatement.bind()
             .setUUID(MAILBOX_ID, cassandraId.asUuid())
             .setLong(UID, uid.asLong()));
     }
 
-    public CompletableFuture<Stream<MessageUid>> 
retrieveDeletedMessage(CassandraId cassandraId, MessageRange range) {
+    public Flux<MessageUid> retrieveDeletedMessage(CassandraId cassandraId, 
MessageRange range) {
         return retrieveResultSetOfDeletedMessage(cassandraId, range)
-            .thenApply(this::resultSetToStream);
+            .flatMapMany(this::resultSetToFlux);
     }
 
-    private CompletableFuture<ResultSet> 
retrieveResultSetOfDeletedMessage(CassandraId cassandraId, MessageRange range) {
+    private Mono<ResultSet> retrieveResultSetOfDeletedMessage(CassandraId 
cassandraId, MessageRange range) {
         switch (range.getType()) {
             case ALL:
                 return retrieveAllDeleted(cassandraId);
@@ -151,35 +150,35 @@ public class CassandraDeletedMessageDAO {
         throw new UnsupportedOperationException();
     }
 
-    private Stream<MessageUid> resultSetToStream(ResultSet resultSet) {
-        return cassandraUtils.convertToStream(resultSet)
+    private Flux<MessageUid> resultSetToFlux(ResultSet resultSet) {
+        return cassandraUtils.convertToFlux(resultSet)
             .map(row ->
                 MessageUid.of(row.getLong(UID)));
     }
 
-    private CompletableFuture<ResultSet> retrieveAllDeleted(CassandraId 
cassandraId) {
-        return cassandraAsyncExecutor.execute(
+    private Mono<ResultSet> retrieveAllDeleted(CassandraId cassandraId) {
+        return cassandraAsyncExecutor.executeReactor(
             selectAllUidStatement.bind()
                 .setUUID(MAILBOX_ID, cassandraId.asUuid()));
     }
 
-    private CompletableFuture<ResultSet> retrieveOneDeleted(CassandraId 
cassandraId, MessageUid uid) {
-        return cassandraAsyncExecutor.execute(
+    private Mono<ResultSet> retrieveOneDeleted(CassandraId cassandraId, 
MessageUid uid) {
+        return cassandraAsyncExecutor.executeReactor(
             selectOneUidStatement.bind()
                 .setUUID(MAILBOX_ID, cassandraId.asUuid())
                 .setLong(UID, uid.asLong()));
     }
 
-    private CompletableFuture<ResultSet> retrieveDeletedBetween(CassandraId 
cassandraId, MessageUid from, MessageUid to) {
-        return cassandraAsyncExecutor.execute(
+    private Mono<ResultSet> retrieveDeletedBetween(CassandraId cassandraId, 
MessageUid from, MessageUid to) {
+        return cassandraAsyncExecutor.executeReactor(
             selectBetweenUidStatement.bind()
                 .setUUID(MAILBOX_ID, cassandraId.asUuid())
                 .setLong(UID_FROM, from.asLong())
                 .setLong(UID_TO, to.asLong()));
     }
 
-    private CompletableFuture<ResultSet> retrieveDeletedAfter(CassandraId 
cassandraId, MessageUid from) {
-        return cassandraAsyncExecutor.execute(
+    private Mono<ResultSet> retrieveDeletedAfter(CassandraId cassandraId, 
MessageUid from) {
+        return cassandraAsyncExecutor.executeReactor(
             selectFromUidStatement.bind()
                 .setUUID(MAILBOX_ID, cassandraId.asUuid())
                 .setLong(UID_FROM, from.asLong()));

http://git-wip-us.apache.org/repos/asf/james-project/blob/06ee1e43/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAO.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAO.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAO.java
index e710e69..50ec510 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAO.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAO.java
@@ -29,9 +29,6 @@ import static 
org.apache.james.mailbox.cassandra.table.CassandraFirstUnseenTable
 import static 
org.apache.james.mailbox.cassandra.table.CassandraFirstUnseenTable.TABLE_NAME;
 import static 
org.apache.james.mailbox.cassandra.table.CassandraFirstUnseenTable.UID;
 
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-
 import javax.inject.Inject;
 
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
@@ -40,6 +37,7 @@ import org.apache.james.mailbox.cassandra.ids.CassandraId;
 
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Session;
+import reactor.core.publisher.Mono;
 
 public class CassandraFirstUnseenDAO {
     private final CassandraAsyncExecutor cassandraAsyncExecutor;
@@ -76,24 +74,24 @@ public class CassandraFirstUnseenDAO {
             .value(UID, bindMarker(UID)));
     }
 
-    public CompletableFuture<Void> addUnread(CassandraId cassandraId, 
MessageUid uid) {
-        return cassandraAsyncExecutor.executeVoid(
+    public Mono<Void> addUnread(CassandraId cassandraId, MessageUid uid) {
+        return cassandraAsyncExecutor.executeVoidReactor(
             addStatement.bind()
                 .setUUID(MAILBOX_ID, cassandraId.asUuid())
                 .setLong(UID, uid.asLong()));
     }
 
-    public CompletableFuture<Void> removeUnread(CassandraId cassandraId, 
MessageUid uid) {
-        return cassandraAsyncExecutor.executeVoid(deleteStatement.bind()
+    public Mono<Void> removeUnread(CassandraId cassandraId, MessageUid uid) {
+        return cassandraAsyncExecutor.executeVoidReactor(deleteStatement.bind()
             .setUUID(MAILBOX_ID, cassandraId.asUuid())
             .setLong(UID, uid.asLong()));
     }
 
-    public CompletableFuture<Optional<MessageUid>> 
retrieveFirstUnread(CassandraId cassandraId) {
-        return cassandraAsyncExecutor.executeSingleRow(
+    public Mono<MessageUid> retrieveFirstUnread(CassandraId cassandraId) {
+        return cassandraAsyncExecutor.executeSingleRowReactor(
             readStatement.bind()
                 .setUUID(MAILBOX_ID, cassandraId.asUuid()))
-            .thenApply(optional -> optional.map(row -> 
MessageUid.of(row.getLong(UID))));
+            .map(row -> MessageUid.of(row.getLong(UID)));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/06ee1e43/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
index 3f1485e..6ca3f46 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
@@ -19,8 +19,6 @@
 
 package org.apache.james.mailbox.cassandra.mail;
 
-import java.util.concurrent.CompletableFuture;
-
 import javax.inject.Inject;
 import javax.mail.Flags;
 
@@ -31,6 +29,8 @@ import org.apache.james.mailbox.model.UpdatedFlags;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 
 import com.google.common.collect.ImmutableSet;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 public class CassandraIndexTableHandler {
 
@@ -53,116 +53,121 @@ public class CassandraIndexTableHandler {
         this.deletedMessageDAO = deletedMessageDAO;
     }
 
-    public CompletableFuture<Void> 
updateIndexOnDelete(ComposedMessageIdWithMetaData 
composedMessageIdWithMetaData, CassandraId mailboxId) {
+    public Mono<Void> updateIndexOnDelete(ComposedMessageIdWithMetaData 
composedMessageIdWithMetaData, CassandraId mailboxId) {
         MessageUid uid = 
composedMessageIdWithMetaData.getComposedMessageId().getUid();
-        return CompletableFuture.allOf(
-            updateFirstUnseenOnDelete(mailboxId, 
composedMessageIdWithMetaData.getFlags(), 
composedMessageIdWithMetaData.getComposedMessageId().getUid()),
-            mailboxRecentDAO.removeFromRecent(mailboxId, 
composedMessageIdWithMetaData.getComposedMessageId().getUid()),
-            mailboxCounterDAO.decrementCount(mailboxId),
-            deletedMessageDAO.removeDeleted(mailboxId, uid),
-            decrementUnseenOnDelete(mailboxId, 
composedMessageIdWithMetaData.getFlags()));
+
+        return Flux.merge(
+               updateFirstUnseenOnDelete(mailboxId, 
composedMessageIdWithMetaData.getFlags(), 
composedMessageIdWithMetaData.getComposedMessageId().getUid()),
+               mailboxRecentDAO.removeFromRecent(mailboxId, 
composedMessageIdWithMetaData.getComposedMessageId().getUid()),
+               mailboxCounterDAO.decrementCount(mailboxId),
+               deletedMessageDAO.removeDeleted(mailboxId, uid),
+               decrementUnseenOnDelete(mailboxId, 
composedMessageIdWithMetaData.getFlags()))
+                .then();
     }
 
-    public CompletableFuture<Void> updateIndexOnAdd(MailboxMessage message, 
CassandraId mailboxId) {
+    public Mono<Void> updateIndexOnAdd(MailboxMessage message, CassandraId 
mailboxId) {
         Flags flags = message.createFlags();
 
-        return CompletableFuture.allOf(
-            checkDeletedOnAdd(mailboxId, message.createFlags(), 
message.getUid()),
-            updateFirstUnseenOnAdd(mailboxId, message.createFlags(), 
message.getUid()),
-            addRecentOnSave(mailboxId, message),
-            incrementUnseenOnSave(mailboxId, flags),
-            mailboxCounterDAO.incrementCount(mailboxId),
-            applicableFlagDAO.updateApplicableFlags(mailboxId, 
ImmutableSet.copyOf(flags.getUserFlags())));
+        return Flux.merge(
+               checkDeletedOnAdd(mailboxId, message.createFlags(), 
message.getUid()),
+               updateFirstUnseenOnAdd(mailboxId, message.createFlags(), 
message.getUid()),
+               addRecentOnSave(mailboxId, message),
+               incrementUnseenOnSave(mailboxId, flags),
+               mailboxCounterDAO.incrementCount(mailboxId),
+               applicableFlagDAO.updateApplicableFlags(mailboxId, 
ImmutableSet.copyOf(flags.getUserFlags())))
+                .then();
     }
 
-    public CompletableFuture<Void> updateIndexOnFlagsUpdate(CassandraId 
mailboxId, UpdatedFlags updatedFlags) {
-        return 
CompletableFuture.allOf(manageUnseenMessageCountsOnFlagsUpdate(mailboxId, 
updatedFlags),
-                                       manageRecentOnFlagsUpdate(mailboxId, 
updatedFlags),
-                                       
updateFirstUnseenOnFlagsUpdate(mailboxId, updatedFlags),
-                                       
applicableFlagDAO.updateApplicableFlags(mailboxId, 
ImmutableSet.copyOf(updatedFlags.userFlagIterator())),
-                                       updateDeletedOnFlagsUpdate(mailboxId, 
updatedFlags));
+    public Mono<Void> updateIndexOnFlagsUpdate(CassandraId mailboxId, 
UpdatedFlags updatedFlags) {
+        return Flux.merge(
+               manageUnseenMessageCountsOnFlagsUpdate(mailboxId, updatedFlags),
+               manageRecentOnFlagsUpdate(mailboxId, updatedFlags),
+               updateFirstUnseenOnFlagsUpdate(mailboxId, updatedFlags),
+               applicableFlagDAO.updateApplicableFlags(mailboxId, 
ImmutableSet.copyOf(updatedFlags.userFlagIterator())),
+               updateDeletedOnFlagsUpdate(mailboxId, updatedFlags))
+                .then();
     }
 
-    private CompletableFuture<Void> updateDeletedOnFlagsUpdate(CassandraId 
mailboxId, UpdatedFlags updatedFlags) {
+    private Mono<Void> updateDeletedOnFlagsUpdate(CassandraId mailboxId, 
UpdatedFlags updatedFlags) {
         if (updatedFlags.isModifiedToSet(Flags.Flag.DELETED)) {
             return deletedMessageDAO.addDeleted(mailboxId, 
updatedFlags.getUid());
         } else if (updatedFlags.isModifiedToUnset(Flags.Flag.DELETED)) {
             return deletedMessageDAO.removeDeleted(mailboxId, 
updatedFlags.getUid());
         } else {
-            return CompletableFuture.completedFuture(null);
+            return Mono.empty();
         }
     }
 
-    private CompletableFuture<Void> decrementUnseenOnDelete(CassandraId 
mailboxId, Flags flags) {
+    private Mono<Void> decrementUnseenOnDelete(CassandraId mailboxId, Flags 
flags) {
         if (flags.contains(Flags.Flag.SEEN)) {
-            return CompletableFuture.completedFuture(null);
+            return Mono.empty();
         }
         return mailboxCounterDAO.decrementUnseen(mailboxId);
     }
 
-    private CompletableFuture<Void> incrementUnseenOnSave(CassandraId 
mailboxId, Flags flags) {
+    private Mono<Void> incrementUnseenOnSave(CassandraId mailboxId, Flags 
flags) {
         if (flags.contains(Flags.Flag.SEEN)) {
-            return CompletableFuture.completedFuture(null);
+            return Mono.empty();
         }
         return mailboxCounterDAO.incrementUnseen(mailboxId);
     }
 
-    private CompletableFuture<Void> addRecentOnSave(CassandraId mailboxId, 
MailboxMessage message) {
+    private Mono<Void> addRecentOnSave(CassandraId mailboxId, MailboxMessage 
message) {
         if (message.createFlags().contains(Flags.Flag.RECENT)) {
             return mailboxRecentDAO.addToRecent(mailboxId, message.getUid());
         }
-        return CompletableFuture.completedFuture(null);
+        return Mono.empty();
     }
 
-    private CompletableFuture<Void> 
manageUnseenMessageCountsOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags 
updatedFlags) {
+    private Mono<Void> manageUnseenMessageCountsOnFlagsUpdate(CassandraId 
mailboxId, UpdatedFlags updatedFlags) {
         if (updatedFlags.isModifiedToUnset(Flags.Flag.SEEN)) {
             return mailboxCounterDAO.incrementUnseen(mailboxId);
         }
         if (updatedFlags.isModifiedToSet(Flags.Flag.SEEN)) {
             return mailboxCounterDAO.decrementUnseen(mailboxId);
         }
-        return CompletableFuture.completedFuture(null);
+        return Mono.empty();
     }
 
-    private CompletableFuture<Void> manageRecentOnFlagsUpdate(CassandraId 
mailboxId, UpdatedFlags updatedFlags) {
+    private Mono<Void> manageRecentOnFlagsUpdate(CassandraId mailboxId, 
UpdatedFlags updatedFlags) {
         if (updatedFlags.isModifiedToUnset(Flags.Flag.RECENT)) {
             return mailboxRecentDAO.removeFromRecent(mailboxId, 
updatedFlags.getUid());
         }
         if (updatedFlags.isModifiedToSet(Flags.Flag.RECENT)) {
             return mailboxRecentDAO.addToRecent(mailboxId, 
updatedFlags.getUid());
         }
-        return CompletableFuture.completedFuture(null);
+        return Mono.empty();
     }
 
-    private CompletableFuture<Void> updateFirstUnseenOnAdd(CassandraId 
mailboxId, Flags flags, MessageUid uid) {
+    private Mono<Void> updateFirstUnseenOnAdd(CassandraId mailboxId, Flags 
flags, MessageUid uid) {
         if (flags.contains(Flags.Flag.SEEN)) {
-            return CompletableFuture.completedFuture(null);
+            return Mono.empty();
         }
         return firstUnseenDAO.addUnread(mailboxId, uid);
     }
 
-    private CompletableFuture<Void> checkDeletedOnAdd(CassandraId mailboxId, 
Flags flags, MessageUid uid) {
+    private Mono<Void> checkDeletedOnAdd(CassandraId mailboxId, Flags flags, 
MessageUid uid) {
         if (flags.contains(Flags.Flag.DELETED)) {
             return deletedMessageDAO.addDeleted(mailboxId, uid);
         }
 
-        return CompletableFuture.completedFuture(null);
+        return Mono.empty();
     }
 
-    private CompletableFuture<Void> updateFirstUnseenOnDelete(CassandraId 
mailboxId, Flags flags, MessageUid uid) {
+    private Mono<Void> updateFirstUnseenOnDelete(CassandraId mailboxId, Flags 
flags, MessageUid uid) {
         if (flags.contains(Flags.Flag.SEEN)) {
-            return CompletableFuture.completedFuture(null);
+            return Mono.empty();
         }
         return firstUnseenDAO.removeUnread(mailboxId, uid);
     }
 
-    private CompletableFuture<Void> updateFirstUnseenOnFlagsUpdate(CassandraId 
mailboxId, UpdatedFlags updatedFlags) {
+    private Mono<Void> updateFirstUnseenOnFlagsUpdate(CassandraId mailboxId, 
UpdatedFlags updatedFlags) {
         if (updatedFlags.isModifiedToUnset(Flags.Flag.SEEN)) {
             return firstUnseenDAO.addUnread(mailboxId, updatedFlags.getUid());
         }
         if (updatedFlags.isModifiedToSet(Flags.Flag.SEEN)) {
             return firstUnseenDAO.removeUnread(mailboxId, 
updatedFlags.getUid());
         }
-        return CompletableFuture.completedFuture(null);
+        return Mono.empty();
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/06ee1e43/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxCounterDAO.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxCounterDAO.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxCounterDAO.java
index f2e2fce..5f2dec0 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxCounterDAO.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxCounterDAO.java
@@ -26,9 +26,6 @@ import static 
com.datastax.driver.core.querybuilder.QueryBuilder.incr;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.update;
 
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-
 import javax.inject.Inject;
 
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
@@ -42,6 +39,7 @@ import com.datastax.driver.core.BoundStatement;
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.querybuilder.Assignment;
+import reactor.core.publisher.Mono;
 
 public class CassandraMailboxCounterDAO {
 
@@ -76,48 +74,48 @@ public class CassandraMailboxCounterDAO {
                 .where(eq(CassandraMailboxCountersTable.MAILBOX_ID, 
bindMarker(CassandraMailboxCountersTable.MAILBOX_ID))));
     }
 
-    public CompletableFuture<Optional<MailboxCounters>> 
retrieveMailboxCounters(Mailbox mailbox) throws MailboxException {
+    public Mono<MailboxCounters> retrieveMailboxCounters(Mailbox mailbox) 
throws MailboxException {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
 
-        return 
cassandraAsyncExecutor.executeSingleRow(bindWithMailbox(mailboxId, 
readStatement))
-            .thenApply(optional -> optional.map(row ->  
MailboxCounters.builder()
+        return 
cassandraAsyncExecutor.executeSingleRowReactor(bindWithMailbox(mailboxId, 
readStatement))
+            .map(row ->  MailboxCounters.builder()
                 .count(row.getLong(CassandraMailboxCountersTable.COUNT))
                 .unseen(row.getLong(CassandraMailboxCountersTable.UNSEEN))
-                .build()));
+                .build());
     }
 
-    public CompletableFuture<Optional<Long>> countMessagesInMailbox(Mailbox 
mailbox) throws MailboxException {
+    public Mono<Long> countMessagesInMailbox(Mailbox mailbox) throws 
MailboxException {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
 
         return countMessagesInMailbox(mailboxId);
     }
 
-    public CompletableFuture<Optional<Long>> 
countMessagesInMailbox(CassandraId cassandraId) {
-        return 
cassandraAsyncExecutor.executeSingleRow(bindWithMailbox(cassandraId, 
readStatement))
-            .thenApply(optional -> optional.map(row -> 
row.getLong(CassandraMailboxCountersTable.COUNT)));
+    public Mono<Long> countMessagesInMailbox(CassandraId cassandraId) {
+        return 
cassandraAsyncExecutor.executeSingleRowReactor(bindWithMailbox(cassandraId, 
readStatement))
+            .map(row -> row.getLong(CassandraMailboxCountersTable.COUNT));
     }
 
-    public CompletableFuture<Optional<Long>> 
countUnseenMessagesInMailbox(Mailbox mailbox) throws MailboxException {
+    public Mono<Long> countUnseenMessagesInMailbox(Mailbox mailbox) throws 
MailboxException {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
 
-        return 
cassandraAsyncExecutor.executeSingleRow(bindWithMailbox(mailboxId, 
readStatement))
-            .thenApply(optional -> optional.map(row -> 
row.getLong(CassandraMailboxCountersTable.UNSEEN)));
+        return 
cassandraAsyncExecutor.executeSingleRowReactor(bindWithMailbox(mailboxId, 
readStatement))
+            .map(row -> row.getLong(CassandraMailboxCountersTable.UNSEEN));
     }
 
-    public CompletableFuture<Void> decrementCount(CassandraId mailboxId) {
-        return cassandraAsyncExecutor.executeVoid(bindWithMailbox(mailboxId, 
decrementMessageCountStatement));
+    public Mono<Void> decrementCount(CassandraId mailboxId) {
+        return 
cassandraAsyncExecutor.executeVoidReactor(bindWithMailbox(mailboxId, 
decrementMessageCountStatement));
     }
 
-    public CompletableFuture<Void> incrementCount(CassandraId mailboxId) {
-        return cassandraAsyncExecutor.executeVoid(bindWithMailbox(mailboxId, 
incrementMessageCountStatement));
+    public Mono<Void> incrementCount(CassandraId mailboxId) {
+        return 
cassandraAsyncExecutor.executeVoidReactor(bindWithMailbox(mailboxId, 
incrementMessageCountStatement));
     }
 
-    public CompletableFuture<Void> decrementUnseen(CassandraId mailboxId) {
-        return cassandraAsyncExecutor.executeVoid(bindWithMailbox(mailboxId, 
decrementUnseenCountStatement));
+    public Mono<Void> decrementUnseen(CassandraId mailboxId) {
+        return 
cassandraAsyncExecutor.executeVoidReactor(bindWithMailbox(mailboxId, 
decrementUnseenCountStatement));
     }
 
-    public CompletableFuture<Void> incrementUnseen(CassandraId mailboxId) {
-        return cassandraAsyncExecutor.executeVoid(bindWithMailbox(mailboxId, 
incrementUnseenCountStatement));
+    public Mono<Void> incrementUnseen(CassandraId mailboxId) {
+        return 
cassandraAsyncExecutor.executeVoidReactor(bindWithMailbox(mailboxId, 
incrementUnseenCountStatement));
     }
 
     private BoundStatement bindWithMailbox(CassandraId mailboxId, 
PreparedStatement statement) {

http://git-wip-us.apache.org/repos/asf/james-project/blob/06ee1e43/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java
index 2018e0b..9a521d0 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java
@@ -25,9 +25,6 @@ import static 
com.datastax.driver.core.querybuilder.QueryBuilder.eq;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
 
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Stream;
-
 import javax.inject.Inject;
 
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
@@ -40,6 +37,8 @@ import com.datastax.driver.core.BoundStatement;
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Session;
 import com.google.common.annotations.VisibleForTesting;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 public class CassandraMailboxRecentsDAO {
 
@@ -85,11 +84,11 @@ public class CassandraMailboxRecentsDAO {
                 .value(CassandraMailboxRecentsTable.RECENT_MESSAGE_UID, 
bindMarker(CassandraMailboxRecentsTable.RECENT_MESSAGE_UID)));
     }
 
-    public CompletableFuture<Stream<MessageUid>> 
getRecentMessageUidsInMailbox(CassandraId mailboxId) {
-        return cassandraAsyncExecutor.execute(bindWithMailbox(mailboxId, 
readStatement))
-            .thenApply(cassandraUtils::convertToStream)
-            .thenApply(stream -> stream.map(row -> 
row.getLong(CassandraMailboxRecentsTable.RECENT_MESSAGE_UID)))
-            .thenApply(stream -> stream.map(MessageUid::of));
+    public Flux<MessageUid> getRecentMessageUidsInMailbox(CassandraId 
mailboxId) {
+        return 
cassandraAsyncExecutor.executeReactor(bindWithMailbox(mailboxId, readStatement))
+            .flatMapMany(cassandraUtils::convertToFlux)
+            .map(row -> 
row.getLong(CassandraMailboxRecentsTable.RECENT_MESSAGE_UID))
+            .map(MessageUid::of);
     }
 
     private BoundStatement bindWithMailbox(CassandraId mailboxId, 
PreparedStatement statement) {
@@ -97,14 +96,14 @@ public class CassandraMailboxRecentsDAO {
             .setUUID(CassandraMailboxRecentsTable.MAILBOX_ID, 
mailboxId.asUuid());
     }
 
-    public CompletableFuture<Void> removeFromRecent(CassandraId mailboxId, 
MessageUid messageUid) {
-        return cassandraAsyncExecutor.executeVoid(deleteStatement.bind()
+    public Mono<Void> removeFromRecent(CassandraId mailboxId, MessageUid 
messageUid) {
+        return cassandraAsyncExecutor.executeVoidReactor(deleteStatement.bind()
             .setUUID(CassandraMailboxRecentsTable.MAILBOX_ID, 
mailboxId.asUuid())
             .setLong(CassandraMailboxRecentsTable.RECENT_MESSAGE_UID, 
messageUid.asLong()));
     }
 
-    public CompletableFuture<Void> addToRecent(CassandraId mailboxId, 
MessageUid messageUid) {
-        return cassandraAsyncExecutor.executeVoid(addStatement.bind()
+    public Mono<Void> addToRecent(CassandraId mailboxId, MessageUid 
messageUid) {
+        return cassandraAsyncExecutor.executeVoidReactor(addStatement.bind()
             .setUUID(CassandraMailboxRecentsTable.MAILBOX_ID, 
mailboxId.asUuid())
             .setLong(CassandraMailboxRecentsTable.RECENT_MESSAGE_UID, 
messageUid.asLong()));
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/06ee1e43/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index 2d9a154..880ac16 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -57,6 +57,7 @@ import org.slf4j.LoggerFactory;
 
 import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.Multimap;
+import reactor.core.publisher.Mono;
 
 public class CassandraMessageIdMapper implements MessageIdMapper {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraMessageIdMapper.class);
@@ -150,7 +151,7 @@ public class CassandraMessageIdMapper implements 
MessageIdMapper {
             .thenCompose(voidValue -> CompletableFuture.allOf(
                 imapUidDAO.insert(composedMessageIdWithMetaData),
                 messageIdDAO.insert(composedMessageIdWithMetaData)))
-            .thenCompose(voidValue -> 
indexTableHandler.updateIndexOnAdd(mailboxMessage, mailboxId))
+            .thenCompose(voidValue -> 
indexTableHandler.updateIndexOnAdd(mailboxMessage, mailboxId).toFuture())
             .join();
     }
 
@@ -162,7 +163,7 @@ public class CassandraMessageIdMapper implements 
MessageIdMapper {
         CompletableFuture.allOf(
                         imapUidDAO.insert(composedMessageIdWithMetaData),
                         messageIdDAO.insert(composedMessageIdWithMetaData))
-                .thenCompose(voidValue -> 
indexTableHandler.updateIndexOnAdd(mailboxMessage, mailboxId))
+                .thenCompose(voidValue -> 
indexTableHandler.updateIndexOnAdd(mailboxMessage, mailboxId).toFuture())
                 .join();
     }
 
@@ -225,7 +226,7 @@ public class CassandraMessageIdMapper implements 
MessageIdMapper {
         return CompletableFuture.allOf(
             imapUidDAO.delete(messageId, mailboxId),
             messageIdDAO.delete(mailboxId, 
metaData.getComposedMessageId().getUid()))
-            .thenCompose(voidValue -> 
indexTableHandler.updateIndexOnDelete(metaData, mailboxId));
+            .thenCompose(voidValue -> 
indexTableHandler.updateIndexOnDelete(metaData, mailboxId).toFuture());
     }
 
     @Override
@@ -239,7 +240,7 @@ public class CassandraMessageIdMapper implements 
MessageIdMapper {
                 .isPresent())
             .flatMap(mailboxId -> flagsUpdateWithRetry(newState, updateMode, 
mailboxId, messageId))
             .map(this::updateCounts)
-            .map(CompletableFuture::join)
+            .map(Mono::block)
             .collect(Guavate.toImmutableMap(Pair::getLeft, Pair::getRight));
     }
 
@@ -264,10 +265,10 @@ public class CassandraMessageIdMapper implements 
MessageIdMapper {
         }
     }
 
-    private CompletableFuture<Pair<MailboxId, UpdatedFlags>> 
updateCounts(Pair<MailboxId, UpdatedFlags> pair) {
+    private Mono<Pair<MailboxId, UpdatedFlags>> updateCounts(Pair<MailboxId, 
UpdatedFlags> pair) {
         CassandraId cassandraId = (CassandraId) pair.getLeft();
         return indexTableHandler.updateIndexOnFlagsUpdate(cassandraId, 
pair.getRight())
-            .thenApply(voidValue -> pair);
+            .then(Mono.just(pair));
     }
 
     private Optional<Pair<Flags, ComposedMessageIdWithMetaData>> 
tryFlagsUpdate(Flags newState, MessageManager.FlagsUpdateMode updateMode, 
MailboxId mailboxId, MessageId messageId) {

http://git-wip-us.apache.org/repos/asf/james-project/blob/06ee1e43/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 0133f05..f2b468f 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -19,7 +19,6 @@
 
 package org.apache.james.mailbox.cassandra.mail;
 
-import java.util.Collection;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
@@ -118,22 +117,22 @@ public class CassandraMessageMapper implements 
MessageMapper {
     @Override
     public long countMessagesInMailbox(Mailbox mailbox) throws 
MailboxException {
         return mailboxCounterDAO.countMessagesInMailbox(mailbox)
-            .join()
-            .orElse(0L);
+                .defaultIfEmpty(0L)
+                .block();
     }
 
     @Override
     public long countUnseenMessagesInMailbox(Mailbox mailbox) throws 
MailboxException {
         return mailboxCounterDAO.countUnseenMessagesInMailbox(mailbox)
-            .join()
-            .orElse(0L);
+                .defaultIfEmpty(0L)
+                .block();
     }
 
     @Override
     public MailboxCounters getMailboxCounters(Mailbox mailbox) throws 
MailboxException {
         return mailboxCounterDAO.retrieveMailboxCounters(mailbox)
-            .join()
-            .orElse(INITIAL_COUNTERS);
+                .defaultIfEmpty(INITIAL_COUNTERS)
+                .block();
     }
 
     @Override
@@ -156,8 +155,7 @@ public class CassandraMessageMapper implements 
MessageMapper {
         return Flux.merge(
                 Mono.fromCompletionStage(imapUidDAO.delete(messageId, 
mailboxId)),
                 Mono.fromCompletionStage(messageIdDAO.delete(mailboxId, uid)))
-            
.concatWith(Mono.fromCompletionStage(indexTableHandler.updateIndexOnDelete(composedMessageIdWithMetaData,
 mailboxId)))
-            .last();
+            
.then(indexTableHandler.updateIndexOnDelete(composedMessageIdWithMetaData, 
mailboxId));
     }
 
     @Override
@@ -165,7 +163,8 @@ public class CassandraMessageMapper implements 
MessageMapper {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
         return retrieveMessages(retrieveMessageIds(mailboxId, messageRange), 
ftype, Limit.from(max))
             .map(SimpleMailboxMessage -> (MailboxMessage) SimpleMailboxMessage)
-            .sort(Comparator.comparing(MailboxMessage::getUid))
+            .collectSortedList(Comparator.comparing(MailboxMessage::getUid))
+            .flatMapMany(Flux::fromIterable)
             .toIterable()
             .iterator();
     }
@@ -189,37 +188,37 @@ public class CassandraMessageMapper implements 
MessageMapper {
     public List<MessageUid> findRecentMessageUidsInMailbox(Mailbox mailbox) 
throws MailboxException {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
         return mailboxRecentDAO.getRecentMessageUidsInMailbox(mailboxId)
-            .join()
-            .collect(Guavate.toImmutableList());
+            .collectList()
+            .block();
     }
 
     @Override
     public MessageUid findFirstUnseenMessageUid(Mailbox mailbox) throws 
MailboxException {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
         return firstUnseenDAO.retrieveFirstUnread(mailboxId)
-            .join()
-            .orElse(null);
+                .map(Optional::of)
+                .defaultIfEmpty(Optional.empty())
+                .block()
+                .orElse(null);
     }
 
     @Override
     public Map<MessageUid, MessageMetaData> 
expungeMarkedForDeletionInMailbox(Mailbox mailbox, MessageRange messageRange) 
throws MailboxException {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
 
-        return 
Mono.fromCompletionStage(deletedMessageDAO.retrieveDeletedMessage(mailboxId, 
messageRange))
-            .flatMapMany(Flux::fromStream)
-            .buffer(cassandraConfiguration.getExpungeChunkSize())
-            .flatMap(uidChunk -> expungeUidChunk(mailboxId, uidChunk))
+        return deletedMessageDAO.retrieveDeletedMessage(mailboxId, 
messageRange)
+            .limitRate(cassandraConfiguration.getExpungeChunkSize())
+            .flatMap(messageUid -> expungeOne(mailboxId, messageUid))
             .collect(Guavate.<SimpleMailboxMessage, MessageUid, 
MessageMetaData>toImmutableMap(MailboxMessage::getUid, 
MailboxMessage::metaData))
             .block();
     }
 
-    private Flux<SimpleMailboxMessage> expungeUidChunk(CassandraId mailboxId, 
Collection<MessageUid> uidChunk) {
-        return Flux.fromStream(uidChunk.stream())
-            .flatMap(uid -> retrieveComposedId(mailboxId, uid))
-            .doOnNext(this::deleteUsingMailboxId)
+    private Flux<SimpleMailboxMessage> expungeOne(CassandraId mailboxId, 
MessageUid messageUid) {
+        return retrieveComposedId(mailboxId, messageUid)
+            .flatMap(idWithMetadata -> 
deleteUsingMailboxId(idWithMetadata).thenReturn(idWithMetadata))
             .flatMap(idWithMetadata ->
                 
Mono.fromCompletionStage(messageDAO.retrieveMessages(ImmutableList.of(idWithMetadata),
 FetchType.Metadata, Limit.unlimited())))
-            .flatMap(Flux::fromStream)
+            .flatMapMany(Flux::fromStream)
             .filter(CassandraMessageDAO.MessageResult::isFound)
             .map(CassandraMessageDAO.MessageResult::message)
             .map(pair -> pair.getKey().toMailboxMessage(ImmutableList.of()));
@@ -257,7 +256,7 @@ public class CassandraMessageMapper implements 
MessageMapper {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
 
         save(mailbox, addUidAndModseq(message, mailboxId))
-            .map(voidValue -> indexTableHandler.updateIndexOnAdd(message, 
mailboxId))
+            .thenEmpty(indexTableHandler.updateIndexOnAdd(message, mailboxId))
             .block();
         return message.metaData();
     }
@@ -317,9 +316,10 @@ public class CassandraMessageMapper implements 
MessageMapper {
     private Mono<FlagsUpdateStageResult> runUpdateStage(CassandraId mailboxId, 
Flux<ComposedMessageIdWithMetaData> toBeUpdated, FlagsUpdateCalculator 
flagsUpdateCalculator) {
         Mono<Long> newModSeq = computeNewModSeq(mailboxId);
         return toBeUpdated
-            .buffer(cassandraConfiguration.getFlagsUpdateChunkSize())
-            .flatMap(uidChunk -> newModSeq.flatMap(modSeq -> 
performUpdatesForChunk(mailboxId, flagsUpdateCalculator, modSeq, uidChunk)))
-            .reduce(FlagsUpdateStageResult.none(), 
FlagsUpdateStageResult::merge);
+            .limitRate(cassandraConfiguration.getFlagsUpdateChunkSize())
+            .flatMapSequential(metadata -> newModSeq.flatMap(modSeq -> 
tryFlagsUpdate(flagsUpdateCalculator, modSeq, metadata)))
+            .reduce(FlagsUpdateStageResult.none(), 
FlagsUpdateStageResult::merge)
+            .flatMap(result -> updateIndexesForUpdatesResult(mailboxId, 
result));
     }
 
     private Mono<Long> computeNewModSeq(CassandraId mailboxId) {
@@ -327,23 +327,15 @@ public class CassandraMessageMapper implements 
MessageMapper {
             .map(value -> value.orElseThrow(() -> new RuntimeException("ModSeq 
generation failed for mailbox " + mailboxId.asUuid())));
     }
 
-    private Mono<FlagsUpdateStageResult> performUpdatesForChunk(CassandraId 
mailboxId, FlagsUpdateCalculator flagsUpdateCalculator, Long newModSeq, 
Collection<ComposedMessageIdWithMetaData> uidChunk) {
-        return Flux.fromIterable(uidChunk)
-            .flatMap(oldMetadata -> tryFlagsUpdate(flagsUpdateCalculator, 
newModSeq, oldMetadata))
-            .reduce(FlagsUpdateStageResult.none(), 
FlagsUpdateStageResult::merge)
-            .flatMap(result -> updateIndexesForUpdatesResult(mailboxId, 
result));
-    }
-
     private Mono<FlagsUpdateStageResult> 
updateIndexesForUpdatesResult(CassandraId mailboxId, FlagsUpdateStageResult 
result) {
         return Flux.fromIterable(result.getSucceeded())
             .flatMap(Throwing
-                .function((UpdatedFlags updatedFlags) -> 
Mono.fromCompletionStage(indexTableHandler.updateIndexOnFlagsUpdate(mailboxId, 
updatedFlags)))
-                .fallbackTo(failedindex -> {
-                    LOGGER.error("Could not update flag indexes for mailboxId 
{} UID {}. This will lead to inconsistencies across Cassandra tables", 
mailboxId, failedindex.getUid());
-                    return Mono.just(null);
+                .function((UpdatedFlags updatedFlags) -> 
indexTableHandler.updateIndexOnFlagsUpdate(mailboxId, updatedFlags))
+                .fallbackTo(failedIndex -> {
+                    LOGGER.error("Could not update flag indexes for mailboxId 
{} UID {}. This will lead to inconsistencies across Cassandra tables", 
mailboxId, failedIndex.getUid());
+                    return Mono.empty();
                 }))
-            .collectList()
-            .map(any -> result);
+            .then(Mono.just(result));
     }
 
     @Override
@@ -366,8 +358,8 @@ public class CassandraMessageMapper implements 
MessageMapper {
     public Flags getApplicableFlag(Mailbox mailbox) throws MailboxException {
         return ApplicableFlagBuilder.builder()
             .add(applicableFlagDAO.retrieveApplicableFlag((CassandraId) 
mailbox.getMailboxId())
-                .join()
-                .orElse(new Flags()))
+                .defaultIfEmpty(new Flags())
+                .block())
             .build();
     }
 
@@ -375,15 +367,15 @@ public class CassandraMessageMapper implements 
MessageMapper {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
 
         insertIds(addUidAndModseq(message, mailboxId), mailboxId)
-                .map(voidValue -> indexTableHandler.updateIndexOnAdd(message, 
mailboxId))
+                .thenEmpty(indexTableHandler.updateIndexOnAdd(message, 
mailboxId))
                 .block();
         return message.metaData();
     }
 
     private Mono<Void> save(Mailbox mailbox, MailboxMessage message) throws 
MailboxException {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
-        return messageDAO.save(message)
-            .flatMap(aVoid -> insertIds(message, mailboxId));
+        return Mono.fromFuture(messageDAO.save(message))
+            .thenEmpty(insertIds(message, mailboxId));
     }
 
     private Mono<Void> insertIds(MailboxMessage message, CassandraId 
mailboxId) {
@@ -395,7 +387,7 @@ public class CassandraMessageMapper implements 
MessageMapper {
         return Flux.merge(
             
Mono.fromCompletionStage(messageIdDAO.insert(composedMessageIdWithMetaData)),
             
Mono.fromCompletionStage(imapUidDAO.insert(composedMessageIdWithMetaData)))
-            .last();
+            .then();
     }
 
 
@@ -441,9 +433,10 @@ public class CassandraMessageMapper implements 
MessageMapper {
             .flatMap(success -> {
                 if (success) {
                     return 
Mono.fromCompletionStage(messageIdDAO.updateMetadata(newMetadata))
-                        .map(ignored -> true);
+                        .then(Mono.just(true));
                 } else {
                     return Mono.just(false);
-                }});
+                }
+            });
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/06ee1e43/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraApplicableFlagDAOTest.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraApplicableFlagDAOTest.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraApplicableFlagDAOTest.java
index e2e4f62..55f7b5c 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraApplicableFlagDAOTest.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraApplicableFlagDAOTest.java
@@ -52,59 +52,59 @@ class CassandraApplicableFlagDAOTest {
 
     @Test
     void updateApplicableFlagsShouldReturnEmptyByDefault() {
-        assertThat(testee.retrieveApplicableFlag(CASSANDRA_ID).join())
-            .isEmpty();
+        
assertThat(testee.retrieveApplicableFlag(CASSANDRA_ID).hasElement().block())
+            .isFalse();
     }
 
     @Test
     void updateApplicableFlagsShouldSupportEmptyUserFlags() {
-        testee.updateApplicableFlags(CASSANDRA_ID, ImmutableSet.of()).join();
+        testee.updateApplicableFlags(CASSANDRA_ID, ImmutableSet.of()).block();
 
-        assertThat(testee.retrieveApplicableFlag(CASSANDRA_ID).join())
-            .isEmpty();
+        
assertThat(testee.retrieveApplicableFlag(CASSANDRA_ID).hasElement().block())
+            .isFalse();
     }
 
     @Test
     void updateApplicableFlagsShouldUpdateUserFlag() {
-        testee.updateApplicableFlags(CASSANDRA_ID, 
ImmutableSet.of(USER_FLAG)).join();
+        testee.updateApplicableFlags(CASSANDRA_ID, 
ImmutableSet.of(USER_FLAG)).block();
 
-        assertThat(testee.retrieveApplicableFlag(CASSANDRA_ID).join())
-            .contains(new Flags(USER_FLAG));
+        assertThat(testee.retrieveApplicableFlag(CASSANDRA_ID).block())
+            .isEqualTo(new Flags(USER_FLAG));
     }
 
     @Test
     void updateApplicableFlagsShouldUnionUserFlags() {
-        testee.updateApplicableFlags(CASSANDRA_ID, 
ImmutableSet.of(USER_FLAG)).join();
-        testee.updateApplicableFlags(CASSANDRA_ID, 
ImmutableSet.of(USER_FLAG2)).join();
+        testee.updateApplicableFlags(CASSANDRA_ID, 
ImmutableSet.of(USER_FLAG)).block();
+        testee.updateApplicableFlags(CASSANDRA_ID, 
ImmutableSet.of(USER_FLAG2)).block();
 
-        assertThat(testee.retrieveApplicableFlag(CASSANDRA_ID).join())
-            .contains(FlagsBuilder.builder().add(USER_FLAG, 
USER_FLAG2).build());
+        assertThat(testee.retrieveApplicableFlag(CASSANDRA_ID).block())
+            .isEqualTo(FlagsBuilder.builder().add(USER_FLAG, 
USER_FLAG2).build());
     }
 
     @Test
     void updateApplicableFlagsShouldBeIdempotent() {
-        testee.updateApplicableFlags(CASSANDRA_ID, 
ImmutableSet.of(USER_FLAG)).join();
-        testee.updateApplicableFlags(CASSANDRA_ID, 
ImmutableSet.of(USER_FLAG)).join();
+        testee.updateApplicableFlags(CASSANDRA_ID, 
ImmutableSet.of(USER_FLAG)).block();
+        testee.updateApplicableFlags(CASSANDRA_ID, 
ImmutableSet.of(USER_FLAG)).block();
 
-        assertThat(testee.retrieveApplicableFlag(CASSANDRA_ID).join())
-            .contains(new Flags(USER_FLAG));
+        assertThat(testee.retrieveApplicableFlag(CASSANDRA_ID).block())
+            .isEqualTo(new Flags(USER_FLAG));
     }
 
     @Test
     void updateApplicableFlagsShouldSkipAlreadyStoredFlagsWhenAddingFlag() {
-        testee.updateApplicableFlags(CASSANDRA_ID, 
ImmutableSet.of(USER_FLAG)).join();
-        testee.updateApplicableFlags(CASSANDRA_ID, ImmutableSet.of(USER_FLAG, 
USER_FLAG2)).join();
+        testee.updateApplicableFlags(CASSANDRA_ID, 
ImmutableSet.of(USER_FLAG)).block();
+        testee.updateApplicableFlags(CASSANDRA_ID, ImmutableSet.of(USER_FLAG, 
USER_FLAG2)).block();
 
-        assertThat(testee.retrieveApplicableFlag(CASSANDRA_ID).join())
-            .contains(FlagsBuilder.builder().add(USER_FLAG, 
USER_FLAG2).build());
+        assertThat(testee.retrieveApplicableFlag(CASSANDRA_ID).block())
+            .isEqualTo(FlagsBuilder.builder().add(USER_FLAG, 
USER_FLAG2).build());
     }
 
     @Test
     void updateApplicableFlagsShouldUpdateMultiFlags() {
-        testee.updateApplicableFlags(CASSANDRA_ID, ImmutableSet.of(USER_FLAG, 
USER_FLAG2)).join();
+        testee.updateApplicableFlags(CASSANDRA_ID, ImmutableSet.of(USER_FLAG, 
USER_FLAG2)).block();
 
-        assertThat(testee.retrieveApplicableFlag(CASSANDRA_ID).join())
-            .contains(FlagsBuilder.builder().add(USER_FLAG, 
USER_FLAG2).build());
+        assertThat(testee.retrieveApplicableFlag(CASSANDRA_ID).block())
+            .isEqualTo(FlagsBuilder.builder().add(USER_FLAG, 
USER_FLAG2).build());
     }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/06ee1e43/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAOTest.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAOTest.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAOTest.java
index 906f9c7..6197506 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAOTest.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAOTest.java
@@ -34,8 +34,6 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
-import com.github.steveash.guavate.Guavate;
-
 class CassandraDeletedMessageDAOTest {
     private static final CassandraId MAILBOX_ID = 
CassandraId.of(UUID.fromString("110e8400-e29b-11d4-a716-446655440000"));
     private static final MessageUid UID_1 = MessageUid.of(1);
@@ -59,32 +57,32 @@ class CassandraDeletedMessageDAOTest {
     void retrieveDeletedMessageShouldReturnEmptyByDefault() {
         List<MessageUid> result = testee
             .retrieveDeletedMessage(MAILBOX_ID, MessageRange.all())
-            .join()
-            .collect(Guavate.toImmutableList());
+                .collectList()
+                .block();
 
         assertThat(result).isEmpty();
     }
 
     @Test
     void addDeletedMessageShouldThenBeReportedAsDeletedMessage() {
-        testee.addDeleted(MAILBOX_ID, UID_1).join();
-        testee.addDeleted(MAILBOX_ID, UID_2).join();
+        testee.addDeleted(MAILBOX_ID, UID_1).block();
+        testee.addDeleted(MAILBOX_ID, UID_2).block();
 
         List<MessageUid> result = testee.retrieveDeletedMessage(MAILBOX_ID, 
MessageRange.all())
-            .join()
-            .collect(Guavate.toImmutableList());
+                .collectList()
+                .block();
 
         assertThat(result).containsExactly(UID_1, UID_2);
     }
 
     @Test
     void addDeletedMessageShouldBeIdempotent() {
-        testee.addDeleted(MAILBOX_ID, UID_1).join();
-        testee.addDeleted(MAILBOX_ID, UID_1).join();
+        testee.addDeleted(MAILBOX_ID, UID_1).block();
+        testee.addDeleted(MAILBOX_ID, UID_1).block();
 
         List<MessageUid> result = testee.retrieveDeletedMessage(MAILBOX_ID, 
MessageRange.all())
-            .join()
-            .collect(Guavate.toImmutableList());
+            .collectList()
+            .block();
 
         assertThat(result).containsExactly(UID_1);
     }
@@ -92,52 +90,52 @@ class CassandraDeletedMessageDAOTest {
 
     @Test
     void removeUnreadShouldReturnEmptyWhenNoData() {
-        testee.removeDeleted(MAILBOX_ID, UID_1).join();
+        testee.removeDeleted(MAILBOX_ID, UID_1).block();
 
         List<MessageUid> result = testee
             .retrieveDeletedMessage(MAILBOX_ID, MessageRange.all())
-            .join()
-            .collect(Guavate.toImmutableList());
+            .collectList()
+            .block();
 
         assertThat(result).isEmpty();
     }
 
     @Test
     void removeDeletedMessageShouldNotAffectOtherMessage() {
-        testee.addDeleted(MAILBOX_ID, UID_2).join();
-        testee.addDeleted(MAILBOX_ID, UID_1).join();
+        testee.addDeleted(MAILBOX_ID, UID_2).block();
+        testee.addDeleted(MAILBOX_ID, UID_1).block();
 
-        testee.removeDeleted(MAILBOX_ID, UID_1).join();
+        testee.removeDeleted(MAILBOX_ID, UID_1).block();
 
         List<MessageUid> result = testee
             .retrieveDeletedMessage(MAILBOX_ID, MessageRange.all())
-            .join()
-            .collect(Guavate.toImmutableList());
+            .collectList()
+            .block();
 
         assertThat(result).containsExactly(UID_2);
     }
 
     @Test
     void removeDeletedShouldRemoveSpecifiedUID() {
-        testee.addDeleted(MAILBOX_ID, UID_2).join();
+        testee.addDeleted(MAILBOX_ID, UID_2).block();
 
-        testee.removeDeleted(MAILBOX_ID, UID_2).join();
+        testee.removeDeleted(MAILBOX_ID, UID_2).block();
 
         List<MessageUid> result = testee
             .retrieveDeletedMessage(MAILBOX_ID, MessageRange.all())
-            .join()
-            .collect(Guavate.toImmutableList());
+            .collectList()
+            .block();
 
         assertThat(result).isEmpty();
     }
 
     private void addMessageForRetrieveTest() {
-        testee.addDeleted(MAILBOX_ID, UID_1).join();
-        testee.addDeleted(MAILBOX_ID, UID_2).join();
-        testee.addDeleted(MAILBOX_ID, UID_3).join();
-        testee.addDeleted(MAILBOX_ID, UID_4).join();
-        testee.addDeleted(MAILBOX_ID, UID_7).join();
-        testee.addDeleted(MAILBOX_ID, UID_8).join();
+        testee.addDeleted(MAILBOX_ID, UID_1).block();
+        testee.addDeleted(MAILBOX_ID, UID_2).block();
+        testee.addDeleted(MAILBOX_ID, UID_3).block();
+        testee.addDeleted(MAILBOX_ID, UID_4).block();
+        testee.addDeleted(MAILBOX_ID, UID_7).block();
+        testee.addDeleted(MAILBOX_ID, UID_8).block();
     }
 
     @Test
@@ -146,8 +144,8 @@ class CassandraDeletedMessageDAOTest {
 
         List<MessageUid> result = testee
             .retrieveDeletedMessage(MAILBOX_ID, MessageRange.all())
-            .join()
-            .collect(Guavate.toImmutableList());
+            .collectList()
+            .block();
 
         assertThat(result).containsExactly(UID_1, UID_2, UID_3, UID_4, UID_7, 
UID_8);
     }
@@ -158,8 +156,8 @@ class CassandraDeletedMessageDAOTest {
 
         List<MessageUid> result = testee
             .retrieveDeletedMessage(MAILBOX_ID, MessageRange.one(UID_1))
-            .join()
-            .collect(Guavate.toImmutableList());
+            .collectList()
+            .block();
 
         assertThat(result).containsExactly(UID_1);
     }
@@ -170,8 +168,8 @@ class CassandraDeletedMessageDAOTest {
 
         List<MessageUid> result = testee
             .retrieveDeletedMessage(MAILBOX_ID, 
MessageRange.one(MessageUid.of(42)))
-            .join()
-            .collect(Guavate.toImmutableList());
+            .collectList()
+            .block();
 
         assertThat(result).isEmpty();
     }
@@ -182,8 +180,8 @@ class CassandraDeletedMessageDAOTest {
 
         List<MessageUid> result = testee
             .retrieveDeletedMessage(MAILBOX_ID, 
MessageRange.range(MessageUid.of(3), MessageUid.of(7)))
-            .join()
-            .collect(Guavate.toImmutableList());
+            .collectList()
+            .block();
 
         assertThat(result).containsExactly(UID_3, UID_4, UID_7);
     }
@@ -194,8 +192,8 @@ class CassandraDeletedMessageDAOTest {
 
         List<MessageUid> result = testee
             .retrieveDeletedMessage(MAILBOX_ID, 
MessageRange.range(MessageUid.of(5), MessageUid.of(6)))
-            .join()
-            .collect(Guavate.toImmutableList());
+            .collectList()
+            .block();
 
         assertThat(result).isEmpty();
     }
@@ -206,8 +204,8 @@ class CassandraDeletedMessageDAOTest {
 
         List<MessageUid> result = testee
             .retrieveDeletedMessage(MAILBOX_ID, 
MessageRange.from(MessageUid.of(9)))
-            .join()
-            .collect(Guavate.toImmutableList());
+            .collectList()
+            .block();
 
         assertThat(result).isEmpty();
     }
@@ -218,8 +216,8 @@ class CassandraDeletedMessageDAOTest {
 
         List<MessageUid> result = testee
             .retrieveDeletedMessage(MAILBOX_ID, 
MessageRange.from(MessageUid.of(4)))
-            .join()
-            .collect(Guavate.toImmutableList());
+            .collectList()
+            .block();
 
         assertThat(result).containsExactly(UID_4, UID_7, UID_8);
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/06ee1e43/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAOTest.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAOTest.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAOTest.java
index 4fae89a..882ed3c 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAOTest.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAOTest.java
@@ -47,85 +47,85 @@ class CassandraFirstUnseenDAOTest {
 
     @Test
     void retrieveFirstUnreadShouldReturnEmptyByDefault() {
-        assertThat(testee.retrieveFirstUnread(MAILBOX_ID).join().isPresent())
+        assertThat(testee.retrieveFirstUnread(MAILBOX_ID).hasElement().block())
             .isFalse();
     }
 
     @Test
     void addUnreadShouldThenBeReportedAsFirstUnseen() {
-        testee.addUnread(MAILBOX_ID, UID_1).join();
+        testee.addUnread(MAILBOX_ID, UID_1).block();
 
-        assertThat(testee.retrieveFirstUnread(MAILBOX_ID).join())
-            .contains(UID_1);
+        assertThat(testee.retrieveFirstUnread(MAILBOX_ID).block())
+            .isEqualByComparingTo(UID_1);
     }
 
     @Test
     void retrieveFirstUnreadShouldReturnLowestUnreadUid() {
-        testee.addUnread(MAILBOX_ID, UID_1).join();
+        testee.addUnread(MAILBOX_ID, UID_1).block();
 
-        testee.addUnread(MAILBOX_ID, UID_2).join();
+        testee.addUnread(MAILBOX_ID, UID_2).block();
 
-        assertThat(testee.retrieveFirstUnread(MAILBOX_ID).join())
-            .contains(UID_1);
+        assertThat(testee.retrieveFirstUnread(MAILBOX_ID).block())
+            .isEqualByComparingTo(UID_1);
     }
 
     @Test
     void retrieveFirstUnreadShouldBeOrderIndependent() {
-        testee.addUnread(MAILBOX_ID, UID_2).join();
+        testee.addUnread(MAILBOX_ID, UID_2).block();
 
-        testee.addUnread(MAILBOX_ID, UID_1).join();
+        testee.addUnread(MAILBOX_ID, UID_1).block();
 
-        assertThat(testee.retrieveFirstUnread(MAILBOX_ID).join())
-            .contains(UID_1);
+        assertThat(testee.retrieveFirstUnread(MAILBOX_ID).block())
+            .isEqualByComparingTo(UID_1);
     }
 
     @Test
     void addUnreadShouldBeIdempotent() {
-        testee.addUnread(MAILBOX_ID, UID_1).join();
+        testee.addUnread(MAILBOX_ID, UID_1).block();
 
-        testee.addUnread(MAILBOX_ID, UID_1).join();
+        testee.addUnread(MAILBOX_ID, UID_1).block();
 
-        assertThat(testee.retrieveFirstUnread(MAILBOX_ID).join())
-            .contains(UID_1);
+        assertThat(testee.retrieveFirstUnread(MAILBOX_ID).block())
+            .isEqualByComparingTo(UID_1);
     }
 
     @Test
     void removeUnreadShouldReturnWhenNoData() {
-        testee.removeUnread(MAILBOX_ID, UID_1).join();
+        testee.removeUnread(MAILBOX_ID, UID_1).block();
 
-        assertThat(testee.retrieveFirstUnread(MAILBOX_ID).join())
-            .isEmpty();
+        assertThat(testee.retrieveFirstUnread(MAILBOX_ID).hasElement().block())
+            .isFalse();
     }
 
     @Test
     void removeUnreadShouldRemoveOnlyUnread() {
-        testee.addUnread(MAILBOX_ID, UID_1).join();
+        testee.addUnread(MAILBOX_ID, UID_1).block();
 
-        testee.removeUnread(MAILBOX_ID, UID_1).join();
+        testee.removeUnread(MAILBOX_ID, UID_1).block();
 
-        assertThat(testee.retrieveFirstUnread(MAILBOX_ID).join())
-            .isEmpty();
+        assertThat(testee.retrieveFirstUnread(MAILBOX_ID).hasElement().block())
+            .isFalse();
     }
 
     @Test
     void removeUnreadShouldRemoveLastUnread() {
-        testee.addUnread(MAILBOX_ID, UID_1).join();
-        testee.addUnread(MAILBOX_ID, UID_2).join();
+        testee.addUnread(MAILBOX_ID, UID_1).block();
+        testee.addUnread(MAILBOX_ID, UID_2).block();
 
-        testee.removeUnread(MAILBOX_ID, UID_2).join();
+        testee.removeUnread(MAILBOX_ID, UID_2).block();
 
-        assertThat(testee.retrieveFirstUnread(MAILBOX_ID).join())
-            .contains(UID_1);
+        assertThat(testee.retrieveFirstUnread(MAILBOX_ID).block())
+            .isEqualByComparingTo(UID_1);
     }
 
     @Test
     void removeUnreadShouldHaveNoEffectWhenNotLast() {
-        testee.addUnread(MAILBOX_ID, UID_1).join();
-        testee.addUnread(MAILBOX_ID, UID_2).join();
+        testee.addUnread(MAILBOX_ID, UID_1).block();
+        testee.addUnread(MAILBOX_ID, UID_2).block();
 
-        testee.removeUnread(MAILBOX_ID, UID_1).join();
+        testee.removeUnread(MAILBOX_ID, UID_1).block();
 
-        assertThat(testee.retrieveFirstUnread(MAILBOX_ID).join())
-            .contains(UID_2);
+        assertThat(testee.retrieveFirstUnread(MAILBOX_ID).block())
+            .isEqualByComparingTo(UID_2);
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to