JAMES-2082 Adding more capability to Stream future handling

Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/c00651a1
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/c00651a1
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/c00651a1

Branch: refs/heads/master
Commit: c00651a13addadd01214051e149d09a5538d0cab
Parents: cae6beb
Author: benwa <btell...@linagora.com>
Authored: Thu Jul 6 17:23:48 2017 +0700
Committer: Antoine Duprat <adup...@linagora.com>
Committed: Mon Jul 10 14:23:54 2017 +0200

----------------------------------------------------------------------
 .../cassandra/mail/CassandraMailboxDAO.java     | 45 ++++++++++++--------
 .../cassandra/mail/CassandraMailboxMapper.java  | 19 ++++-----
 .../james/util/CompletableFutureUtil.java       |  6 +++
 .../apache/james/util/FluentFutureStream.java   | 33 ++++++++++++++
 .../james/util/CompletableFutureUtilTest.java   | 14 ++++++
 5 files changed, 89 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/c00651a1/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java
index fee0e15..c9a136d 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java
@@ -50,6 +50,7 @@ import org.apache.james.mailbox.model.MailboxPath;
 import org.apache.james.mailbox.store.mail.model.Mailbox;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleMailbox;
 import org.apache.james.util.CompletableFutureUtil;
+import org.apache.james.util.FluentFutureStream;
 
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Row;
@@ -141,22 +142,29 @@ public class CassandraMailboxDAO {
     }
 
     public CompletableFuture<Optional<SimpleMailbox>> 
retrieveMailbox(CassandraId mailboxId) {
-        return mailbox(mailboxId,
-            executor.executeSingleRow(readStatement.bind()
-                .setUUID(ID, mailboxId.asUuid())));
+        CompletableFuture<MailboxACL> aclCompletableFuture =
+            new CassandraACLMapper(mailboxId, session, executor, 
cassandraConfiguration)
+                .getACL();
+
+        CompletableFuture<Optional<SimpleMailbox>> simpleMailboxFuture = 
executor.executeSingleRow(readStatement.bind()
+            .setUUID(ID, mailboxId.asUuid()))
+            .thenApply(rowOptional -> rowOptional.map(this::mailboxFromRow))
+            .thenApply(mailbox -> addMailboxId(mailboxId, mailbox));
+
+        return CompletableFutureUtil.combine(
+            aclCompletableFuture,
+            simpleMailboxFuture,
+            this::addAcl);
+    }
+
+    private Optional<SimpleMailbox> addMailboxId(CassandraId cassandraId, 
Optional<SimpleMailbox> mailboxOptional) {
+        mailboxOptional.ifPresent(mailbox -> 
mailbox.setMailboxId(cassandraId));
+        return mailboxOptional;
     }
 
