JAMES-2082 Adding more capability to Stream future handling
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/c00651a1 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/c00651a1 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/c00651a1 Branch: refs/heads/master Commit: c00651a13addadd01214051e149d09a5538d0cab Parents: cae6beb Author: benwa <btell...@linagora.com> Authored: Thu Jul 6 17:23:48 2017 +0700 Committer: Antoine Duprat <adup...@linagora.com> Committed: Mon Jul 10 14:23:54 2017 +0200 ---------------------------------------------------------------------- .../cassandra/mail/CassandraMailboxDAO.java | 45 ++++++++++++-------- .../cassandra/mail/CassandraMailboxMapper.java | 19 ++++----- .../james/util/CompletableFutureUtil.java | 6 +++ .../apache/james/util/FluentFutureStream.java | 33 ++++++++++++++ .../james/util/CompletableFutureUtilTest.java | 14 ++++++ 5 files changed, 89 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/c00651a1/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java index fee0e15..c9a136d 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java @@ -50,6 +50,7 @@ import org.apache.james.mailbox.model.MailboxPath; import org.apache.james.mailbox.store.mail.model.Mailbox; import org.apache.james.mailbox.store.mail.model.impl.SimpleMailbox; import org.apache.james.util.CompletableFutureUtil; +import org.apache.james.util.FluentFutureStream; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Row; @@ -141,22 +142,29 @@ public class CassandraMailboxDAO { } public CompletableFuture<Optional<SimpleMailbox>> retrieveMailbox(CassandraId mailboxId) { - return mailbox(mailboxId, - executor.executeSingleRow(readStatement.bind() - .setUUID(ID, mailboxId.asUuid()))); + CompletableFuture<MailboxACL> aclCompletableFuture = + new CassandraACLMapper(mailboxId, session, executor, cassandraConfiguration) + .getACL(); + + CompletableFuture<Optional<SimpleMailbox>> simpleMailboxFuture = executor.executeSingleRow(readStatement.bind() + .setUUID(ID, mailboxId.asUuid())) + .thenApply(rowOptional -> rowOptional.map(this::mailboxFromRow)) + .thenApply(mailbox -> addMailboxId(mailboxId, mailbox)); + + return CompletableFutureUtil.combine( + aclCompletableFuture, + simpleMailboxFuture, + this::addAcl); + } + + private Optional<SimpleMailbox> addMailboxId(CassandraId cassandraId, Optional<SimpleMailbox> mailboxOptional) { + mailboxOptional.ifPresent(mailbox -> mailbox.setMailboxId(cassandraId)); + return mailboxOptional; } - private CompletableFuture<Optional<SimpleMailbox>> mailbox(CassandraId cassandraId, CompletableFuture<Optional<Row>> rowFuture) { - CompletableFuture<MailboxACL> aclCompletableFuture = new CassandraACLMapper(cassandraId, session, executor, cassandraConfiguration).getACL(); - return rowFuture.thenApply(rowOptional -> rowOptional.map(this::mailboxFromRow)) - .thenApply(mailboxOptional -> { - mailboxOptional.ifPresent(mailbox -> mailbox.setMailboxId(cassandraId)); - return mailboxOptional; - }) - .thenCompose(mailboxOptional -> aclCompletableFuture.thenApply(acl -> { - mailboxOptional.ifPresent(mailbox -> mailbox.setACL(acl)); - return mailboxOptional; - })); + private Optional<SimpleMailbox> addAcl(MailboxACL acl, Optional<SimpleMailbox> mailboxOptional) { + mailboxOptional.ifPresent(mailbox -> mailbox.setACL(acl)); + return mailboxOptional; } private SimpleMailbox mailboxFromRow(Row row) { @@ -169,10 +177,11 @@ public class CassandraMailboxDAO { } public CompletableFuture<Stream<SimpleMailbox>> retrieveAllMailboxes() { - return executor.execute(listStatement.bind()) - .thenApply(cassandraUtils::convertToStream) - .thenApply(stream -> stream.map(this::toMailboxWithId)) - .thenCompose(stream -> CompletableFutureUtil.allOf(stream.map(this::toMailboxWithAclFuture))); + return FluentFutureStream.of(executor.execute(listStatement.bind()) + .thenApply(cassandraUtils::convertToStream)) + .map(this::toMailboxWithId) + .thenComposeOnAll(this::toMailboxWithAclFuture) + .completableFuture(); } private SimpleMailbox toMailboxWithId(Row row) { http://git-wip-us.apache.org/repos/asf/james-project/blob/c00651a1/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java index ef0362a..0a2a3f0 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java @@ -43,7 +43,7 @@ import org.apache.james.mailbox.store.mail.MailboxMapper; import org.apache.james.mailbox.store.mail.model.Mailbox; import org.apache.james.mailbox.store.mail.model.impl.SimpleMailbox; import org.apache.james.util.CompletableFutureUtil; -import org.apache.james.util.OptionalConverter; +import org.apache.james.util.FluentFutureStream; import com.datastax.driver.core.Session; import com.datastax.driver.core.exceptions.InvalidQueryException; @@ -111,15 +111,14 @@ public class CassandraMailboxMapper implements MailboxMapper { @Override public List<Mailbox> findMailboxWithPathLike(MailboxPath path) throws MailboxException { Pattern regex = Pattern.compile(constructEscapedRegexForMailboxNameMatching(path)); - return mailboxPathDAO.listUserMailboxes(path.getNamespace(), path.getUser()) - .thenApply(stream -> stream.filter(idAndPath -> regex.matcher(idAndPath.getMailboxPath().getName()).matches())) - .thenApply(stream -> stream.map(CassandraMailboxPathDAO.CassandraIdAndPath::getCassandraId)) - .thenApply(stream -> stream.map(mailboxDAO::retrieveMailbox)) - .thenCompose(CompletableFutureUtil::allOf) - .thenApply(stream -> stream - .flatMap(OptionalConverter::toStream) - .collect(Guavate.<Mailbox>toImmutableList())) - .join(); + + return FluentFutureStream.of(mailboxPathDAO.listUserMailboxes(path.getNamespace(), path.getUser())) + .filter(idAndPath -> regex.matcher(idAndPath.getMailboxPath().getName()).matches()) + .map(CassandraMailboxPathDAO.CassandraIdAndPath::getCassandraId) + .thenFlatComposeOnOptional(mailboxDAO::retrieveMailbox) + .completableFuture() + .join() + .collect(Guavate.toImmutableList()); } @Override http://git-wip-us.apache.org/repos/asf/james-project/blob/c00651a1/server/container/util-java8/src/main/java/org/apache/james/util/CompletableFutureUtil.java ---------------------------------------------------------------------- diff --git a/server/container/util-java8/src/main/java/org/apache/james/util/CompletableFutureUtil.java b/server/container/util-java8/src/main/java/org/apache/james/util/CompletableFutureUtil.java index 6b5c312..67ee055 100644 --- a/server/container/util-java8/src/main/java/org/apache/james/util/CompletableFutureUtil.java +++ b/server/container/util-java8/src/main/java/org/apache/james/util/CompletableFutureUtil.java @@ -21,6 +21,7 @@ package org.apache.james.util; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.function.BiFunction; import java.util.function.BinaryOperator; import java.util.function.Function; import java.util.function.Supplier; @@ -33,6 +34,11 @@ public class CompletableFutureUtil { return allOf(Stream.of(futures)); } + public static <T, U, V> CompletableFuture<V> combine(CompletableFuture<T> t, CompletableFuture<U> u, BiFunction<T,U,V> combiner) { + return t.thenCompose(valueT -> + u.thenApply(valueU -> combiner.apply(valueT, valueU))); + } + public static <T> CompletableFuture<Stream<T>> allOf(Stream<CompletableFuture<T>> futureStream) { return futureStream .map((CompletableFuture<T> future) -> future.thenApply(Stream::of)) http://git-wip-us.apache.org/repos/asf/james-project/blob/c00651a1/server/container/util-java8/src/main/java/org/apache/james/util/FluentFutureStream.java ---------------------------------------------------------------------- diff --git a/server/container/util-java8/src/main/java/org/apache/james/util/FluentFutureStream.java b/server/container/util-java8/src/main/java/org/apache/james/util/FluentFutureStream.java index 810f264..fa40616 100644 --- a/server/container/util-java8/src/main/java/org/apache/james/util/FluentFutureStream.java +++ b/server/container/util-java8/src/main/java/org/apache/james/util/FluentFutureStream.java @@ -23,6 +23,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.BinaryOperator; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Stream; public class FluentFutureStream<T> { @@ -33,6 +34,16 @@ public class FluentFutureStream<T> { return new FluentFutureStream<>(completableFuture); } + public static <T> FluentFutureStream<T> ofNestedStreams(Stream<CompletableFuture<Stream<T>>> completableFuture) { + return of(completableFuture) + .flatMap(Function.identity()); + } + + public static <T> FluentFutureStream<T> ofOptionals(Stream<CompletableFuture<Optional<T>>> completableFuture) { + return of(completableFuture) + .flatMapOptional(Function.identity()); + } + public static <T> FluentFutureStream<T> of(Stream<CompletableFuture<T>> completableFutureStream) { return new FluentFutureStream<>(CompletableFutureUtil.allOf(completableFutureStream)); } @@ -69,11 +80,28 @@ public class FluentFutureStream<T> { CompletableFutureUtil.thenComposeOnAll(completableFuture(), function)); } + public <U> FluentFutureStream<U> thenFlatCompose(Function<T, CompletableFuture<Stream<U>>> function) { + return FluentFutureStream.of( + CompletableFutureUtil.thenComposeOnAll(completableFuture(), function)) + .flatMap(Function.identity()); + } + + public <U> FluentFutureStream<U> thenFlatComposeOnOptional(Function<T, CompletableFuture<Optional<U>>> function) { + return FluentFutureStream.of( + CompletableFutureUtil.thenComposeOnAll(completableFuture(), function)) + .flatMapOptional(Function.identity()); + } + public <U> FluentFutureStream<U> flatMap(Function<T, Stream<U>> function) { return FluentFutureStream.of(completableFuture().thenApply(stream -> stream.flatMap(function))); } + public <U> FluentFutureStream<U> flatMapOptional(Function<T, Optional<U>> function) { + return map(function) + .flatMap(OptionalConverter::toStream); + } + public <U> FluentFutureStream<U> thenCompose(Function<Stream<T>, CompletableFuture<Stream<U>>> function) { return FluentFutureStream.of(completableFuture().thenCompose(function)); } @@ -82,6 +110,11 @@ public class FluentFutureStream<T> { return this.completableFuture; } + public FluentFutureStream<T> filter(Predicate<T> predicate) { + return FluentFutureStream.of(completableFuture + .thenApply(stream -> stream.filter(predicate))); + } + public Stream<T> join() { return completableFuture().join(); } http://git-wip-us.apache.org/repos/asf/james-project/blob/c00651a1/server/container/util-java8/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java ---------------------------------------------------------------------- diff --git a/server/container/util-java8/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java b/server/container/util-java8/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java index 3a7e571..b4ea41d 100644 --- a/server/container/util-java8/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java +++ b/server/container/util-java8/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java @@ -52,6 +52,20 @@ public class CompletableFutureUtilTest { } @Test + public void combineShouldReturnCombinationOfBothSuppliedFutures() { + int value1 = 18; + int value2 = 12; + + assertThat(CompletableFutureUtil.combine( + CompletableFuture.completedFuture(value1), + CompletableFuture.completedFuture(value2), + (a, b) -> 2 * a + b) + .join()) + .isEqualTo(2 * value1 + value2); + + } + + @Test public void allOfShouldUnboxEmptyStream() { assertThat( CompletableFutureUtil.allOf(Stream.empty()) --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org