Repository: james-project
Updated Branches:
  refs/heads/master ab4171cfa -> 7a02725b4


JAMES-2630 propagate reactor objects following blobs-api changes


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/7a02725b
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/7a02725b
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/7a02725b

Branch: refs/heads/master
Commit: 7a02725b4ba94bc14f29ce94d7d727fd950d620e
Parents: ab4171c
Author: Matthieu Baechler <[email protected]>
Authored: Mon Jan 28 17:55:39 2019 +0100
Committer: Matthieu Baechler <[email protected]>
Committed: Wed Feb 6 10:00:31 2019 +0100

----------------------------------------------------------------------
 .../mail/CassandraAttachmentMapper.java         |  4 +-
 .../cassandra/mail/CassandraMessageDAO.java     | 22 ++++---
 .../cassandra/mail/CassandraMessageIdDAO.java   |  4 +-
 .../mail/CassandraMessageIdMapper.java          | 18 +++--
 .../mail/CassandraMessageIdToImapUidDAO.java    |  5 +-
 .../cassandra/mail/CassandraMessageMapper.java  |  6 +-
 .../mail/migration/AttachmentV2Migration.java   | 14 ++--
 .../cassandra/mail/CassandraMessageDAOTest.java | 27 ++++----
 .../mail/CassandraMessageIdDAOTest.java         | 69 ++++++++++----------
 .../CassandraMessageIdToImapUidDAOTest.java     | 38 +++++------
 .../AttachmentMessageIdCreationTest.java        |  6 +-
 .../cassandra/CassandraMailRepository.java      | 11 ++--
 .../CassandraMailRepositoryMailDAO.java         | 35 +++++-----
 .../CassandraMailRepositoryMailDaoAPI.java      |  4 +-
 .../CassandraMailRepositoryMailDaoV2.java       | 34 +++++-----
 .../MergingCassandraMailRepositoryMailDao.java  |  3 +-
 .../CassandraMailRepositoryMailDAOTest.java     | 20 +++---
 ...ilRepositoryWithFakeImplementationsTest.java | 12 ++--
 .../apache/james/queue/rabbitmq/Enqueuer.java   | 14 ++--
 19 files changed, 169 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/7a02725b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
