JAMES-2544 RabbitMQ browse Cassandra DAOs and tests

Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/3a53806a
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/3a53806a
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/3a53806a

Branch: refs/heads/master
Commit: 3a53806a0184960ed50bbbc500d6392870a214a9
Parents: 0523acf
Author: duc <dt...@linagora.com>
Authored: Wed Sep 12 20:15:49 2018 +0700
Committer: Benoit Tellier <btell...@linagora.com>
Committed: Wed Sep 26 09:22:06 2018 +0700

----------------------------------------------------------------------
 .../cassandra/utils/CassandraAsyncExecutor.java |   4 +
 .../mail/CassandraAttachmentMapper.java         |   3 +-
 .../cassandra/mail/CassandraMailboxMapper.java  |  14 +-
 .../mail/CassandraMessageIdMapper.java          |   9 +-
 .../cassandra/mail/CassandraMessageMapper.java  |  14 +-
 .../james/util/CompletableFutureUtil.java       |  30 +--
 .../apache/james/util/FluentFutureStream.java   | 127 ++++------
 .../james/util/CompletableFutureUtilTest.java   | 142 ++---------
 .../james/util/FluentFutureStreamTest.java      |  57 ++++-
 .../queue/api/ManageableMailQueueContract.java  |   4 +-
 .../james/queue/rabbitmq/RabbitMQMailQueue.java |  47 +++-
 .../rabbitmq/view/cassandra/BrowseStartDAO.java | 109 ++++++++
 .../cassandra/CassandraMailQueueBrowser.java    | 128 ++++++++++
 .../cassandra/CassandraMailQueueMailDelete.java |  87 +++++++
 .../cassandra/CassandraMailQueueMailStore.java  |  98 ++++++++
 .../view/cassandra/CassandraMailQueueView.java  |  91 +++++++
 .../view/cassandra/DeletedMailsDAO.java         |  83 +++++++
 .../view/cassandra/EnqueuedMailsDAO.java        | 144 +++++++++++
 .../view/cassandra/EnqueuedMailsDaoUtil.java    | 196 +++++++++++++++
 .../view/cassandra/model/BucketedSlices.java    |  29 +--
 .../view/cassandra/model/EnqueuedMail.java      |   8 +-
 .../rabbitmq/view/cassandra/model/MailKey.java  |  17 ++
 .../queue/rabbitmq/RabbitMQMailQueueTest.java   | 248 +++++++++++++++++--
 .../rabbitmq/RabbitMqMailQueueFactoryTest.java  |   9 +-
 .../view/cassandra/BrowseStartDAOTest.java      |  86 +++++++
 .../CassandraMailQueueViewTestFactory.java      |  48 ++++
 .../view/cassandra/DeletedMailsDAOTest.java     | 108 ++++++++
 .../view/cassandra/EnqueuedMailsDaoTest.java    | 126 ++++++++++
 .../cassandra/model/BucketedSlicesTest.java     |  58 ++++-
 .../view/cassandra/model/EnqueuedMailTest.java  |  33 +++
 .../view/cassandra/model/MailKeyTest.java       |  33 +++
 31 files changed, 1882 insertions(+), 308 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
----------------------------------------------------------------------
diff --git 
a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
 
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
index a137c66..7815643 100644
--- 
a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
+++ 
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
@@ -62,4 +62,8 @@ public class CassandraAsyncExecutor {
             .thenApply(Optional::ofNullable);
     }
 
