This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch CASSANDRA-18519
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git

commit c054e1dcc0ac001c74d7a93a744197b4af9acf1e
Author: David Capwell <dcapw...@gmail.com>
AuthorDate: Mon May 15 12:36:53 2023 -0700

    onError and onComplete now required, and added new asChain method
---
 .../main/java/accord/utils/async/Observable.java   | 49 +++++++++++++++++++++-
 1 file changed, 47 insertions(+), 2 deletions(-)

diff --git a/accord-core/src/main/java/accord/utils/async/Observable.java 
b/accord-core/src/main/java/accord/utils/async/Observable.java
index 00986e6b..4d4a5dc7 100644
--- a/accord-core/src/main/java/accord/utils/async/Observable.java
+++ b/accord-core/src/main/java/accord/utils/async/Observable.java
@@ -21,6 +21,7 @@ package accord.utils.async;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.function.BiConsumer;
+import java.util.function.Consumer;
 import java.util.function.Function;
 
 /**
@@ -30,8 +31,10 @@ import java.util.function.Function;
 public interface Observable<T>
 {
     void onNext(T value) throws Exception;
-    default void onError(Throwable t) {}
-    default void onCompleted() {}
+
+    void onError(Throwable t);
+
+    void onCompleted();
 
     default <R> Observable<R> map(Function<? super R, ? extends T> mapper)
     {
@@ -89,4 +92,46 @@ public interface Observable<T>
             }
         };
     }
+
+    static <A> AsyncChain<List<A>> asChain(Consumer<Observable<A>> work)
+    {
+        return asChain(work, Function.identity());
+    }
+
+    static <A, B> AsyncChain<List<B>> asChain(Consumer<Observable<A>> work, 
Function<? super A, ? extends B> mapper)
+    {
+        return new AsyncChains.Head<List<B>>()
+        {
+            @Override
+            protected void start(BiConsumer<? super List<B>, Throwable> 
callback)
+            {
+                work.accept(new Observable<A>()
+                {
+                    List<B> values = new ArrayList<>();
+
+                    @Override
+                    public void onNext(A value)
+                    {
+                        values.add(mapper.apply(value));
+                    }
+
+                    @Override
+                    public void onError(Throwable t)
+                    {
+                        values.clear();
+                        values = null;
+                        callback.accept(null, t);
+                    }
+
+                    @Override
+                    public void onCompleted()
+                    {
+                        List<B> values = this.values;
+                        this.values = null;
+                        callback.accept(values, null);
+                    }
+                });
+            }
+        };
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to