JAMES-2566 Improve completableFutureUtil::chainAll
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/78113b27 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/78113b27 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/78113b27 Branch: refs/heads/master Commit: 78113b27748d1823bd356bf93087dd7fcdf9a7cf Parents: eedcc93 Author: Benoit Tellier <btell...@linagora.com> Authored: Thu Oct 18 09:16:06 2018 +0700 Committer: Benoit Tellier <btell...@linagora.com> Committed: Wed Oct 31 09:12:08 2018 +0700 ---------------------------------------------------------------------- .../james/util/CompletableFutureUtil.java | 37 ++++++++++---------- 1 file changed, 18 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/78113b27/server/container/util/src/main/java/org/apache/james/util/CompletableFutureUtil.java ---------------------------------------------------------------------- diff --git a/server/container/util/src/main/java/org/apache/james/util/CompletableFutureUtil.java b/server/container/util/src/main/java/org/apache/james/util/CompletableFutureUtil.java index 19c94b4..f51261a 100644 --- a/server/container/util/src/main/java/org/apache/james/util/CompletableFutureUtil.java +++ b/server/container/util/src/main/java/org/apache/james/util/CompletableFutureUtil.java @@ -19,6 +19,7 @@ package org.apache.james.util; +import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; import java.util.Optional; @@ -29,6 +30,8 @@ import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Stream; +import com.google.common.collect.ImmutableList; + public class CompletableFutureUtil { public static <T> CompletableFuture<Optional<T>> unwrap(CompletableFuture<Optional<CompletableFuture<T>>> base) { @@ -50,26 +53,22 @@ public class CompletableFutureUtil { .map(CompletableFuture::join)); } - public static <R, T> CompletableFuture<Stream<R>> chainAll(Stream<T> futureStream, - Function<T, CompletableFuture<R>> transformationToChain) { - return futureStream - .map(t -> (Supplier<CompletableFuture<R>>) (() -> transformationToChain.apply(t))) - .reduce(CompletableFuture.<Stream<R>>completedFuture(Stream.of()), - (accumulator, supplier) -> - accumulator.thenCompose( - accumulatedStream -> - supplier.get() - .thenCompose(r -> - CompletableFuture.completedFuture(Stream.<R>concat(accumulatedStream, Stream.of(r)))) - ), - getCompletableFutureBinaryOperator()); - } + @SuppressWarnings("unchecked") + public static <R, T> CompletableFuture<Stream<R>> chainAll(Stream<T> futureStream, Function<T, CompletableFuture<R>> transformationToChain) { + ImmutableList<T> elements = futureStream.collect(ImmutableList.toImmutableList()); + ArrayList<R> results = new ArrayList<>(elements.size()); - private static <R> BinaryOperator<CompletableFuture<Stream<R>>> getCompletableFutureBinaryOperator() { - return (future1, future2) -> - future1.thenCompose(stream1 -> - future2.<Stream<R>>thenCompose(stream2 -> - CompletableFuture.completedFuture(Stream.concat(stream1, stream2)))); + CompletableFuture<Void> futureEmptyStream = CompletableFuture.completedFuture(null); + + BiFunction<CompletableFuture, Supplier<CompletableFuture<R>>, CompletableFuture> accumulator = + (future, supplier) -> future.thenCompose(any -> supplier.get().thenAccept(results::add)); + + BinaryOperator<CompletableFuture> combiner = (f1, f2) -> f1.thenCompose(any -> f2); + + return elements.stream() + .map(t -> (Supplier<CompletableFuture<R>>) (() -> transformationToChain.apply(t))) + .reduce(futureEmptyStream, accumulator, combiner) + .thenApply(any -> results.stream()); } public static <T, U> CompletableFuture<Stream<U>> map(CompletableFuture<Stream<T>> futurStream, Function<T, U> action) { --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org