+    public CompletableFuture<Boolean> executeReturnExists(Statement statement) 
{
+        return executeSingleRow(statement)
+            .thenApply(Optional::isPresent);
+    }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
index 7975b35..ecb43de 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
@@ -103,8 +103,7 @@ public class CassandraAttachmentMapper implements 
AttachmentMapper {
                 .map(id -> getAttachmentInternal(id)
                     .thenApply(finalValue -> logNotFound(id, finalValue)));
 
-        return FluentFutureStream
-            .ofOptionals(attachments)
+        return FluentFutureStream.of(attachments, 
FluentFutureStream::unboxOptional)
             .collect(Guavate.toImmutableList());
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
index 5dee291..69c092d 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
@@ -79,8 +79,10 @@ public class CassandraMailboxMapper implements MailboxMapper 
{
     @Override
     public void delete(Mailbox mailbox) {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
-        
FluentFutureStream.ofFutures(mailboxPathDAO.delete(mailbox.generateAssociatedPath()),
 mailboxPathV2DAO.delete(mailbox.generateAssociatedPath()))
-            .thenComposeOnAll(any -> mailboxDAO.delete(mailboxId))
+        FluentFutureStream.ofFutures(
+                mailboxPathDAO.delete(mailbox.generateAssociatedPath()),
+                mailboxPathV2DAO.delete(mailbox.generateAssociatedPath()))
+            .map(any -> mailboxDAO.delete(mailboxId), 
FluentFutureStream::unboxFuture)
             .join();
     }
 
@@ -166,10 +168,10 @@ public class CassandraMailboxMapper implements 
MailboxMapper {
 
     private List<Mailbox> toMailboxes(MailboxPath path, 
CompletableFuture<Stream<CassandraIdAndPath>> listUserMailboxes) {
         Pattern regex = 
Pattern.compile(constructEscapedRegexForMailboxNameMatching(path));
-        
+
         return FluentFutureStream.of(listUserMailboxes)
                 .filter(idAndPath -> 
regex.matcher(idAndPath.getMailboxPath().getName()).matches())
-                .thenFlatComposeOnOptional(this::retrieveMailbox)
+                .map(this::retrieveMailbox, 
FluentFutureStream::unboxFutureOptional)
                 .join()
                 .collect(Guavate.toImmutableList());
     }
@@ -227,7 +229,7 @@ public class CassandraMailboxMapper implements 
MailboxMapper {
     @Override
     public List<Mailbox> list() {
         return mailboxDAO.retrieveAllMailboxes()
-            .thenComposeOnAll(this::toMailboxWithAclFuture)
+            .map(this::toMailboxWithAclFuture, FluentFutureStream::unboxFuture)
             .join()
             .collect(Guavate.toImmutableList());
     }
@@ -283,7 +285,7 @@ public class CassandraMailboxMapper implements 
MailboxMapper {
     public List<Mailbox> findNonPersonalMailboxes(String userName, Right 
right) {
         return 
FluentFutureStream.of(userMailboxRightsDAO.listRightsForUser(userName)
             .thenApply(map -> toAuthorizedMailboxIds(map, right)))
-            .thenFlatComposeOnOptional(this::retrieveMailbox)
+            .map(this::retrieveMailbox, 
FluentFutureStream::unboxFutureOptional)
             .join()
             .collect(Guavate.toImmutableList());
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index c59a60b..2d9a154 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -98,9 +98,10 @@ public class CassandraMessageIdMapper implements 
MessageIdMapper {
     }
 
     private Stream<SimpleMailboxMessage> findAsStream(Collection<MessageId> 
messageIds, FetchType fetchType) {
-        return FluentFutureStream.ofNestedStreams(
-            messageIds.stream()
-                .map(messageId -> imapUidDAO.retrieve((CassandraMessageId) 
messageId, Optional.empty())))
+        return FluentFutureStream.of(
+                messageIds.stream()
+                    .map(messageId -> imapUidDAO.retrieve((CassandraMessageId) 
messageId, Optional.empty())),
+                FluentFutureStream::unboxStream)
             .collect(Guavate.toImmutableList())
             .thenCompose(composedMessageIds -> 
messageDAO.retrieveMessages(composedMessageIds, fetchType, Limit.unlimited()))
             .thenApply(stream -> stream
@@ -113,7 +114,7 @@ public class CassandraMessageIdMapper implements 
MessageIdMapper {
     }
 
     private CompletableFuture<Stream<SimpleMailboxMessage>> 
filterMessagesWithExistingMailbox(Stream<SimpleMailboxMessage> stream) {
-        return 
FluentFutureStream.ofOptionals(stream.map(this::keepMessageIfMailboxExists))
+        return 
FluentFutureStream.of(stream.map(this::keepMessageIfMailboxExists), 
FluentFutureStream::unboxOptional)
             .completableFuture();
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 57b1f81..300e7ef 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -214,10 +214,13 @@ public class CassandraMessageMapper implements 
MessageMapper {
     }
 
     private CompletableFuture<Stream<SimpleMailboxMessage>> 
expungeUidChunk(CassandraId mailboxId, Collection<MessageUid> uidChunk) {
-        return FluentFutureStream.ofOptionals(
-                uidChunk.stream().map(uid -> retrieveComposedId(mailboxId, 
uid)))
+        return FluentFutureStream.of(
+                uidChunk.stream().map(uid -> retrieveComposedId(mailboxId, 
uid)),
+                FluentFutureStream::unboxOptional)
             .performOnAll(this::deleteUsingMailboxId)
-            .thenFlatCompose(idWithMetadata -> 
messageDAO.retrieveMessages(ImmutableList.of(idWithMetadata), 
FetchType.Metadata, Limit.unlimited()))
+            .map(idWithMetadata -> FluentFutureStream.of(
+                messageDAO.retrieveMessages(ImmutableList.of(idWithMetadata), 
FetchType.Metadata, Limit.unlimited())),
+                FluentFutureStream::unboxFluentFuture)
             .filter(CassandraMessageDAO.MessageResult::isFound)
             .map(CassandraMessageDAO.MessageResult::message)
             .map(pair -> pair.getKey().toMailboxMessage(ImmutableList.of()))
@@ -299,8 +302,9 @@ public class CassandraMessageMapper implements 
MessageMapper {
     }
 
     private FlagsUpdateStageResult retryUpdatesStage(CassandraId mailboxId, 
FlagsUpdateCalculator flagsUpdateCalculator, List<MessageUid> failed) {
-        Stream<ComposedMessageIdWithMetaData> idsFailed = 
FluentFutureStream.ofOptionals(
-            failed.stream().map(uid -> messageIdDAO.retrieve(mailboxId, uid)))
+        Stream<ComposedMessageIdWithMetaData> idsFailed = 
FluentFutureStream.of(
+                failed.stream().map(uid -> messageIdDAO.retrieve(mailboxId, 
uid)),
+                FluentFutureStream::unboxOptional)
             .join();
 
         return runUpdateStage(mailboxId, idsFailed, flagsUpdateCalculator);

http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/container/util/src/main/java/org/apache/james/util/CompletableFutureUtil.java
----------------------------------------------------------------------
diff --git 
a/server/container/util/src/main/java/org/apache/james/util/CompletableFutureUtil.java
 
b/server/container/util/src/main/java/org/apache/james/util/CompletableFutureUtil.java
index d2e83af..433de52 100644
--- 
a/server/container/util/src/main/java/org/apache/james/util/CompletableFutureUtil.java
+++ 
b/server/container/util/src/main/java/org/apache/james/util/CompletableFutureUtil.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.util;
 
+import java.util.Comparator;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.BiFunction;
@@ -35,11 +36,6 @@ public class CompletableFutureUtil {
                 .orElse(CompletableFuture.completedFuture(Optional.empty())));
     }
 
-    @SafeVarargs
-    public static <T> CompletableFuture<Stream<T>> 
allOfArray(CompletableFuture<T>... futures) {
-        return allOf(Stream.of(futures));
-    }
-
     public static <T, U, V> CompletableFuture<V> combine(CompletableFuture<T> 
t, CompletableFuture<U> u, BiFunction<T,U,V> combiner) {
         return t.thenCompose(valueT ->
             u.thenApply(valueU -> combiner.apply(valueT, valueU)));
@@ -81,20 +77,6 @@ public class CompletableFutureUtil {
                     CompletableFuture.completedFuture(Stream.concat(stream1, 
stream2))));
     }
 
-    public static <T> CompletableFuture<Stream<T>> 
performOnAll(CompletableFuture<Stream<T>> futurStream, Function<T, 
CompletableFuture<Void>> action) {
-        return thenComposeOnAll(futurStream, value ->
-            keepValue(() ->
-                action.apply(value),
-                value));
-    }
-
-    public static <T, U> CompletableFuture<Stream<U>> 
thenComposeOnAll(CompletableFuture<Stream<T>> futurStream, Function<T, 
CompletableFuture<U>> action) {
-        return futurStream
-            .thenCompose(stream ->
-                CompletableFutureUtil.allOf(
-                    stream.map(action)));
-    }
-
     public static <T, U> CompletableFuture<Stream<U>> 
map(CompletableFuture<Stream<T>> futurStream, Function<T, U> action) {
         return futurStream
             .thenApply(stream ->
@@ -109,10 +91,6 @@ public class CompletableFutureUtil {
         return futureStream.thenApply(stream -> 
stream.reduce(binaryOperator).orElse(emptyAccumulator));
     }
 
-    public static <T> CompletableFuture<T> 
keepValue(Supplier<CompletableFuture<Void>> supplier, T value) {
-        return supplier.get().thenApply(any -> value);
-    }
-
     public static <T> Function<Boolean, CompletableFuture<Boolean>> 
composeIfTrue(Supplier<CompletableFuture<T>> composeOperation) {
         return b -> {
             if (b) {
@@ -121,4 +99,10 @@ public class CompletableFutureUtil {
             return CompletableFuture.completedFuture(b);
         };
     }
+
+    public static <T> CompletableFuture<Stream<T>> 
sorted(CompletableFuture<Stream<T>> futureStream, Comparator<T> comparator) {
+        return futureStream
+            .thenApply(stream ->
+                stream.sorted(comparator));
+    }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/container/util/src/main/java/org/apache/james/util/FluentFutureStream.java
----------------------------------------------------------------------
diff --git 
a/server/container/util/src/main/java/org/apache/james/util/FluentFutureStream.java
 
b/server/container/util/src/main/java/org/apache/james/util/FluentFutureStream.java
index 9dcae7a..0b0b86e 100644
--- 
a/server/container/util/src/main/java/org/apache/james/util/FluentFutureStream.java
+++ 
b/server/container/util/src/main/java/org/apache/james/util/FluentFutureStream.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.util;
 
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.BinaryOperator;
@@ -29,6 +31,31 @@ import java.util.stream.Stream;
 
 public class FluentFutureStream<T> {
 
+    public static <T> FluentFutureStream<T> 
unboxStream(FluentFutureStream<Stream<T>> streams) {
+        return FluentFutureStream.of(
+            streams.completableFuture()
+                .thenApply(StreamUtils::flatten));
+    }
+
+    public static <T> FluentFutureStream<T> 
unboxOptional(FluentFutureStream<Optional<T>> optionals) {
+        return unboxStream(optionals.map(OptionalUtils::toStream));
+    }
+
+    public static <T> FluentFutureStream<T> 
unboxFuture(FluentFutureStream<CompletableFuture<T>> futures) {
+        return FluentFutureStream.of(futures.completableFuture()
+            .thenCompose(CompletableFutureUtil::allOf));
+    }
+
+    public static <T> FluentFutureStream<T> 
unboxFluentFuture(FluentFutureStream<FluentFutureStream<T>> futures) {
+        return unboxStream(
+            unboxFuture(
+                futures.map(FluentFutureStream::completableFuture)));
+    }
+
+    public static <T> FluentFutureStream<T> 
unboxFutureOptional(FluentFutureStream<CompletableFuture<Optional<T>>> futures) 
{
+        return unboxOptional(unboxFuture(futures));
+    }
+
     private final CompletableFuture<Stream<T>> completableFuture;
 
     /**
@@ -38,6 +65,11 @@ public class FluentFutureStream<T> {
         return new FluentFutureStream<>(completableFuture);
     }
 
+    public static <T, U> FluentFutureStream<U> of(Stream<CompletableFuture<T>> 
completableFuture,
+                                               Function<FluentFutureStream<T>, 
FluentFutureStream<U>> unboxer) {
+        return unboxer.apply(of(completableFuture));
+    }
+
     /**
      * Constructs a FluentFutureStream from a Stream of Future
      */
@@ -45,32 +77,9 @@ public class FluentFutureStream<T> {
         return new 
FluentFutureStream<>(CompletableFutureUtil.allOf(completableFutureStream));
     }
 
-    /**
-     * Constructs a FluentFutureStream from a Stream of Future of Stream.
-     *
-     * Underlying streams are flatMapped.
-     */
-    public static <T> FluentFutureStream<T> 
ofNestedStreams(Stream<CompletableFuture<Stream<T>>> completableFuture) {
-        return of(completableFuture)
-            .flatMap(Function.identity());
-    }
-
-    /**
-     * Constructs a FluentFutureStream from a Stream of Future of Optionals.
-     *
-     * Underlying optionals are unboxed.
-     */
-    public static <T> FluentFutureStream<T> 
ofOptionals(Stream<CompletableFuture<Optional<T>>> completableFuture) {
-        return of(completableFuture)
-            .flatMapOptional(Function.identity());
-    }
-
-    /**
-     * Constructs a FluentFutureStream from the supplied futures.
-     */
     @SafeVarargs
     public static <T> FluentFutureStream<T> ofFutures(CompletableFuture<T>... 
completableFutures) {
-        return new 
FluentFutureStream<>(CompletableFutureUtil.allOfArray(completableFutures));
+        return of(Arrays.stream(completableFutures));
     }
 
     private FluentFutureStream(CompletableFuture<Stream<T>> completableFuture) 
{
@@ -81,8 +90,8 @@ public class FluentFutureStream<T> {
      * For all values of the underlying stream, an action will be performed.
      */
     public FluentFutureStream<T> performOnAll(Function<T, 
CompletableFuture<Void>> action) {
-        return FluentFutureStream.of(
-            CompletableFutureUtil.performOnAll(completableFuture(), action));
+        return map(t -> action.apply(t).thenApply(any -> t),
+            FluentFutureStream::unboxFuture);
     }
 
     /**
@@ -93,56 +102,8 @@ public class FluentFutureStream<T> {
             CompletableFutureUtil.map(completableFuture(), function));
     }
 
-    /**
-     * Apply a transformation to all value of the underlying stream.
-     *
-     * As the supplied transformation produces streams, the results will be 
flatMapped.
-     */
-    public <U> FluentFutureStream<U> flatMap(Function<T, Stream<U>> function) {
-        return FluentFutureStream.of(completableFuture().thenApply(stream ->
-            stream.flatMap(function)));
-    }
-
-    /**
-     * Apply a transformation to all value of the underlying stream.
-     *
-     * As the supplied transformation produces optionals, the results will be 
unboxed.
-     */
-    public <U> FluentFutureStream<U> flatMapOptional(Function<T, Optional<U>> 
function) {
-        return map(function)
-            .flatMap(OptionalUtils::toStream);
-    }
-
-    /**
-     * Apply a transformation to all value of the underlying stream.
-     *
-     * As the supplied transformation produces futures, we need to compose the 
returned values.
-     */
-    public <U> FluentFutureStream<U> thenComposeOnAll(Function<T, 
CompletableFuture<U>> function) {
-        return FluentFutureStream.of(
-            CompletableFutureUtil.thenComposeOnAll(completableFuture(), 
function));
-    }
-
-    /**
-     * Apply a transformation to all value of the underlying stream.
-     *
-     * As the supplied transformation produces futures of stream, we need to 
compose then flatMap the returned values.
-     */
-    public <U> FluentFutureStream<U> thenFlatCompose(Function<T, 
CompletableFuture<Stream<U>>> function) {
-        return FluentFutureStream.of(
-            CompletableFutureUtil.thenComposeOnAll(completableFuture(), 
function))
-            .flatMap(Function.identity());
-    }
-
-    /**
-     * Apply a transformation to all value of the underlying stream.
-     *
-     * As the supplied transformation produces futures of optionals, we need 
to compose then unbox the returned values.
-     */
-    public <U> FluentFutureStream<U> thenFlatComposeOnOptional(Function<T, 
CompletableFuture<Optional<U>>> function) {
-        return FluentFutureStream.of(
-            CompletableFutureUtil.thenComposeOnAll(completableFuture(), 
function))
-            .flatMapOptional(Function.identity());
+    public <U, V> FluentFutureStream<V> map(Function<T, U> function, 
Function<FluentFutureStream<U>, FluentFutureStream<V>> unboxer) {
+        return unboxer.apply(map(function));
     }
 
     /**
@@ -153,6 +114,12 @@ public class FluentFutureStream<T> {
             .thenApply(stream -> stream.filter(predicate)));
     }
 
+    public FluentFutureStream<T> thenFilter(Function<T, 
CompletableFuture<Boolean>> futurePredicate) {
+        return map(t -> futurePredicate.apply(t)
+            .thenApply(isKept -> Optional.of(t).filter(any -> isKept)),
+            FluentFutureStream::unboxFutureOptional);
+    }
+
     /**
      * Reduces the underlying stream. Reduced value is supplied as a Future of 
optional, as no empty value is supplied.
      */
@@ -168,6 +135,14 @@ public class FluentFutureStream<T> {
     }
 
     /**
+     * sort all elements of the stream by the provided {@code Comparator}.
+     */
+    public FluentFutureStream<T> sorted(Comparator<T> comparator) {
+        return FluentFutureStream.of(
+            CompletableFutureUtil.sorted(completableFuture(), comparator));
+    }
+
+    /**
      * Returns a future of the underlying stream.
      */
     public CompletableFuture<Stream<T>> completableFuture() {

http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/container/util/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java
----------------------------------------------------------------------
diff --git 
a/server/container/util/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java
 
b/server/container/util/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java
index a42fb39..acbe10f 100644
--- 
a/server/container/util/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java
+++ 
b/server/container/util/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java
@@ -27,7 +27,6 @@ import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Supplier;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
@@ -171,59 +170,6 @@ public class CompletableFutureUtilTest {
     }
 
     @Test
-    public void allOfArrayShouldPreserveOrder() {
-        long value1 = 18L;
-        long value2 = 19L;
-        long value3 = 20L;
-        long value4 = 21L;
-        long value5 = 22L;
-        long value6 = 23L;
-        long value7 = 24L;
-        long value8 = 25L;
-        long value9 = 26L;
-        long value10 = 27L;
-        assertThat(
-            CompletableFutureUtil.allOfArray(
-                    CompletableFuture.completedFuture(value1),
-                    CompletableFuture.completedFuture(value2),
-                    CompletableFuture.completedFuture(value3),
-                    CompletableFuture.completedFuture(value4),
-                    CompletableFuture.completedFuture(value5),
-                    CompletableFuture.completedFuture(value6),
-                    CompletableFuture.completedFuture(value7),
-                    CompletableFuture.completedFuture(value8),
-                    CompletableFuture.completedFuture(value9),
-                    CompletableFuture.completedFuture(value10))
-                .join()
-                .collect(Guavate.toImmutableList()))
-            .containsExactly(value1, value2, value3, value4, value5, value6, 
value7, value8, value9, value10);
-    }
-
-    @Test
-    public void allOfArrayShouldUnboxNoArgs() {
-        assertThat(
-            CompletableFutureUtil.allOfArray()
-                .join()
-                .collect(Guavate.toImmutableList()))
-            .isEmpty();
-    }
-
-    @Test
-    public void allOfArrayShouldUnboxArray() {
-        long value1 = 18L;
-        long value2 = 19L;
-        long value3 = 20L;
-        assertThat(
-            CompletableFutureUtil.allOfArray(
-                    CompletableFuture.completedFuture(value1),
-                    CompletableFuture.completedFuture(value2),
-                    CompletableFuture.completedFuture(value3))
-                .join()
-                .collect(Guavate.toImmutableList()))
-            .containsOnly(value1, value2, value3);
-    }
-
-    @Test
     public void allOfShouldWorkOnVeryLargeStream() {
         CompletableFutureUtil.allOf(
             IntStream.range(0, 100000)
@@ -257,58 +203,6 @@ public class CompletableFutureUtilTest {
     }
 
     @Test
-    public void 
thenComposeOnAllShouldMapOnStreamInsideACompletableFuturOfStreamAndTransformTheResultingStreamOfCompletableFutureIntoACompletableOfStreamAndFlatIt()
 {
-        CompletableFuture<Stream<Integer>> futurOfInteger = 
CompletableFuture.completedFuture(Stream.of(1, 2, 3));
-
-        assertThat(
-            CompletableFutureUtil.thenComposeOnAll(futurOfInteger, integer ->
-                CompletableFuture.completedFuture(integer * 2))
-                .join()
-                .collect(Guavate.toImmutableList()))
-            .containsExactly(2, 4, 6);
-    }
-
-    @Test
-    public void thenComposeOnAllOnEmptyStreamShouldReturnAnEmptyStream() {
-        CompletableFuture<Stream<Integer>> futurOfInteger = 
CompletableFuture.completedFuture(Stream.of());
-
-        assertThat(
-            CompletableFutureUtil.thenComposeOnAll(futurOfInteger, integer ->
-                CompletableFuture.completedFuture(integer * 2))
-                .join()
-                .collect(Guavate.toImmutableList()))
-            .isEmpty();
-    }
-
-    @Test
-    public void keepValueShouldCompleteWhenTheGivenCompletableFutureEnd() {
-        final AtomicInteger numOfFutureExecution = new AtomicInteger(0);
-
-        Supplier<CompletableFuture<Void>> future = () ->
-            CompletableFuture.runAsync(numOfFutureExecution::incrementAndGet);
-
-        assertThat(
-            CompletableFutureUtil.keepValue(future, 42)
-                .join())
-            .isEqualTo(42);
-
-        assertThat(
-            numOfFutureExecution.get())
-            .isEqualTo(1);
-    }
-
-    @Test
-    public void keepValueShouldReturnNullWithNullValue() {
-        Supplier<CompletableFuture<Void>> future = () ->
-            CompletableFuture.completedFuture(null);
-
-        assertThat(
-            CompletableFutureUtil.keepValue(future, null)
-                .join())
-            .isNull();
-    }
-
-    @Test
     public void composeIfTrueShouldReturnTrueWhenTrue() {
         assertThat(
             CompletableFutureUtil.composeIfTrue(() -> 
CompletableFuture.completedFuture(null))
@@ -357,7 +251,7 @@ public class CompletableFutureUtilTest {
         assertThat(
             CompletableFutureUtil.reduce(
                 (i, j) -> i + j,
-                CompletableFutureUtil.<Long>allOfArray())
+                CompletableFuture.completedFuture(Stream.<Long>of()))
                 .join())
             .isEmpty();
     }
@@ -367,11 +261,8 @@ public class CompletableFutureUtilTest {
         assertThat(
             CompletableFutureUtil.reduce(
                 (i, j) -> i + j,
-                CompletableFutureUtil.allOfArray(
-                    CompletableFuture.completedFuture(1L),
-                    CompletableFuture.completedFuture(2L),
-                    CompletableFuture.completedFuture(3L)
-                ))
+                CompletableFuture.completedFuture(Stream.of(
+                    1L, 2L, 3L)))
                 .join())
             .contains(6L);
     }
@@ -382,7 +273,7 @@ public class CompletableFutureUtilTest {
         assertThat(
             CompletableFutureUtil.reduce(
                 (i, j) -> i + j,
-                CompletableFutureUtil.<Long>allOfArray(),
+                CompletableFuture.completedFuture(Stream.of()),
                 identityAccumulator)
                 .join())
             .isEqualTo(identityAccumulator);
@@ -393,11 +284,7 @@ public class CompletableFutureUtilTest {
         assertThat(
             CompletableFutureUtil.reduce(
                 (i, j) -> i + j,
-                CompletableFutureUtil.allOfArray(
-                    CompletableFuture.completedFuture(1L),
-                    CompletableFuture.completedFuture(2L),
-                    CompletableFuture.completedFuture(3L)
-                ),
+                CompletableFuture.completedFuture(Stream.of(1L, 2L,3L)),
                 0L)
                 .join())
             .isEqualTo(6L);
@@ -420,4 +307,23 @@ public class CompletableFutureUtilTest {
                 .join())
             .isEmpty();
     }
+
+    @Test
+    public void sortShouldReturnEmptyWhenEmptyStream() {
+        FluentFutureStream<Long> futureStream = FluentFutureStream.ofFutures();
+        assertThat(futureStream.sorted(Long::compareTo).join())
+            .isEmpty();
+    }
+
+    @Test
+    public void sortShouldReturnTheSortedStream() {
+        FluentFutureStream<Long> futureStream = FluentFutureStream.ofFutures(
+            CompletableFuture.completedFuture(4L),
+            CompletableFuture.completedFuture(3L),
+            CompletableFuture.completedFuture(2L),
+            CompletableFuture.completedFuture(1L));
+
+        assertThat(futureStream.sorted(Long::compareTo).join())
+            .containsExactly(1L, 2L, 3L, 4L);
+    }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/container/util/src/test/java/org/apache/james/util/FluentFutureStreamTest.java
----------------------------------------------------------------------
diff --git 
a/server/container/util/src/test/java/org/apache/james/util/FluentFutureStreamTest.java
 
b/server/container/util/src/test/java/org/apache/james/util/FluentFutureStreamTest.java
index 0877414..56017d6 100644
--- 
a/server/container/util/src/test/java/org/apache/james/util/FluentFutureStreamTest.java
+++ 
b/server/container/util/src/test/java/org/apache/james/util/FluentFutureStreamTest.java
@@ -71,11 +71,12 @@ public class FluentFutureStreamTest {
     @Test
     public void 
ofNestedStreamsShouldConstructAFluentFutureStreamWhenProvidedAStreamOfFutureOfStream()
 {
         assertThat(
-            FluentFutureStream.ofNestedStreams(
+            FluentFutureStream.<Stream<Integer>, Integer>of(
                 Stream.of(
                     CompletableFuture.completedFuture(Stream.of(1, 2)),
                     CompletableFuture.completedFuture(Stream.of()),
-                    CompletableFuture.completedFuture(Stream.of(3))))
+                    CompletableFuture.completedFuture(Stream.of(3))),
+                    FluentFutureStream::unboxStream)
                 .join()
                 .collect(Guavate.toImmutableList()))
             .containsExactly(1, 2, 3);
@@ -85,12 +86,13 @@ public class FluentFutureStreamTest {
     @Test
     public void 
ofOptionalsShouldConstructAFluentFutureStreamWhenProvidedAStreamOfFutureOfOptionals()
 {
         assertThat(
-            FluentFutureStream.ofOptionals(
+            FluentFutureStream.<Optional<Integer>, Integer>of(
                 Stream.of(
                     CompletableFuture.completedFuture(Optional.of(1)),
                     CompletableFuture.completedFuture(Optional.of(2)),
                     CompletableFuture.completedFuture(Optional.empty()),
-                    CompletableFuture.completedFuture(Optional.of(3))))
+                    CompletableFuture.completedFuture(Optional.of(3))),
+                    FluentFutureStream::unboxOptional)
                 .join()
                 .collect(Guavate.toImmutableList()))
             .containsExactly(1, 2, 3);
@@ -126,7 +128,7 @@ public class FluentFutureStreamTest {
             FluentFutureStream.of(
                 CompletableFuture.completedFuture(
                     Stream.of(1, 2, 3)))
-                .flatMap(i -> Stream.of(i, i + 1))
+                .map(i -> Stream.of(i, i + 1), FluentFutureStream::unboxStream)
                 .join()
                 .collect(Guavate.toImmutableList()))
             .containsExactly(1, 2, 2, 3, 3, 4);
@@ -138,8 +140,9 @@ public class FluentFutureStreamTest {
             FluentFutureStream.of(
                 CompletableFuture.completedFuture(
                     Stream.of(1, 2, 3)))
-                .flatMapOptional(i -> Optional.of(i + 1)
-                    .filter(j -> j % 2 == 0))
+                .map(i -> Optional.of(i + 1)
+                    .filter(j -> j % 2 == 0),
+                    FluentFutureStream::unboxOptional)
                 .join()
                 .collect(Guavate.toImmutableList()))
             .containsExactly(2, 4);
@@ -180,12 +183,24 @@ public class FluentFutureStreamTest {
     }
 
     @Test
+    public void thenFilterShouldBeAppliedOnTheUnderlyingStream() {
+        assertThat(
+            FluentFutureStream.of(
+                CompletableFuture.completedFuture(
+                    Stream.of(1, 2, 3)))
+                .thenFilter(i -> CompletableFuture.completedFuture(i % 2 == 1))
+                .join()
+                .collect(Guavate.toImmutableList()))
+            .containsExactly(1, 3);
+    }
+
+    @Test
     public void 
thenComposeOnAllShouldTransformUnderlyingValuesAndComposeFutures() {
         assertThat(
             FluentFutureStream.of(
                 CompletableFuture.completedFuture(
                     Stream.of(1, 2, 3)))
-                .thenComposeOnAll(i -> CompletableFuture.completedFuture(i + 
1))
+                .map(i -> CompletableFuture.completedFuture(i + 1), 
FluentFutureStream::unboxFuture)
                 .join()
                 .collect(Guavate.toImmutableList()))
             .containsExactly(2, 3, 4);
@@ -197,7 +212,7 @@ public class FluentFutureStreamTest {
             FluentFutureStream.of(
                 CompletableFuture.completedFuture(
                     Stream.of(1, 2, 3)))
-                .thenFlatCompose(i -> 
CompletableFuture.completedFuture(Stream.of(i, i + 1)))
+                .map(i -> 
FluentFutureStream.of(CompletableFuture.completedFuture(Stream.of(i, i + 1))), 
FluentFutureStream::unboxFluentFuture)
                 .join()
                 .collect(Guavate.toImmutableList()))
             .containsExactly(1, 2, 2, 3, 3, 4);
@@ -209,8 +224,9 @@ public class FluentFutureStreamTest {
             FluentFutureStream.of(
                 CompletableFuture.completedFuture(
                     Stream.of(1, 2, 3)))
-                .thenFlatComposeOnOptional(i -> 
CompletableFuture.completedFuture(Optional.of(i + 1)
-                    .filter(j -> j % 2 == 0)))
+                .map(i -> CompletableFuture.completedFuture(
+                    Optional.of(i + 1).filter(j -> j % 2 == 0)),
+                    FluentFutureStream::unboxFutureOptional)
                 .join()
                 .collect(Guavate.toImmutableList()))
             .containsExactly(2, 4);
@@ -255,4 +271,23 @@ public class FluentFutureStreamTest {
             .isEmpty();
     }
 
+    @Test
+    public void sortedShouldReturnInOrderElements() {
+        assertThat(
+            FluentFutureStream.of(
+                CompletableFuture.completedFuture(Stream.of(4L, 3L, 2L, 1L)))
+                .sorted(Long::compareTo)
+                .join())
+            .containsExactly(1L, 2L, 3L, 4L);
+    }
+
+    @Test
+    public void sortedShouldReturnEmptyWhenEmpty() {
+        CompletableFuture<Stream<Long>> completableFutureStream = 
CompletableFuture.completedFuture(Stream.of());
+        assertThat(
+            FluentFutureStream.of(completableFutureStream)
+                .sorted(Long::compareTo)
+                .join())
+            .isEmpty();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueContract.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueContract.java
 
b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueContract.java
index 2877efe..9bc503c 100644
--- 
a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueContract.java
+++ 
b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueContract.java
@@ -57,8 +57,8 @@ public interface ManageableMailQueueContract extends 
MailQueueContract {
 
     @Test
     default void getSizeShouldReturnMessageCountWhenSeveralMails() throws 
Exception {
-        getManageableMailQueue().enQueue(defaultMail().build());
-        getManageableMailQueue().enQueue(defaultMail().build());
+        getManageableMailQueue().enQueue(defaultMail().name("1").build());
+        getManageableMailQueue().enQueue(defaultMail().name("2").build());
 
         long size = getManageableMailQueue().getSize();
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
index e4b8881..818a83b 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
@@ -25,11 +25,13 @@ import java.util.function.Function;
 import javax.inject.Inject;
 import javax.mail.internet.MimeMessage;
 
+import org.apache.commons.lang3.NotImplementedException;
 import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.Store;
 import org.apache.james.blob.mail.MimeMessagePartsId;
 import org.apache.james.metrics.api.MetricFactory;
-import org.apache.james.queue.api.MailQueue;
+import org.apache.james.queue.api.ManageableMailQueue;
+import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
 import org.apache.mailet.Mail;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,7 +39,7 @@ import org.slf4j.LoggerFactory;
 import com.github.fge.lambdas.Throwing;
 import com.google.common.annotations.VisibleForTesting;
 
-public class RabbitMQMailQueue implements MailQueue {
+public class RabbitMQMailQueue implements ManageableMailQueue {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(RabbitMQMailQueue.class);
 
@@ -47,21 +49,26 @@ public class RabbitMQMailQueue implements MailQueue {
         private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore;
         private final MailReferenceSerializer mailReferenceSerializer;
         private final Function<MailReferenceDTO, Mail> mailLoader;
+        private final MailQueueView mailQueueView;
 
         @Inject
         @VisibleForTesting Factory(MetricFactory metricFactory, RabbitClient 
rabbitClient,
-                                   Store<MimeMessage, MimeMessagePartsId> 
mimeMessageStore, BlobId.Factory blobIdFactory) {
+                                   Store<MimeMessage, MimeMessagePartsId> 
mimeMessageStore,
+                                   BlobId.Factory blobIdFactory,
+                                   MailQueueView mailQueueView) {
             this.metricFactory = metricFactory;
             this.rabbitClient = rabbitClient;
             this.mimeMessageStore = mimeMessageStore;
             this.mailReferenceSerializer = new MailReferenceSerializer();
             this.mailLoader = Throwing.function(new 
MailLoader(mimeMessageStore, blobIdFactory)::load).sneakyThrow();
+            this.mailQueueView = mailQueueView;
         }
 
         RabbitMQMailQueue create(MailQueueName mailQueueName) {
             return new RabbitMQMailQueue(metricFactory, mailQueueName,
                 new Enqueuer(mailQueueName, rabbitClient, mimeMessageStore, 
mailReferenceSerializer, metricFactory),
-                new Dequeuer(mailQueueName, rabbitClient, mailLoader, 
mailReferenceSerializer, metricFactory));
+                new Dequeuer(mailQueueName, rabbitClient, mailLoader, 
mailReferenceSerializer, metricFactory),
+                mailQueueView);
         }
     }
 
@@ -69,15 +76,17 @@ public class RabbitMQMailQueue implements MailQueue {
     private final MetricFactory metricFactory;
     private final Enqueuer enqueuer;
     private final Dequeuer dequeuer;
+    private final MailQueueView mailQueueView;
 
     RabbitMQMailQueue(MetricFactory metricFactory, MailQueueName name,
-                      Enqueuer enqueuer, Dequeuer dequeuer) {
+                      Enqueuer enqueuer, Dequeuer dequeuer, MailQueueView 
mailQueueView) {
 
         this.name = name;
         this.enqueuer = enqueuer;
         this.dequeuer = dequeuer;
 
         this.metricFactory = metricFactory;
+        this.mailQueueView = mailQueueView;
     }
 
     @Override
@@ -104,4 +113,30 @@ public class RabbitMQMailQueue implements MailQueue {
         return 
metricFactory.runPublishingTimerMetric(DEQUEUED_TIMER_METRIC_NAME_PREFIX + 
name.asString(),
             Throwing.supplier(dequeuer::deQueue).sneakyThrow());
     }
-}
\ No newline at end of file
+
+    @Override
+    public long getSize() {
+        return mailQueueView.getSize();
+    }
+
+    @Override
+    public long flush() {
+        LOGGER.warn("Delays are not supported by RabbitMQ. Flush is a NOOP.");
+        return 0;
+    }
+
+    @Override
+    public long clear() {
+        throw new NotImplementedException("Not yet implemented");
+    }
+
+    @Override
+    public long remove(Type type, String value) {
+        throw new NotImplementedException("Not yet implemented");
+    }
+
+    @Override
+    public MailQueueIterator browse() {
+        return mailQueueView.browse();
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAO.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAO.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAO.java
new file mode 100644
index 0000000..098c033
--- /dev/null
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAO.java
@@ -0,0 +1,109 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.queue.rabbitmq.view.cassandra;
+
+import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.set;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.update;
+import static 
org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.BrowseStartTable.BROWSE_START;
+import static 
org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.BrowseStartTable.QUEUE_NAME;
+import static 
org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.BrowseStartTable.TABLE_NAME;
+
+import java.time.Instant;
+import java.util.Date;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import javax.inject.Inject;
+
+import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.queue.rabbitmq.MailQueueName;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.google.common.annotations.VisibleForTesting;
+
+class BrowseStartDAO {
+
+    private final CassandraAsyncExecutor executor;
+    private final PreparedStatement selectOne;
+    private final PreparedStatement insertOne;
+    private final PreparedStatement updateOne;
+
+    @Inject
+    BrowseStartDAO(Session session) {
+        this.executor = new CassandraAsyncExecutor(session);
+
+        this.selectOne = prepareSelectOne(session);
+        this.updateOne = prepareUpdate(session);
+        this.insertOne = prepareInsertOne(session);
+    }
+
+    private PreparedStatement prepareSelectOne(Session session) {
+        return session.prepare(select()
+                .from(TABLE_NAME)
+                .where(eq(QUEUE_NAME, bindMarker(QUEUE_NAME))));
+    }
+
+    private PreparedStatement prepareUpdate(Session session) {
+        return session.prepare(update(TABLE_NAME)
+            .with(set(BROWSE_START, bindMarker(BROWSE_START)))
+            .where(eq(QUEUE_NAME, bindMarker(QUEUE_NAME))));
+    }
+
+    private PreparedStatement prepareInsertOne(Session session) {
+        return session.prepare(insertInto(TABLE_NAME)
+            .ifNotExists()
+            .value(BROWSE_START, bindMarker(BROWSE_START))
+            .value(QUEUE_NAME, bindMarker(QUEUE_NAME)));
+    }
+
+    CompletableFuture<Optional<Instant>> findBrowseStart(MailQueueName 
queueName) {
+        return selectOne(queueName)
+            .thenApply(optional -> optional.map(this::getBrowseStart));
+    }
+
+    CompletableFuture<Void> updateBrowseStart(MailQueueName mailQueueName, 
Instant sliceStart) {
+        return executor.executeVoid(updateOne.bind()
+            .setTimestamp(BROWSE_START, Date.from(sliceStart))
+            .setString(QUEUE_NAME, mailQueueName.asString()));
+    }
+
+    CompletableFuture<Void> insertInitialBrowseStart(MailQueueName 
mailQueueName, Instant sliceStart) {
+        return executor.executeVoid(insertOne.bind()
+            .setTimestamp(BROWSE_START, Date.from(sliceStart))
+            .setString(QUEUE_NAME, mailQueueName.asString()));
+    }
+
+    @VisibleForTesting
+    CompletableFuture<Optional<Row>> selectOne(MailQueueName queueName) {
+        return executor.executeSingleRow(
+            selectOne.bind()
+                .setString(QUEUE_NAME, queueName.asString()));
+    }
+
+    private Instant getBrowseStart(Row row) {
+        return row.getTimestamp(BROWSE_START).toInstant();
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
new file mode 100644
index 0000000..21dedb8
--- /dev/null
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
@@ -0,0 +1,128 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.queue.rabbitmq.view.cassandra;
+
+import static 
org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.BucketId;
+import static 
org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.Slice;
+import static 
org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.Slice.allSlicesTill;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import javax.inject.Inject;
+
+import org.apache.james.queue.api.ManageableMailQueue;
+import org.apache.james.queue.rabbitmq.MailQueueName;
+import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedMail;
+import org.apache.james.util.FluentFutureStream;
+
+import com.google.common.base.Preconditions;
+
+class CassandraMailQueueBrowser {
+
+    static class CassandraMailQueueIterator implements 
ManageableMailQueue.MailQueueIterator {
+
+        private final Iterator<ManageableMailQueue.MailQueueItemView> iterator;
+
+        
CassandraMailQueueIterator(Iterator<ManageableMailQueue.MailQueueItemView> 
iterator) {
+            Preconditions.checkNotNull(iterator);
+
+            this.iterator = iterator;
+        }
+
+        @Override
+        public void close() {}
+
+        @Override
+        public boolean hasNext() {
+            return iterator.hasNext();
+        }
+
+        @Override
+        public ManageableMailQueue.MailQueueItemView next() {
+            return iterator.next();
+        }
+    }
+
+    private final BrowseStartDAO browseStartDao;
+    private final DeletedMailsDAO deletedMailsDao;
+    private final EnqueuedMailsDAO enqueuedMailsDao;
+    private final CassandraMailQueueViewConfiguration configuration;
+    private final Clock clock;
+
+    @Inject
+    CassandraMailQueueBrowser(BrowseStartDAO browseStartDao,
+                              DeletedMailsDAO deletedMailsDao,
+                              EnqueuedMailsDAO enqueuedMailsDao,
+                              CassandraMailQueueViewConfiguration 
configuration, Clock clock) {
+        this.browseStartDao = browseStartDao;
+        this.deletedMailsDao = deletedMailsDao;
+        this.enqueuedMailsDao = enqueuedMailsDao;
+        this.configuration = configuration;
+        this.clock = clock;
+    }
+
+    CompletableFuture<Stream<ManageableMailQueue.MailQueueItemView>> 
browse(MailQueueName queueName) {
+        return browseReferences(queueName)
+            .map(EnqueuedMail::getMail)
+            .map(ManageableMailQueue.MailQueueItemView::new)
+            .completableFuture();
+    }
+
+    FluentFutureStream<EnqueuedMail> browseReferences(MailQueueName queueName) 
{
+        return FluentFutureStream.of(browseStartDao.findBrowseStart(queueName)
+            .thenApply(this::allSlicesStartingAt))
+            .map(slice -> browseSlice(queueName, slice), 
FluentFutureStream::unboxFluentFuture);
+    }
+
+    private FluentFutureStream<EnqueuedMail> browseSlice(MailQueueName 
queueName, Slice slice) {
+        return FluentFutureStream.of(
+            allBucketIds()
+                .map(bucketId ->
+                    browseBucket(queueName, slice, 
bucketId).completableFuture()),
+            FluentFutureStream::unboxStream)
+            .sorted(Comparator.comparing(EnqueuedMail::getEnqueuedTime));
+    }
+
+    private FluentFutureStream<EnqueuedMail> browseBucket(MailQueueName 
queueName, Slice slice, BucketId bucketId) {
+        return FluentFutureStream.of(
+            enqueuedMailsDao.selectEnqueuedMails(queueName, slice, bucketId))
+                .thenFilter(mailReference -> 
deletedMailsDao.isStillEnqueued(queueName, mailReference.getMailKey()));
+    }
+
+    private Stream<Slice> allSlicesStartingAt(Optional<Instant> 
maybeBrowseStart) {
+        return maybeBrowseStart
+            .map(browseStart -> Slice.of(browseStart, 
configuration.getSliceWindow()))
+            .map(startSlice -> allSlicesTill(startSlice, clock.instant()))
+            .orElse(Stream.empty());
+    }
+
+    private Stream<BucketId> allBucketIds() {
+        return IntStream
+            .range(0, configuration.getBucketCount())
+            .mapToObj(BucketId::of);
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java
new file mode 100644
index 0000000..c8c0a0e
--- /dev/null
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java
@@ -0,0 +1,87 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.queue.rabbitmq.view.cassandra;
+
+import java.time.Instant;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Stream;
+
+import javax.inject.Inject;
+
+import org.apache.james.queue.rabbitmq.MailQueueName;
+import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedMail;
+import org.apache.james.queue.rabbitmq.view.cassandra.model.MailKey;
+import org.apache.mailet.Mail;
+
+class CassandraMailQueueMailDelete {
+
+    private final DeletedMailsDAO deletedMailsDao;
+    private final BrowseStartDAO browseStartDao;
+    private final CassandraMailQueueBrowser cassandraMailQueueBrowser;
+    private final CassandraMailQueueViewConfiguration configuration;
+    private final ThreadLocalRandom random;
+
+    @Inject
+    CassandraMailQueueMailDelete(DeletedMailsDAO deletedMailsDao,
+                                 BrowseStartDAO browseStartDao,
+                                 CassandraMailQueueBrowser 
cassandraMailQueueBrowser,
+                                 CassandraMailQueueViewConfiguration 
configuration,
+                                 ThreadLocalRandom random) {
+        this.deletedMailsDao = deletedMailsDao;
+        this.browseStartDao = browseStartDao;
+        this.cassandraMailQueueBrowser = cassandraMailQueueBrowser;
+        this.configuration = configuration;
+        this.random = random;
+    }
+
+    CompletableFuture<Void> considerDeleted(Mail mail, MailQueueName 
mailQueueName) {
+        return deletedMailsDao
+            .markAsDeleted(mailQueueName, MailKey.fromMail(mail))
+            .thenRunAsync(() -> maybeUpdateBrowseStart(mailQueueName));
+    }
+
+    private void maybeUpdateBrowseStart(MailQueueName mailQueueName) {
+        if (shouldUpdateBrowseStart()) {
+            findNewBrowseStart(mailQueueName)
+                .thenCompose(newBrowseStart -> 
updateNewBrowseStart(mailQueueName, newBrowseStart))
+                .join();
+        }
+    }
+
+    private CompletableFuture<Optional<Instant>> 
findNewBrowseStart(MailQueueName mailQueueName) {
+        return cassandraMailQueueBrowser.browseReferences(mailQueueName)
+            .map(EnqueuedMail::getTimeRangeStart)
+            .completableFuture()
+            .thenApply(Stream::findFirst);
+    }
+
+    private CompletableFuture<Void> updateNewBrowseStart(MailQueueName 
mailQueueName, Optional<Instant> maybeNewBrowseStart) {
+        return maybeNewBrowseStart
+            .map(newBrowseStartInstant -> 
browseStartDao.updateBrowseStart(mailQueueName, newBrowseStartInstant))
+            .orElse(CompletableFuture.completedFuture(null));
+    }
+
+    private boolean shouldUpdateBrowseStart() {
+        int threshold = configuration.getUpdateBrowseStartPace();
+        return Math.abs(random.nextInt()) % threshold == 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java
new file mode 100644
index 0000000..ed696f2
--- /dev/null
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java
@@ -0,0 +1,98 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.queue.rabbitmq.view.cassandra;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.inject.Inject;
+
+import org.apache.james.queue.rabbitmq.MailQueueName;
+import 
org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.BucketId;
+import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedMail;
+import org.apache.james.queue.rabbitmq.view.cassandra.model.MailKey;
+import org.apache.mailet.Mail;
+
+class CassandraMailQueueMailStore {
+
+    private final EnqueuedMailsDAO enqueuedMailsDao;
+    private final BrowseStartDAO browseStartDao;
+    private final CassandraMailQueueViewConfiguration configuration;
+    private final Clock clock;
+    private final Set<MailQueueName> initialInserted;
+
+    @Inject
+    CassandraMailQueueMailStore(EnqueuedMailsDAO enqueuedMailsDao,
+                                BrowseStartDAO browseStartDao,
+                                CassandraMailQueueViewConfiguration 
configuration,
+                                Clock clock) {
+        this.enqueuedMailsDao = enqueuedMailsDao;
+        this.browseStartDao = browseStartDao;
+        this.configuration = configuration;
+        this.clock = clock;
+        this.initialInserted = ConcurrentHashMap.newKeySet();
+    }
+
+    CompletableFuture<Void> storeMailInEnqueueTable(Mail mail, MailQueueName 
mailQueueName) {
+        EnqueuedMail enqueuedMail = convertToEnqueuedMail(mail, mailQueueName);
+
+        return enqueuedMailsDao.insert(enqueuedMail)
+            .thenCompose(any -> initBrowseStartIfNeeded(mailQueueName, 
enqueuedMail.getTimeRangeStart()));
+    }
+
+    private CompletableFuture<Void> initBrowseStartIfNeeded(MailQueueName 
mailQueueName, Instant sliceStartAt) {
+        if (!initialInserted.contains(mailQueueName)) {
+            return tryInsertBrowseStart(mailQueueName, sliceStartAt);
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
+    private CompletableFuture<Void> tryInsertBrowseStart(MailQueueName 
mailQueueName, Instant sliceStartAt) {
+        return browseStartDao
+            .insertInitialBrowseStart(mailQueueName, sliceStartAt)
+            .thenAccept(any -> initialInserted.add(mailQueueName));
+    }
+
+    private EnqueuedMail convertToEnqueuedMail(Mail mail, MailQueueName 
mailQueueName) {
+        return EnqueuedMail.builder()
+            .mail(mail)
+            .bucketId(computedBucketId(mail))
+            .timeRangeStart(currentSliceStartInstant())
+            .enqueuedTime(Instant.now())
+            .mailKey(MailKey.fromMail(mail))
+            .mailQueueName(mailQueueName)
+            .build();
+    }
+
+    private Instant currentSliceStartInstant() {
+        long sliceSize = configuration.getSliceWindow().getSeconds();
+        long sliceId = clock.instant().getEpochSecond() / sliceSize;
+        return Instant.ofEpochSecond(sliceId * sliceSize);
+    }
+
+    private BucketId computedBucketId(Mail mail) {
+        int mailKeyHashCode = mail.getName().hashCode();
+        int bucketIdValue = mailKeyHashCode % configuration.getBucketCount();
+        return BucketId.of(bucketIdValue);
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
new file mode 100644
index 0000000..ee309e6
--- /dev/null
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
@@ -0,0 +1,91 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.queue.rabbitmq.view.cassandra;
+
+import java.util.concurrent.CompletableFuture;
+
+import javax.inject.Inject;
+
+import org.apache.james.queue.api.ManageableMailQueue;
+import org.apache.james.queue.rabbitmq.MailQueueName;
+import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
+import org.apache.mailet.Mail;
+
+import com.google.common.collect.Iterators;
+
+public class CassandraMailQueueView implements MailQueueView {
+
+    public static class Factory {
+        private final CassandraMailQueueMailStore storeHelper;
+        private final CassandraMailQueueBrowser cassandraMailQueueBrowser;
+        private final CassandraMailQueueMailDelete 
cassandraMailQueueMailDelete;
+
+        @Inject
+        public Factory(CassandraMailQueueMailStore storeHelper, 
CassandraMailQueueBrowser cassandraMailQueueBrowser, 
CassandraMailQueueMailDelete cassandraMailQueueMailDelete) {
+            this.storeHelper = storeHelper;
+            this.cassandraMailQueueBrowser = cassandraMailQueueBrowser;
+            this.cassandraMailQueueMailDelete = cassandraMailQueueMailDelete;
+        }
+
+        public MailQueueView create(MailQueueName mailQueueName) {
+            return new CassandraMailQueueView(storeHelper, mailQueueName, 
cassandraMailQueueBrowser, cassandraMailQueueMailDelete);
+        }
+    }
+
+    private final CassandraMailQueueMailStore storeHelper;
+    private final CassandraMailQueueBrowser cassandraMailQueueBrowser;
+    private final CassandraMailQueueMailDelete cassandraMailQueueMailDelete;
+
+    private final MailQueueName mailQueueName;
+
+    CassandraMailQueueView(CassandraMailQueueMailStore storeHelper,
+                                  MailQueueName mailQueueName,
+                                  CassandraMailQueueBrowser 
cassandraMailQueueBrowser,
+                                  CassandraMailQueueMailDelete 
cassandraMailQueueMailDelete) {
+        this.mailQueueName = mailQueueName;
+        this.storeHelper = storeHelper;
+        this.cassandraMailQueueBrowser = cassandraMailQueueBrowser;
+        this.cassandraMailQueueMailDelete = cassandraMailQueueMailDelete;
+    }
+
+    @Override
+    public CompletableFuture<Void> storeMail(Mail mail) {
+        return storeHelper.storeMailInEnqueueTable(mail, mailQueueName);
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteMail(Mail mail) {
+        return cassandraMailQueueMailDelete.considerDeleted(mail, 
mailQueueName);
+    }
+
+    @Override
+    public ManageableMailQueue.MailQueueIterator browse() {
+        return new CassandraMailQueueBrowser.CassandraMailQueueIterator(
+            cassandraMailQueueBrowser.browse(mailQueueName)
+                .join()
+                .iterator());
+    }
+
+    @Override
+    public long getSize() {
+        return Iterators.size(browse());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAO.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAO.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAO.java
new file mode 100644
index 0000000..36ee5f4
--- /dev/null
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAO.java
@@ -0,0 +1,83 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.queue.rabbitmq.view.cassandra;
+
+import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
+import static 
org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.DeletedMailTable.MAIL_KEY;
+import static 
org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.DeletedMailTable.QUEUE_NAME;
+import static 
org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.DeletedMailTable.TABLE_NAME;
+
+import java.util.concurrent.CompletableFuture;
+
+import javax.inject.Inject;
+
+import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.queue.rabbitmq.MailQueueName;
+import org.apache.james.queue.rabbitmq.view.cassandra.model.MailKey;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+
+class DeletedMailsDAO {
+
+    private final CassandraAsyncExecutor executor;
+    private final PreparedStatement selectOne;
+    private final PreparedStatement insertOne;
+
+    @Inject
+    DeletedMailsDAO(Session session) {
+        this.executor = new CassandraAsyncExecutor(session);
+        this.selectOne = prepareSelectExist(session);
+        this.insertOne = prepareInsert(session);
+    }
+
+    private PreparedStatement prepareInsert(Session session) {
+        return session.prepare(insertInto(TABLE_NAME)
+            .value(QUEUE_NAME, bindMarker(QUEUE_NAME))
+            .value(MAIL_KEY, bindMarker(MAIL_KEY)));
+    }
+
+    private PreparedStatement prepareSelectExist(Session session) {
+        return session.prepare(select()
+            .from(TABLE_NAME)
+            .where(eq(QUEUE_NAME, bindMarker(QUEUE_NAME)))
+            .and(eq(MAIL_KEY, bindMarker(MAIL_KEY))));
+    }
+
+    CompletableFuture<Void> markAsDeleted(MailQueueName mailQueueName, MailKey 
mailKey) {
+        return executor.executeVoid(insertOne.bind()
+            .setString(QUEUE_NAME, mailQueueName.asString())
+            .setString(MAIL_KEY, mailKey.getMailKey()));
+    }
+
+    CompletableFuture<Boolean> isDeleted(MailQueueName mailQueueName, MailKey 
mailKey) {
+        return executor.executeReturnExists(
+            selectOne.bind()
+                .setString(QUEUE_NAME, mailQueueName.asString())
+                .setString(MAIL_KEY, mailKey.getMailKey()));
+    }
+
+    CompletableFuture<Boolean> isStillEnqueued(MailQueueName mailQueueName, 
MailKey mailKey) {
+        return isDeleted(mailQueueName, mailKey).thenApply(b -> !b);
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java
new file mode 100644
index 0000000..2b8e11b
--- /dev/null
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java
@@ -0,0 +1,144 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.queue.rabbitmq.view.cassandra;
+
+import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
+import static 
org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.ATTRIBUTES;
+import static 
org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.BUCKET_ID;
+import static 
org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.ENQUEUED_TIME;
+import static 
org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.ERROR_MESSAGE;
+import static 
org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.LAST_UPDATED;
+import static 
org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.MAIL_KEY;
+import static 
org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.PER_RECIPIENT_SPECIFIC_HEADERS;
+import static 
org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.QUEUE_NAME;
+import static 
org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.RECIPIENTS;
+import static 
org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.REMOTE_ADDR;
+import static 
org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.REMOTE_HOST;
+import static 
org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.SENDER;
+import static 
org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.STATE;
+import static 
org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.TABLE_NAME;
+import static 
org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.TIME_RANGE_START;
+import static 
org.apache.james.queue.rabbitmq.view.cassandra.EnqueuedMailsDaoUtil.asStringList;
+import static 
org.apache.james.queue.rabbitmq.view.cassandra.EnqueuedMailsDaoUtil.toHeaderMap;
+import static 
org.apache.james.queue.rabbitmq.view.cassandra.EnqueuedMailsDaoUtil.toRawAttributeMap;
+import static 
org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.BucketId;
+import static 
org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.Slice;
+
+import java.util.Date;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Stream;
+
+import javax.inject.Inject;
+
+import org.apache.james.backends.cassandra.init.CassandraTypesProvider;
+import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.backends.cassandra.utils.CassandraUtils;
+import org.apache.james.core.MailAddress;
+import org.apache.james.queue.rabbitmq.MailQueueName;
+import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedMail;
+import org.apache.mailet.Mail;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+
+class EnqueuedMailsDAO {
+
+    private final CassandraAsyncExecutor executor;
+    private final PreparedStatement selectFrom;
+    private final PreparedStatement insert;
+    private final CassandraUtils cassandraUtils;
+    private final CassandraTypesProvider cassandraTypesProvider;
+
+    @Inject
+    EnqueuedMailsDAO(Session session, CassandraUtils cassandraUtils, 
CassandraTypesProvider cassandraTypesProvider) {
+        this.executor = new CassandraAsyncExecutor(session);
+        this.cassandraUtils = cassandraUtils;
+        this.cassandraTypesProvider = cassandraTypesProvider;
+
+        this.selectFrom = prepareSelectFrom(session);
+        this.insert = prepareInsert(session);
+    }
+
+    private PreparedStatement prepareSelectFrom(Session session) {
+        return session.prepare(select()
+            .from(TABLE_NAME)
+            .where(eq(QUEUE_NAME, bindMarker(QUEUE_NAME)))
+            .and(eq(TIME_RANGE_START, bindMarker(TIME_RANGE_START)))
+            .and(eq(BUCKET_ID, bindMarker(BUCKET_ID))));
+    }
+
+    private PreparedStatement prepareInsert(Session session) {
+        return session.prepare(insertInto(TABLE_NAME)
+            .value(QUEUE_NAME, bindMarker(QUEUE_NAME))
+            .value(TIME_RANGE_START, bindMarker(TIME_RANGE_START))
+            .value(BUCKET_ID, bindMarker(BUCKET_ID))
+            .value(MAIL_KEY, bindMarker(MAIL_KEY))
+            .value(ENQUEUED_TIME, bindMarker(ENQUEUED_TIME))
+            .value(STATE, bindMarker(STATE))
+            .value(SENDER, bindMarker(SENDER))
+            .value(RECIPIENTS, bindMarker(RECIPIENTS))
+            .value(ATTRIBUTES, bindMarker(ATTRIBUTES))
+            .value(ERROR_MESSAGE, bindMarker(ERROR_MESSAGE))
+            .value(REMOTE_ADDR, bindMarker(REMOTE_ADDR))
+            .value(REMOTE_HOST, bindMarker(REMOTE_HOST))
+            .value(LAST_UPDATED, bindMarker(LAST_UPDATED))
+            .value(PER_RECIPIENT_SPECIFIC_HEADERS, 
bindMarker(PER_RECIPIENT_SPECIFIC_HEADERS)));
+    }
+
+    CompletableFuture<Void> insert(EnqueuedMail enqueuedMail) {
+        Mail mail = enqueuedMail.getMail();
+
+        return executor.executeVoid(insert.bind()
+            .setString(QUEUE_NAME, enqueuedMail.getMailQueueName().asString())
+            .setTimestamp(TIME_RANGE_START, 
Date.from(enqueuedMail.getTimeRangeStart()))
+            .setInt(BUCKET_ID, enqueuedMail.getBucketId().getValue())
+            .setTimestamp(ENQUEUED_TIME, 
Date.from(enqueuedMail.getEnqueuedTime()))
+            .setString(MAIL_KEY, mail.getName())
+            .setString(STATE, mail.getState())
+            .setString(SENDER, Optional.ofNullable(mail.getSender())
+                .map(MailAddress::asString)
+                .orElse(null))
+            .setList(RECIPIENTS, asStringList(mail.getRecipients()))
+            .setString(ERROR_MESSAGE, mail.getErrorMessage())
+            .setString(REMOTE_ADDR, mail.getRemoteAddr())
+            .setString(REMOTE_HOST, mail.getRemoteHost())
+            .setTimestamp(LAST_UPDATED, mail.getLastUpdated())
+            .setMap(ATTRIBUTES, toRawAttributeMap(mail))
+            .setMap(PER_RECIPIENT_SPECIFIC_HEADERS, 
toHeaderMap(cassandraTypesProvider, mail.getPerRecipientSpecificHeaders()))
+        );
+    }
+
+    CompletableFuture<Stream<EnqueuedMail>> selectEnqueuedMails(
+        MailQueueName queueName, Slice slice, BucketId bucketId) {
+
+        return executor.execute(
+            selectFrom.bind()
+                .setString(QUEUE_NAME, queueName.asString())
+                .setTimestamp(TIME_RANGE_START, 
Date.from(slice.getStartSliceInstant()))
+                .setInt(BUCKET_ID, bucketId.getValue()))
+            .thenApply(resultSet -> cassandraUtils.convertToStream(resultSet)
+                .map(EnqueuedMailsDaoUtil::toEnqueuedMail));
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to