This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch CASSANDRA-18519 in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit bed28e17072555b11ecd0b781115e5a24ad20137 Author: David Capwell <dcapw...@gmail.com> AuthorDate: Thu May 11 09:48:43 2023 -0700 created new forCallback to help bridge the gap between AsyncChain and Observable --- .../main/java/accord/utils/async/Observable.java | 73 +++++++++++++++------- 1 file changed, 49 insertions(+), 24 deletions(-) diff --git a/accord-core/src/main/java/accord/utils/async/Observable.java b/accord-core/src/main/java/accord/utils/async/Observable.java index 3f4f5cfb..bcf9695a 100644 --- a/accord-core/src/main/java/accord/utils/async/Observable.java +++ b/accord-core/src/main/java/accord/utils/async/Observable.java @@ -18,6 +18,9 @@ package accord.utils.async; +import java.util.ArrayList; +import java.util.List; +import java.util.function.BiConsumer; import java.util.function.Function; /** @@ -32,36 +35,58 @@ public interface Observable<T> default <R> Observable<R> map(Function<? super R, ? extends T> mapper) { - return new Map<>(this, mapper); + Observable<T> self = this; + // since this project still targets jdk8, can't create private classes, so to avoid adding types to the public api, + // use ananomus classes. + return new Observable<R>() + { + @Override + public void onNext(R value) + { + self.onNext(mapper.apply(value)); + } + + @Override + public void onError(Throwable t) + { + self.onError(t); + } + + @Override + public void onCompleted() + { + self.onCompleted(); + } + }; } - class Map<A, B> implements Observable<A> + static <T> Observable<T> forCallback(BiConsumer<? super List<T>, Throwable> callback) { - private final Observable<B> next; - private final Function<? super A, ? extends B> mapper; - - public Map(Observable<B> next, Function<? super A, ? extends B> mapper) + return new Observable<T>() { - this.next = next; - this.mapper = mapper; - } + private List<T> elements = new ArrayList<>(); - @Override - public void onNext(A value) - { - next.onNext(mapper.apply(value)); - } + @Override + public void onNext(T value) + { + elements.add(value); + } - @Override - public void onError(Throwable t) - { - next.onError(t); - } + @Override + public void onError(Throwable t) + { + this.elements.clear(); + this.elements = null; + callback.accept(null, t); + } - @Override - public void onCompleted() - { - next.onCompleted(); - } + @Override + public void onCompleted() + { + List<T> elements = this.elements; + this.elements = null; + callback.accept(elements, null); + } + }; } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org