This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch CASSANDRA-18519
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git

commit 745c2815806e1dd173016c763b51d05300c262b9
Author: David Capwell <dcapw...@gmail.com>
AuthorDate: Tue May 16 14:09:23 2023 -0700

    added a distinct filter
---
 .../main/java/accord/utils/async/Observable.java   | 33 ++++++++++++++++++++++
 1 file changed, 33 insertions(+)

diff --git a/accord-core/src/main/java/accord/utils/async/Observable.java 
b/accord-core/src/main/java/accord/utils/async/Observable.java
index ad184c9a..29f863e5 100644
--- a/accord-core/src/main/java/accord/utils/async/Observable.java
+++ b/accord-core/src/main/java/accord/utils/async/Observable.java
@@ -19,7 +19,9 @@
 package accord.utils.async;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -65,6 +67,37 @@ public interface Observable<T>
         };
     }
 
+    static <T> Observable<T> distinct(Observable<T> callback)
+    {
+        return new Observable<T>()
+        {
+            Set<T> keys = new HashSet<>();
+
+            @Override
+            public void onNext(T value) throws Exception
+            {
+                if (keys.add(value))
+                    callback.onNext(value);
+            }
+
+            @Override
+            public void onError(Throwable t)
+            {
+                keys.clear();
+                keys = null;
+                callback.onError(t);
+            }
+
+            @Override
+            public void onCompleted()
+            {
+                keys.clear();
+                keys = null;
+                callback.onCompleted();
+            }
+        };
+    }
+
     static <T> Observable<T> forCallback(BiConsumer<? super List<T>, 
Throwable> callback)
     {
         return new Observable<T>()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to