westonpace commented on a change in pull request #9643:
URL: https://github.com/apache/arrow/pull/9643#discussion_r589801536



##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -235,29 +505,213 @@ class ReadaheadGenerator {
 /// The source generator must be async-reentrant
 ///
 /// This generator itself is async-reentrant.
+///
+/// This generator may queue up to max_readahead instances of T
 template <typename T>
 AsyncGenerator<T> MakeReadaheadGenerator(AsyncGenerator<T> source_generator,
                                          int max_readahead) {
   return ReadaheadGenerator<T>(std::move(source_generator), max_readahead);
 }
 
-/// \brief Transforms an async generator using a transformer function 
returning a new
-/// AsyncGenerator
+/// \brief Creates a generator that will yield finished futures from a vector
 ///
-/// The transform function here behaves exactly the same as the transform 
function in
-/// MakeTransformedIterator and you can safely use the same transform function 
to
-/// transform both synchronous and asynchronous streams.
+/// This generator is async-reentrant
+template <typename T>
+AsyncGenerator<T> MakeVectorGenerator(std::vector<T> vec) {
+  struct State {
+    explicit State(std::vector<T> vec_) : vec(std::move(vec_)), vec_idx(0) {}
+
+    std::vector<T> vec;
+    std::atomic<std::size_t> vec_idx;
+  };
+
+  auto state = std::make_shared<State>(std::move(vec));
+  return [state]() {
+    auto idx = state->vec_idx.fetch_add(1);
+    if (idx >= state->vec.size()) {
+      return Future<T>::MakeFinished(IterationTraits<T>::End());
+    }
+    return Future<T>::MakeFinished(state->vec[idx]);
+  };
+}
+
+template <typename T>
+class MergeMapGenerator {
+ public:
+  explicit MergeMapGenerator(AsyncGenerator<AsyncGenerator<T>> source,
+                             int max_subscriptions)
+      : state_(std::make_shared<State>(std::move(source), max_subscriptions)) 
{}
+
+  Future<T> operator()() {
+    Future<T> waiting_future;
+    std::shared_ptr<DeliveredJob> delivered_job;
+    {
+      auto guard = state_->mutex.Lock();
+      if (!state_->delivered_jobs.empty()) {
+        delivered_job = std::move(state_->delivered_jobs.front());
+        state_->delivered_jobs.pop_front();
+      } else if (state_->finished) {
+        return IterationTraits<T>::End();
+      } else {
+        waiting_future = Future<T>::Make();
+        
state_->waiting_jobs.push_back(std::make_shared<Future<T>>(waiting_future));
+      }
+    }
+    if (delivered_job) {
+      delivered_job->deliverer().AddCallback(InnerCallback{state_, 
delivered_job->index});
+      return std::move(delivered_job->value);
+    }
+    if (state_->first) {
+      state_->first = false;
+      for (std::size_t i = 0; i < state_->active_subscriptions.size(); i++) {
+        state_->source().AddCallback(OuterCallback{state_, i});
+      }
+    }
+    return waiting_future;
+  }
+
+ private:
+  struct DeliveredJob {
+    explicit DeliveredJob(AsyncGenerator<T> deliverer_, T value_, std::size_t 
index_)
+        : deliverer(deliverer_), value(value_), index(index_) {}
+
+    AsyncGenerator<T> deliverer;
+    T value;
+    std::size_t index;
+  };
+
+  struct State {
+    State(AsyncGenerator<AsyncGenerator<T>> source, int max_subscriptions)
+        : source(std::move(source)),
+          active_subscriptions(max_subscriptions),
+          delivered_jobs(),
+          waiting_jobs(),
+          mutex(),
+          first(true),
+          source_exhausted(false),
+          finished(false),
+          num_active_subscriptions(max_subscriptions) {}
+
+    AsyncGenerator<AsyncGenerator<T>> source;
+    // active_subscriptions and delivered_jobs will be bounded by 
max_subscriptions
+    std::vector<AsyncGenerator<T>> active_subscriptions;
+    std::deque<std::shared_ptr<DeliveredJob>> delivered_jobs;
+    // waiting_jobs is unbounded, reentrant pulls (e.g. AddReadahead) will 
provide the
+    // backpressure
+    std::deque<std::shared_ptr<Future<T>>> waiting_jobs;
+    util::Mutex mutex;
+    bool first;
+    bool source_exhausted;
+    bool finished;
+    int num_active_subscriptions;
+  };
+
+  struct InnerCallback {
+    void operator()(const Result<T>& maybe_next) {
+      bool finished = false;
+      Future<T> sink;
+      if (maybe_next.ok()) {
+        finished = IterationTraits<T>::IsEnd(*maybe_next);
+        {
+          auto guard = state->mutex.Lock();
+          if (!finished) {
+            if (state->waiting_jobs.empty()) {
+              state->delivered_jobs.push_back(std::make_shared<DeliveredJob>(
+                  state->active_subscriptions[index], *maybe_next, index));
+            } else {
+              sink = std::move(*state->waiting_jobs.front());
+              state->waiting_jobs.pop_front();
+            }
+          }
+        }
+      } else {
+        finished = true;
+      }
+      if (finished) {
+        state->source().AddCallback(OuterCallback{state, index});
+      } else if (sink.is_valid()) {
+        sink.MarkFinished(*maybe_next);
+        state->active_subscriptions[index]().AddCallback(*this);
+      }
+    }
+    std::shared_ptr<State> state;
+    std::size_t index;
+  };
+
+  struct OuterCallback {
+    void operator()(const Result<AsyncGenerator<T>>& maybe_next) {
+      bool should_purge = false;
+      bool should_continue = false;
+      {
+        auto guard = state->mutex.Lock();
+        if (!maybe_next.ok() || 
IterationTraits<AsyncGenerator<T>>::IsEnd(*maybe_next)) {
+          state->source_exhausted = true;
+          if (--state->num_active_subscriptions == 0) {
+            state->finished = true;
+            should_purge = true;
+          }
+        } else {
+          state->active_subscriptions[index] = *maybe_next;
+          should_continue = true;
+        }
+      }
+      if (should_continue) {
+        (*maybe_next)().AddCallback(InnerCallback{state, index});
+      } else if (should_purge) {
+        // At this point state->finished has been marked true so no one else
+        // will be interacting with waiting_jobs and we can iterate outside 
lock
+        while (!state->waiting_jobs.empty()) {
+          state->waiting_jobs.front()->MarkFinished(IterationTraits<T>::End());
+          state->waiting_jobs.pop_front();
+        }
+      }
+    }
+    std::shared_ptr<State> state;
+    std::size_t index;
+  };
+
+  std::shared_ptr<State> state_;
+};
+
+/// \brief Creates a generator that takes in a stream of generators and pulls 
from up to
+/// max_subscriptions at a time
 ///
