This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch CASSANDRA-18519 in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit cfabfdac6f5ed927c1acb7568d3eb13aad6d4573 Author: David Capwell <dcapw...@gmail.com> AuthorDate: Tue May 16 13:31:02 2023 -0700 Observable.asChain now takes a collector so we can work with list/set --- .../main/java/accord/utils/async/Observable.java | 27 ++++++++++++++-------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/accord-core/src/main/java/accord/utils/async/Observable.java b/accord-core/src/main/java/accord/utils/async/Observable.java index 4d4a5dc7..ad184c9a 100644 --- a/accord-core/src/main/java/accord/utils/async/Observable.java +++ b/accord-core/src/main/java/accord/utils/async/Observable.java @@ -23,6 +23,8 @@ import java.util.List; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; +import java.util.stream.Collector; +import java.util.stream.Collectors; /** * Stream like interface that is "pushed" results (to the {@link #onNext(Object)} method). This interface is similar, @@ -95,30 +97,37 @@ public interface Observable<T> static <A> AsyncChain<List<A>> asChain(Consumer<Observable<A>> work) { - return asChain(work, Function.identity()); + return asChain(work, Function.identity(), Collectors.toList()); } - static <A, B> AsyncChain<List<B>> asChain(Consumer<Observable<A>> work, Function<? super A, ? extends B> mapper) + static <A, Result, Accumulator> AsyncChain<Result> asChain(Consumer<Observable<A>> work, + Collector<? super A, Accumulator, Result> collector) { - return new AsyncChains.Head<List<B>>() + return asChain(work, Function.identity(), collector); + } + + static <A, B, Result, Accumulator> AsyncChain<Result> asChain(Consumer<Observable<A>> work, + Function<? super A, ? extends B> mapper, + Collector<? super B, Accumulator, Result> collector) + { + return new AsyncChains.Head<Result>() { @Override - protected void start(BiConsumer<? super List<B>, Throwable> callback) + protected void start(BiConsumer<? super Result, Throwable> callback) { work.accept(new Observable<A>() { - List<B> values = new ArrayList<>(); + Accumulator values = collector.supplier().get(); @Override public void onNext(A value) { - values.add(mapper.apply(value)); + collector.accumulator().accept(values, mapper.apply(value)); } @Override public void onError(Throwable t) { - values.clear(); values = null; callback.accept(null, t); } @@ -126,9 +135,9 @@ public interface Observable<T> @Override public void onCompleted() { - List<B> values = this.values; + Result result = collector.finisher().apply(this.values); this.values = null; - callback.accept(values, null); + callback.accept(result, null); } }); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org