JAMES-2630 remove last calls to Mono::toFuture
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/f5ea01ed Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/f5ea01ed Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/f5ea01ed Branch: refs/heads/master Commit: f5ea01edff7555547f142e1756857481cfca578f Parents: 4490d62 Author: Matthieu Baechler <matth...@apache.org> Authored: Mon Jan 28 18:11:35 2019 +0100 Committer: Matthieu Baechler <matth...@apache.org> Committed: Wed Feb 6 15:26:22 2019 +0100 ---------------------------------------------------------------------- .../cassandra/CassandraMailRepository.java | 18 ++++++++---------- .../CassandraMailRepositoryMailDAO.java | 11 +++++------ .../CassandraMailRepositoryMailDaoAPI.java | 3 +-- .../CassandraMailRepositoryMailDaoV2.java | 11 +++++------ .../MergingCassandraMailRepositoryMailDao.java | 8 +++----- .../CassandraMailRepositoryMailDAOTest.java | 20 ++++++++++---------- ...ilRepositoryWithFakeImplementationsTest.java | 7 ++----- .../apache/james/queue/rabbitmq/Enqueuer.java | 2 +- .../queue/rabbitmq/view/api/MailQueueView.java | 8 ++++---- .../view/cassandra/CassandraMailQueueView.java | 11 ++++------- 10 files changed, 43 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/f5ea01ed/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java ---------------------------------------------------------------------- diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java index 8a0c3b3..1651845 100644 --- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java +++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java @@ -21,7 +21,6 @@ package org.apache.james.mailrepository.cassandra; import java.util.Collection; import java.util.Iterator; -import java.util.concurrent.CompletableFuture; import javax.mail.MessagingException; import javax.mail.internet.MimeMessage; @@ -31,7 +30,6 @@ import org.apache.james.blob.mail.MimeMessagePartsId; import org.apache.james.mailrepository.api.MailKey; import org.apache.james.mailrepository.api.MailRepository; import org.apache.james.mailrepository.api.MailRepositoryUrl; -import org.apache.james.util.CompletableFutureUtil; import org.apache.mailet.Mail; import reactor.core.publisher.Flux; @@ -85,22 +83,22 @@ public class CassandraMailRepository implements MailRepository { @Override public Mail retrieve(MailKey key) { - return CompletableFutureUtil - .unwrap(mailDAO.read(url, key) - .thenApply(optional -> optional.map(this::toMail))) - .join() - .orElse(null); + return mailDAO.read(url, key) + .map(optional -> optional.map(this::toMail)) + .flatMap(Mono::justOrEmpty) + .flatMap(Function.identity()) + .defaultIfEmpty(null) + .block(); } - private CompletableFuture<Mail> toMail(CassandraMailRepositoryMailDAO.MailDTO mailDTO) { + private Mono<Mail> toMail(CassandraMailRepositoryMailDAO.MailDTO mailDTO) { MimeMessagePartsId parts = MimeMessagePartsId.builder() .headerBlobId(mailDTO.getHeaderBlobId()) .bodyBlobId(mailDTO.getBodyBlobId()) .build(); return mimeMessageStore.read(parts) - .toFuture() - .thenApply(mimeMessage -> mailDTO.getMailBuilder() + .map(mimeMessage -> mailDTO.getMailBuilder() .mimeMessage(mimeMessage) .build()); } http://git-wip-us.apache.org/repos/asf/james-project/blob/f5ea01ed/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAO.java ---------------------------------------------------------------------- diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAO.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAO.java index 61a4e2d..98b3bf8 100644 --- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAO.java +++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAO.java @@ -56,7 +56,6 @@ import java.util.Date; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.CompletableFuture; import javax.inject.Inject; import javax.mail.internet.AddressException; @@ -168,11 +167,11 @@ public class CassandraMailRepositoryMailDAO implements CassandraMailRepositoryMa } @Override - public CompletableFuture<Optional<MailDTO>> read(MailRepositoryUrl url, MailKey key) { - return executor.executeSingleRow(selectMail.bind() - .setString(REPOSITORY_NAME, url.asString()) - .setString(MAIL_KEY, key.asString())) - .thenApply(rowOptional -> rowOptional.map(this::toMail)); + public Mono<Optional<MailDTO>> read(MailRepositoryUrl url, MailKey key) { + return executor.executeSingleRowOptionalReactor(selectMail.bind() + .setString(REPOSITORY_NAME, url.asString()) + .setString(MAIL_KEY, key.asString())) + .map(rowOptional -> rowOptional.map(this::toMail)); } private MailDTO toMail(Row row) { http://git-wip-us.apache.org/repos/asf/james-project/blob/f5ea01ed/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoAPI.java ---------------------------------------------------------------------- diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoAPI.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoAPI.java index a0e3c73..4fb9355 100644 --- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoAPI.java +++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoAPI.java @@ -21,7 +21,6 @@ package org.apache.james.mailrepository.cassandra; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.CompletableFuture; import org.apache.james.blob.api.BlobId; import org.apache.james.mailrepository.api.MailKey; @@ -36,7 +35,7 @@ public interface CassandraMailRepositoryMailDaoAPI { Mono<Void> remove(MailRepositoryUrl url, MailKey key); - CompletableFuture<Optional<MailDTO>> read(MailRepositoryUrl url, MailKey key); + Mono<Optional<MailDTO>> read(MailRepositoryUrl url, MailKey key); class MailDTO { private final MailImpl.Builder mailBuilder; http://git-wip-us.apache.org/repos/asf/james-project/blob/f5ea01ed/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoV2.java ---------------------------------------------------------------------- diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoV2.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoV2.java index 390410f..70471c8 100644 --- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoV2.java +++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoV2.java @@ -49,7 +49,6 @@ import java.util.Date; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.CompletableFuture; import javax.inject.Inject; import javax.mail.internet.AddressException; @@ -159,11 +158,11 @@ public class CassandraMailRepositoryMailDaoV2 implements CassandraMailRepository .setString(MAIL_KEY, key.asString())); } - public CompletableFuture<Optional<MailDTO>> read(MailRepositoryUrl url, MailKey key) { - return executor.executeSingleRow(selectMail.bind() - .setString(REPOSITORY_NAME, url.asString()) - .setString(MAIL_KEY, key.asString())) - .thenApply(rowOptional -> rowOptional.map(this::toMail)); + public Mono<Optional<MailDTO>> read(MailRepositoryUrl url, MailKey key) { + return executor.executeSingleRowOptionalReactor(selectMail.bind() + .setString(REPOSITORY_NAME, url.asString()) + .setString(MAIL_KEY, key.asString())) + .map(rowOptional -> rowOptional.map(this::toMail)); } private MailDTO toMail(Row row) { http://git-wip-us.apache.org/repos/asf/james-project/blob/f5ea01ed/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/MergingCassandraMailRepositoryMailDao.java ---------------------------------------------------------------------- diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/MergingCassandraMailRepositoryMailDao.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/MergingCassandraMailRepositoryMailDao.java index 7c3eb64..2c9deeb 100644 --- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/MergingCassandraMailRepositoryMailDao.java +++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/MergingCassandraMailRepositoryMailDao.java @@ -20,14 +20,12 @@ package org.apache.james.mailrepository.cassandra; import java.util.Optional; -import java.util.concurrent.CompletableFuture; import javax.inject.Inject; import org.apache.james.blob.api.BlobId; import org.apache.james.mailrepository.api.MailKey; import org.apache.james.mailrepository.api.MailRepositoryUrl; -import org.apache.james.util.OptionalUtils; import org.apache.mailet.Mail; import com.google.common.annotations.VisibleForTesting; @@ -57,9 +55,9 @@ public class MergingCassandraMailRepositoryMailDao implements CassandraMailRepos } @Override - public CompletableFuture<Optional<MailDTO>> read(MailRepositoryUrl url, MailKey key) { + public Mono<Optional<MailDTO>> read(MailRepositoryUrl url, MailKey key) { return v2.read(url, key) - .thenCombine(v1.read(url, key), - (maybeV2Value, maybeV1Value) -> OptionalUtils.or(maybeV2Value, maybeV1Value)); + .filter(Optional::isPresent) + .switchIfEmpty(v1.read(url, key)); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/f5ea01ed/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java ---------------------------------------------------------------------- diff --git a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java index 7578e47..7a48bcb 100644 --- a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java +++ b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java @@ -80,7 +80,7 @@ class CassandraMailRepositoryMailDAOTest { blobIdBody) .block(); - CassandraMailRepositoryMailDAO.MailDTO mailDTO = testee.read(URL, KEY_1).join().get(); + CassandraMailRepositoryMailDAO.MailDTO mailDTO = testee.read(URL, KEY_1).block().get(); Mail partialMail = mailDTO.getMailBuilder().build(); assertSoftly(softly -> { @@ -106,14 +106,14 @@ class CassandraMailRepositoryMailDAOTest { testee.remove(URL, KEY_1).block(); - assertThat(testee.read(URL, KEY_1).join()) + assertThat(testee.read(URL, KEY_1).block()) .isEmpty(); } @Test void readShouldReturnEmptyWhenAbsent() { - assertThat(testee().read(URL, KEY_1).join()) + assertThat(testee().read(URL, KEY_1).block()) .isEmpty(); } } @@ -163,7 +163,7 @@ class CassandraMailRepositoryMailDAOTest { blobIdBody) .block(); - CassandraMailRepositoryMailDAO.MailDTO mailDTO = testee.read(URL, KEY_1).join().get(); + CassandraMailRepositoryMailDAO.MailDTO mailDTO = testee.read(URL, KEY_1).block().get(); Mail partialMail = mailDTO.getMailBuilder().build(); assertSoftly(softly -> { @@ -233,7 +233,7 @@ class CassandraMailRepositoryMailDAOTest { blobIdBody) .block(); - CassandraMailRepositoryMailDAO.MailDTO mailDTO = testee.read(URL, KEY_1).join().get(); + CassandraMailRepositoryMailDAO.MailDTO mailDTO = testee.read(URL, KEY_1).block().get(); Mail partialMail = mailDTO.getMailBuilder().build(); assertSoftly(softly -> { @@ -288,7 +288,7 @@ class CassandraMailRepositoryMailDAOTest { blobIdBody) .block(); - CassandraMailRepositoryMailDaoAPI.MailDTO actual = testee.read(URL, KEY_1).join().get(); + CassandraMailRepositoryMailDaoAPI.MailDTO actual = testee.read(URL, KEY_1).block().get(); Mail partialMail = actual.getMailBuilder().build(); assertSoftly(softly -> { softly.assertThat(actual.getBodyBlobId()).isEqualTo(blobIdBody); @@ -310,7 +310,7 @@ class CassandraMailRepositoryMailDAOTest { blobIdBody) .block(); - CassandraMailRepositoryMailDaoAPI.MailDTO actual = testee.read(URL, KEY_1).join().get(); + CassandraMailRepositoryMailDaoAPI.MailDTO actual = testee.read(URL, KEY_1).block().get(); Mail partialMail = actual.getMailBuilder().build(); assertSoftly(softly -> { softly.assertThat(actual.getBodyBlobId()).isEqualTo(blobIdBody); @@ -342,7 +342,7 @@ class CassandraMailRepositoryMailDAOTest { blobIdBody2) .block(); - CassandraMailRepositoryMailDaoAPI.MailDTO actual = testee.read(URL, KEY_1).join().get(); + CassandraMailRepositoryMailDaoAPI.MailDTO actual = testee.read(URL, KEY_1).block().get(); Mail partialMail = actual.getMailBuilder().build(); assertSoftly(softly -> { softly.assertThat(actual.getBodyBlobId()).isEqualTo(blobIdBody2); @@ -376,8 +376,8 @@ class CassandraMailRepositoryMailDAOTest { testee.remove(URL, KEY_1).block(); - Optional<CassandraMailRepositoryMailDaoAPI.MailDTO> v1Entry = v1.read(URL, KEY_1).join(); - Optional<CassandraMailRepositoryMailDaoAPI.MailDTO> v2Entry = v2.read(URL, KEY_1).join(); + Optional<CassandraMailRepositoryMailDaoAPI.MailDTO> v1Entry = v1.read(URL, KEY_1).block(); + Optional<CassandraMailRepositoryMailDaoAPI.MailDTO> v2Entry = v2.read(URL, KEY_1).block(); assertThat(v1Entry).isEmpty(); assertThat(v2Entry).isEmpty(); } http://git-wip-us.apache.org/repos/asf/james-project/blob/f5ea01ed/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java ---------------------------------------------------------------------- diff --git a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java index a2d59b5..4e03ec6 100644 --- a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java +++ b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java @@ -25,7 +25,6 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.util.List; import java.util.Optional; -import java.util.concurrent.CompletableFuture; import javax.mail.internet.MimeMessage; @@ -160,10 +159,8 @@ class CassandraMailRepositoryWithFakeImplementationsTest { } @Override - public CompletableFuture<Optional<CassandraMailRepositoryMailDAO.MailDTO>> read(MailRepositoryUrl url, MailKey key) { - return CompletableFuture.supplyAsync(() -> { - throw new RuntimeException("Expected failure while reading mail parts"); - }); + public Mono<Optional<CassandraMailRepositoryMailDAO.MailDTO>> read(MailRepositoryUrl url, MailKey key) { + return Mono.error(new RuntimeException("Expected failure while reading mail parts")); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/f5ea01ed/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java index 3a29461..7ca896d 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java @@ -62,7 +62,7 @@ class Enqueuer { void enQueue(Mail mail) throws MailQueue.MailQueueException { saveMail(mail) .map(Throwing.<MimeMessagePartsId, EnqueuedItem>function(partsId -> publishReferenceToRabbit(mail, partsId)).sneakyThrow()) - .map(mailQueueView::storeMail) + .flatMap(mailQueueView::storeMail) .thenEmpty(Mono.fromRunnable(enqueueMetric::increment)) .block(); } http://git-wip-us.apache.org/repos/asf/james-project/blob/f5ea01ed/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java index 12ca723..93c79e4 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java @@ -19,13 +19,13 @@ package org.apache.james.queue.rabbitmq.view.api; -import java.util.concurrent.CompletableFuture; - import org.apache.james.queue.api.ManageableMailQueue; import org.apache.james.queue.rabbitmq.EnqueuedItem; import org.apache.james.queue.rabbitmq.MailQueueName; import org.apache.mailet.Mail; +import reactor.core.publisher.Mono; + public interface MailQueueView { interface Factory { @@ -34,11 +34,11 @@ public interface MailQueueView { void initialize(MailQueueName mailQueueName); - CompletableFuture<Void> storeMail(EnqueuedItem enqueuedItem); + Mono<Void> storeMail(EnqueuedItem enqueuedItem); long delete(DeleteCondition deleteCondition); - CompletableFuture<Boolean> isPresent(Mail mail); + Mono<Boolean> isPresent(Mail mail); ManageableMailQueue.MailQueueIterator browse(); http://git-wip-us.apache.org/repos/asf/james-project/blob/f5ea01ed/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java index f318e5e..fd9d327 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java @@ -19,8 +19,6 @@ package org.apache.james.queue.rabbitmq.view.cassandra; -import java.util.concurrent.CompletableFuture; - import javax.inject.Inject; import org.apache.james.queue.api.ManageableMailQueue; @@ -84,8 +82,8 @@ public class CassandraMailQueueView implements MailQueueView { } @Override - public CompletableFuture<Void> storeMail(EnqueuedItem enqueuedItem) { - return storeHelper.storeMail(enqueuedItem).toFuture(); + public Mono<Void> storeMail(EnqueuedItem enqueuedItem) { + return storeHelper.storeMail(enqueuedItem); } @Override @@ -126,9 +124,8 @@ public class CassandraMailQueueView implements MailQueueView { } @Override - public CompletableFuture<Boolean> isPresent(Mail mail) { + public Mono<Boolean> isPresent(Mail mail) { return cassandraMailQueueMailDelete.isDeleted(mail, mailQueueName) - .map(bool -> !bool) - .toFuture(); + .map(bool -> !bool); } } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org