-/// This generator is not async-reentrant
-template <typename T, typename V>
-AsyncGenerator<V> MakeAsyncGenerator(AsyncGenerator<T> generator,
-                                     Transformer<T, V> transformer) {
-  return TransformingGenerator<T, V>(generator, transformer);
+/// Note: This is the equivalent of Rx::MergeMap.  This may deliver items out 
of
+/// sequence. For example, items from the third AsyncGenerator generated by 
the source
+/// may be emitted before some items from the first AsyncGenerator generated 
by the
+/// source.
+///
+/// This generator expects source to be async-reentrant regardless of whether 
this
+/// generator is async-reentrant or not (unless max_readahead is 1)
+/// This generator will not pull from the individual subscriptions 
reentrantly.  Add
+/// readahead to the individual subscriptions if that is desired.
+/// This generator is async-reentrant
+///
+/// This generator may queue up to max_readahead instances of T
+template <typename T>
+AsyncGenerator<T> MakeMergeMapGenerator(AsyncGenerator<AsyncGenerator<T>> 
source,
+                                        int max_readahead) {
+  return MergeMapGenerator<T>(std::move(source), max_readahead);

Review comment:
       Max subscriptions.  I fixed up the naming.  Maybe it's not the best name 
as subscriptions comes from Rx where there are actual "subscriptions" but the 
concept is pretty much the same.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to