This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch CASSANDRA-18519 in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit c054e1dcc0ac001c74d7a93a744197b4af9acf1e Author: David Capwell <dcapw...@gmail.com> AuthorDate: Mon May 15 12:36:53 2023 -0700 onError and onComplete now required, and added new asChain method --- .../main/java/accord/utils/async/Observable.java | 49 +++++++++++++++++++++- 1 file changed, 47 insertions(+), 2 deletions(-) diff --git a/accord-core/src/main/java/accord/utils/async/Observable.java b/accord-core/src/main/java/accord/utils/async/Observable.java index 00986e6b..4d4a5dc7 100644 --- a/accord-core/src/main/java/accord/utils/async/Observable.java +++ b/accord-core/src/main/java/accord/utils/async/Observable.java @@ -21,6 +21,7 @@ package accord.utils.async; import java.util.ArrayList; import java.util.List; import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.function.Function; /** @@ -30,8 +31,10 @@ import java.util.function.Function; public interface Observable<T> { void onNext(T value) throws Exception; - default void onError(Throwable t) {} - default void onCompleted() {} + + void onError(Throwable t); + + void onCompleted(); default <R> Observable<R> map(Function<? super R, ? extends T> mapper) { @@ -89,4 +92,46 @@ public interface Observable<T> } }; } + + static <A> AsyncChain<List<A>> asChain(Consumer<Observable<A>> work) + { + return asChain(work, Function.identity()); + } + + static <A, B> AsyncChain<List<B>> asChain(Consumer<Observable<A>> work, Function<? super A, ? extends B> mapper) + { + return new AsyncChains.Head<List<B>>() + { + @Override + protected void start(BiConsumer<? super List<B>, Throwable> callback) + { + work.accept(new Observable<A>() + { + List<B> values = new ArrayList<>(); + + @Override + public void onNext(A value) + { + values.add(mapper.apply(value)); + } + + @Override + public void onError(Throwable t) + { + values.clear(); + values = null; + callback.accept(null, t); + } + + @Override + public void onCompleted() + { + List<B> values = this.values; + this.values = null; + callback.accept(values, null); + } + }); + } + }; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org