index 8606b06..71afb7d 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
@@ -78,8 +78,8 @@ public class CassandraAttachmentMapper implements 
AttachmentMapper {
     }
 
     private Mono<Attachment> retrievePayload(DAOAttachment daoAttachment) {
-        return 
Mono.fromCompletionStage(blobStore.readBytes(daoAttachment.getBlobId()).toFuture()
-            .thenApply(daoAttachment::toAttachment));
+        return blobStore.readBytes(daoAttachment.getBlobId())
+            .map(daoAttachment::toAttachment);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/james-project/blob/7a02725b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
index ab06089..2a277d3 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
@@ -94,6 +94,7 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.primitives.Bytes;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.util.function.Tuple2;
 
 public class CassandraMessageDAO {
     public static final long DEFAULT_LONG_VALUE = 0L;
@@ -174,26 +175,27 @@ public class CassandraMessageDAO {
             .where(eq(MESSAGE_ID, bindMarker(MESSAGE_ID))));
     }
 
-    public CompletableFuture<Void> save(MailboxMessage message) throws 
MailboxException {
-        return saveContent(message).thenCompose(pair ->
-            cassandraAsyncExecutor.executeVoid(boundWriteStatement(message, 
pair)));
+    public Mono<Void> save(MailboxMessage message) throws MailboxException {
+        return saveContent(message)
+            .flatMap(pair -> 
cassandraAsyncExecutor.executeVoidReactor(boundWriteStatement(message, pair)))
+            .then();
     }
 
-    private CompletableFuture<Pair<BlobId, BlobId>> saveContent(MailboxMessage 
message) throws MailboxException {
+    private Mono<Tuple2<BlobId, BlobId>> saveContent(MailboxMessage message) 
throws MailboxException {
         try {
             byte[] headerContent = 
IOUtils.toByteArray(message.getHeaderContent());
             byte[] bodyContent = IOUtils.toByteArray(message.getBodyContent());
 
-            CompletableFuture<BlobId> bodyFuture = 
blobStore.save(bodyContent).toFuture();
-            CompletableFuture<BlobId> headerFuture = 
blobStore.save(headerContent).toFuture();
+            Mono<BlobId> bodyFuture = blobStore.save(bodyContent);
+            Mono<BlobId> headerFuture = blobStore.save(headerContent);
 
-            return headerFuture.thenCombine(bodyFuture, Pair::of);
+            return headerFuture.zipWith(bodyFuture);
         } catch (IOException e) {
             throw new MailboxException("Error saving mail content", e);
         }
     }
 
-    private BoundStatement boundWriteStatement(MailboxMessage message, 
Pair<BlobId, BlobId> pair) {
+    private BoundStatement boundWriteStatement(MailboxMessage message, 
Tuple2<BlobId, BlobId> pair) {
         CassandraMessageId messageId = (CassandraMessageId) 
message.getMessageId();
         return insert.bind()
             .setUUID(MESSAGE_ID, messageId.get())
@@ -201,8 +203,8 @@ public class CassandraMessageDAO {
             .setInt(BODY_START_OCTET, (int) (message.getHeaderOctets()))
             .setLong(FULL_CONTENT_OCTETS, message.getFullContentOctets())
             .setLong(BODY_OCTECTS, message.getBodyOctets())
-            .setString(BODY_CONTENT, pair.getRight().asString())
-            .setString(HEADER_CONTENT, pair.getLeft().asString())
+            .setString(BODY_CONTENT, pair.getT2().asString())
+            .setString(HEADER_CONTENT, pair.getT1().asString())
             .setLong(TEXTUAL_LINE_COUNT, 
Optional.ofNullable(message.getTextualLineCount()).orElse(DEFAULT_LONG_VALUE))
             .setList(PROPERTIES, buildPropertiesUdt(message))
             .setList(ATTACHMENTS, buildAttachmentUdt(message));

http://git-wip-us.apache.org/repos/asf/james-project/blob/7a02725b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
index fe93143..9f8b4ad 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
@@ -176,10 +176,10 @@ public class CassandraMessageIdDAO {
                 .setLong(IMAP_UID, uid.asLong()));
     }
 
-    public CompletableFuture<Void> insert(ComposedMessageIdWithMetaData 
composedMessageIdWithMetaData) {
+    public Mono<Void> insert(ComposedMessageIdWithMetaData 
composedMessageIdWithMetaData) {
         ComposedMessageId composedMessageId = 
composedMessageIdWithMetaData.getComposedMessageId();
         Flags flags = composedMessageIdWithMetaData.getFlags();
-        return cassandraAsyncExecutor.executeVoid(insert.bind()
+        return cassandraAsyncExecutor.executeVoidReactor(insert.bind()
                 .setUUID(MAILBOX_ID, ((CassandraId) 
composedMessageId.getMailboxId()).asUuid())
                 .setLong(IMAP_UID, composedMessageId.getUid().asLong())
                 .setUUID(MESSAGE_ID, ((CassandraMessageId) 
composedMessageId.getMessageId()).get())

http://git-wip-us.apache.org/repos/asf/james-project/blob/7a02725b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index 0a7c0bc..bfc3afb 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -23,14 +23,12 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 import java.util.stream.Stream;
 
 import javax.mail.Flags;
 
 import org.apache.commons.lang3.tuple.Pair;
-
 import 
org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.MessageManager;
@@ -140,11 +138,11 @@ public class CassandraMessageIdMapper implements 
MessageIdMapper {
         mailboxMapper.findMailboxById(mailboxId);
         ComposedMessageIdWithMetaData composedMessageIdWithMetaData = 
createMetadataFor(mailboxMessage);
         messageDAO.save(mailboxMessage)
-            .thenCompose(voidValue -> CompletableFuture.allOf(
+            .thenMany(Flux.merge(
                 imapUidDAO.insert(composedMessageIdWithMetaData),
                 messageIdDAO.insert(composedMessageIdWithMetaData)))
-            .thenCompose(voidValue -> 
indexTableHandler.updateIndexOnAdd(mailboxMessage, mailboxId).toFuture())
-            .join();
+            .thenEmpty(indexTableHandler.updateIndexOnAdd(mailboxMessage, 
mailboxId))
+            .block();
     }
 
     @Override
@@ -152,11 +150,11 @@ public class CassandraMessageIdMapper implements 
MessageIdMapper {
         CassandraId mailboxId = (CassandraId) mailboxMessage.getMailboxId();
         mailboxMapper.findMailboxById(mailboxId);
         ComposedMessageIdWithMetaData composedMessageIdWithMetaData = 
createMetadataFor(mailboxMessage);
-        CompletableFuture.allOf(
-                        imapUidDAO.insert(composedMessageIdWithMetaData),
-                        messageIdDAO.insert(composedMessageIdWithMetaData))
-                .thenCompose(voidValue -> 
indexTableHandler.updateIndexOnAdd(mailboxMessage, mailboxId).toFuture())
-                .join();
+        Flux.merge(
+                imapUidDAO.insert(composedMessageIdWithMetaData),
+                messageIdDAO.insert(composedMessageIdWithMetaData))
+            .thenEmpty(indexTableHandler.updateIndexOnAdd(mailboxMessage, 
mailboxId))
+            .block();
     }
 
     private ComposedMessageIdWithMetaData createMetadataFor(MailboxMessage 
mailboxMessage) {

http://git-wip-us.apache.org/repos/asf/james-project/blob/7a02725b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java
index be5686a..4018a8c 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java
@@ -41,7 +41,6 @@ import static 
org.apache.james.mailbox.cassandra.table.MessageIdToImapUid.MOD_SE
 import static 
org.apache.james.mailbox.cassandra.table.MessageIdToImapUid.TABLE_NAME;
 
 import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
 
 import javax.inject.Inject;
 import javax.mail.Flags;
@@ -155,10 +154,10 @@ public class CassandraMessageIdToImapUidDAO {
                 .setUUID(MAILBOX_ID, mailboxId.asUuid()));
     }
 
-    public CompletableFuture<Void> insert(ComposedMessageIdWithMetaData 
composedMessageIdWithMetaData) {
+    public Mono<Void> insert(ComposedMessageIdWithMetaData 
composedMessageIdWithMetaData) {
         ComposedMessageId composedMessageId = 
composedMessageIdWithMetaData.getComposedMessageId();
         Flags flags = composedMessageIdWithMetaData.getFlags();
-        return cassandraAsyncExecutor.executeVoid(insert.bind()
+        return cassandraAsyncExecutor.executeVoidReactor(insert.bind()
                 .setUUID(MESSAGE_ID, ((CassandraMessageId) 
composedMessageId.getMessageId()).get())
                 .setUUID(MAILBOX_ID, ((CassandraId) 
composedMessageId.getMailboxId()).asUuid())
                 .setLong(IMAP_UID, composedMessageId.getUid().asLong())

http://git-wip-us.apache.org/repos/asf/james-project/blob/7a02725b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index af3d7dc..aa1b869 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -368,7 +368,7 @@ public class CassandraMessageMapper implements 
MessageMapper {
 
     private Mono<Void> save(Mailbox mailbox, MailboxMessage message) throws 
MailboxException {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
-        return Mono.fromFuture(messageDAO.save(message))
+        return messageDAO.save(message)
             .thenEmpty(insertIds(message, mailboxId));
     }
 
@@ -379,8 +379,8 @@ public class CassandraMessageMapper implements 
MessageMapper {
                 .modSeq(message.getModSeq())
                 .build();
         return Flux.merge(
-            
Mono.fromCompletionStage(messageIdDAO.insert(composedMessageIdWithMetaData)),
-            
Mono.fromCompletionStage(imapUidDAO.insert(composedMessageIdWithMetaData)))
+                messageIdDAO.insert(composedMessageIdWithMetaData),
+                imapUidDAO.insert(composedMessageIdWithMetaData))
             .then();
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/7a02725b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java
index 6398342..273e92c 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java
@@ -30,6 +30,8 @@ import org.apache.james.task.Task;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import reactor.core.publisher.Mono;
+
 public class AttachmentV2Migration implements Migration {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(AttachmentV2Migration.class);
     private final CassandraAttachmentDAO attachmentDAOV1;
@@ -59,12 +61,12 @@ public class AttachmentV2Migration implements Migration {
 
     private Result migrateAttachment(Attachment attachment) {
         try {
-            blobStore.save(attachment.getBytes()).toFuture()
-                .thenApply(blobId -> CassandraAttachmentDAOV2.from(attachment, 
blobId))
-                .thenCompose(daoAttachement -> 
attachmentDAOV2.storeAttachment(daoAttachement).toFuture())
-                .thenCompose(any -> 
attachmentDAOV1.deleteAttachment(attachment.getAttachmentId()))
-                .join();
-            return Result.COMPLETED;
+            return blobStore.save(attachment.getBytes())
+                .map(blobId -> CassandraAttachmentDAOV2.from(attachment, 
blobId))
+                .flatMap(attachmentDAOV2::storeAttachment)
+                .then(Mono.defer(() -> 
Mono.fromFuture(attachmentDAOV1.deleteAttachment(attachment.getAttachmentId()))))
+                .thenReturn(Result.COMPLETED)
+                .block();
         } catch (Exception e) {
             LOGGER.error("Error while performing attachmentDAO V2 migration", 
e);
             return Result.PARTIAL;

http://git-wip-us.apache.org/repos/asf/james-project/blob/7a02725b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java
index 4eabd41..c6c953e 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java
@@ -61,7 +61,6 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.primitives.Bytes;
-
 import nl.jqno.equalsverifier.EqualsVerifier;
 import reactor.core.publisher.Flux;
 
@@ -105,7 +104,7 @@ class CassandraMessageDAOTest {
     void saveShouldSaveNullValueForTextualLineCountAsZero() throws Exception {
         message = createMessage(messageId, CONTENT, BODY_START, new 
PropertyBuilder(), NO_ATTACHMENT);
 
-        testee.save(message).join();
+        testee.save(message).block();
 
         MessageWithoutAttachment attachmentRepresentation =
             toMessage(testee.retrieveMessages(messageIds, 
MessageMapper.FetchType.Metadata, Limit.unlimited()));
@@ -121,7 +120,7 @@ class CassandraMessageDAOTest {
         propertyBuilder.setTextualLineCount(textualLineCount);
         message = createMessage(messageId, CONTENT, BODY_START, 
propertyBuilder, NO_ATTACHMENT);
 
-        testee.save(message).join();
+        testee.save(message).block();
 
         MessageWithoutAttachment attachmentRepresentation =
             toMessage(testee.retrieveMessages(messageIds, 
MessageMapper.FetchType.Metadata, Limit.unlimited()));
@@ -133,7 +132,7 @@ class CassandraMessageDAOTest {
     void saveShouldStoreMessageWithFullContent() throws Exception {
         message = createMessage(messageId, CONTENT, BODY_START, new 
PropertyBuilder(), NO_ATTACHMENT);
 
-        testee.save(message).join();
+        testee.save(message).block();
 
         MessageWithoutAttachment attachmentRepresentation =
             toMessage(testee.retrieveMessages(messageIds, 
MessageMapper.FetchType.Full, Limit.unlimited()));
@@ -146,7 +145,7 @@ class CassandraMessageDAOTest {
     void saveShouldStoreMessageWithBodyContent() throws Exception {
         message = createMessage(messageId, CONTENT, BODY_START, new 
PropertyBuilder(), NO_ATTACHMENT);
 
-        testee.save(message).join();
+        testee.save(message).block();
 
         MessageWithoutAttachment attachmentRepresentation =
             toMessage(testee.retrieveMessages(messageIds, 
MessageMapper.FetchType.Body, Limit.unlimited()));
@@ -162,7 +161,7 @@ class CassandraMessageDAOTest {
     void saveShouldStoreMessageWithHeaderContent() throws Exception {
         message = createMessage(messageId, CONTENT, BODY_START, new 
PropertyBuilder(), NO_ATTACHMENT);
 
-        testee.save(message).join();
+        testee.save(message).block();
 
         MessageWithoutAttachment attachmentRepresentation =
             toMessage(testee.retrieveMessages(messageIds, 
MessageMapper.FetchType.Headers, Limit.unlimited()));
@@ -211,7 +210,7 @@ class CassandraMessageDAOTest {
                 .build())
             .build();
         SimpleMailboxMessage message1 = createMessage(messageId, CONTENT, 
BODY_START, new PropertyBuilder(), ImmutableList.of(attachment));
-        testee.save(message1).join();
+        testee.save(message1).block();
         MessageIdAttachmentIds expected = new 
MessageIdAttachmentIds(messageId, 
ImmutableSet.of(attachment.getAttachmentId()));
         
         //When
@@ -237,7 +236,7 @@ class CassandraMessageDAOTest {
                 .build())
             .build();
         SimpleMailboxMessage message1 = createMessage(messageId, CONTENT, 
BODY_START, new PropertyBuilder(), ImmutableList.of(attachment1, attachment2));
-        testee.save(message1).join();
+        testee.save(message1).block();
         MessageIdAttachmentIds expected = new 
MessageIdAttachmentIds(messageId, 
ImmutableSet.of(attachment1.getAttachmentId(), attachment2.getAttachmentId()));
         
         //When
@@ -266,8 +265,8 @@ class CassandraMessageDAOTest {
             .build();
         SimpleMailboxMessage message1 = createMessage(messageId1, CONTENT, 
BODY_START, new PropertyBuilder(), ImmutableList.of(attachment1));
         SimpleMailboxMessage message2 = createMessage(messageId2, CONTENT, 
BODY_START, new PropertyBuilder(), ImmutableList.of(attachment2));
-        testee.save(message1).join();
-        testee.save(message2).join();
+        testee.save(message1).block();
+        testee.save(message2).block();
         MessageIdAttachmentIds expected1 = new 
MessageIdAttachmentIds(messageId1, 
ImmutableSet.of(attachment1.getAttachmentId()));
         MessageIdAttachmentIds expected2 = new 
MessageIdAttachmentIds(messageId2, 
ImmutableSet.of(attachment2.getAttachmentId()));
         
@@ -282,7 +281,7 @@ class CassandraMessageDAOTest {
     void 
retrieveAllMessageIdAttachmentIdsShouldReturnEmtpyWhenStoredWithoutAttachment() 
throws Exception {
         //Given
         SimpleMailboxMessage message1 = createMessage(messageId, CONTENT, 
BODY_START, new PropertyBuilder(), NO_ATTACHMENT);
-        testee.save(message1).join();
+        testee.save(message1).block();
         
         //When
         Stream<MessageIdAttachmentIds> actual = 
testee.retrieveAllMessageIdAttachmentIds().join();
@@ -312,9 +311,9 @@ class CassandraMessageDAOTest {
         SimpleMailboxMessage message1 = createMessage(messageId1, CONTENT, 
BODY_START, new PropertyBuilder(), ImmutableList.of(attachmentFor1));
         SimpleMailboxMessage message2 = createMessage(messageId2, CONTENT, 
BODY_START, new PropertyBuilder(), NO_ATTACHMENT);
         SimpleMailboxMessage message3 = createMessage(messageId3, CONTENT, 
BODY_START, new PropertyBuilder(), ImmutableList.of(attachmentFor3));
-        testee.save(message1).join();
-        testee.save(message2).join();
-        testee.save(message3).join();
+        testee.save(message1).block();
+        testee.save(message2).block();
+        testee.save(message3).block();
         
         //When
         Stream<MessageIdAttachmentIds> actual = 
testee.retrieveAllMessageIdAttachmentIds().join();

http://git-wip-us.apache.org/repos/asf/james-project/blob/7a02725b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java
index 67b9c90..1ebc42b 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java
@@ -22,7 +22,6 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 import java.util.List;
 import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 
 import javax.mail.Flags;
@@ -41,6 +40,8 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
+import reactor.core.publisher.Flux;
+
 class CassandraMessageIdDAOTest {
     @RegisterExtension
     static CassandraClusterExtension cassandraCluster = new 
CassandraClusterExtension(CassandraMessageModule.MODULE);
@@ -70,7 +71,7 @@ class CassandraMessageIdDAOTest {
                 .flags(new Flags())
                 .modSeq(1)
                 .build())
-            .join();
+            .block();
 
         testee.delete(mailboxId, messageUid).block();
 
@@ -85,18 +86,18 @@ class CassandraMessageIdDAOTest {
         MessageUid messageUid2 = MessageUid.of(2);
         CassandraMessageId messageId = messageIdFactory.generate();
         CassandraMessageId messageId2 = messageIdFactory.generate();
-        CompletableFuture.allOf(testee.insert(
-                ComposedMessageIdWithMetaData.builder()
-                    .composedMessageId(new ComposedMessageId(mailboxId, 
messageId, messageUid))
-                    .flags(new Flags())
-                    .modSeq(1)
-                    .build()),
-                testee.insert(ComposedMessageIdWithMetaData.builder()
-                    .composedMessageId(new ComposedMessageId(mailboxId, 
messageId2, messageUid2))
-                    .flags(new Flags())
-                    .modSeq(1)
-                    .build()))
-        .join();
+        Flux.merge(testee.insert(
+            ComposedMessageIdWithMetaData.builder()
+                .composedMessageId(new ComposedMessageId(mailboxId, messageId, 
messageUid))
+                .flags(new Flags())
+                .modSeq(1)
+                .build()),
+            testee.insert(ComposedMessageIdWithMetaData.builder()
+                .composedMessageId(new ComposedMessageId(mailboxId, 
messageId2, messageUid2))
+                .flags(new Flags())
+                .modSeq(1)
+                .build()))
+        .blockLast();
 
         testee.delete(mailboxId, messageUid).block();
 
@@ -117,7 +118,7 @@ class CassandraMessageIdDAOTest {
                 .flags(new Flags())
                 .modSeq(1)
                 .build();
-        testee.insert(composedMessageIdWithMetaData).join();
+        testee.insert(composedMessageIdWithMetaData).block();
 
         Optional<ComposedMessageIdWithMetaData> message = 
testee.retrieve(mailboxId, messageUid).join();
         assertThat(message.get()).isEqualTo(composedMessageIdWithMetaData);
@@ -135,7 +136,7 @@ class CassandraMessageIdDAOTest {
                 .flags(new Flags())
                 .modSeq(1)
                 .build())
-            .join();
+            .block();
 
         ComposedMessageIdWithMetaData expectedComposedMessageId = 
ComposedMessageIdWithMetaData.builder()
                 .composedMessageId(composedMessageId)
@@ -160,7 +161,7 @@ class CassandraMessageIdDAOTest {
                 .flags(new Flags())
                 .modSeq(1)
                 .build())
-            .join();
+            .block();
 
         ComposedMessageIdWithMetaData expectedComposedMessageId = 
ComposedMessageIdWithMetaData.builder()
                 .composedMessageId(composedMessageId)
@@ -185,7 +186,7 @@ class CassandraMessageIdDAOTest {
                 .flags(new Flags())
                 .modSeq(1)
                 .build())
-            .join();
+            .block();
 
         ComposedMessageIdWithMetaData expectedComposedMessageId = 
ComposedMessageIdWithMetaData.builder()
                 .composedMessageId(composedMessageId)
@@ -210,7 +211,7 @@ class CassandraMessageIdDAOTest {
                 .flags(new Flags())
                 .modSeq(1)
                 .build())
-            .join();
+            .block();
 
         ComposedMessageIdWithMetaData expectedComposedMessageId = 
ComposedMessageIdWithMetaData.builder()
                 .composedMessageId(composedMessageId)
@@ -235,7 +236,7 @@ class CassandraMessageIdDAOTest {
                 .flags(new Flags())
                 .modSeq(1)
                 .build())
-            .join();
+            .block();
 
         ComposedMessageIdWithMetaData expectedComposedMessageId = 
ComposedMessageIdWithMetaData.builder()
                 .composedMessageId(composedMessageId)
@@ -260,7 +261,7 @@ class CassandraMessageIdDAOTest {
                 .flags(new Flags())
                 .modSeq(1)
                 .build())
-            .join();
+            .block();
 
         ComposedMessageIdWithMetaData expectedComposedMessageId = 
ComposedMessageIdWithMetaData.builder()
                 .composedMessageId(composedMessageId)
@@ -285,7 +286,7 @@ class CassandraMessageIdDAOTest {
                 .flags(new Flags())
                 .modSeq(1)
                 .build())
-            .join();
+            .block();
 
         ComposedMessageIdWithMetaData expectedComposedMessageId = 
ComposedMessageIdWithMetaData.builder()
                 .composedMessageId(composedMessageId)
@@ -310,7 +311,7 @@ class CassandraMessageIdDAOTest {
                 .flags(new Flags())
                 .modSeq(1)
                 .build())
-            .join();
+            .block();
 
         ComposedMessageIdWithMetaData expectedComposedMessageId = 
ComposedMessageIdWithMetaData.builder()
                 .composedMessageId(composedMessageId)
@@ -335,7 +336,7 @@ class CassandraMessageIdDAOTest {
                 .flags(new Flags())
                 .modSeq(1)
                 .build())
-            .join();
+            .block();
 
         Flags flags = new Flags();
         flags.add("myCustomFlag");
@@ -360,7 +361,7 @@ class CassandraMessageIdDAOTest {
                 .flags(new Flags())
                 .modSeq(1)
                 .build();
-        testee.insert(composedMessageIdWithMetaData).join();
+        testee.insert(composedMessageIdWithMetaData).block();
 
         Optional<ComposedMessageIdWithMetaData> message = 
testee.retrieve(mailboxId, messageUid).join();
 
@@ -385,9 +386,9 @@ class CassandraMessageIdDAOTest {
                 .flags(new Flags())
                 .modSeq(1)
                 .build();
-        CompletableFuture.allOf(testee.insert(composedMessageIdWithMetaData),
+        Flux.merge(testee.insert(composedMessageIdWithMetaData),
                 testee.insert(composedMessageIdWithMetaData2))
-        .join();
+        .blockLast();
 
         List<ComposedMessageIdWithMetaData> messages = 
testee.retrieveMessages(mailboxId, MessageRange.all()).join()
                 .collect(Collectors.toList());
@@ -415,7 +416,7 @@ class CassandraMessageIdDAOTest {
                 .flags(new Flags())
                 .modSeq(1)
                 .build();
-        CompletableFuture.allOf(testee.insert(
+        Flux.merge(testee.insert(
                 ComposedMessageIdWithMetaData.builder()
                     .composedMessageId(new ComposedMessageId(mailboxId, 
messageId, messageUid))
                     .flags(new Flags())
@@ -423,7 +424,7 @@ class CassandraMessageIdDAOTest {
                     .build()),
                 testee.insert(composedMessageIdWithMetaData),
                 testee.insert(composedMessageIdWithMetaData2))
-        .join();
+        .blockLast();
 
         List<ComposedMessageIdWithMetaData> messages = 
testee.retrieveMessages(mailboxId, MessageRange.from(messageUid2)).join()
                 .collect(Collectors.toList());
@@ -448,7 +449,7 @@ class CassandraMessageIdDAOTest {
                 .flags(new Flags())
                 .modSeq(1)
                 .build())
-            .join();
+            .block();
 
         ComposedMessageIdWithMetaData composedMessageIdWithMetaData = 
ComposedMessageIdWithMetaData.builder()
                 .composedMessageId(new ComposedMessageId(mailboxId, 
messageId2, messageUid2))
@@ -461,14 +462,14 @@ class CassandraMessageIdDAOTest {
                 .flags(new Flags())
                 .modSeq(1)
                 .build();
-        CompletableFuture.allOf(testee.insert(composedMessageIdWithMetaData),
+        Flux.merge(testee.insert(composedMessageIdWithMetaData),
                 testee.insert(composedMessageIdWithMetaData2),
                 testee.insert(ComposedMessageIdWithMetaData.builder()
                     .composedMessageId(new ComposedMessageId(mailboxId, 
messageId4, messageUid4))
                     .flags(new Flags())
                     .modSeq(1)
                     .build()))
-        .join();
+        .blockLast();
 
         List<ComposedMessageIdWithMetaData> messages = 
testee.retrieveMessages(mailboxId, MessageRange.range(messageUid2, 
messageUid3)).join()
                 .collect(Collectors.toList());
@@ -491,7 +492,7 @@ class CassandraMessageIdDAOTest {
                 .flags(new Flags())
                 .modSeq(1)
                 .build();
-        CompletableFuture.allOf(testee.insert(
+        Flux.merge(testee.insert(
                 ComposedMessageIdWithMetaData.builder()
                     .composedMessageId(new ComposedMessageId(mailboxId, 
messageId, messageUid))
                     .flags(new Flags())
@@ -503,7 +504,7 @@ class CassandraMessageIdDAOTest {
                     .flags(new Flags())
                     .modSeq(1)
                     .build()))
-        .join();
+        .blockLast();
 
         List<ComposedMessageIdWithMetaData> messages = 
testee.retrieveMessages(mailboxId, MessageRange.one(messageUid2)).join()
                 .collect(Collectors.toList());

http://git-wip-us.apache.org/repos/asf/james-project/blob/7a02725b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAOTest.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAOTest.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAOTest.java
index f563e5f..01bfdc0 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAOTest.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAOTest.java
@@ -22,7 +22,6 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 import java.util.List;
 import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
 
 import javax.mail.Flags;
 import javax.mail.Flags.Flag;
@@ -42,6 +41,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
 import com.datastax.driver.core.utils.UUIDs;
+import reactor.core.publisher.Flux;
 
 class CassandraMessageIdToImapUidDAOTest {
     public static final CassandraModule MODULE = 
CassandraModule.aggregateModules(
@@ -77,7 +77,7 @@ class CassandraMessageIdToImapUidDAOTest {
                     .flags(new Flags())
                     .modSeq(1)
                     .build())
-                .join();
+                .block();
 
         testee.delete(messageId, mailboxId).block();
 
@@ -92,7 +92,7 @@ class CassandraMessageIdToImapUidDAOTest {
         CassandraId mailboxId2 = CassandraId.timeBased();
         MessageUid messageUid = MessageUid.of(1);
         MessageUid messageUid2 = MessageUid.of(2);
-        CompletableFuture.allOf(
+        Flux.merge(
             testee.insert(ComposedMessageIdWithMetaData.builder()
                     .composedMessageId(new ComposedMessageId(mailboxId, 
messageId, messageUid))
                     .flags(new Flags())
@@ -103,7 +103,7 @@ class CassandraMessageIdToImapUidDAOTest {
                     .flags(new Flags())
                     .modSeq(1)
                     .build()))
-        .join();
+        .blockLast();
 
         testee.delete(messageId, mailboxId).block();
 
@@ -127,7 +127,7 @@ class CassandraMessageIdToImapUidDAOTest {
                 .flags(new Flags())
                 .modSeq(1)
                 .build())
-            .join();
+            .block();
 
         ComposedMessageIdWithMetaData expectedComposedMessageId = 
ComposedMessageIdWithMetaData.builder()
                 .composedMessageId(new ComposedMessageId(mailboxId, messageId, 
messageUid))
@@ -149,7 +149,7 @@ class CassandraMessageIdToImapUidDAOTest {
                 .flags(new Flags())
                 .modSeq(1)
                 .build();
-        testee.insert(composedMessageIdWithFlags).join();
+        testee.insert(composedMessageIdWithFlags).block();
 
         Boolean result = testee.updateMetadata(composedMessageIdWithFlags, 
1).block();
 
@@ -167,7 +167,7 @@ class CassandraMessageIdToImapUidDAOTest {
                 .flags(new Flags())
                 .modSeq(1)
                 .build();
-        testee.insert(composedMessageIdWithFlags).join();
+        testee.insert(composedMessageIdWithFlags).block();
 
         Boolean result = testee.updateMetadata(composedMessageIdWithFlags, 
3).block();
 
@@ -186,7 +186,7 @@ class CassandraMessageIdToImapUidDAOTest {
                 .flags(new Flags())
                 .modSeq(1)
                 .build())
-            .join();
+            .block();
 
         ComposedMessageIdWithMetaData expectedComposedMessageId = 
ComposedMessageIdWithMetaData.builder()
                 .composedMessageId(composedMessageId)
@@ -211,7 +211,7 @@ class CassandraMessageIdToImapUidDAOTest {
                 .flags(new Flags())
                 .modSeq(1)
                 .build())
-            .join();
+            .block();
 
         ComposedMessageIdWithMetaData expectedComposedMessageId = 
ComposedMessageIdWithMetaData.builder()
                 .composedMessageId(composedMessageId)
@@ -236,7 +236,7 @@ class CassandraMessageIdToImapUidDAOTest {
                 .flags(new Flags())
                 .modSeq(1)
                 .build())
-            .join();
+            .block();
 
         ComposedMessageIdWithMetaData expectedComposedMessageId = 
ComposedMessageIdWithMetaData.builder()
                 .composedMessageId(composedMessageId)
@@ -261,7 +261,7 @@ class CassandraMessageIdToImapUidDAOTest {
                 .flags(new Flags())
                 .modSeq(1)
                 .build())
-            .join();
+            .block();
 
         ComposedMessageIdWithMetaData expectedComposedMessageId = 
ComposedMessageIdWithMetaData.builder()
                 .composedMessageId(composedMessageId)
@@ -286,7 +286,7 @@ class CassandraMessageIdToImapUidDAOTest {
                 .flags(new Flags())
                 .modSeq(1)
                 .build())
-            .join();
+            .block();
 
         ComposedMessageIdWithMetaData expectedComposedMessageId = 
ComposedMessageIdWithMetaData.builder()
                 .composedMessageId(composedMessageId)
@@ -311,7 +311,7 @@ class CassandraMessageIdToImapUidDAOTest {
                 .flags(new Flags())
                 .modSeq(1)
                 .build())
-            .join();
+            .block();
 
         ComposedMessageIdWithMetaData expectedComposedMessageId = 
ComposedMessageIdWithMetaData.builder()
                 .composedMessageId(composedMessageId)
@@ -336,7 +336,7 @@ class CassandraMessageIdToImapUidDAOTest {
                 .flags(new Flags())
                 .modSeq(1)
                 .build())
-            .join();
+            .block();
 
         ComposedMessageIdWithMetaData expectedComposedMessageId = 
ComposedMessageIdWithMetaData.builder()
                 .composedMessageId(composedMessageId)
@@ -362,7 +362,7 @@ class CassandraMessageIdToImapUidDAOTest {
                 .flags(new Flags())
                 .modSeq(1)
                 .build())
-            .join();
+            .block();
 
         ComposedMessageIdWithMetaData expectedComposedMessageId = 
ComposedMessageIdWithMetaData.builder()
                 .composedMessageId(composedMessageId)
@@ -387,7 +387,7 @@ class CassandraMessageIdToImapUidDAOTest {
                 .flags(new Flags())
                 .modSeq(1)
                 .build())
-            .join();
+            .block();
 
         Flags flags = new Flags();
         flags.add("myCustomFlag");
@@ -412,7 +412,7 @@ class CassandraMessageIdToImapUidDAOTest {
                 .flags(new Flags())
                 .modSeq(1)
                 .build())
-            .join();
+            .block();
 
         ComposedMessageIdWithMetaData expectedComposedMessageId = 
ComposedMessageIdWithMetaData.builder()
                 .composedMessageId(new ComposedMessageId(mailboxId, messageId, 
messageUid))
@@ -431,7 +431,7 @@ class CassandraMessageIdToImapUidDAOTest {
         CassandraId mailboxId2 = CassandraId.timeBased();
         MessageUid messageUid = MessageUid.of(1);
         MessageUid messageUid2 = MessageUid.of(2);
-        CompletableFuture.allOf(
+        Flux.merge(
                 testee.insert(ComposedMessageIdWithMetaData.builder()
                     .composedMessageId(new ComposedMessageId(mailboxId, 
messageId, messageUid))
                     .flags(new Flags())
@@ -442,7 +442,7 @@ class CassandraMessageIdToImapUidDAOTest {
                     .flags(new Flags())
                     .modSeq(1)
                     .build()))
-        .join();
+        .blockLast();
 
         ComposedMessageIdWithMetaData expectedComposedMessageId = 
ComposedMessageIdWithMetaData.builder()
                 .composedMessageId(new ComposedMessageId(mailboxId, messageId, 
messageUid))

http://git-wip-us.apache.org/repos/asf/james-project/blob/7a02725b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreationTest.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreationTest.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreationTest.java
index 84f33e8..499cf54 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreationTest.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreationTest.java
@@ -106,7 +106,7 @@ class AttachmentMessageIdCreationTest {
         List<MessageAttachment> noAttachment = ImmutableList.of();
         message = createMessage(messageId, noAttachment);
 
-        cassandraMessageDAO.save(message).join();
+        cassandraMessageDAO.save(message).block();
 
         assertThat(migration.run())
             .isEqualTo(Migration.Result.COMPLETED);
@@ -117,7 +117,7 @@ class AttachmentMessageIdCreationTest {
         MessageAttachment attachment = createAttachment();
         message = createMessage(messageId, ImmutableList.of(attachment));
 
-        cassandraMessageDAO.save(message).join();
+        cassandraMessageDAO.save(message).block();
 
         assertThat(migration.run())
             .isEqualTo(Migration.Result.COMPLETED);
@@ -128,7 +128,7 @@ class AttachmentMessageIdCreationTest {
         MessageAttachment attachment = createAttachment();
         message = createMessage(messageId, ImmutableList.of(attachment));
 
-        cassandraMessageDAO.save(message).join();
+        cassandraMessageDAO.save(message).block();
 
         migration.run();
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/7a02725b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
----------------------------------------------------------------------
diff --git 
a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
 
b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
index 893035a..8a0c3b3 100644
--- 
a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
+++ 
b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
@@ -34,7 +34,6 @@ import org.apache.james.mailrepository.api.MailRepositoryUrl;
 import org.apache.james.util.CompletableFutureUtil;
 import org.apache.mailet.Mail;
 
-import com.github.fge.lambdas.Throwing;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
@@ -60,16 +59,14 @@ public class CassandraMailRepository implements 
MailRepository {
     public MailKey store(Mail mail) throws MessagingException {
         MailKey mailKey = MailKey.forMail(mail);
 
-        Mono.fromFuture(mimeMessageStore.save(mail.getMessage())
-            .toFuture()
-            .thenCompose(Throwing.function(parts -> mailDAO.store(url, mail,
+        return mimeMessageStore.save(mail.getMessage())
+            .flatMap(parts -> mailDAO.store(url, mail,
                 parts.getHeaderBlobId(),
-                parts.getBodyBlobId()))))
+                parts.getBodyBlobId()))
             .then(keysDAO.store(url, mailKey))
             .flatMap(this::increaseSizeIfStored)
+            .thenReturn(mailKey)
             .block();
-
-        return mailKey;
     }
 
     private Mono<Void> increaseSizeIfStored(Boolean isStored) {

http://git-wip-us.apache.org/repos/asf/james-project/blob/7a02725b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAO.java
----------------------------------------------------------------------
diff --git 
a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAO.java
 
b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAO.java
index 6e884b4..61a4e2d 100644
--- 
a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAO.java
+++ 
b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAO.java
@@ -59,7 +59,6 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 import javax.inject.Inject;
-import javax.mail.MessagingException;
 import javax.mail.internet.AddressException;
 
 import org.apache.commons.lang3.tuple.Pair;
@@ -142,23 +141,23 @@ public class CassandraMailRepositoryMailDAO implements 
CassandraMailRepositoryMa
     }
 
     @Override
-    public CompletableFuture<Void> store(MailRepositoryUrl url, Mail mail, 
BlobId headerId, BlobId bodyId) throws MessagingException {
-        return executor.executeVoid(insertMail.bind()
-            .setString(REPOSITORY_NAME, url.asString())
-            .setString(MAIL_KEY, mail.getName())
-            .setString(HEADER_BLOB_ID, headerId.asString())
-            .setString(BODY_BLOB_ID, bodyId.asString())
-            .setString(STATE, mail.getState())
-            .setString(SENDER, mail.getMaybeSender().asString(null))
-            .setList(RECIPIENTS, asStringList(mail.getRecipients()))
-            .setString(ERROR_MESSAGE, mail.getErrorMessage())
-            .setString(REMOTE_ADDR, mail.getRemoteAddr())
-            .setString(REMOTE_HOST, mail.getRemoteHost())
-            .setLong(MESSAGE_SIZE, mail.getMessageSize())
-            .setTimestamp(LAST_UPDATED, mail.getLastUpdated())
-            .setMap(ATTRIBUTES, toRawAttributeMap(mail))
-            .setMap(PER_RECIPIENT_SPECIFIC_HEADERS, 
toHeaderMap(mail.getPerRecipientSpecificHeaders()))
-        );
+    public Mono<Void> store(MailRepositoryUrl url, Mail mail, BlobId headerId, 
BlobId bodyId) {
+        return Mono.fromCallable(() -> insertMail.bind()
+                .setString(REPOSITORY_NAME, url.asString())
+                .setString(MAIL_KEY, mail.getName())
+                .setString(HEADER_BLOB_ID, headerId.asString())
+                .setString(BODY_BLOB_ID, bodyId.asString())
+                .setString(STATE, mail.getState())
+                .setString(SENDER, mail.getMaybeSender().asString(null))
+                .setList(RECIPIENTS, asStringList(mail.getRecipients()))
+                .setString(ERROR_MESSAGE, mail.getErrorMessage())
+                .setString(REMOTE_ADDR, mail.getRemoteAddr())
+                .setString(REMOTE_HOST, mail.getRemoteHost())
+                .setLong(MESSAGE_SIZE, mail.getMessageSize())
+                .setTimestamp(LAST_UPDATED, mail.getLastUpdated())
+                .setMap(ATTRIBUTES, toRawAttributeMap(mail))
+                .setMap(PER_RECIPIENT_SPECIFIC_HEADERS, 
toHeaderMap(mail.getPerRecipientSpecificHeaders())))
+            .flatMap(executor::executeVoidReactor);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/james-project/blob/7a02725b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoAPI.java
----------------------------------------------------------------------
diff --git 
a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoAPI.java
 
b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoAPI.java
index bf49097..a0e3c73 100644
--- 
a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoAPI.java
+++ 
b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoAPI.java
@@ -23,8 +23,6 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
-import javax.mail.MessagingException;
-
 import org.apache.james.blob.api.BlobId;
 import org.apache.james.mailrepository.api.MailKey;
 import org.apache.james.mailrepository.api.MailRepositoryUrl;
@@ -34,7 +32,7 @@ import org.apache.mailet.Mail;
 import reactor.core.publisher.Mono;
 
 public interface CassandraMailRepositoryMailDaoAPI {
-    CompletableFuture<Void> store(MailRepositoryUrl url, Mail mail, BlobId 
headerId, BlobId bodyId) throws MessagingException;
+    Mono<Void> store(MailRepositoryUrl url, Mail mail, BlobId headerId, BlobId 
bodyId);
 
     Mono<Void> remove(MailRepositoryUrl url, MailKey key);
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/7a02725b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoV2.java
----------------------------------------------------------------------
diff --git 
a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoV2.java
 
b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoV2.java
index 5b4edfe..390410f 100644
--- 
a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoV2.java
+++ 
b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoV2.java
@@ -52,7 +52,6 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 import javax.inject.Inject;
-import javax.mail.MessagingException;
 import javax.mail.internet.AddressException;
 
 import org.apache.commons.lang3.tuple.Pair;
@@ -134,22 +133,23 @@ public class CassandraMailRepositoryMailDaoV2 implements 
CassandraMailRepository
                 .and(eq(MAIL_KEY, bindMarker(MAIL_KEY))));
     }
 
-    public CompletableFuture<Void> store(MailRepositoryUrl url, Mail mail, 
BlobId headerId, BlobId bodyId) throws MessagingException {
-        return executor.executeVoid(insertMail.bind()
-            .setString(REPOSITORY_NAME, url.asString())
-            .setString(MAIL_KEY, mail.getName())
-            .setString(HEADER_BLOB_ID, headerId.asString())
-            .setString(BODY_BLOB_ID, bodyId.asString())
-            .setString(STATE, mail.getState())
-            .setString(SENDER, mail.getMaybeSender().asString(null))
-            .setList(RECIPIENTS, asStringList(mail.getRecipients()))
-            .setString(ERROR_MESSAGE, mail.getErrorMessage())
-            .setString(REMOTE_ADDR, mail.getRemoteAddr())
-            .setString(REMOTE_HOST, mail.getRemoteHost())
-            .setTimestamp(LAST_UPDATED, mail.getLastUpdated())
-            .setMap(ATTRIBUTES, toRawAttributeMap(mail))
-            .setList(PER_RECIPIENT_SPECIFIC_HEADERS, 
toTupleList(mail.getPerRecipientSpecificHeaders()))
-        );
+    public Mono<Void> store(MailRepositoryUrl url, Mail mail, BlobId headerId, 
BlobId bodyId) {
+        return Mono.fromCallable(() ->
+            insertMail.bind()
+                .setString(REPOSITORY_NAME, url.asString())
+                .setString(MAIL_KEY, mail.getName())
+                .setString(HEADER_BLOB_ID, headerId.asString())
+                .setString(BODY_BLOB_ID, bodyId.asString())
+                .setString(STATE, mail.getState())
+                .setString(SENDER, mail.getMaybeSender().asString(null))
+                .setList(RECIPIENTS, asStringList(mail.getRecipients()))
+                .setString(ERROR_MESSAGE, mail.getErrorMessage())
+                .setString(REMOTE_ADDR, mail.getRemoteAddr())
+                .setString(REMOTE_HOST, mail.getRemoteHost())
+                .setTimestamp(LAST_UPDATED, mail.getLastUpdated())
+                .setMap(ATTRIBUTES, toRawAttributeMap(mail))
+                .setList(PER_RECIPIENT_SPECIFIC_HEADERS, 
toTupleList(mail.getPerRecipientSpecificHeaders())))
+            .flatMap(executor::executeVoidReactor);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/james-project/blob/7a02725b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/MergingCassandraMailRepositoryMailDao.java
----------------------------------------------------------------------
diff --git 
a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/MergingCassandraMailRepositoryMailDao.java
 
b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/MergingCassandraMailRepositoryMailDao.java
index f83766a..7c3eb64 100644
--- 
a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/MergingCassandraMailRepositoryMailDao.java
+++ 
b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/MergingCassandraMailRepositoryMailDao.java
@@ -23,7 +23,6 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 import javax.inject.Inject;
-import javax.mail.MessagingException;
 
 import org.apache.james.blob.api.BlobId;
 import org.apache.james.mailrepository.api.MailKey;
@@ -48,7 +47,7 @@ public class MergingCassandraMailRepositoryMailDao implements 
CassandraMailRepos
     }
 
     @Override
-    public CompletableFuture<Void> store(MailRepositoryUrl url, Mail mail, 
BlobId headerId, BlobId bodyId) throws MessagingException {
+    public Mono<Void> store(MailRepositoryUrl url, Mail mail, BlobId headerId, 
BlobId bodyId) {
         return v2.store(url, mail, headerId, bodyId);
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/7a02725b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java
----------------------------------------------------------------------
diff --git 
a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java
 
b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java
index f597eff..7578e47 100644
--- 
a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java
+++ 
b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java
@@ -78,7 +78,7 @@ class CassandraMailRepositoryMailDAOTest {
                     .build(),
                 blobIdHeader,
                 blobIdBody)
-                .join();
+                .block();
 
             CassandraMailRepositoryMailDAO.MailDTO mailDTO = testee.read(URL, 
KEY_1).join().get();
 
@@ -102,7 +102,7 @@ class CassandraMailRepositoryMailDAOTest {
                     .build(),
                 blobIdHeader,
                 blobIdBody)
-                .join();
+                .block();
 
             testee.remove(URL, KEY_1).block();
 
@@ -161,7 +161,7 @@ class CassandraMailRepositoryMailDAOTest {
                     .build(),
                 blobIdHeader,
                 blobIdBody)
-                .join();
+                .block();
 
             CassandraMailRepositoryMailDAO.MailDTO mailDTO = testee.read(URL, 
KEY_1).join().get();
 
@@ -231,7 +231,7 @@ class CassandraMailRepositoryMailDAOTest {
                     .build(),
                 blobIdHeader,
                 blobIdBody)
-                .join();
+                .block();
 
             CassandraMailRepositoryMailDAO.MailDTO mailDTO = testee.read(URL, 
KEY_1).join().get();
 
@@ -286,7 +286,7 @@ class CassandraMailRepositoryMailDAOTest {
                     .build(),
                 blobIdHeader,
                 blobIdBody)
-                .join();
+                .block();
 
             CassandraMailRepositoryMailDaoAPI.MailDTO actual = 
testee.read(URL, KEY_1).join().get();
             Mail partialMail = actual.getMailBuilder().build();
@@ -308,7 +308,7 @@ class CassandraMailRepositoryMailDAOTest {
                     .build(),
                 blobIdHeader,
                 blobIdBody)
-                .join();
+                .block();
 
             CassandraMailRepositoryMailDaoAPI.MailDTO actual = 
testee.read(URL, KEY_1).join().get();
             Mail partialMail = actual.getMailBuilder().build();
@@ -332,7 +332,7 @@ class CassandraMailRepositoryMailDAOTest {
                     .build(),
                 blobIdHeader1,
                 blobIdBody1)
-                .join();
+                .block();
 
             v2.store(URL,
                 FakeMail.builder()
@@ -340,7 +340,7 @@ class CassandraMailRepositoryMailDAOTest {
                     .build(),
                 blobIdHeader2,
                 blobIdBody2)
-                .join();
+                .block();
 
             CassandraMailRepositoryMailDaoAPI.MailDTO actual = 
testee.read(URL, KEY_1).join().get();
             Mail partialMail = actual.getMailBuilder().build();
@@ -364,7 +364,7 @@ class CassandraMailRepositoryMailDAOTest {
                     .build(),
                 blobIdHeader1,
                 blobIdBody1)
-                .join();
+                .block();
 
             v2.store(URL,
                 FakeMail.builder()
@@ -372,7 +372,7 @@ class CassandraMailRepositoryMailDAOTest {
                     .build(),
                 blobIdHeader2,
                 blobIdBody2)
-                .join();
+                .block();
 
             testee.remove(URL, KEY_1).block();
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/7a02725b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java
----------------------------------------------------------------------
diff --git 
a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java
 
b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java
index 683919a..a2d59b5 100644
--- 
a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java
+++ 
b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java
@@ -119,7 +119,7 @@ class CassandraMailRepositoryWithFakeImplementationsTest {
 
             assertThatThrownBy(() -> cassandraMailRepository.store(mail))
                     .isInstanceOf(RuntimeException.class)
-                    .hasMessage("java.lang.RuntimeException: Expected failure 
while saving");
+                    .hasMessage("Expected failure while saving");
 
             assertThat(keysDAO.list(URL).collectList().block()).isEmpty();
         }
@@ -147,10 +147,8 @@ class CassandraMailRepositoryWithFakeImplementationsTest {
             }
 
             @Override
-            public CompletableFuture<Void> store(MailRepositoryUrl url, Mail 
mail, BlobId headerId, BlobId bodyId) {
-                return CompletableFuture.supplyAsync(() -> {
-                    throw new RuntimeException("Expected failure while storing 
mail parts");
-                });
+            public Mono<Void> store(MailRepositoryUrl url, Mail mail, BlobId 
headerId, BlobId bodyId) {
+                return Mono.error(new RuntimeException("Expected failure while 
storing mail parts"));
             }
 
             @Override
@@ -183,7 +181,7 @@ class CassandraMailRepositoryWithFakeImplementationsTest {
 
             assertThatThrownBy(() -> cassandraMailRepository.store(mail))
                     .isInstanceOf(RuntimeException.class)
-                    .hasMessage("java.lang.RuntimeException: Expected failure 
while storing mail parts");
+                    .hasMessage("Expected failure while storing mail parts");
 
             assertThat(keysDAO.list(URL).collectList().block()).isEmpty();
         }
@@ -202,7 +200,7 @@ class CassandraMailRepositoryWithFakeImplementationsTest {
 
             assertThatThrownBy(() -> cassandraMailRepository.store(mail))
                     .isInstanceOf(RuntimeException.class)
-                    .hasMessage("java.lang.RuntimeException: Expected failure 
while storing mail parts");
+                    .hasMessage("Expected failure while storing mail parts");
 
             ResultSet resultSet = cassandra.getConf().execute(select()
                     .from(BlobTable.TABLE_NAME));

http://git-wip-us.apache.org/repos/asf/james-project/blob/7a02725b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
index 349c878..3a29461 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
@@ -22,7 +22,6 @@ package org.apache.james.queue.rabbitmq;
 import static org.apache.james.queue.api.MailQueue.ENQUEUED_METRIC_NAME_PREFIX;
 
 import java.time.Clock;
-import java.util.concurrent.CompletableFuture;
 
 import javax.mail.MessagingException;
 import javax.mail.internet.MimeMessage;
@@ -37,6 +36,7 @@ import org.apache.mailet.Mail;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.github.fge.lambdas.Throwing;
+import reactor.core.publisher.Mono;
 
 class Enqueuer {
     private final MailQueueName name;
@@ -61,15 +61,15 @@ class Enqueuer {
 
     void enQueue(Mail mail) throws MailQueue.MailQueueException {
         saveMail(mail)
-            .thenApply(Throwing.<MimeMessagePartsId, 
EnqueuedItem>function(partsId -> publishReferenceToRabbit(mail, 
partsId)).sneakyThrow())
-            .thenCompose(mailQueueView::storeMail)
-            .thenRun(enqueueMetric::increment)
-            .join();
+            .map(Throwing.<MimeMessagePartsId, EnqueuedItem>function(partsId 
-> publishReferenceToRabbit(mail, partsId)).sneakyThrow())
+            .map(mailQueueView::storeMail)
+            .thenEmpty(Mono.fromRunnable(enqueueMetric::increment))
+            .block();
     }
 
-    private CompletableFuture<MimeMessagePartsId> saveMail(Mail mail) throws 
MailQueue.MailQueueException {
+    private Mono<MimeMessagePartsId> saveMail(Mail mail) throws 
MailQueue.MailQueueException {
         try {
-            return mimeMessageStore.save(mail.getMessage()).toFuture();
+            return mimeMessageStore.save(mail.getMessage());
         } catch (MessagingException e) {
             throw new MailQueue.MailQueueException("Error while saving blob", 
e);
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to