JAMES-2544 RabbitMQ browse Cassandra DAOs and tests
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/3a53806a Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/3a53806a Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/3a53806a Branch: refs/heads/master Commit: 3a53806a0184960ed50bbbc500d6392870a214a9 Parents: 0523acf Author: duc <dt...@linagora.com> Authored: Wed Sep 12 20:15:49 2018 +0700 Committer: Benoit Tellier <btell...@linagora.com> Committed: Wed Sep 26 09:22:06 2018 +0700 ---------------------------------------------------------------------- .../cassandra/utils/CassandraAsyncExecutor.java | 4 + .../mail/CassandraAttachmentMapper.java | 3 +- .../cassandra/mail/CassandraMailboxMapper.java | 14 +- .../mail/CassandraMessageIdMapper.java | 9 +- .../cassandra/mail/CassandraMessageMapper.java | 14 +- .../james/util/CompletableFutureUtil.java | 30 +-- .../apache/james/util/FluentFutureStream.java | 127 ++++------ .../james/util/CompletableFutureUtilTest.java | 142 ++--------- .../james/util/FluentFutureStreamTest.java | 57 ++++- .../queue/api/ManageableMailQueueContract.java | 4 +- .../james/queue/rabbitmq/RabbitMQMailQueue.java | 47 +++- .../rabbitmq/view/cassandra/BrowseStartDAO.java | 109 ++++++++ .../cassandra/CassandraMailQueueBrowser.java | 128 ++++++++++ .../cassandra/CassandraMailQueueMailDelete.java | 87 +++++++ .../cassandra/CassandraMailQueueMailStore.java | 98 ++++++++ .../view/cassandra/CassandraMailQueueView.java | 91 +++++++ .../view/cassandra/DeletedMailsDAO.java | 83 +++++++ .../view/cassandra/EnqueuedMailsDAO.java | 144 +++++++++++ .../view/cassandra/EnqueuedMailsDaoUtil.java | 196 +++++++++++++++ .../view/cassandra/model/BucketedSlices.java | 29 +-- .../view/cassandra/model/EnqueuedMail.java | 8 +- .../rabbitmq/view/cassandra/model/MailKey.java | 17 ++ .../queue/rabbitmq/RabbitMQMailQueueTest.java | 248 +++++++++++++++++-- .../rabbitmq/RabbitMqMailQueueFactoryTest.java | 9 +- .../view/cassandra/BrowseStartDAOTest.java | 86 +++++++ .../CassandraMailQueueViewTestFactory.java | 48 ++++ .../view/cassandra/DeletedMailsDAOTest.java | 108 ++++++++ .../view/cassandra/EnqueuedMailsDaoTest.java | 126 ++++++++++ .../cassandra/model/BucketedSlicesTest.java | 58 ++++- .../view/cassandra/model/EnqueuedMailTest.java | 33 +++ .../view/cassandra/model/MailKeyTest.java | 33 +++ 31 files changed, 1882 insertions(+), 308 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java ---------------------------------------------------------------------- diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java index a137c66..7815643 100644 --- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java +++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java @@ -62,4 +62,8 @@ public class CassandraAsyncExecutor { .thenApply(Optional::ofNullable); } + public CompletableFuture<Boolean> executeReturnExists(Statement statement) { + return executeSingleRow(statement) + .thenApply(Optional::isPresent); + } } http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java index 7975b35..ecb43de 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java @@ -103,8 +103,7 @@ public class CassandraAttachmentMapper implements AttachmentMapper { .map(id -> getAttachmentInternal(id) .thenApply(finalValue -> logNotFound(id, finalValue))); - return FluentFutureStream - .ofOptionals(attachments) + return FluentFutureStream.of(attachments, FluentFutureStream::unboxOptional) .collect(Guavate.toImmutableList()); } http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java index 5dee291..69c092d 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java @@ -79,8 +79,10 @@ public class CassandraMailboxMapper implements MailboxMapper { @Override public void delete(Mailbox mailbox) { CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); - FluentFutureStream.ofFutures(mailboxPathDAO.delete(mailbox.generateAssociatedPath()), mailboxPathV2DAO.delete(mailbox.generateAssociatedPath())) - .thenComposeOnAll(any -> mailboxDAO.delete(mailboxId)) + FluentFutureStream.ofFutures( + mailboxPathDAO.delete(mailbox.generateAssociatedPath()), + mailboxPathV2DAO.delete(mailbox.generateAssociatedPath())) + .map(any -> mailboxDAO.delete(mailboxId), FluentFutureStream::unboxFuture) .join(); } @@ -166,10 +168,10 @@ public class CassandraMailboxMapper implements MailboxMapper { private List<Mailbox> toMailboxes(MailboxPath path, CompletableFuture<Stream<CassandraIdAndPath>> listUserMailboxes) { Pattern regex = Pattern.compile(constructEscapedRegexForMailboxNameMatching(path)); - + return FluentFutureStream.of(listUserMailboxes) .filter(idAndPath -> regex.matcher(idAndPath.getMailboxPath().getName()).matches()) - .thenFlatComposeOnOptional(this::retrieveMailbox) + .map(this::retrieveMailbox, FluentFutureStream::unboxFutureOptional) .join() .collect(Guavate.toImmutableList()); } @@ -227,7 +229,7 @@ public class CassandraMailboxMapper implements MailboxMapper { @Override public List<Mailbox> list() { return mailboxDAO.retrieveAllMailboxes() - .thenComposeOnAll(this::toMailboxWithAclFuture) + .map(this::toMailboxWithAclFuture, FluentFutureStream::unboxFuture) .join() .collect(Guavate.toImmutableList()); } @@ -283,7 +285,7 @@ public class CassandraMailboxMapper implements MailboxMapper { public List<Mailbox> findNonPersonalMailboxes(String userName, Right right) { return FluentFutureStream.of(userMailboxRightsDAO.listRightsForUser(userName) .thenApply(map -> toAuthorizedMailboxIds(map, right))) - .thenFlatComposeOnOptional(this::retrieveMailbox) + .map(this::retrieveMailbox, FluentFutureStream::unboxFutureOptional) .join() .collect(Guavate.toImmutableList()); } http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java index c59a60b..2d9a154 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java @@ -98,9 +98,10 @@ public class CassandraMessageIdMapper implements MessageIdMapper { } private Stream<SimpleMailboxMessage> findAsStream(Collection<MessageId> messageIds, FetchType fetchType) { - return FluentFutureStream.ofNestedStreams( - messageIds.stream() - .map(messageId -> imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty()))) + return FluentFutureStream.of( + messageIds.stream() + .map(messageId -> imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty())), + FluentFutureStream::unboxStream) .collect(Guavate.toImmutableList()) .thenCompose(composedMessageIds -> messageDAO.retrieveMessages(composedMessageIds, fetchType, Limit.unlimited())) .thenApply(stream -> stream @@ -113,7 +114,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper { } private CompletableFuture<Stream<SimpleMailboxMessage>> filterMessagesWithExistingMailbox(Stream<SimpleMailboxMessage> stream) { - return FluentFutureStream.ofOptionals(stream.map(this::keepMessageIfMailboxExists)) + return FluentFutureStream.of(stream.map(this::keepMessageIfMailboxExists), FluentFutureStream::unboxOptional) .completableFuture(); } http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java index 57b1f81..300e7ef 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java @@ -214,10 +214,13 @@ public class CassandraMessageMapper implements MessageMapper { } private CompletableFuture<Stream<SimpleMailboxMessage>> expungeUidChunk(CassandraId mailboxId, Collection<MessageUid> uidChunk) { - return FluentFutureStream.ofOptionals( - uidChunk.stream().map(uid -> retrieveComposedId(mailboxId, uid))) + return FluentFutureStream.of( + uidChunk.stream().map(uid -> retrieveComposedId(mailboxId, uid)), + FluentFutureStream::unboxOptional) .performOnAll(this::deleteUsingMailboxId) - .thenFlatCompose(idWithMetadata -> messageDAO.retrieveMessages(ImmutableList.of(idWithMetadata), FetchType.Metadata, Limit.unlimited())) + .map(idWithMetadata -> FluentFutureStream.of( + messageDAO.retrieveMessages(ImmutableList.of(idWithMetadata), FetchType.Metadata, Limit.unlimited())), + FluentFutureStream::unboxFluentFuture) .filter(CassandraMessageDAO.MessageResult::isFound) .map(CassandraMessageDAO.MessageResult::message) .map(pair -> pair.getKey().toMailboxMessage(ImmutableList.of())) @@ -299,8 +302,9 @@ public class CassandraMessageMapper implements MessageMapper { } private FlagsUpdateStageResult retryUpdatesStage(CassandraId mailboxId, FlagsUpdateCalculator flagsUpdateCalculator, List<MessageUid> failed) { - Stream<ComposedMessageIdWithMetaData> idsFailed = FluentFutureStream.ofOptionals( - failed.stream().map(uid -> messageIdDAO.retrieve(mailboxId, uid))) + Stream<ComposedMessageIdWithMetaData> idsFailed = FluentFutureStream.of( + failed.stream().map(uid -> messageIdDAO.retrieve(mailboxId, uid)), + FluentFutureStream::unboxOptional) .join(); return runUpdateStage(mailboxId, idsFailed, flagsUpdateCalculator); http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/container/util/src/main/java/org/apache/james/util/CompletableFutureUtil.java ---------------------------------------------------------------------- diff --git a/server/container/util/src/main/java/org/apache/james/util/CompletableFutureUtil.java b/server/container/util/src/main/java/org/apache/james/util/CompletableFutureUtil.java index d2e83af..433de52 100644 --- a/server/container/util/src/main/java/org/apache/james/util/CompletableFutureUtil.java +++ b/server/container/util/src/main/java/org/apache/james/util/CompletableFutureUtil.java @@ -19,6 +19,7 @@ package org.apache.james.util; +import java.util.Comparator; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.BiFunction; @@ -35,11 +36,6 @@ public class CompletableFutureUtil { .orElse(CompletableFuture.completedFuture(Optional.empty()))); } - @SafeVarargs - public static <T> CompletableFuture<Stream<T>> allOfArray(CompletableFuture<T>... futures) { - return allOf(Stream.of(futures)); - } - public static <T, U, V> CompletableFuture<V> combine(CompletableFuture<T> t, CompletableFuture<U> u, BiFunction<T,U,V> combiner) { return t.thenCompose(valueT -> u.thenApply(valueU -> combiner.apply(valueT, valueU))); @@ -81,20 +77,6 @@ public class CompletableFutureUtil { CompletableFuture.completedFuture(Stream.concat(stream1, stream2)))); } - public static <T> CompletableFuture<Stream<T>> performOnAll(CompletableFuture<Stream<T>> futurStream, Function<T, CompletableFuture<Void>> action) { - return thenComposeOnAll(futurStream, value -> - keepValue(() -> - action.apply(value), - value)); - } - - public static <T, U> CompletableFuture<Stream<U>> thenComposeOnAll(CompletableFuture<Stream<T>> futurStream, Function<T, CompletableFuture<U>> action) { - return futurStream - .thenCompose(stream -> - CompletableFutureUtil.allOf( - stream.map(action))); - } - public static <T, U> CompletableFuture<Stream<U>> map(CompletableFuture<Stream<T>> futurStream, Function<T, U> action) { return futurStream .thenApply(stream -> @@ -109,10 +91,6 @@ public class CompletableFutureUtil { return futureStream.thenApply(stream -> stream.reduce(binaryOperator).orElse(emptyAccumulator)); } - public static <T> CompletableFuture<T> keepValue(Supplier<CompletableFuture<Void>> supplier, T value) { - return supplier.get().thenApply(any -> value); - } - public static <T> Function<Boolean, CompletableFuture<Boolean>> composeIfTrue(Supplier<CompletableFuture<T>> composeOperation) { return b -> { if (b) { @@ -121,4 +99,10 @@ public class CompletableFutureUtil { return CompletableFuture.completedFuture(b); }; } + + public static <T> CompletableFuture<Stream<T>> sorted(CompletableFuture<Stream<T>> futureStream, Comparator<T> comparator) { + return futureStream + .thenApply(stream -> + stream.sorted(comparator)); + } } http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/container/util/src/main/java/org/apache/james/util/FluentFutureStream.java ---------------------------------------------------------------------- diff --git a/server/container/util/src/main/java/org/apache/james/util/FluentFutureStream.java b/server/container/util/src/main/java/org/apache/james/util/FluentFutureStream.java index 9dcae7a..0b0b86e 100644 --- a/server/container/util/src/main/java/org/apache/james/util/FluentFutureStream.java +++ b/server/container/util/src/main/java/org/apache/james/util/FluentFutureStream.java @@ -19,6 +19,8 @@ package org.apache.james.util; +import java.util.Arrays; +import java.util.Comparator; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.BinaryOperator; @@ -29,6 +31,31 @@ import java.util.stream.Stream; public class FluentFutureStream<T> { + public static <T> FluentFutureStream<T> unboxStream(FluentFutureStream<Stream<T>> streams) { + return FluentFutureStream.of( + streams.completableFuture() + .thenApply(StreamUtils::flatten)); + } + + public static <T> FluentFutureStream<T> unboxOptional(FluentFutureStream<Optional<T>> optionals) { + return unboxStream(optionals.map(OptionalUtils::toStream)); + } + + public static <T> FluentFutureStream<T> unboxFuture(FluentFutureStream<CompletableFuture<T>> futures) { + return FluentFutureStream.of(futures.completableFuture() + .thenCompose(CompletableFutureUtil::allOf)); + } + + public static <T> FluentFutureStream<T> unboxFluentFuture(FluentFutureStream<FluentFutureStream<T>> futures) { + return unboxStream( + unboxFuture( + futures.map(FluentFutureStream::completableFuture))); + } + + public static <T> FluentFutureStream<T> unboxFutureOptional(FluentFutureStream<CompletableFuture<Optional<T>>> futures) { + return unboxOptional(unboxFuture(futures)); + } + private final CompletableFuture<Stream<T>> completableFuture; /** @@ -38,6 +65,11 @@ public class FluentFutureStream<T> { return new FluentFutureStream<>(completableFuture); } + public static <T, U> FluentFutureStream<U> of(Stream<CompletableFuture<T>> completableFuture, + Function<FluentFutureStream<T>, FluentFutureStream<U>> unboxer) { + return unboxer.apply(of(completableFuture)); + } + /** * Constructs a FluentFutureStream from a Stream of Future */ @@ -45,32 +77,9 @@ public class FluentFutureStream<T> { return new FluentFutureStream<>(CompletableFutureUtil.allOf(completableFutureStream)); } - /** - * Constructs a FluentFutureStream from a Stream of Future of Stream. - * - * Underlying streams are flatMapped. - */ - public static <T> FluentFutureStream<T> ofNestedStreams(Stream<CompletableFuture<Stream<T>>> completableFuture) { - return of(completableFuture) - .flatMap(Function.identity()); - } - - /** - * Constructs a FluentFutureStream from a Stream of Future of Optionals. - * - * Underlying optionals are unboxed. - */ - public static <T> FluentFutureStream<T> ofOptionals(Stream<CompletableFuture<Optional<T>>> completableFuture) { - return of(completableFuture) - .flatMapOptional(Function.identity()); - } - - /** - * Constructs a FluentFutureStream from the supplied futures. - */ @SafeVarargs public static <T> FluentFutureStream<T> ofFutures(CompletableFuture<T>... completableFutures) { - return new FluentFutureStream<>(CompletableFutureUtil.allOfArray(completableFutures)); + return of(Arrays.stream(completableFutures)); } private FluentFutureStream(CompletableFuture<Stream<T>> completableFuture) { @@ -81,8 +90,8 @@ public class FluentFutureStream<T> { * For all values of the underlying stream, an action will be performed. */ public FluentFutureStream<T> performOnAll(Function<T, CompletableFuture<Void>> action) { - return FluentFutureStream.of( - CompletableFutureUtil.performOnAll(completableFuture(), action)); + return map(t -> action.apply(t).thenApply(any -> t), + FluentFutureStream::unboxFuture); } /** @@ -93,56 +102,8 @@ public class FluentFutureStream<T> { CompletableFutureUtil.map(completableFuture(), function)); } - /** - * Apply a transformation to all value of the underlying stream. - * - * As the supplied transformation produces streams, the results will be flatMapped. - */ - public <U> FluentFutureStream<U> flatMap(Function<T, Stream<U>> function) { - return FluentFutureStream.of(completableFuture().thenApply(stream -> - stream.flatMap(function))); - } - - /** - * Apply a transformation to all value of the underlying stream. - * - * As the supplied transformation produces optionals, the results will be unboxed. - */ - public <U> FluentFutureStream<U> flatMapOptional(Function<T, Optional<U>> function) { - return map(function) - .flatMap(OptionalUtils::toStream); - } - - /** - * Apply a transformation to all value of the underlying stream. - * - * As the supplied transformation produces futures, we need to compose the returned values. - */ - public <U> FluentFutureStream<U> thenComposeOnAll(Function<T, CompletableFuture<U>> function) { - return FluentFutureStream.of( - CompletableFutureUtil.thenComposeOnAll(completableFuture(), function)); - } - - /** - * Apply a transformation to all value of the underlying stream. - * - * As the supplied transformation produces futures of stream, we need to compose then flatMap the returned values. - */ - public <U> FluentFutureStream<U> thenFlatCompose(Function<T, CompletableFuture<Stream<U>>> function) { - return FluentFutureStream.of( - CompletableFutureUtil.thenComposeOnAll(completableFuture(), function)) - .flatMap(Function.identity()); - } - - /** - * Apply a transformation to all value of the underlying stream. - * - * As the supplied transformation produces futures of optionals, we need to compose then unbox the returned values. - */ - public <U> FluentFutureStream<U> thenFlatComposeOnOptional(Function<T, CompletableFuture<Optional<U>>> function) { - return FluentFutureStream.of( - CompletableFutureUtil.thenComposeOnAll(completableFuture(), function)) - .flatMapOptional(Function.identity()); + public <U, V> FluentFutureStream<V> map(Function<T, U> function, Function<FluentFutureStream<U>, FluentFutureStream<V>> unboxer) { + return unboxer.apply(map(function)); } /** @@ -153,6 +114,12 @@ public class FluentFutureStream<T> { .thenApply(stream -> stream.filter(predicate))); } + public FluentFutureStream<T> thenFilter(Function<T, CompletableFuture<Boolean>> futurePredicate) { + return map(t -> futurePredicate.apply(t) + .thenApply(isKept -> Optional.of(t).filter(any -> isKept)), + FluentFutureStream::unboxFutureOptional); + } + /** * Reduces the underlying stream. Reduced value is supplied as a Future of optional, as no empty value is supplied. */ @@ -168,6 +135,14 @@ public class FluentFutureStream<T> { } /** + * sort all elements of the stream by the provided {@code Comparator}. + */ + public FluentFutureStream<T> sorted(Comparator<T> comparator) { + return FluentFutureStream.of( + CompletableFutureUtil.sorted(completableFuture(), comparator)); + } + + /** * Returns a future of the underlying stream. */ public CompletableFuture<Stream<T>> completableFuture() { http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/container/util/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java ---------------------------------------------------------------------- diff --git a/server/container/util/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java b/server/container/util/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java index a42fb39..acbe10f 100644 --- a/server/container/util/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java +++ b/server/container/util/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java @@ -27,7 +27,6 @@ import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Supplier; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -171,59 +170,6 @@ public class CompletableFutureUtilTest { } @Test - public void allOfArrayShouldPreserveOrder() { - long value1 = 18L; - long value2 = 19L; - long value3 = 20L; - long value4 = 21L; - long value5 = 22L; - long value6 = 23L; - long value7 = 24L; - long value8 = 25L; - long value9 = 26L; - long value10 = 27L; - assertThat( - CompletableFutureUtil.allOfArray( - CompletableFuture.completedFuture(value1), - CompletableFuture.completedFuture(value2), - CompletableFuture.completedFuture(value3), - CompletableFuture.completedFuture(value4), - CompletableFuture.completedFuture(value5), - CompletableFuture.completedFuture(value6), - CompletableFuture.completedFuture(value7), - CompletableFuture.completedFuture(value8), - CompletableFuture.completedFuture(value9), - CompletableFuture.completedFuture(value10)) - .join() - .collect(Guavate.toImmutableList())) - .containsExactly(value1, value2, value3, value4, value5, value6, value7, value8, value9, value10); - } - - @Test - public void allOfArrayShouldUnboxNoArgs() { - assertThat( - CompletableFutureUtil.allOfArray() - .join() - .collect(Guavate.toImmutableList())) - .isEmpty(); - } - - @Test - public void allOfArrayShouldUnboxArray() { - long value1 = 18L; - long value2 = 19L; - long value3 = 20L; - assertThat( - CompletableFutureUtil.allOfArray( - CompletableFuture.completedFuture(value1), - CompletableFuture.completedFuture(value2), - CompletableFuture.completedFuture(value3)) - .join() - .collect(Guavate.toImmutableList())) - .containsOnly(value1, value2, value3); - } - - @Test public void allOfShouldWorkOnVeryLargeStream() { CompletableFutureUtil.allOf( IntStream.range(0, 100000) @@ -257,58 +203,6 @@ public class CompletableFutureUtilTest { } @Test - public void thenComposeOnAllShouldMapOnStreamInsideACompletableFuturOfStreamAndTransformTheResultingStreamOfCompletableFutureIntoACompletableOfStreamAndFlatIt() { - CompletableFuture<Stream<Integer>> futurOfInteger = CompletableFuture.completedFuture(Stream.of(1, 2, 3)); - - assertThat( - CompletableFutureUtil.thenComposeOnAll(futurOfInteger, integer -> - CompletableFuture.completedFuture(integer * 2)) - .join() - .collect(Guavate.toImmutableList())) - .containsExactly(2, 4, 6); - } - - @Test - public void thenComposeOnAllOnEmptyStreamShouldReturnAnEmptyStream() { - CompletableFuture<Stream<Integer>> futurOfInteger = CompletableFuture.completedFuture(Stream.of()); - - assertThat( - CompletableFutureUtil.thenComposeOnAll(futurOfInteger, integer -> - CompletableFuture.completedFuture(integer * 2)) - .join() - .collect(Guavate.toImmutableList())) - .isEmpty(); - } - - @Test - public void keepValueShouldCompleteWhenTheGivenCompletableFutureEnd() { - final AtomicInteger numOfFutureExecution = new AtomicInteger(0); - - Supplier<CompletableFuture<Void>> future = () -> - CompletableFuture.runAsync(numOfFutureExecution::incrementAndGet); - - assertThat( - CompletableFutureUtil.keepValue(future, 42) - .join()) - .isEqualTo(42); - - assertThat( - numOfFutureExecution.get()) - .isEqualTo(1); - } - - @Test - public void keepValueShouldReturnNullWithNullValue() { - Supplier<CompletableFuture<Void>> future = () -> - CompletableFuture.completedFuture(null); - - assertThat( - CompletableFutureUtil.keepValue(future, null) - .join()) - .isNull(); - } - - @Test public void composeIfTrueShouldReturnTrueWhenTrue() { assertThat( CompletableFutureUtil.composeIfTrue(() -> CompletableFuture.completedFuture(null)) @@ -357,7 +251,7 @@ public class CompletableFutureUtilTest { assertThat( CompletableFutureUtil.reduce( (i, j) -> i + j, - CompletableFutureUtil.<Long>allOfArray()) + CompletableFuture.completedFuture(Stream.<Long>of())) .join()) .isEmpty(); } @@ -367,11 +261,8 @@ public class CompletableFutureUtilTest { assertThat( CompletableFutureUtil.reduce( (i, j) -> i + j, - CompletableFutureUtil.allOfArray( - CompletableFuture.completedFuture(1L), - CompletableFuture.completedFuture(2L), - CompletableFuture.completedFuture(3L) - )) + CompletableFuture.completedFuture(Stream.of( + 1L, 2L, 3L))) .join()) .contains(6L); } @@ -382,7 +273,7 @@ public class CompletableFutureUtilTest { assertThat( CompletableFutureUtil.reduce( (i, j) -> i + j, - CompletableFutureUtil.<Long>allOfArray(), + CompletableFuture.completedFuture(Stream.of()), identityAccumulator) .join()) .isEqualTo(identityAccumulator); @@ -393,11 +284,7 @@ public class CompletableFutureUtilTest { assertThat( CompletableFutureUtil.reduce( (i, j) -> i + j, - CompletableFutureUtil.allOfArray( - CompletableFuture.completedFuture(1L), - CompletableFuture.completedFuture(2L), - CompletableFuture.completedFuture(3L) - ), + CompletableFuture.completedFuture(Stream.of(1L, 2L,3L)), 0L) .join()) .isEqualTo(6L); @@ -420,4 +307,23 @@ public class CompletableFutureUtilTest { .join()) .isEmpty(); } + + @Test + public void sortShouldReturnEmptyWhenEmptyStream() { + FluentFutureStream<Long> futureStream = FluentFutureStream.ofFutures(); + assertThat(futureStream.sorted(Long::compareTo).join()) + .isEmpty(); + } + + @Test + public void sortShouldReturnTheSortedStream() { + FluentFutureStream<Long> futureStream = FluentFutureStream.ofFutures( + CompletableFuture.completedFuture(4L), + CompletableFuture.completedFuture(3L), + CompletableFuture.completedFuture(2L), + CompletableFuture.completedFuture(1L)); + + assertThat(futureStream.sorted(Long::compareTo).join()) + .containsExactly(1L, 2L, 3L, 4L); + } } http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/container/util/src/test/java/org/apache/james/util/FluentFutureStreamTest.java ---------------------------------------------------------------------- diff --git a/server/container/util/src/test/java/org/apache/james/util/FluentFutureStreamTest.java b/server/container/util/src/test/java/org/apache/james/util/FluentFutureStreamTest.java index 0877414..56017d6 100644 --- a/server/container/util/src/test/java/org/apache/james/util/FluentFutureStreamTest.java +++ b/server/container/util/src/test/java/org/apache/james/util/FluentFutureStreamTest.java @@ -71,11 +71,12 @@ public class FluentFutureStreamTest { @Test public void ofNestedStreamsShouldConstructAFluentFutureStreamWhenProvidedAStreamOfFutureOfStream() { assertThat( - FluentFutureStream.ofNestedStreams( + FluentFutureStream.<Stream<Integer>, Integer>of( Stream.of( CompletableFuture.completedFuture(Stream.of(1, 2)), CompletableFuture.completedFuture(Stream.of()), - CompletableFuture.completedFuture(Stream.of(3)))) + CompletableFuture.completedFuture(Stream.of(3))), + FluentFutureStream::unboxStream) .join() .collect(Guavate.toImmutableList())) .containsExactly(1, 2, 3); @@ -85,12 +86,13 @@ public class FluentFutureStreamTest { @Test public void ofOptionalsShouldConstructAFluentFutureStreamWhenProvidedAStreamOfFutureOfOptionals() { assertThat( - FluentFutureStream.ofOptionals( + FluentFutureStream.<Optional<Integer>, Integer>of( Stream.of( CompletableFuture.completedFuture(Optional.of(1)), CompletableFuture.completedFuture(Optional.of(2)), CompletableFuture.completedFuture(Optional.empty()), - CompletableFuture.completedFuture(Optional.of(3)))) + CompletableFuture.completedFuture(Optional.of(3))), + FluentFutureStream::unboxOptional) .join() .collect(Guavate.toImmutableList())) .containsExactly(1, 2, 3); @@ -126,7 +128,7 @@ public class FluentFutureStreamTest { FluentFutureStream.of( CompletableFuture.completedFuture( Stream.of(1, 2, 3))) - .flatMap(i -> Stream.of(i, i + 1)) + .map(i -> Stream.of(i, i + 1), FluentFutureStream::unboxStream) .join() .collect(Guavate.toImmutableList())) .containsExactly(1, 2, 2, 3, 3, 4); @@ -138,8 +140,9 @@ public class FluentFutureStreamTest { FluentFutureStream.of( CompletableFuture.completedFuture( Stream.of(1, 2, 3))) - .flatMapOptional(i -> Optional.of(i + 1) - .filter(j -> j % 2 == 0)) + .map(i -> Optional.of(i + 1) + .filter(j -> j % 2 == 0), + FluentFutureStream::unboxOptional) .join() .collect(Guavate.toImmutableList())) .containsExactly(2, 4); @@ -180,12 +183,24 @@ public class FluentFutureStreamTest { } @Test + public void thenFilterShouldBeAppliedOnTheUnderlyingStream() { + assertThat( + FluentFutureStream.of( + CompletableFuture.completedFuture( + Stream.of(1, 2, 3))) + .thenFilter(i -> CompletableFuture.completedFuture(i % 2 == 1)) + .join() + .collect(Guavate.toImmutableList())) + .containsExactly(1, 3); + } + + @Test public void thenComposeOnAllShouldTransformUnderlyingValuesAndComposeFutures() { assertThat( FluentFutureStream.of( CompletableFuture.completedFuture( Stream.of(1, 2, 3))) - .thenComposeOnAll(i -> CompletableFuture.completedFuture(i + 1)) + .map(i -> CompletableFuture.completedFuture(i + 1), FluentFutureStream::unboxFuture) .join() .collect(Guavate.toImmutableList())) .containsExactly(2, 3, 4); @@ -197,7 +212,7 @@ public class FluentFutureStreamTest { FluentFutureStream.of( CompletableFuture.completedFuture( Stream.of(1, 2, 3))) - .thenFlatCompose(i -> CompletableFuture.completedFuture(Stream.of(i, i + 1))) + .map(i -> FluentFutureStream.of(CompletableFuture.completedFuture(Stream.of(i, i + 1))), FluentFutureStream::unboxFluentFuture) .join() .collect(Guavate.toImmutableList())) .containsExactly(1, 2, 2, 3, 3, 4); @@ -209,8 +224,9 @@ public class FluentFutureStreamTest { FluentFutureStream.of( CompletableFuture.completedFuture( Stream.of(1, 2, 3))) - .thenFlatComposeOnOptional(i -> CompletableFuture.completedFuture(Optional.of(i + 1) - .filter(j -> j % 2 == 0))) + .map(i -> CompletableFuture.completedFuture( + Optional.of(i + 1).filter(j -> j % 2 == 0)), + FluentFutureStream::unboxFutureOptional) .join() .collect(Guavate.toImmutableList())) .containsExactly(2, 4); @@ -255,4 +271,23 @@ public class FluentFutureStreamTest { .isEmpty(); } + @Test + public void sortedShouldReturnInOrderElements() { + assertThat( + FluentFutureStream.of( + CompletableFuture.completedFuture(Stream.of(4L, 3L, 2L, 1L))) + .sorted(Long::compareTo) + .join()) + .containsExactly(1L, 2L, 3L, 4L); + } + + @Test + public void sortedShouldReturnEmptyWhenEmpty() { + CompletableFuture<Stream<Long>> completableFutureStream = CompletableFuture.completedFuture(Stream.of()); + assertThat( + FluentFutureStream.of(completableFutureStream) + .sorted(Long::compareTo) + .join()) + .isEmpty(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueContract.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueContract.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueContract.java index 2877efe..9bc503c 100644 --- a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueContract.java +++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueContract.java @@ -57,8 +57,8 @@ public interface ManageableMailQueueContract extends MailQueueContract { @Test default void getSizeShouldReturnMessageCountWhenSeveralMails() throws Exception { - getManageableMailQueue().enQueue(defaultMail().build()); - getManageableMailQueue().enQueue(defaultMail().build()); + getManageableMailQueue().enQueue(defaultMail().name("1").build()); + getManageableMailQueue().enQueue(defaultMail().name("2").build()); long size = getManageableMailQueue().getSize(); http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java index e4b8881..818a83b 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java @@ -25,11 +25,13 @@ import java.util.function.Function; import javax.inject.Inject; import javax.mail.internet.MimeMessage; +import org.apache.commons.lang3.NotImplementedException; import org.apache.james.blob.api.BlobId; import org.apache.james.blob.api.Store; import org.apache.james.blob.mail.MimeMessagePartsId; import org.apache.james.metrics.api.MetricFactory; -import org.apache.james.queue.api.MailQueue; +import org.apache.james.queue.api.ManageableMailQueue; +import org.apache.james.queue.rabbitmq.view.api.MailQueueView; import org.apache.mailet.Mail; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,7 +39,7 @@ import org.slf4j.LoggerFactory; import com.github.fge.lambdas.Throwing; import com.google.common.annotations.VisibleForTesting; -public class RabbitMQMailQueue implements MailQueue { +public class RabbitMQMailQueue implements ManageableMailQueue { private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQMailQueue.class); @@ -47,21 +49,26 @@ public class RabbitMQMailQueue implements MailQueue { private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore; private final MailReferenceSerializer mailReferenceSerializer; private final Function<MailReferenceDTO, Mail> mailLoader; + private final MailQueueView mailQueueView; @Inject @VisibleForTesting Factory(MetricFactory metricFactory, RabbitClient rabbitClient, - Store<MimeMessage, MimeMessagePartsId> mimeMessageStore, BlobId.Factory blobIdFactory) { + Store<MimeMessage, MimeMessagePartsId> mimeMessageStore, + BlobId.Factory blobIdFactory, + MailQueueView mailQueueView) { this.metricFactory = metricFactory; this.rabbitClient = rabbitClient; this.mimeMessageStore = mimeMessageStore; this.mailReferenceSerializer = new MailReferenceSerializer(); this.mailLoader = Throwing.function(new MailLoader(mimeMessageStore, blobIdFactory)::load).sneakyThrow(); + this.mailQueueView = mailQueueView; } RabbitMQMailQueue create(MailQueueName mailQueueName) { return new RabbitMQMailQueue(metricFactory, mailQueueName, new Enqueuer(mailQueueName, rabbitClient, mimeMessageStore, mailReferenceSerializer, metricFactory), - new Dequeuer(mailQueueName, rabbitClient, mailLoader, mailReferenceSerializer, metricFactory)); + new Dequeuer(mailQueueName, rabbitClient, mailLoader, mailReferenceSerializer, metricFactory), + mailQueueView); } } @@ -69,15 +76,17 @@ public class RabbitMQMailQueue implements MailQueue { private final MetricFactory metricFactory; private final Enqueuer enqueuer; private final Dequeuer dequeuer; + private final MailQueueView mailQueueView; RabbitMQMailQueue(MetricFactory metricFactory, MailQueueName name, - Enqueuer enqueuer, Dequeuer dequeuer) { + Enqueuer enqueuer, Dequeuer dequeuer, MailQueueView mailQueueView) { this.name = name; this.enqueuer = enqueuer; this.dequeuer = dequeuer; this.metricFactory = metricFactory; + this.mailQueueView = mailQueueView; } @Override @@ -104,4 +113,30 @@ public class RabbitMQMailQueue implements MailQueue { return metricFactory.runPublishingTimerMetric(DEQUEUED_TIMER_METRIC_NAME_PREFIX + name.asString(), Throwing.supplier(dequeuer::deQueue).sneakyThrow()); } -} \ No newline at end of file + + @Override + public long getSize() { + return mailQueueView.getSize(); + } + + @Override + public long flush() { + LOGGER.warn("Delays are not supported by RabbitMQ. Flush is a NOOP."); + return 0; + } + + @Override + public long clear() { + throw new NotImplementedException("Not yet implemented"); + } + + @Override + public long remove(Type type, String value) { + throw new NotImplementedException("Not yet implemented"); + } + + @Override + public MailQueueIterator browse() { + return mailQueueView.browse(); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAO.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAO.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAO.java new file mode 100644 index 0000000..098c033 --- /dev/null +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAO.java @@ -0,0 +1,109 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.queue.rabbitmq.view.cassandra; + +import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker; +import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; +import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; +import static com.datastax.driver.core.querybuilder.QueryBuilder.select; +import static com.datastax.driver.core.querybuilder.QueryBuilder.set; +import static com.datastax.driver.core.querybuilder.QueryBuilder.update; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.BrowseStartTable.BROWSE_START; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.BrowseStartTable.QUEUE_NAME; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.BrowseStartTable.TABLE_NAME; + +import java.time.Instant; +import java.util.Date; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import javax.inject.Inject; + +import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; +import org.apache.james.queue.rabbitmq.MailQueueName; + +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.google.common.annotations.VisibleForTesting; + +class BrowseStartDAO { + + private final CassandraAsyncExecutor executor; + private final PreparedStatement selectOne; + private final PreparedStatement insertOne; + private final PreparedStatement updateOne; + + @Inject + BrowseStartDAO(Session session) { + this.executor = new CassandraAsyncExecutor(session); + + this.selectOne = prepareSelectOne(session); + this.updateOne = prepareUpdate(session); + this.insertOne = prepareInsertOne(session); + } + + private PreparedStatement prepareSelectOne(Session session) { + return session.prepare(select() + .from(TABLE_NAME) + .where(eq(QUEUE_NAME, bindMarker(QUEUE_NAME)))); + } + + private PreparedStatement prepareUpdate(Session session) { + return session.prepare(update(TABLE_NAME) + .with(set(BROWSE_START, bindMarker(BROWSE_START))) + .where(eq(QUEUE_NAME, bindMarker(QUEUE_NAME)))); + } + + private PreparedStatement prepareInsertOne(Session session) { + return session.prepare(insertInto(TABLE_NAME) + .ifNotExists() + .value(BROWSE_START, bindMarker(BROWSE_START)) + .value(QUEUE_NAME, bindMarker(QUEUE_NAME))); + } + + CompletableFuture<Optional<Instant>> findBrowseStart(MailQueueName queueName) { + return selectOne(queueName) + .thenApply(optional -> optional.map(this::getBrowseStart)); + } + + CompletableFuture<Void> updateBrowseStart(MailQueueName mailQueueName, Instant sliceStart) { + return executor.executeVoid(updateOne.bind() + .setTimestamp(BROWSE_START, Date.from(sliceStart)) + .setString(QUEUE_NAME, mailQueueName.asString())); + } + + CompletableFuture<Void> insertInitialBrowseStart(MailQueueName mailQueueName, Instant sliceStart) { + return executor.executeVoid(insertOne.bind() + .setTimestamp(BROWSE_START, Date.from(sliceStart)) + .setString(QUEUE_NAME, mailQueueName.asString())); + } + + @VisibleForTesting + CompletableFuture<Optional<Row>> selectOne(MailQueueName queueName) { + return executor.executeSingleRow( + selectOne.bind() + .setString(QUEUE_NAME, queueName.asString())); + } + + private Instant getBrowseStart(Row row) { + return row.getTimestamp(BROWSE_START).toInstant(); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java new file mode 100644 index 0000000..21dedb8 --- /dev/null +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java @@ -0,0 +1,128 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.queue.rabbitmq.view.cassandra; + +import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.BucketId; +import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.Slice; +import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.Slice.allSlicesTill; + +import java.time.Clock; +import java.time.Instant; +import java.util.Comparator; +import java.util.Iterator; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import javax.inject.Inject; + +import org.apache.james.queue.api.ManageableMailQueue; +import org.apache.james.queue.rabbitmq.MailQueueName; +import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedMail; +import org.apache.james.util.FluentFutureStream; + +import com.google.common.base.Preconditions; + +class CassandraMailQueueBrowser { + + static class CassandraMailQueueIterator implements ManageableMailQueue.MailQueueIterator { + + private final Iterator<ManageableMailQueue.MailQueueItemView> iterator; + + CassandraMailQueueIterator(Iterator<ManageableMailQueue.MailQueueItemView> iterator) { + Preconditions.checkNotNull(iterator); + + this.iterator = iterator; + } + + @Override + public void close() {} + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public ManageableMailQueue.MailQueueItemView next() { + return iterator.next(); + } + } + + private final BrowseStartDAO browseStartDao; + private final DeletedMailsDAO deletedMailsDao; + private final EnqueuedMailsDAO enqueuedMailsDao; + private final CassandraMailQueueViewConfiguration configuration; + private final Clock clock; + + @Inject + CassandraMailQueueBrowser(BrowseStartDAO browseStartDao, + DeletedMailsDAO deletedMailsDao, + EnqueuedMailsDAO enqueuedMailsDao, + CassandraMailQueueViewConfiguration configuration, Clock clock) { + this.browseStartDao = browseStartDao; + this.deletedMailsDao = deletedMailsDao; + this.enqueuedMailsDao = enqueuedMailsDao; + this.configuration = configuration; + this.clock = clock; + } + + CompletableFuture<Stream<ManageableMailQueue.MailQueueItemView>> browse(MailQueueName queueName) { + return browseReferences(queueName) + .map(EnqueuedMail::getMail) + .map(ManageableMailQueue.MailQueueItemView::new) + .completableFuture(); + } + + FluentFutureStream<EnqueuedMail> browseReferences(MailQueueName queueName) { + return FluentFutureStream.of(browseStartDao.findBrowseStart(queueName) + .thenApply(this::allSlicesStartingAt)) + .map(slice -> browseSlice(queueName, slice), FluentFutureStream::unboxFluentFuture); + } + + private FluentFutureStream<EnqueuedMail> browseSlice(MailQueueName queueName, Slice slice) { + return FluentFutureStream.of( + allBucketIds() + .map(bucketId -> + browseBucket(queueName, slice, bucketId).completableFuture()), + FluentFutureStream::unboxStream) + .sorted(Comparator.comparing(EnqueuedMail::getEnqueuedTime)); + } + + private FluentFutureStream<EnqueuedMail> browseBucket(MailQueueName queueName, Slice slice, BucketId bucketId) { + return FluentFutureStream.of( + enqueuedMailsDao.selectEnqueuedMails(queueName, slice, bucketId)) + .thenFilter(mailReference -> deletedMailsDao.isStillEnqueued(queueName, mailReference.getMailKey())); + } + + private Stream<Slice> allSlicesStartingAt(Optional<Instant> maybeBrowseStart) { + return maybeBrowseStart + .map(browseStart -> Slice.of(browseStart, configuration.getSliceWindow())) + .map(startSlice -> allSlicesTill(startSlice, clock.instant())) + .orElse(Stream.empty()); + } + + private Stream<BucketId> allBucketIds() { + return IntStream + .range(0, configuration.getBucketCount()) + .mapToObj(BucketId::of); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java new file mode 100644 index 0000000..c8c0a0e --- /dev/null +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java @@ -0,0 +1,87 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.queue.rabbitmq.view.cassandra; + +import java.time.Instant; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Stream; + +import javax.inject.Inject; + +import org.apache.james.queue.rabbitmq.MailQueueName; +import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedMail; +import org.apache.james.queue.rabbitmq.view.cassandra.model.MailKey; +import org.apache.mailet.Mail; + +class CassandraMailQueueMailDelete { + + private final DeletedMailsDAO deletedMailsDao; + private final BrowseStartDAO browseStartDao; + private final CassandraMailQueueBrowser cassandraMailQueueBrowser; + private final CassandraMailQueueViewConfiguration configuration; + private final ThreadLocalRandom random; + + @Inject + CassandraMailQueueMailDelete(DeletedMailsDAO deletedMailsDao, + BrowseStartDAO browseStartDao, + CassandraMailQueueBrowser cassandraMailQueueBrowser, + CassandraMailQueueViewConfiguration configuration, + ThreadLocalRandom random) { + this.deletedMailsDao = deletedMailsDao; + this.browseStartDao = browseStartDao; + this.cassandraMailQueueBrowser = cassandraMailQueueBrowser; + this.configuration = configuration; + this.random = random; + } + + CompletableFuture<Void> considerDeleted(Mail mail, MailQueueName mailQueueName) { + return deletedMailsDao + .markAsDeleted(mailQueueName, MailKey.fromMail(mail)) + .thenRunAsync(() -> maybeUpdateBrowseStart(mailQueueName)); + } + + private void maybeUpdateBrowseStart(MailQueueName mailQueueName) { + if (shouldUpdateBrowseStart()) { + findNewBrowseStart(mailQueueName) + .thenCompose(newBrowseStart -> updateNewBrowseStart(mailQueueName, newBrowseStart)) + .join(); + } + } + + private CompletableFuture<Optional<Instant>> findNewBrowseStart(MailQueueName mailQueueName) { + return cassandraMailQueueBrowser.browseReferences(mailQueueName) + .map(EnqueuedMail::getTimeRangeStart) + .completableFuture() + .thenApply(Stream::findFirst); + } + + private CompletableFuture<Void> updateNewBrowseStart(MailQueueName mailQueueName, Optional<Instant> maybeNewBrowseStart) { + return maybeNewBrowseStart + .map(newBrowseStartInstant -> browseStartDao.updateBrowseStart(mailQueueName, newBrowseStartInstant)) + .orElse(CompletableFuture.completedFuture(null)); + } + + private boolean shouldUpdateBrowseStart() { + int threshold = configuration.getUpdateBrowseStartPace(); + return Math.abs(random.nextInt()) % threshold == 0; + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java new file mode 100644 index 0000000..ed696f2 --- /dev/null +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java @@ -0,0 +1,98 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.queue.rabbitmq.view.cassandra; + +import java.time.Clock; +import java.time.Instant; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +import javax.inject.Inject; + +import org.apache.james.queue.rabbitmq.MailQueueName; +import org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.BucketId; +import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedMail; +import org.apache.james.queue.rabbitmq.view.cassandra.model.MailKey; +import org.apache.mailet.Mail; + +class CassandraMailQueueMailStore { + + private final EnqueuedMailsDAO enqueuedMailsDao; + private final BrowseStartDAO browseStartDao; + private final CassandraMailQueueViewConfiguration configuration; + private final Clock clock; + private final Set<MailQueueName> initialInserted; + + @Inject + CassandraMailQueueMailStore(EnqueuedMailsDAO enqueuedMailsDao, + BrowseStartDAO browseStartDao, + CassandraMailQueueViewConfiguration configuration, + Clock clock) { + this.enqueuedMailsDao = enqueuedMailsDao; + this.browseStartDao = browseStartDao; + this.configuration = configuration; + this.clock = clock; + this.initialInserted = ConcurrentHashMap.newKeySet(); + } + + CompletableFuture<Void> storeMailInEnqueueTable(Mail mail, MailQueueName mailQueueName) { + EnqueuedMail enqueuedMail = convertToEnqueuedMail(mail, mailQueueName); + + return enqueuedMailsDao.insert(enqueuedMail) + .thenCompose(any -> initBrowseStartIfNeeded(mailQueueName, enqueuedMail.getTimeRangeStart())); + } + + private CompletableFuture<Void> initBrowseStartIfNeeded(MailQueueName mailQueueName, Instant sliceStartAt) { + if (!initialInserted.contains(mailQueueName)) { + return tryInsertBrowseStart(mailQueueName, sliceStartAt); + } + return CompletableFuture.completedFuture(null); + } + + private CompletableFuture<Void> tryInsertBrowseStart(MailQueueName mailQueueName, Instant sliceStartAt) { + return browseStartDao + .insertInitialBrowseStart(mailQueueName, sliceStartAt) + .thenAccept(any -> initialInserted.add(mailQueueName)); + } + + private EnqueuedMail convertToEnqueuedMail(Mail mail, MailQueueName mailQueueName) { + return EnqueuedMail.builder() + .mail(mail) + .bucketId(computedBucketId(mail)) + .timeRangeStart(currentSliceStartInstant()) + .enqueuedTime(Instant.now()) + .mailKey(MailKey.fromMail(mail)) + .mailQueueName(mailQueueName) + .build(); + } + + private Instant currentSliceStartInstant() { + long sliceSize = configuration.getSliceWindow().getSeconds(); + long sliceId = clock.instant().getEpochSecond() / sliceSize; + return Instant.ofEpochSecond(sliceId * sliceSize); + } + + private BucketId computedBucketId(Mail mail) { + int mailKeyHashCode = mail.getName().hashCode(); + int bucketIdValue = mailKeyHashCode % configuration.getBucketCount(); + return BucketId.of(bucketIdValue); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java new file mode 100644 index 0000000..ee309e6 --- /dev/null +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java @@ -0,0 +1,91 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.queue.rabbitmq.view.cassandra; + +import java.util.concurrent.CompletableFuture; + +import javax.inject.Inject; + +import org.apache.james.queue.api.ManageableMailQueue; +import org.apache.james.queue.rabbitmq.MailQueueName; +import org.apache.james.queue.rabbitmq.view.api.MailQueueView; +import org.apache.mailet.Mail; + +import com.google.common.collect.Iterators; + +public class CassandraMailQueueView implements MailQueueView { + + public static class Factory { + private final CassandraMailQueueMailStore storeHelper; + private final CassandraMailQueueBrowser cassandraMailQueueBrowser; + private final CassandraMailQueueMailDelete cassandraMailQueueMailDelete; + + @Inject + public Factory(CassandraMailQueueMailStore storeHelper, CassandraMailQueueBrowser cassandraMailQueueBrowser, CassandraMailQueueMailDelete cassandraMailQueueMailDelete) { + this.storeHelper = storeHelper; + this.cassandraMailQueueBrowser = cassandraMailQueueBrowser; + this.cassandraMailQueueMailDelete = cassandraMailQueueMailDelete; + } + + public MailQueueView create(MailQueueName mailQueueName) { + return new CassandraMailQueueView(storeHelper, mailQueueName, cassandraMailQueueBrowser, cassandraMailQueueMailDelete); + } + } + + private final CassandraMailQueueMailStore storeHelper; + private final CassandraMailQueueBrowser cassandraMailQueueBrowser; + private final CassandraMailQueueMailDelete cassandraMailQueueMailDelete; + + private final MailQueueName mailQueueName; + + CassandraMailQueueView(CassandraMailQueueMailStore storeHelper, + MailQueueName mailQueueName, + CassandraMailQueueBrowser cassandraMailQueueBrowser, + CassandraMailQueueMailDelete cassandraMailQueueMailDelete) { + this.mailQueueName = mailQueueName; + this.storeHelper = storeHelper; + this.cassandraMailQueueBrowser = cassandraMailQueueBrowser; + this.cassandraMailQueueMailDelete = cassandraMailQueueMailDelete; + } + + @Override + public CompletableFuture<Void> storeMail(Mail mail) { + return storeHelper.storeMailInEnqueueTable(mail, mailQueueName); + } + + @Override + public CompletableFuture<Void> deleteMail(Mail mail) { + return cassandraMailQueueMailDelete.considerDeleted(mail, mailQueueName); + } + + @Override + public ManageableMailQueue.MailQueueIterator browse() { + return new CassandraMailQueueBrowser.CassandraMailQueueIterator( + cassandraMailQueueBrowser.browse(mailQueueName) + .join() + .iterator()); + } + + @Override + public long getSize() { + return Iterators.size(browse()); + } + +} http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAO.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAO.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAO.java new file mode 100644 index 0000000..36ee5f4 --- /dev/null +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAO.java @@ -0,0 +1,83 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.queue.rabbitmq.view.cassandra; + +import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker; +import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; +import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; +import static com.datastax.driver.core.querybuilder.QueryBuilder.select; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.DeletedMailTable.MAIL_KEY; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.DeletedMailTable.QUEUE_NAME; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.DeletedMailTable.TABLE_NAME; + +import java.util.concurrent.CompletableFuture; + +import javax.inject.Inject; + +import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; +import org.apache.james.queue.rabbitmq.MailQueueName; +import org.apache.james.queue.rabbitmq.view.cassandra.model.MailKey; + +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; + +class DeletedMailsDAO { + + private final CassandraAsyncExecutor executor; + private final PreparedStatement selectOne; + private final PreparedStatement insertOne; + + @Inject + DeletedMailsDAO(Session session) { + this.executor = new CassandraAsyncExecutor(session); + this.selectOne = prepareSelectExist(session); + this.insertOne = prepareInsert(session); + } + + private PreparedStatement prepareInsert(Session session) { + return session.prepare(insertInto(TABLE_NAME) + .value(QUEUE_NAME, bindMarker(QUEUE_NAME)) + .value(MAIL_KEY, bindMarker(MAIL_KEY))); + } + + private PreparedStatement prepareSelectExist(Session session) { + return session.prepare(select() + .from(TABLE_NAME) + .where(eq(QUEUE_NAME, bindMarker(QUEUE_NAME))) + .and(eq(MAIL_KEY, bindMarker(MAIL_KEY)))); + } + + CompletableFuture<Void> markAsDeleted(MailQueueName mailQueueName, MailKey mailKey) { + return executor.executeVoid(insertOne.bind() + .setString(QUEUE_NAME, mailQueueName.asString()) + .setString(MAIL_KEY, mailKey.getMailKey())); + } + + CompletableFuture<Boolean> isDeleted(MailQueueName mailQueueName, MailKey mailKey) { + return executor.executeReturnExists( + selectOne.bind() + .setString(QUEUE_NAME, mailQueueName.asString()) + .setString(MAIL_KEY, mailKey.getMailKey())); + } + + CompletableFuture<Boolean> isStillEnqueued(MailQueueName mailQueueName, MailKey mailKey) { + return isDeleted(mailQueueName, mailKey).thenApply(b -> !b); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java new file mode 100644 index 0000000..2b8e11b --- /dev/null +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java @@ -0,0 +1,144 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.queue.rabbitmq.view.cassandra; + +import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker; +import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; +import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; +import static com.datastax.driver.core.querybuilder.QueryBuilder.select; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.ATTRIBUTES; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.BUCKET_ID; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.ENQUEUED_TIME; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.ERROR_MESSAGE; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.LAST_UPDATED; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.MAIL_KEY; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.PER_RECIPIENT_SPECIFIC_HEADERS; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.QUEUE_NAME; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.RECIPIENTS; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.REMOTE_ADDR; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.REMOTE_HOST; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.SENDER; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.STATE; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.TABLE_NAME; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.TIME_RANGE_START; +import static org.apache.james.queue.rabbitmq.view.cassandra.EnqueuedMailsDaoUtil.asStringList; +import static org.apache.james.queue.rabbitmq.view.cassandra.EnqueuedMailsDaoUtil.toHeaderMap; +import static org.apache.james.queue.rabbitmq.view.cassandra.EnqueuedMailsDaoUtil.toRawAttributeMap; +import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.BucketId; +import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.Slice; + +import java.util.Date; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Stream; + +import javax.inject.Inject; + +import org.apache.james.backends.cassandra.init.CassandraTypesProvider; +import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; +import org.apache.james.backends.cassandra.utils.CassandraUtils; +import org.apache.james.core.MailAddress; +import org.apache.james.queue.rabbitmq.MailQueueName; +import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedMail; +import org.apache.mailet.Mail; + +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; + +class EnqueuedMailsDAO { + + private final CassandraAsyncExecutor executor; + private final PreparedStatement selectFrom; + private final PreparedStatement insert; + private final CassandraUtils cassandraUtils; + private final CassandraTypesProvider cassandraTypesProvider; + + @Inject + EnqueuedMailsDAO(Session session, CassandraUtils cassandraUtils, CassandraTypesProvider cassandraTypesProvider) { + this.executor = new CassandraAsyncExecutor(session); + this.cassandraUtils = cassandraUtils; + this.cassandraTypesProvider = cassandraTypesProvider; + + this.selectFrom = prepareSelectFrom(session); + this.insert = prepareInsert(session); + } + + private PreparedStatement prepareSelectFrom(Session session) { + return session.prepare(select() + .from(TABLE_NAME) + .where(eq(QUEUE_NAME, bindMarker(QUEUE_NAME))) + .and(eq(TIME_RANGE_START, bindMarker(TIME_RANGE_START))) + .and(eq(BUCKET_ID, bindMarker(BUCKET_ID)))); + } + + private PreparedStatement prepareInsert(Session session) { + return session.prepare(insertInto(TABLE_NAME) + .value(QUEUE_NAME, bindMarker(QUEUE_NAME)) + .value(TIME_RANGE_START, bindMarker(TIME_RANGE_START)) + .value(BUCKET_ID, bindMarker(BUCKET_ID)) + .value(MAIL_KEY, bindMarker(MAIL_KEY)) + .value(ENQUEUED_TIME, bindMarker(ENQUEUED_TIME)) + .value(STATE, bindMarker(STATE)) + .value(SENDER, bindMarker(SENDER)) + .value(RECIPIENTS, bindMarker(RECIPIENTS)) + .value(ATTRIBUTES, bindMarker(ATTRIBUTES)) + .value(ERROR_MESSAGE, bindMarker(ERROR_MESSAGE)) + .value(REMOTE_ADDR, bindMarker(REMOTE_ADDR)) + .value(REMOTE_HOST, bindMarker(REMOTE_HOST)) + .value(LAST_UPDATED, bindMarker(LAST_UPDATED)) + .value(PER_RECIPIENT_SPECIFIC_HEADERS, bindMarker(PER_RECIPIENT_SPECIFIC_HEADERS))); + } + + CompletableFuture<Void> insert(EnqueuedMail enqueuedMail) { + Mail mail = enqueuedMail.getMail(); + + return executor.executeVoid(insert.bind() + .setString(QUEUE_NAME, enqueuedMail.getMailQueueName().asString()) + .setTimestamp(TIME_RANGE_START, Date.from(enqueuedMail.getTimeRangeStart())) + .setInt(BUCKET_ID, enqueuedMail.getBucketId().getValue()) + .setTimestamp(ENQUEUED_TIME, Date.from(enqueuedMail.getEnqueuedTime())) + .setString(MAIL_KEY, mail.getName()) + .setString(STATE, mail.getState()) + .setString(SENDER, Optional.ofNullable(mail.getSender()) + .map(MailAddress::asString) + .orElse(null)) + .setList(RECIPIENTS, asStringList(mail.getRecipients())) + .setString(ERROR_MESSAGE, mail.getErrorMessage()) + .setString(REMOTE_ADDR, mail.getRemoteAddr()) + .setString(REMOTE_HOST, mail.getRemoteHost()) + .setTimestamp(LAST_UPDATED, mail.getLastUpdated()) + .setMap(ATTRIBUTES, toRawAttributeMap(mail)) + .setMap(PER_RECIPIENT_SPECIFIC_HEADERS, toHeaderMap(cassandraTypesProvider, mail.getPerRecipientSpecificHeaders())) + ); + } + + CompletableFuture<Stream<EnqueuedMail>> selectEnqueuedMails( + MailQueueName queueName, Slice slice, BucketId bucketId) { + + return executor.execute( + selectFrom.bind() + .setString(QUEUE_NAME, queueName.asString()) + .setTimestamp(TIME_RANGE_START, Date.from(slice.getStartSliceInstant())) + .setInt(BUCKET_ID, bucketId.getValue())) + .thenApply(resultSet -> cassandraUtils.convertToStream(resultSet) + .map(EnqueuedMailsDaoUtil::toEnqueuedMail)); + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org