-    private CompletableFuture<Optional<SimpleMailbox>> mailbox(CassandraId 
cassandraId, CompletableFuture<Optional<Row>> rowFuture) {
-        CompletableFuture<MailboxACL> aclCompletableFuture = new 
CassandraACLMapper(cassandraId, session, executor, 
cassandraConfiguration).getACL();
-        return rowFuture.thenApply(rowOptional -> 
rowOptional.map(this::mailboxFromRow))
-            .thenApply(mailboxOptional -> {
-                mailboxOptional.ifPresent(mailbox -> 
mailbox.setMailboxId(cassandraId));
-                return mailboxOptional;
-            })
-            .thenCompose(mailboxOptional -> aclCompletableFuture.thenApply(acl 
-> {
-                mailboxOptional.ifPresent(mailbox -> mailbox.setACL(acl));
-                return mailboxOptional;
-            }));
+    private Optional<SimpleMailbox> addAcl(MailboxACL acl, 
Optional<SimpleMailbox> mailboxOptional) {
+        mailboxOptional.ifPresent(mailbox -> mailbox.setACL(acl));
+        return mailboxOptional;
     }
 
     private SimpleMailbox mailboxFromRow(Row row) {
@@ -169,10 +177,11 @@ public class CassandraMailboxDAO {
     }
 
     public CompletableFuture<Stream<SimpleMailbox>> retrieveAllMailboxes() {
-        return executor.execute(listStatement.bind())
-            .thenApply(cassandraUtils::convertToStream)
-            .thenApply(stream -> stream.map(this::toMailboxWithId))
-            .thenCompose(stream -> 
CompletableFutureUtil.allOf(stream.map(this::toMailboxWithAclFuture)));
+        return FluentFutureStream.of(executor.execute(listStatement.bind())
+            .thenApply(cassandraUtils::convertToStream))
+            .map(this::toMailboxWithId)
+            .thenComposeOnAll(this::toMailboxWithAclFuture)
+            .completableFuture();
     }
 
     private SimpleMailbox toMailboxWithId(Row row) {

http://git-wip-us.apache.org/repos/asf/james-project/blob/c00651a1/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
index ef0362a..0a2a3f0 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
@@ -43,7 +43,7 @@ import org.apache.james.mailbox.store.mail.MailboxMapper;
 import org.apache.james.mailbox.store.mail.model.Mailbox;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleMailbox;
 import org.apache.james.util.CompletableFutureUtil;
-import org.apache.james.util.OptionalConverter;
+import org.apache.james.util.FluentFutureStream;
 
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.exceptions.InvalidQueryException;
@@ -111,15 +111,14 @@ public class CassandraMailboxMapper implements 
MailboxMapper {
     @Override
     public List<Mailbox> findMailboxWithPathLike(MailboxPath path) throws 
MailboxException {
         Pattern regex = 
Pattern.compile(constructEscapedRegexForMailboxNameMatching(path));
-        return mailboxPathDAO.listUserMailboxes(path.getNamespace(), 
path.getUser())
-            .thenApply(stream -> stream.filter(idAndPath -> 
regex.matcher(idAndPath.getMailboxPath().getName()).matches()))
-            .thenApply(stream -> 
stream.map(CassandraMailboxPathDAO.CassandraIdAndPath::getCassandraId))
-            .thenApply(stream -> stream.map(mailboxDAO::retrieveMailbox))
-            .thenCompose(CompletableFutureUtil::allOf)
-            .thenApply(stream -> stream
-                .flatMap(OptionalConverter::toStream)
-                .collect(Guavate.<Mailbox>toImmutableList()))
-            .join();
+
+        return 
FluentFutureStream.of(mailboxPathDAO.listUserMailboxes(path.getNamespace(), 
path.getUser()))
+            .filter(idAndPath -> 
regex.matcher(idAndPath.getMailboxPath().getName()).matches())
+            .map(CassandraMailboxPathDAO.CassandraIdAndPath::getCassandraId)
+            .thenFlatComposeOnOptional(mailboxDAO::retrieveMailbox)
+            .completableFuture()
+            .join()
+            .collect(Guavate.toImmutableList());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/james-project/blob/c00651a1/server/container/util-java8/src/main/java/org/apache/james/util/CompletableFutureUtil.java
----------------------------------------------------------------------
diff --git 
a/server/container/util-java8/src/main/java/org/apache/james/util/CompletableFutureUtil.java
 
b/server/container/util-java8/src/main/java/org/apache/james/util/CompletableFutureUtil.java
index 6b5c312..67ee055 100644
--- 
a/server/container/util-java8/src/main/java/org/apache/james/util/CompletableFutureUtil.java
+++ 
b/server/container/util-java8/src/main/java/org/apache/james/util/CompletableFutureUtil.java
@@ -21,6 +21,7 @@ package org.apache.james.util;
 
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
 import java.util.function.BinaryOperator;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -33,6 +34,11 @@ public class CompletableFutureUtil {
         return allOf(Stream.of(futures));
     }
 
+    public static <T, U, V> CompletableFuture<V> combine(CompletableFuture<T> 
t, CompletableFuture<U> u, BiFunction<T,U,V> combiner) {
+        return t.thenCompose(valueT ->
+            u.thenApply(valueU -> combiner.apply(valueT, valueU)));
+    }
+
     public static <T> CompletableFuture<Stream<T>> 
allOf(Stream<CompletableFuture<T>> futureStream) {
         return futureStream
             .map((CompletableFuture<T> future) -> future.thenApply(Stream::of))

http://git-wip-us.apache.org/repos/asf/james-project/blob/c00651a1/server/container/util-java8/src/main/java/org/apache/james/util/FluentFutureStream.java
----------------------------------------------------------------------
diff --git 
a/server/container/util-java8/src/main/java/org/apache/james/util/FluentFutureStream.java
 
b/server/container/util-java8/src/main/java/org/apache/james/util/FluentFutureStream.java
index 810f264..fa40616 100644
--- 
a/server/container/util-java8/src/main/java/org/apache/james/util/FluentFutureStream.java
+++ 
b/server/container/util-java8/src/main/java/org/apache/james/util/FluentFutureStream.java
@@ -23,6 +23,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.BinaryOperator;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Stream;
 
 public class FluentFutureStream<T> {
@@ -33,6 +34,16 @@ public class FluentFutureStream<T> {
         return new FluentFutureStream<>(completableFuture);
     }
 
+    public static <T> FluentFutureStream<T> 
ofNestedStreams(Stream<CompletableFuture<Stream<T>>> completableFuture) {
+        return of(completableFuture)
+            .flatMap(Function.identity());
+    }
+
+    public static <T> FluentFutureStream<T> 
ofOptionals(Stream<CompletableFuture<Optional<T>>> completableFuture) {
+        return of(completableFuture)
+            .flatMapOptional(Function.identity());
+    }
+
     public static <T> FluentFutureStream<T> of(Stream<CompletableFuture<T>> 
completableFutureStream) {
         return new 
FluentFutureStream<>(CompletableFutureUtil.allOf(completableFutureStream));
     }
@@ -69,11 +80,28 @@ public class FluentFutureStream<T> {
             CompletableFutureUtil.thenComposeOnAll(completableFuture(), 
function));
     }
 
+    public <U> FluentFutureStream<U> thenFlatCompose(Function<T, 
CompletableFuture<Stream<U>>> function) {
+        return FluentFutureStream.of(
+            CompletableFutureUtil.thenComposeOnAll(completableFuture(), 
function))
+            .flatMap(Function.identity());
+    }
+
+    public <U> FluentFutureStream<U> thenFlatComposeOnOptional(Function<T, 
CompletableFuture<Optional<U>>> function) {
+        return FluentFutureStream.of(
+            CompletableFutureUtil.thenComposeOnAll(completableFuture(), 
function))
+            .flatMapOptional(Function.identity());
+    }
+
     public <U> FluentFutureStream<U> flatMap(Function<T, Stream<U>> function) {
         return FluentFutureStream.of(completableFuture().thenApply(stream ->
             stream.flatMap(function)));
     }
 
+    public <U> FluentFutureStream<U> flatMapOptional(Function<T, Optional<U>> 
function) {
+        return map(function)
+            .flatMap(OptionalConverter::toStream);
+    }
+
     public <U> FluentFutureStream<U> thenCompose(Function<Stream<T>, 
CompletableFuture<Stream<U>>> function) {
         return 
FluentFutureStream.of(completableFuture().thenCompose(function));
     }
@@ -82,6 +110,11 @@ public class FluentFutureStream<T> {
         return this.completableFuture;
     }
 
+    public FluentFutureStream<T> filter(Predicate<T> predicate) {
+        return FluentFutureStream.of(completableFuture
+            .thenApply(stream -> stream.filter(predicate)));
+    }
+
     public Stream<T> join() {
         return completableFuture().join();
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/c00651a1/server/container/util-java8/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java
----------------------------------------------------------------------
diff --git 
a/server/container/util-java8/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java
 
b/server/container/util-java8/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java
index 3a7e571..b4ea41d 100644
--- 
a/server/container/util-java8/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java
+++ 
b/server/container/util-java8/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java
@@ -52,6 +52,20 @@ public class CompletableFutureUtilTest {
     }
 
     @Test
+    public void combineShouldReturnCombinationOfBothSuppliedFutures() {
+        int value1 = 18;
+        int value2 = 12;
+
+        assertThat(CompletableFutureUtil.combine(
+            CompletableFuture.completedFuture(value1),
+            CompletableFuture.completedFuture(value2),
+            (a, b) -> 2 * a + b)
+            .join())
+            .isEqualTo(2 * value1 + value2);
+
+    }
+
+    @Test
     public void allOfShouldUnboxEmptyStream() {
         assertThat(
             CompletableFutureUtil.allOf(Stream.empty())


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to