Repository: james-project Updated Branches: refs/heads/master ab4171cfa -> 7a02725b4
JAMES-2630 propagate reactor objects following blobs-api changes Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/7a02725b Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/7a02725b Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/7a02725b Branch: refs/heads/master Commit: 7a02725b4ba94bc14f29ce94d7d727fd950d620e Parents: ab4171c Author: Matthieu Baechler <[email protected]> Authored: Mon Jan 28 17:55:39 2019 +0100 Committer: Matthieu Baechler <[email protected]> Committed: Wed Feb 6 10:00:31 2019 +0100 ---------------------------------------------------------------------- .../mail/CassandraAttachmentMapper.java | 4 +- .../cassandra/mail/CassandraMessageDAO.java | 22 ++++--- .../cassandra/mail/CassandraMessageIdDAO.java | 4 +- .../mail/CassandraMessageIdMapper.java | 18 +++-- .../mail/CassandraMessageIdToImapUidDAO.java | 5 +- .../cassandra/mail/CassandraMessageMapper.java | 6 +- .../mail/migration/AttachmentV2Migration.java | 14 ++-- .../cassandra/mail/CassandraMessageDAOTest.java | 27 ++++---- .../mail/CassandraMessageIdDAOTest.java | 69 ++++++++++---------- .../CassandraMessageIdToImapUidDAOTest.java | 38 +++++------ .../AttachmentMessageIdCreationTest.java | 6 +- .../cassandra/CassandraMailRepository.java | 11 ++-- .../CassandraMailRepositoryMailDAO.java | 35 +++++----- .../CassandraMailRepositoryMailDaoAPI.java | 4 +- .../CassandraMailRepositoryMailDaoV2.java | 34 +++++----- .../MergingCassandraMailRepositoryMailDao.java | 3 +- .../CassandraMailRepositoryMailDAOTest.java | 20 +++--- ...ilRepositoryWithFakeImplementationsTest.java | 12 ++-- .../apache/james/queue/rabbitmq/Enqueuer.java | 14 ++-- 19 files changed, 169 insertions(+), 177 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/7a02725b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java index 8606b06..71afb7d 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java @@ -78,8 +78,8 @@ public class CassandraAttachmentMapper implements AttachmentMapper { } private Mono<Attachment> retrievePayload(DAOAttachment daoAttachment) { - return Mono.fromCompletionStage(blobStore.readBytes(daoAttachment.getBlobId()).toFuture() - .thenApply(daoAttachment::toAttachment)); + return blobStore.readBytes(daoAttachment.getBlobId()) + .map(daoAttachment::toAttachment); } @Override http://git-wip-us.apache.org/repos/asf/james-project/blob/7a02725b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java index ab06089..2a277d3 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java @@ -94,6 +94,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.primitives.Bytes; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; public class CassandraMessageDAO { public static final long DEFAULT_LONG_VALUE = 0L; @@ -174,26 +175,27 @@ public class CassandraMessageDAO { .where(eq(MESSAGE_ID, bindMarker(MESSAGE_ID)))); } - public CompletableFuture<Void> save(MailboxMessage message) throws MailboxException { - return saveContent(message).thenCompose(pair -> - cassandraAsyncExecutor.executeVoid(boundWriteStatement(message, pair))); + public Mono<Void> save(MailboxMessage message) throws MailboxException { + return saveContent(message) + .flatMap(pair -> cassandraAsyncExecutor.executeVoidReactor(boundWriteStatement(message, pair))) + .then(); } - private CompletableFuture<Pair<BlobId, BlobId>> saveContent(MailboxMessage message) throws MailboxException { + private Mono<Tuple2<BlobId, BlobId>> saveContent(MailboxMessage message) throws MailboxException { try { byte[] headerContent = IOUtils.toByteArray(message.getHeaderContent()); byte[] bodyContent = IOUtils.toByteArray(message.getBodyContent()); - CompletableFuture<BlobId> bodyFuture = blobStore.save(bodyContent).toFuture(); - CompletableFuture<BlobId> headerFuture = blobStore.save(headerContent).toFuture(); + Mono<BlobId> bodyFuture = blobStore.save(bodyContent); + Mono<BlobId> headerFuture = blobStore.save(headerContent); - return headerFuture.thenCombine(bodyFuture, Pair::of); + return headerFuture.zipWith(bodyFuture); } catch (IOException e) { throw new MailboxException("Error saving mail content", e); } } - private BoundStatement boundWriteStatement(MailboxMessage message, Pair<BlobId, BlobId> pair) { + private BoundStatement boundWriteStatement(MailboxMessage message, Tuple2<BlobId, BlobId> pair) { CassandraMessageId messageId = (CassandraMessageId) message.getMessageId(); return insert.bind() .setUUID(MESSAGE_ID, messageId.get()) @@ -201,8 +203,8 @@ public class CassandraMessageDAO { .setInt(BODY_START_OCTET, (int) (message.getHeaderOctets())) .setLong(FULL_CONTENT_OCTETS, message.getFullContentOctets()) .setLong(BODY_OCTECTS, message.getBodyOctets()) - .setString(BODY_CONTENT, pair.getRight().asString()) - .setString(HEADER_CONTENT, pair.getLeft().asString()) + .setString(BODY_CONTENT, pair.getT2().asString()) + .setString(HEADER_CONTENT, pair.getT1().asString()) .setLong(TEXTUAL_LINE_COUNT, Optional.ofNullable(message.getTextualLineCount()).orElse(DEFAULT_LONG_VALUE)) .setList(PROPERTIES, buildPropertiesUdt(message)) .setList(ATTACHMENTS, buildAttachmentUdt(message)); http://git-wip-us.apache.org/repos/asf/james-project/blob/7a02725b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java index fe93143..9f8b4ad 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java @@ -176,10 +176,10 @@ public class CassandraMessageIdDAO { .setLong(IMAP_UID, uid.asLong())); } - public CompletableFuture<Void> insert(ComposedMessageIdWithMetaData composedMessageIdWithMetaData) { + public Mono<Void> insert(ComposedMessageIdWithMetaData composedMessageIdWithMetaData) { ComposedMessageId composedMessageId = composedMessageIdWithMetaData.getComposedMessageId(); Flags flags = composedMessageIdWithMetaData.getFlags(); - return cassandraAsyncExecutor.executeVoid(insert.bind() + return cassandraAsyncExecutor.executeVoidReactor(insert.bind() .setUUID(MAILBOX_ID, ((CassandraId) composedMessageId.getMailboxId()).asUuid()) .setLong(IMAP_UID, composedMessageId.getUid().asLong()) .setUUID(MESSAGE_ID, ((CassandraMessageId) composedMessageId.getMessageId()).get()) http://git-wip-us.apache.org/repos/asf/james-project/blob/7a02725b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java index 0a7c0bc..bfc3afb 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java @@ -23,14 +23,12 @@ import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.CompletableFuture; import java.util.function.Function; import java.util.stream.Stream; import javax.mail.Flags; import org.apache.commons.lang3.tuple.Pair; - import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration; import org.apache.james.mailbox.MailboxSession; import org.apache.james.mailbox.MessageManager; @@ -140,11 +138,11 @@ public class CassandraMessageIdMapper implements MessageIdMapper { mailboxMapper.findMailboxById(mailboxId); ComposedMessageIdWithMetaData composedMessageIdWithMetaData = createMetadataFor(mailboxMessage); messageDAO.save(mailboxMessage) - .thenCompose(voidValue -> CompletableFuture.allOf( + .thenMany(Flux.merge( imapUidDAO.insert(composedMessageIdWithMetaData), messageIdDAO.insert(composedMessageIdWithMetaData))) - .thenCompose(voidValue -> indexTableHandler.updateIndexOnAdd(mailboxMessage, mailboxId).toFuture()) - .join(); + .thenEmpty(indexTableHandler.updateIndexOnAdd(mailboxMessage, mailboxId)) + .block(); } @Override @@ -152,11 +150,11 @@ public class CassandraMessageIdMapper implements MessageIdMapper { CassandraId mailboxId = (CassandraId) mailboxMessage.getMailboxId(); mailboxMapper.findMailboxById(mailboxId); ComposedMessageIdWithMetaData composedMessageIdWithMetaData = createMetadataFor(mailboxMessage); - CompletableFuture.allOf( - imapUidDAO.insert(composedMessageIdWithMetaData), - messageIdDAO.insert(composedMessageIdWithMetaData)) - .thenCompose(voidValue -> indexTableHandler.updateIndexOnAdd(mailboxMessage, mailboxId).toFuture()) - .join(); + Flux.merge( + imapUidDAO.insert(composedMessageIdWithMetaData), + messageIdDAO.insert(composedMessageIdWithMetaData)) + .thenEmpty(indexTableHandler.updateIndexOnAdd(mailboxMessage, mailboxId)) + .block(); } private ComposedMessageIdWithMetaData createMetadataFor(MailboxMessage mailboxMessage) { http://git-wip-us.apache.org/repos/asf/james-project/blob/7a02725b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java index be5686a..4018a8c 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java @@ -41,7 +41,6 @@ import static org.apache.james.mailbox.cassandra.table.MessageIdToImapUid.MOD_SE import static org.apache.james.mailbox.cassandra.table.MessageIdToImapUid.TABLE_NAME; import java.util.Optional; -import java.util.concurrent.CompletableFuture; import javax.inject.Inject; import javax.mail.Flags; @@ -155,10 +154,10 @@ public class CassandraMessageIdToImapUidDAO { .setUUID(MAILBOX_ID, mailboxId.asUuid())); } - public CompletableFuture<Void> insert(ComposedMessageIdWithMetaData composedMessageIdWithMetaData) { + public Mono<Void> insert(ComposedMessageIdWithMetaData composedMessageIdWithMetaData) { ComposedMessageId composedMessageId = composedMessageIdWithMetaData.getComposedMessageId(); Flags flags = composedMessageIdWithMetaData.getFlags(); - return cassandraAsyncExecutor.executeVoid(insert.bind() + return cassandraAsyncExecutor.executeVoidReactor(insert.bind() .setUUID(MESSAGE_ID, ((CassandraMessageId) composedMessageId.getMessageId()).get()) .setUUID(MAILBOX_ID, ((CassandraId) composedMessageId.getMailboxId()).asUuid()) .setLong(IMAP_UID, composedMessageId.getUid().asLong()) http://git-wip-us.apache.org/repos/asf/james-project/blob/7a02725b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java index af3d7dc..aa1b869 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java @@ -368,7 +368,7 @@ public class CassandraMessageMapper implements MessageMapper { private Mono<Void> save(Mailbox mailbox, MailboxMessage message) throws MailboxException { CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); - return Mono.fromFuture(messageDAO.save(message)) + return messageDAO.save(message) .thenEmpty(insertIds(message, mailboxId)); } @@ -379,8 +379,8 @@ public class CassandraMessageMapper implements MessageMapper { .modSeq(message.getModSeq()) .build(); return Flux.merge( - Mono.fromCompletionStage(messageIdDAO.insert(composedMessageIdWithMetaData)), - Mono.fromCompletionStage(imapUidDAO.insert(composedMessageIdWithMetaData))) + messageIdDAO.insert(composedMessageIdWithMetaData), + imapUidDAO.insert(composedMessageIdWithMetaData)) .then(); } http://git-wip-us.apache.org/repos/asf/james-project/blob/7a02725b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java index 6398342..273e92c 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java @@ -30,6 +30,8 @@ import org.apache.james.task.Task; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; + public class AttachmentV2Migration implements Migration { private static final Logger LOGGER = LoggerFactory.getLogger(AttachmentV2Migration.class); private final CassandraAttachmentDAO attachmentDAOV1; @@ -59,12 +61,12 @@ public class AttachmentV2Migration implements Migration { private Result migrateAttachment(Attachment attachment) { try { - blobStore.save(attachment.getBytes()).toFuture() - .thenApply(blobId -> CassandraAttachmentDAOV2.from(attachment, blobId)) - .thenCompose(daoAttachement -> attachmentDAOV2.storeAttachment(daoAttachement).toFuture()) - .thenCompose(any -> attachmentDAOV1.deleteAttachment(attachment.getAttachmentId())) - .join(); - return Result.COMPLETED; + return blobStore.save(attachment.getBytes()) + .map(blobId -> CassandraAttachmentDAOV2.from(attachment, blobId)) + .flatMap(attachmentDAOV2::storeAttachment) + .then(Mono.defer(() -> Mono.fromFuture(attachmentDAOV1.deleteAttachment(attachment.getAttachmentId())))) + .thenReturn(Result.COMPLETED) + .block(); } catch (Exception e) { LOGGER.error("Error while performing attachmentDAO V2 migration", e); return Result.PARTIAL; http://git-wip-us.apache.org/repos/asf/james-project/blob/7a02725b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java index 4eabd41..c6c953e 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java @@ -61,7 +61,6 @@ import org.junit.jupiter.api.extension.RegisterExtension; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.primitives.Bytes; - import nl.jqno.equalsverifier.EqualsVerifier; import reactor.core.publisher.Flux; @@ -105,7 +104,7 @@ class CassandraMessageDAOTest { void saveShouldSaveNullValueForTextualLineCountAsZero() throws Exception { message = createMessage(messageId, CONTENT, BODY_START, new PropertyBuilder(), NO_ATTACHMENT); - testee.save(message).join(); + testee.save(message).block(); MessageWithoutAttachment attachmentRepresentation = toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Metadata, Limit.unlimited())); @@ -121,7 +120,7 @@ class CassandraMessageDAOTest { propertyBuilder.setTextualLineCount(textualLineCount); message = createMessage(messageId, CONTENT, BODY_START, propertyBuilder, NO_ATTACHMENT); - testee.save(message).join(); + testee.save(message).block(); MessageWithoutAttachment attachmentRepresentation = toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Metadata, Limit.unlimited())); @@ -133,7 +132,7 @@ class CassandraMessageDAOTest { void saveShouldStoreMessageWithFullContent() throws Exception { message = createMessage(messageId, CONTENT, BODY_START, new PropertyBuilder(), NO_ATTACHMENT); - testee.save(message).join(); + testee.save(message).block(); MessageWithoutAttachment attachmentRepresentation = toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Full, Limit.unlimited())); @@ -146,7 +145,7 @@ class CassandraMessageDAOTest { void saveShouldStoreMessageWithBodyContent() throws Exception { message = createMessage(messageId, CONTENT, BODY_START, new PropertyBuilder(), NO_ATTACHMENT); - testee.save(message).join(); + testee.save(message).block(); MessageWithoutAttachment attachmentRepresentation = toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Body, Limit.unlimited())); @@ -162,7 +161,7 @@ class CassandraMessageDAOTest { void saveShouldStoreMessageWithHeaderContent() throws Exception { message = createMessage(messageId, CONTENT, BODY_START, new PropertyBuilder(), NO_ATTACHMENT); - testee.save(message).join(); + testee.save(message).block(); MessageWithoutAttachment attachmentRepresentation = toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Headers, Limit.unlimited())); @@ -211,7 +210,7 @@ class CassandraMessageDAOTest { .build()) .build(); SimpleMailboxMessage message1 = createMessage(messageId, CONTENT, BODY_START, new PropertyBuilder(), ImmutableList.of(attachment)); - testee.save(message1).join(); + testee.save(message1).block(); MessageIdAttachmentIds expected = new MessageIdAttachmentIds(messageId, ImmutableSet.of(attachment.getAttachmentId())); //When @@ -237,7 +236,7 @@ class CassandraMessageDAOTest { .build()) .build(); SimpleMailboxMessage message1 = createMessage(messageId, CONTENT, BODY_START, new PropertyBuilder(), ImmutableList.of(attachment1, attachment2)); - testee.save(message1).join(); + testee.save(message1).block(); MessageIdAttachmentIds expected = new MessageIdAttachmentIds(messageId, ImmutableSet.of(attachment1.getAttachmentId(), attachment2.getAttachmentId())); //When @@ -266,8 +265,8 @@ class CassandraMessageDAOTest { .build(); SimpleMailboxMessage message1 = createMessage(messageId1, CONTENT, BODY_START, new PropertyBuilder(), ImmutableList.of(attachment1)); SimpleMailboxMessage message2 = createMessage(messageId2, CONTENT, BODY_START, new PropertyBuilder(), ImmutableList.of(attachment2)); - testee.save(message1).join(); - testee.save(message2).join(); + testee.save(message1).block(); + testee.save(message2).block(); MessageIdAttachmentIds expected1 = new MessageIdAttachmentIds(messageId1, ImmutableSet.of(attachment1.getAttachmentId())); MessageIdAttachmentIds expected2 = new MessageIdAttachmentIds(messageId2, ImmutableSet.of(attachment2.getAttachmentId())); @@ -282,7 +281,7 @@ class CassandraMessageDAOTest { void retrieveAllMessageIdAttachmentIdsShouldReturnEmtpyWhenStoredWithoutAttachment() throws Exception { //Given SimpleMailboxMessage message1 = createMessage(messageId, CONTENT, BODY_START, new PropertyBuilder(), NO_ATTACHMENT); - testee.save(message1).join(); + testee.save(message1).block(); //When Stream<MessageIdAttachmentIds> actual = testee.retrieveAllMessageIdAttachmentIds().join(); @@ -312,9 +311,9 @@ class CassandraMessageDAOTest { SimpleMailboxMessage message1 = createMessage(messageId1, CONTENT, BODY_START, new PropertyBuilder(), ImmutableList.of(attachmentFor1)); SimpleMailboxMessage message2 = createMessage(messageId2, CONTENT, BODY_START, new PropertyBuilder(), NO_ATTACHMENT); SimpleMailboxMessage message3 = createMessage(messageId3, CONTENT, BODY_START, new PropertyBuilder(), ImmutableList.of(attachmentFor3)); - testee.save(message1).join(); - testee.save(message2).join(); - testee.save(message3).join(); + testee.save(message1).block(); + testee.save(message2).block(); + testee.save(message3).block(); //When Stream<MessageIdAttachmentIds> actual = testee.retrieveAllMessageIdAttachmentIds().join(); http://git-wip-us.apache.org/repos/asf/james-project/blob/7a02725b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java index 67b9c90..1ebc42b 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java @@ -22,7 +22,6 @@ import static org.assertj.core.api.Assertions.assertThat; import java.util.List; import java.util.Optional; -import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import javax.mail.Flags; @@ -41,6 +40,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import reactor.core.publisher.Flux; + class CassandraMessageIdDAOTest { @RegisterExtension static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraMessageModule.MODULE); @@ -70,7 +71,7 @@ class CassandraMessageIdDAOTest { .flags(new Flags()) .modSeq(1) .build()) - .join(); + .block(); testee.delete(mailboxId, messageUid).block(); @@ -85,18 +86,18 @@ class CassandraMessageIdDAOTest { MessageUid messageUid2 = MessageUid.of(2); CassandraMessageId messageId = messageIdFactory.generate(); CassandraMessageId messageId2 = messageIdFactory.generate(); - CompletableFuture.allOf(testee.insert( - ComposedMessageIdWithMetaData.builder() - .composedMessageId(new ComposedMessageId(mailboxId, messageId, messageUid)) - .flags(new Flags()) - .modSeq(1) - .build()), - testee.insert(ComposedMessageIdWithMetaData.builder() - .composedMessageId(new ComposedMessageId(mailboxId, messageId2, messageUid2)) - .flags(new Flags()) - .modSeq(1) - .build())) - .join(); + Flux.merge(testee.insert( + ComposedMessageIdWithMetaData.builder() + .composedMessageId(new ComposedMessageId(mailboxId, messageId, messageUid)) + .flags(new Flags()) + .modSeq(1) + .build()), + testee.insert(ComposedMessageIdWithMetaData.builder() + .composedMessageId(new ComposedMessageId(mailboxId, messageId2, messageUid2)) + .flags(new Flags()) + .modSeq(1) + .build())) + .blockLast(); testee.delete(mailboxId, messageUid).block(); @@ -117,7 +118,7 @@ class CassandraMessageIdDAOTest { .flags(new Flags()) .modSeq(1) .build(); - testee.insert(composedMessageIdWithMetaData).join(); + testee.insert(composedMessageIdWithMetaData).block(); Optional<ComposedMessageIdWithMetaData> message = testee.retrieve(mailboxId, messageUid).join(); assertThat(message.get()).isEqualTo(composedMessageIdWithMetaData); @@ -135,7 +136,7 @@ class CassandraMessageIdDAOTest { .flags(new Flags()) .modSeq(1) .build()) - .join(); + .block(); ComposedMessageIdWithMetaData expectedComposedMessageId = ComposedMessageIdWithMetaData.builder() .composedMessageId(composedMessageId) @@ -160,7 +161,7 @@ class CassandraMessageIdDAOTest { .flags(new Flags()) .modSeq(1) .build()) - .join(); + .block(); ComposedMessageIdWithMetaData expectedComposedMessageId = ComposedMessageIdWithMetaData.builder() .composedMessageId(composedMessageId) @@ -185,7 +186,7 @@ class CassandraMessageIdDAOTest { .flags(new Flags()) .modSeq(1) .build()) - .join(); + .block(); ComposedMessageIdWithMetaData expectedComposedMessageId = ComposedMessageIdWithMetaData.builder() .composedMessageId(composedMessageId) @@ -210,7 +211,7 @@ class CassandraMessageIdDAOTest { .flags(new Flags()) .modSeq(1) .build()) - .join(); + .block(); ComposedMessageIdWithMetaData expectedComposedMessageId = ComposedMessageIdWithMetaData.builder() .composedMessageId(composedMessageId) @@ -235,7 +236,7 @@ class CassandraMessageIdDAOTest { .flags(new Flags()) .modSeq(1) .build()) - .join(); + .block(); ComposedMessageIdWithMetaData expectedComposedMessageId = ComposedMessageIdWithMetaData.builder() .composedMessageId(composedMessageId) @@ -260,7 +261,7 @@ class CassandraMessageIdDAOTest { .flags(new Flags()) .modSeq(1) .build()) - .join(); + .block(); ComposedMessageIdWithMetaData expectedComposedMessageId = ComposedMessageIdWithMetaData.builder() .composedMessageId(composedMessageId) @@ -285,7 +286,7 @@ class CassandraMessageIdDAOTest { .flags(new Flags()) .modSeq(1) .build()) - .join(); + .block(); ComposedMessageIdWithMetaData expectedComposedMessageId = ComposedMessageIdWithMetaData.builder() .composedMessageId(composedMessageId) @@ -310,7 +311,7 @@ class CassandraMessageIdDAOTest { .flags(new Flags()) .modSeq(1) .build()) - .join(); + .block(); ComposedMessageIdWithMetaData expectedComposedMessageId = ComposedMessageIdWithMetaData.builder() .composedMessageId(composedMessageId) @@ -335,7 +336,7 @@ class CassandraMessageIdDAOTest { .flags(new Flags()) .modSeq(1) .build()) - .join(); + .block(); Flags flags = new Flags(); flags.add("myCustomFlag"); @@ -360,7 +361,7 @@ class CassandraMessageIdDAOTest { .flags(new Flags()) .modSeq(1) .build(); - testee.insert(composedMessageIdWithMetaData).join(); + testee.insert(composedMessageIdWithMetaData).block(); Optional<ComposedMessageIdWithMetaData> message = testee.retrieve(mailboxId, messageUid).join(); @@ -385,9 +386,9 @@ class CassandraMessageIdDAOTest { .flags(new Flags()) .modSeq(1) .build(); - CompletableFuture.allOf(testee.insert(composedMessageIdWithMetaData), + Flux.merge(testee.insert(composedMessageIdWithMetaData), testee.insert(composedMessageIdWithMetaData2)) - .join(); + .blockLast(); List<ComposedMessageIdWithMetaData> messages = testee.retrieveMessages(mailboxId, MessageRange.all()).join() .collect(Collectors.toList()); @@ -415,7 +416,7 @@ class CassandraMessageIdDAOTest { .flags(new Flags()) .modSeq(1) .build(); - CompletableFuture.allOf(testee.insert( + Flux.merge(testee.insert( ComposedMessageIdWithMetaData.builder() .composedMessageId(new ComposedMessageId(mailboxId, messageId, messageUid)) .flags(new Flags()) @@ -423,7 +424,7 @@ class CassandraMessageIdDAOTest { .build()), testee.insert(composedMessageIdWithMetaData), testee.insert(composedMessageIdWithMetaData2)) - .join(); + .blockLast(); List<ComposedMessageIdWithMetaData> messages = testee.retrieveMessages(mailboxId, MessageRange.from(messageUid2)).join() .collect(Collectors.toList()); @@ -448,7 +449,7 @@ class CassandraMessageIdDAOTest { .flags(new Flags()) .modSeq(1) .build()) - .join(); + .block(); ComposedMessageIdWithMetaData composedMessageIdWithMetaData = ComposedMessageIdWithMetaData.builder() .composedMessageId(new ComposedMessageId(mailboxId, messageId2, messageUid2)) @@ -461,14 +462,14 @@ class CassandraMessageIdDAOTest { .flags(new Flags()) .modSeq(1) .build(); - CompletableFuture.allOf(testee.insert(composedMessageIdWithMetaData), + Flux.merge(testee.insert(composedMessageIdWithMetaData), testee.insert(composedMessageIdWithMetaData2), testee.insert(ComposedMessageIdWithMetaData.builder() .composedMessageId(new ComposedMessageId(mailboxId, messageId4, messageUid4)) .flags(new Flags()) .modSeq(1) .build())) - .join(); + .blockLast(); List<ComposedMessageIdWithMetaData> messages = testee.retrieveMessages(mailboxId, MessageRange.range(messageUid2, messageUid3)).join() .collect(Collectors.toList()); @@ -491,7 +492,7 @@ class CassandraMessageIdDAOTest { .flags(new Flags()) .modSeq(1) .build(); - CompletableFuture.allOf(testee.insert( + Flux.merge(testee.insert( ComposedMessageIdWithMetaData.builder() .composedMessageId(new ComposedMessageId(mailboxId, messageId, messageUid)) .flags(new Flags()) @@ -503,7 +504,7 @@ class CassandraMessageIdDAOTest { .flags(new Flags()) .modSeq(1) .build())) - .join(); + .blockLast(); List<ComposedMessageIdWithMetaData> messages = testee.retrieveMessages(mailboxId, MessageRange.one(messageUid2)).join() .collect(Collectors.toList()); http://git-wip-us.apache.org/repos/asf/james-project/blob/7a02725b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAOTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAOTest.java index f563e5f..01bfdc0 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAOTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAOTest.java @@ -22,7 +22,6 @@ import static org.assertj.core.api.Assertions.assertThat; import java.util.List; import java.util.Optional; -import java.util.concurrent.CompletableFuture; import javax.mail.Flags; import javax.mail.Flags.Flag; @@ -42,6 +41,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import com.datastax.driver.core.utils.UUIDs; +import reactor.core.publisher.Flux; class CassandraMessageIdToImapUidDAOTest { public static final CassandraModule MODULE = CassandraModule.aggregateModules( @@ -77,7 +77,7 @@ class CassandraMessageIdToImapUidDAOTest { .flags(new Flags()) .modSeq(1) .build()) - .join(); + .block(); testee.delete(messageId, mailboxId).block(); @@ -92,7 +92,7 @@ class CassandraMessageIdToImapUidDAOTest { CassandraId mailboxId2 = CassandraId.timeBased(); MessageUid messageUid = MessageUid.of(1); MessageUid messageUid2 = MessageUid.of(2); - CompletableFuture.allOf( + Flux.merge( testee.insert(ComposedMessageIdWithMetaData.builder() .composedMessageId(new ComposedMessageId(mailboxId, messageId, messageUid)) .flags(new Flags()) @@ -103,7 +103,7 @@ class CassandraMessageIdToImapUidDAOTest { .flags(new Flags()) .modSeq(1) .build())) - .join(); + .blockLast(); testee.delete(messageId, mailboxId).block(); @@ -127,7 +127,7 @@ class CassandraMessageIdToImapUidDAOTest { .flags(new Flags()) .modSeq(1) .build()) - .join(); + .block(); ComposedMessageIdWithMetaData expectedComposedMessageId = ComposedMessageIdWithMetaData.builder() .composedMessageId(new ComposedMessageId(mailboxId, messageId, messageUid)) @@ -149,7 +149,7 @@ class CassandraMessageIdToImapUidDAOTest { .flags(new Flags()) .modSeq(1) .build(); - testee.insert(composedMessageIdWithFlags).join(); + testee.insert(composedMessageIdWithFlags).block(); Boolean result = testee.updateMetadata(composedMessageIdWithFlags, 1).block(); @@ -167,7 +167,7 @@ class CassandraMessageIdToImapUidDAOTest { .flags(new Flags()) .modSeq(1) .build(); - testee.insert(composedMessageIdWithFlags).join(); + testee.insert(composedMessageIdWithFlags).block(); Boolean result = testee.updateMetadata(composedMessageIdWithFlags, 3).block(); @@ -186,7 +186,7 @@ class CassandraMessageIdToImapUidDAOTest { .flags(new Flags()) .modSeq(1) .build()) - .join(); + .block(); ComposedMessageIdWithMetaData expectedComposedMessageId = ComposedMessageIdWithMetaData.builder() .composedMessageId(composedMessageId) @@ -211,7 +211,7 @@ class CassandraMessageIdToImapUidDAOTest { .flags(new Flags()) .modSeq(1) .build()) - .join(); + .block(); ComposedMessageIdWithMetaData expectedComposedMessageId = ComposedMessageIdWithMetaData.builder() .composedMessageId(composedMessageId) @@ -236,7 +236,7 @@ class CassandraMessageIdToImapUidDAOTest { .flags(new Flags()) .modSeq(1) .build()) - .join(); + .block(); ComposedMessageIdWithMetaData expectedComposedMessageId = ComposedMessageIdWithMetaData.builder() .composedMessageId(composedMessageId) @@ -261,7 +261,7 @@ class CassandraMessageIdToImapUidDAOTest { .flags(new Flags()) .modSeq(1) .build()) - .join(); + .block(); ComposedMessageIdWithMetaData expectedComposedMessageId = ComposedMessageIdWithMetaData.builder() .composedMessageId(composedMessageId) @@ -286,7 +286,7 @@ class CassandraMessageIdToImapUidDAOTest { .flags(new Flags()) .modSeq(1) .build()) - .join(); + .block(); ComposedMessageIdWithMetaData expectedComposedMessageId = ComposedMessageIdWithMetaData.builder() .composedMessageId(composedMessageId) @@ -311,7 +311,7 @@ class CassandraMessageIdToImapUidDAOTest { .flags(new Flags()) .modSeq(1) .build()) - .join(); + .block(); ComposedMessageIdWithMetaData expectedComposedMessageId = ComposedMessageIdWithMetaData.builder() .composedMessageId(composedMessageId) @@ -336,7 +336,7 @@ class CassandraMessageIdToImapUidDAOTest { .flags(new Flags()) .modSeq(1) .build()) - .join(); + .block(); ComposedMessageIdWithMetaData expectedComposedMessageId = ComposedMessageIdWithMetaData.builder() .composedMessageId(composedMessageId) @@ -362,7 +362,7 @@ class CassandraMessageIdToImapUidDAOTest { .flags(new Flags()) .modSeq(1) .build()) - .join(); + .block(); ComposedMessageIdWithMetaData expectedComposedMessageId = ComposedMessageIdWithMetaData.builder() .composedMessageId(composedMessageId) @@ -387,7 +387,7 @@ class CassandraMessageIdToImapUidDAOTest { .flags(new Flags()) .modSeq(1) .build()) - .join(); + .block(); Flags flags = new Flags(); flags.add("myCustomFlag"); @@ -412,7 +412,7 @@ class CassandraMessageIdToImapUidDAOTest { .flags(new Flags()) .modSeq(1) .build()) - .join(); + .block(); ComposedMessageIdWithMetaData expectedComposedMessageId = ComposedMessageIdWithMetaData.builder() .composedMessageId(new ComposedMessageId(mailboxId, messageId, messageUid)) @@ -431,7 +431,7 @@ class CassandraMessageIdToImapUidDAOTest { CassandraId mailboxId2 = CassandraId.timeBased(); MessageUid messageUid = MessageUid.of(1); MessageUid messageUid2 = MessageUid.of(2); - CompletableFuture.allOf( + Flux.merge( testee.insert(ComposedMessageIdWithMetaData.builder() .composedMessageId(new ComposedMessageId(mailboxId, messageId, messageUid)) .flags(new Flags()) @@ -442,7 +442,7 @@ class CassandraMessageIdToImapUidDAOTest { .flags(new Flags()) .modSeq(1) .build())) - .join(); + .blockLast(); ComposedMessageIdWithMetaData expectedComposedMessageId = ComposedMessageIdWithMetaData.builder() .composedMessageId(new ComposedMessageId(mailboxId, messageId, messageUid)) http://git-wip-us.apache.org/repos/asf/james-project/blob/7a02725b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreationTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreationTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreationTest.java index 84f33e8..499cf54 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreationTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreationTest.java @@ -106,7 +106,7 @@ class AttachmentMessageIdCreationTest { List<MessageAttachment> noAttachment = ImmutableList.of(); message = createMessage(messageId, noAttachment); - cassandraMessageDAO.save(message).join(); + cassandraMessageDAO.save(message).block(); assertThat(migration.run()) .isEqualTo(Migration.Result.COMPLETED); @@ -117,7 +117,7 @@ class AttachmentMessageIdCreationTest { MessageAttachment attachment = createAttachment(); message = createMessage(messageId, ImmutableList.of(attachment)); - cassandraMessageDAO.save(message).join(); + cassandraMessageDAO.save(message).block(); assertThat(migration.run()) .isEqualTo(Migration.Result.COMPLETED); @@ -128,7 +128,7 @@ class AttachmentMessageIdCreationTest { MessageAttachment attachment = createAttachment(); message = createMessage(messageId, ImmutableList.of(attachment)); - cassandraMessageDAO.save(message).join(); + cassandraMessageDAO.save(message).block(); migration.run(); http://git-wip-us.apache.org/repos/asf/james-project/blob/7a02725b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java ---------------------------------------------------------------------- diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java index 893035a..8a0c3b3 100644 --- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java +++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java @@ -34,7 +34,6 @@ import org.apache.james.mailrepository.api.MailRepositoryUrl; import org.apache.james.util.CompletableFutureUtil; import org.apache.mailet.Mail; -import com.github.fge.lambdas.Throwing; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -60,16 +59,14 @@ public class CassandraMailRepository implements MailRepository { public MailKey store(Mail mail) throws MessagingException { MailKey mailKey = MailKey.forMail(mail); - Mono.fromFuture(mimeMessageStore.save(mail.getMessage()) - .toFuture() - .thenCompose(Throwing.function(parts -> mailDAO.store(url, mail, + return mimeMessageStore.save(mail.getMessage()) + .flatMap(parts -> mailDAO.store(url, mail, parts.getHeaderBlobId(), - parts.getBodyBlobId())))) + parts.getBodyBlobId())) .then(keysDAO.store(url, mailKey)) .flatMap(this::increaseSizeIfStored) + .thenReturn(mailKey) .block(); - - return mailKey; } private Mono<Void> increaseSizeIfStored(Boolean isStored) { http://git-wip-us.apache.org/repos/asf/james-project/blob/7a02725b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAO.java ---------------------------------------------------------------------- diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAO.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAO.java index 6e884b4..61a4e2d 100644 --- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAO.java +++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAO.java @@ -59,7 +59,6 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import javax.inject.Inject; -import javax.mail.MessagingException; import javax.mail.internet.AddressException; import org.apache.commons.lang3.tuple.Pair; @@ -142,23 +141,23 @@ public class CassandraMailRepositoryMailDAO implements CassandraMailRepositoryMa } @Override - public CompletableFuture<Void> store(MailRepositoryUrl url, Mail mail, BlobId headerId, BlobId bodyId) throws MessagingException { - return executor.executeVoid(insertMail.bind() - .setString(REPOSITORY_NAME, url.asString()) - .setString(MAIL_KEY, mail.getName()) - .setString(HEADER_BLOB_ID, headerId.asString()) - .setString(BODY_BLOB_ID, bodyId.asString()) - .setString(STATE, mail.getState()) - .setString(SENDER, mail.getMaybeSender().asString(null)) - .setList(RECIPIENTS, asStringList(mail.getRecipients())) - .setString(ERROR_MESSAGE, mail.getErrorMessage()) - .setString(REMOTE_ADDR, mail.getRemoteAddr()) - .setString(REMOTE_HOST, mail.getRemoteHost()) - .setLong(MESSAGE_SIZE, mail.getMessageSize()) - .setTimestamp(LAST_UPDATED, mail.getLastUpdated()) - .setMap(ATTRIBUTES, toRawAttributeMap(mail)) - .setMap(PER_RECIPIENT_SPECIFIC_HEADERS, toHeaderMap(mail.getPerRecipientSpecificHeaders())) - ); + public Mono<Void> store(MailRepositoryUrl url, Mail mail, BlobId headerId, BlobId bodyId) { + return Mono.fromCallable(() -> insertMail.bind() + .setString(REPOSITORY_NAME, url.asString()) + .setString(MAIL_KEY, mail.getName()) + .setString(HEADER_BLOB_ID, headerId.asString()) + .setString(BODY_BLOB_ID, bodyId.asString()) + .setString(STATE, mail.getState()) + .setString(SENDER, mail.getMaybeSender().asString(null)) + .setList(RECIPIENTS, asStringList(mail.getRecipients())) + .setString(ERROR_MESSAGE, mail.getErrorMessage()) + .setString(REMOTE_ADDR, mail.getRemoteAddr()) + .setString(REMOTE_HOST, mail.getRemoteHost()) + .setLong(MESSAGE_SIZE, mail.getMessageSize()) + .setTimestamp(LAST_UPDATED, mail.getLastUpdated()) + .setMap(ATTRIBUTES, toRawAttributeMap(mail)) + .setMap(PER_RECIPIENT_SPECIFIC_HEADERS, toHeaderMap(mail.getPerRecipientSpecificHeaders()))) + .flatMap(executor::executeVoidReactor); } @Override http://git-wip-us.apache.org/repos/asf/james-project/blob/7a02725b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoAPI.java ---------------------------------------------------------------------- diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoAPI.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoAPI.java index bf49097..a0e3c73 100644 --- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoAPI.java +++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoAPI.java @@ -23,8 +23,6 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import javax.mail.MessagingException; - import org.apache.james.blob.api.BlobId; import org.apache.james.mailrepository.api.MailKey; import org.apache.james.mailrepository.api.MailRepositoryUrl; @@ -34,7 +32,7 @@ import org.apache.mailet.Mail; import reactor.core.publisher.Mono; public interface CassandraMailRepositoryMailDaoAPI { - CompletableFuture<Void> store(MailRepositoryUrl url, Mail mail, BlobId headerId, BlobId bodyId) throws MessagingException; + Mono<Void> store(MailRepositoryUrl url, Mail mail, BlobId headerId, BlobId bodyId); Mono<Void> remove(MailRepositoryUrl url, MailKey key); http://git-wip-us.apache.org/repos/asf/james-project/blob/7a02725b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoV2.java ---------------------------------------------------------------------- diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoV2.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoV2.java index 5b4edfe..390410f 100644 --- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoV2.java +++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoV2.java @@ -52,7 +52,6 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import javax.inject.Inject; -import javax.mail.MessagingException; import javax.mail.internet.AddressException; import org.apache.commons.lang3.tuple.Pair; @@ -134,22 +133,23 @@ public class CassandraMailRepositoryMailDaoV2 implements CassandraMailRepository .and(eq(MAIL_KEY, bindMarker(MAIL_KEY)))); } - public CompletableFuture<Void> store(MailRepositoryUrl url, Mail mail, BlobId headerId, BlobId bodyId) throws MessagingException { - return executor.executeVoid(insertMail.bind() - .setString(REPOSITORY_NAME, url.asString()) - .setString(MAIL_KEY, mail.getName()) - .setString(HEADER_BLOB_ID, headerId.asString()) - .setString(BODY_BLOB_ID, bodyId.asString()) - .setString(STATE, mail.getState()) - .setString(SENDER, mail.getMaybeSender().asString(null)) - .setList(RECIPIENTS, asStringList(mail.getRecipients())) - .setString(ERROR_MESSAGE, mail.getErrorMessage()) - .setString(REMOTE_ADDR, mail.getRemoteAddr()) - .setString(REMOTE_HOST, mail.getRemoteHost()) - .setTimestamp(LAST_UPDATED, mail.getLastUpdated()) - .setMap(ATTRIBUTES, toRawAttributeMap(mail)) - .setList(PER_RECIPIENT_SPECIFIC_HEADERS, toTupleList(mail.getPerRecipientSpecificHeaders())) - ); + public Mono<Void> store(MailRepositoryUrl url, Mail mail, BlobId headerId, BlobId bodyId) { + return Mono.fromCallable(() -> + insertMail.bind() + .setString(REPOSITORY_NAME, url.asString()) + .setString(MAIL_KEY, mail.getName()) + .setString(HEADER_BLOB_ID, headerId.asString()) + .setString(BODY_BLOB_ID, bodyId.asString()) + .setString(STATE, mail.getState()) + .setString(SENDER, mail.getMaybeSender().asString(null)) + .setList(RECIPIENTS, asStringList(mail.getRecipients())) + .setString(ERROR_MESSAGE, mail.getErrorMessage()) + .setString(REMOTE_ADDR, mail.getRemoteAddr()) + .setString(REMOTE_HOST, mail.getRemoteHost()) + .setTimestamp(LAST_UPDATED, mail.getLastUpdated()) + .setMap(ATTRIBUTES, toRawAttributeMap(mail)) + .setList(PER_RECIPIENT_SPECIFIC_HEADERS, toTupleList(mail.getPerRecipientSpecificHeaders()))) + .flatMap(executor::executeVoidReactor); } @Override http://git-wip-us.apache.org/repos/asf/james-project/blob/7a02725b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/MergingCassandraMailRepositoryMailDao.java ---------------------------------------------------------------------- diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/MergingCassandraMailRepositoryMailDao.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/MergingCassandraMailRepositoryMailDao.java index f83766a..7c3eb64 100644 --- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/MergingCassandraMailRepositoryMailDao.java +++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/MergingCassandraMailRepositoryMailDao.java @@ -23,7 +23,6 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import javax.inject.Inject; -import javax.mail.MessagingException; import org.apache.james.blob.api.BlobId; import org.apache.james.mailrepository.api.MailKey; @@ -48,7 +47,7 @@ public class MergingCassandraMailRepositoryMailDao implements CassandraMailRepos } @Override - public CompletableFuture<Void> store(MailRepositoryUrl url, Mail mail, BlobId headerId, BlobId bodyId) throws MessagingException { + public Mono<Void> store(MailRepositoryUrl url, Mail mail, BlobId headerId, BlobId bodyId) { return v2.store(url, mail, headerId, bodyId); } http://git-wip-us.apache.org/repos/asf/james-project/blob/7a02725b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java ---------------------------------------------------------------------- diff --git a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java index f597eff..7578e47 100644 --- a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java +++ b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java @@ -78,7 +78,7 @@ class CassandraMailRepositoryMailDAOTest { .build(), blobIdHeader, blobIdBody) - .join(); + .block(); CassandraMailRepositoryMailDAO.MailDTO mailDTO = testee.read(URL, KEY_1).join().get(); @@ -102,7 +102,7 @@ class CassandraMailRepositoryMailDAOTest { .build(), blobIdHeader, blobIdBody) - .join(); + .block(); testee.remove(URL, KEY_1).block(); @@ -161,7 +161,7 @@ class CassandraMailRepositoryMailDAOTest { .build(), blobIdHeader, blobIdBody) - .join(); + .block(); CassandraMailRepositoryMailDAO.MailDTO mailDTO = testee.read(URL, KEY_1).join().get(); @@ -231,7 +231,7 @@ class CassandraMailRepositoryMailDAOTest { .build(), blobIdHeader, blobIdBody) - .join(); + .block(); CassandraMailRepositoryMailDAO.MailDTO mailDTO = testee.read(URL, KEY_1).join().get(); @@ -286,7 +286,7 @@ class CassandraMailRepositoryMailDAOTest { .build(), blobIdHeader, blobIdBody) - .join(); + .block(); CassandraMailRepositoryMailDaoAPI.MailDTO actual = testee.read(URL, KEY_1).join().get(); Mail partialMail = actual.getMailBuilder().build(); @@ -308,7 +308,7 @@ class CassandraMailRepositoryMailDAOTest { .build(), blobIdHeader, blobIdBody) - .join(); + .block(); CassandraMailRepositoryMailDaoAPI.MailDTO actual = testee.read(URL, KEY_1).join().get(); Mail partialMail = actual.getMailBuilder().build(); @@ -332,7 +332,7 @@ class CassandraMailRepositoryMailDAOTest { .build(), blobIdHeader1, blobIdBody1) - .join(); + .block(); v2.store(URL, FakeMail.builder() @@ -340,7 +340,7 @@ class CassandraMailRepositoryMailDAOTest { .build(), blobIdHeader2, blobIdBody2) - .join(); + .block(); CassandraMailRepositoryMailDaoAPI.MailDTO actual = testee.read(URL, KEY_1).join().get(); Mail partialMail = actual.getMailBuilder().build(); @@ -364,7 +364,7 @@ class CassandraMailRepositoryMailDAOTest { .build(), blobIdHeader1, blobIdBody1) - .join(); + .block(); v2.store(URL, FakeMail.builder() @@ -372,7 +372,7 @@ class CassandraMailRepositoryMailDAOTest { .build(), blobIdHeader2, blobIdBody2) - .join(); + .block(); testee.remove(URL, KEY_1).block(); http://git-wip-us.apache.org/repos/asf/james-project/blob/7a02725b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java ---------------------------------------------------------------------- diff --git a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java index 683919a..a2d59b5 100644 --- a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java +++ b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java @@ -119,7 +119,7 @@ class CassandraMailRepositoryWithFakeImplementationsTest { assertThatThrownBy(() -> cassandraMailRepository.store(mail)) .isInstanceOf(RuntimeException.class) - .hasMessage("java.lang.RuntimeException: Expected failure while saving"); + .hasMessage("Expected failure while saving"); assertThat(keysDAO.list(URL).collectList().block()).isEmpty(); } @@ -147,10 +147,8 @@ class CassandraMailRepositoryWithFakeImplementationsTest { } @Override - public CompletableFuture<Void> store(MailRepositoryUrl url, Mail mail, BlobId headerId, BlobId bodyId) { - return CompletableFuture.supplyAsync(() -> { - throw new RuntimeException("Expected failure while storing mail parts"); - }); + public Mono<Void> store(MailRepositoryUrl url, Mail mail, BlobId headerId, BlobId bodyId) { + return Mono.error(new RuntimeException("Expected failure while storing mail parts")); } @Override @@ -183,7 +181,7 @@ class CassandraMailRepositoryWithFakeImplementationsTest { assertThatThrownBy(() -> cassandraMailRepository.store(mail)) .isInstanceOf(RuntimeException.class) - .hasMessage("java.lang.RuntimeException: Expected failure while storing mail parts"); + .hasMessage("Expected failure while storing mail parts"); assertThat(keysDAO.list(URL).collectList().block()).isEmpty(); } @@ -202,7 +200,7 @@ class CassandraMailRepositoryWithFakeImplementationsTest { assertThatThrownBy(() -> cassandraMailRepository.store(mail)) .isInstanceOf(RuntimeException.class) - .hasMessage("java.lang.RuntimeException: Expected failure while storing mail parts"); + .hasMessage("Expected failure while storing mail parts"); ResultSet resultSet = cassandra.getConf().execute(select() .from(BlobTable.TABLE_NAME)); http://git-wip-us.apache.org/repos/asf/james-project/blob/7a02725b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java index 349c878..3a29461 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java @@ -22,7 +22,6 @@ package org.apache.james.queue.rabbitmq; import static org.apache.james.queue.api.MailQueue.ENQUEUED_METRIC_NAME_PREFIX; import java.time.Clock; -import java.util.concurrent.CompletableFuture; import javax.mail.MessagingException; import javax.mail.internet.MimeMessage; @@ -37,6 +36,7 @@ import org.apache.mailet.Mail; import com.fasterxml.jackson.core.JsonProcessingException; import com.github.fge.lambdas.Throwing; +import reactor.core.publisher.Mono; class Enqueuer { private final MailQueueName name; @@ -61,15 +61,15 @@ class Enqueuer { void enQueue(Mail mail) throws MailQueue.MailQueueException { saveMail(mail) - .thenApply(Throwing.<MimeMessagePartsId, EnqueuedItem>function(partsId -> publishReferenceToRabbit(mail, partsId)).sneakyThrow()) - .thenCompose(mailQueueView::storeMail) - .thenRun(enqueueMetric::increment) - .join(); + .map(Throwing.<MimeMessagePartsId, EnqueuedItem>function(partsId -> publishReferenceToRabbit(mail, partsId)).sneakyThrow()) + .map(mailQueueView::storeMail) + .thenEmpty(Mono.fromRunnable(enqueueMetric::increment)) + .block(); } - private CompletableFuture<MimeMessagePartsId> saveMail(Mail mail) throws MailQueue.MailQueueException { + private Mono<MimeMessagePartsId> saveMail(Mail mail) throws MailQueue.MailQueueException { try { - return mimeMessageStore.save(mail.getMessage()).toFuture(); + return mimeMessageStore.save(mail.getMessage()); } catch (MessagingException e) { throw new MailQueue.MailQueueException("Error while saving blob", e); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
