This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch CASSANDRA-18519
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git

commit bed28e17072555b11ecd0b781115e5a24ad20137
Author: David Capwell <dcapw...@gmail.com>
AuthorDate: Thu May 11 09:48:43 2023 -0700

    created new forCallback to help bridge the gap between AsyncChain and 
Observable
---
 .../main/java/accord/utils/async/Observable.java   | 73 +++++++++++++++-------
 1 file changed, 49 insertions(+), 24 deletions(-)

diff --git a/accord-core/src/main/java/accord/utils/async/Observable.java 
b/accord-core/src/main/java/accord/utils/async/Observable.java
index 3f4f5cfb..bcf9695a 100644
--- a/accord-core/src/main/java/accord/utils/async/Observable.java
+++ b/accord-core/src/main/java/accord/utils/async/Observable.java
@@ -18,6 +18,9 @@
 
 package accord.utils.async;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.BiConsumer;
 import java.util.function.Function;
 
 /**
@@ -32,36 +35,58 @@ public interface Observable<T>
 
     default <R> Observable<R> map(Function<? super R, ? extends T> mapper)
     {
-        return new Map<>(this, mapper);
+        Observable<T> self = this;
+        // since this project still targets jdk8, can't create private 
classes, so to avoid adding types to the public api,
+        // use ananomus classes.
+        return new Observable<R>()
+        {
+            @Override
+            public void onNext(R value)
+            {
+                self.onNext(mapper.apply(value));
+            }
+
+            @Override
+            public void onError(Throwable t)
+            {
+                self.onError(t);
+            }
+
+            @Override
+            public void onCompleted()
+            {
+                self.onCompleted();
+            }
+        };
     }
 
-    class Map<A, B> implements Observable<A>
+    static <T> Observable<T> forCallback(BiConsumer<? super List<T>, 
Throwable> callback)
     {
-        private final Observable<B> next;
-        private final Function<? super A, ? extends B> mapper;
-
-        public Map(Observable<B> next, Function<? super A, ? extends B> mapper)
+        return new Observable<T>()
         {
-            this.next = next;
-            this.mapper = mapper;
-        }
+            private List<T> elements = new ArrayList<>();
 
-        @Override
-        public void onNext(A value)
-        {
-            next.onNext(mapper.apply(value));
-        }
+            @Override
+            public void onNext(T value)
+            {
+                elements.add(value);
+            }
 
-        @Override
-        public void onError(Throwable t)
-        {
-            next.onError(t);
-        }
+            @Override
+            public void onError(Throwable t)
+            {
+                this.elements.clear();
+                this.elements = null;
+                callback.accept(null, t);
+            }
 
-        @Override
-        public void onCompleted()
-        {
-            next.onCompleted();
-        }
+            @Override
+            public void onCompleted()
+            {
+                List<T> elements = this.elements;
+                this.elements = null;
+                callback.accept(elements, null);
+            }
+        };
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to