This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch CASSANDRA-18519 in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit 745c2815806e1dd173016c763b51d05300c262b9 Author: David Capwell <dcapw...@gmail.com> AuthorDate: Tue May 16 14:09:23 2023 -0700 added a distinct filter --- .../main/java/accord/utils/async/Observable.java | 33 ++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/accord-core/src/main/java/accord/utils/async/Observable.java b/accord-core/src/main/java/accord/utils/async/Observable.java index ad184c9a..29f863e5 100644 --- a/accord-core/src/main/java/accord/utils/async/Observable.java +++ b/accord-core/src/main/java/accord/utils/async/Observable.java @@ -19,7 +19,9 @@ package accord.utils.async; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; @@ -65,6 +67,37 @@ public interface Observable<T> }; } + static <T> Observable<T> distinct(Observable<T> callback) + { + return new Observable<T>() + { + Set<T> keys = new HashSet<>(); + + @Override + public void onNext(T value) throws Exception + { + if (keys.add(value)) + callback.onNext(value); + } + + @Override + public void onError(Throwable t) + { + keys.clear(); + keys = null; + callback.onError(t); + } + + @Override + public void onCompleted() + { + keys.clear(); + keys = null; + callback.onCompleted(); + } + }; + } + static <T> Observable<T> forCallback(BiConsumer<? super List<T>, Throwable> callback) { return new Observable<T>() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org