This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch CASSANDRA-18519
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git

commit cfabfdac6f5ed927c1acb7568d3eb13aad6d4573
Author: David Capwell <dcapw...@gmail.com>
AuthorDate: Tue May 16 13:31:02 2023 -0700

    Observable.asChain now takes a collector so we can work with list/set
---
 .../main/java/accord/utils/async/Observable.java   | 27 ++++++++++++++--------
 1 file changed, 18 insertions(+), 9 deletions(-)

diff --git a/accord-core/src/main/java/accord/utils/async/Observable.java 
b/accord-core/src/main/java/accord/utils/async/Observable.java
index 4d4a5dc7..ad184c9a 100644
--- a/accord-core/src/main/java/accord/utils/async/Observable.java
+++ b/accord-core/src/main/java/accord/utils/async/Observable.java
@@ -23,6 +23,8 @@ import java.util.List;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
 
 /**
  * Stream like interface that is "pushed" results (to the {@link 
#onNext(Object)} method).  This interface is similar,
@@ -95,30 +97,37 @@ public interface Observable<T>
 
     static <A> AsyncChain<List<A>> asChain(Consumer<Observable<A>> work)
     {
-        return asChain(work, Function.identity());
+        return asChain(work, Function.identity(), Collectors.toList());
     }
 
-    static <A, B> AsyncChain<List<B>> asChain(Consumer<Observable<A>> work, 
Function<? super A, ? extends B> mapper)
+    static <A, Result, Accumulator> AsyncChain<Result> 
asChain(Consumer<Observable<A>> work,
+                                                               Collector<? 
super A, Accumulator, Result> collector)
     {
-        return new AsyncChains.Head<List<B>>()
+        return asChain(work, Function.identity(), collector);
+    }
+
+    static <A, B, Result, Accumulator> AsyncChain<Result> 
asChain(Consumer<Observable<A>> work,
+                                                                  Function<? 
super A, ? extends B> mapper,
+                                                                  Collector<? 
super B, Accumulator, Result> collector)
+    {
+        return new AsyncChains.Head<Result>()
         {
             @Override
-            protected void start(BiConsumer<? super List<B>, Throwable> 
callback)
+            protected void start(BiConsumer<? super Result, Throwable> 
callback)
             {
                 work.accept(new Observable<A>()
                 {
-                    List<B> values = new ArrayList<>();
+                    Accumulator values = collector.supplier().get();
 
                     @Override
                     public void onNext(A value)
                     {
-                        values.add(mapper.apply(value));
+                        collector.accumulator().accept(values, 
mapper.apply(value));
                     }
 
                     @Override
                     public void onError(Throwable t)
                     {
-                        values.clear();
                         values = null;
                         callback.accept(null, t);
                     }
@@ -126,9 +135,9 @@ public interface Observable<T>
                     @Override
                     public void onCompleted()
                     {
-                        List<B> values = this.values;
+                        Result result = 
collector.finisher().apply(this.values);
                         this.values = null;
-                        callback.accept(values, null);
+                        callback.accept(result, null);
                     }
                 });
             }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to