westonpace commented on a change in pull request #9643:
URL: https://github.com/apache/arrow/pull/9643#discussion_r593116560



##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -235,29 +541,209 @@ class ReadaheadGenerator {
 /// The source generator must be async-reentrant
 ///
 /// This generator itself is async-reentrant.
+///
+/// This generator may queue up to max_readahead instances of T
 template <typename T>
 AsyncGenerator<T> MakeReadaheadGenerator(AsyncGenerator<T> source_generator,
                                          int max_readahead) {
   return ReadaheadGenerator<T>(std::move(source_generator), max_readahead);
 }
 
-/// \brief Transforms an async generator using a transformer function 
returning a new
-/// AsyncGenerator
+/// \brief Creates a generator that will yield finished futures from a vector
 ///
-/// The transform function here behaves exactly the same as the transform 
function in
-/// MakeTransformedIterator and you can safely use the same transform function 
to
-/// transform both synchronous and asynchronous streams.
+/// This generator is async-reentrant
+template <typename T>
+AsyncGenerator<T> MakeVectorGenerator(std::vector<T> vec) {
+  struct State {
+    explicit State(std::vector<T> vec_) : vec(std::move(vec_)), vec_idx(0) {}
+
+    std::vector<T> vec;
+    std::atomic<std::size_t> vec_idx;
+  };
+
+  auto state = std::make_shared<State>(std::move(vec));
+  return [state]() {
+    auto idx = state->vec_idx.fetch_add(1);
+    if (idx >= state->vec.size()) {
+      return Future<T>::MakeFinished(IterationTraits<T>::End());
+    }
+    return Future<T>::MakeFinished(state->vec[idx]);
+  };
+}
+
+/// \see MakeMergeMapGenerator
+template <typename T>
+class MergeMapGenerator {
+ public:
+  explicit MergeMapGenerator(AsyncGenerator<AsyncGenerator<T>> source,
+                             int max_subscriptions)
+      : state_(std::make_shared<State>(std::move(source), max_subscriptions)) 
{}
+
+  Future<T> operator()() {
+    Future<T> waiting_future;
+    std::shared_ptr<DeliveredJob> delivered_job;
+    {
+      auto guard = state_->mutex.Lock();
+      if (!state_->delivered_jobs.empty()) {
+        delivered_job = std::move(state_->delivered_jobs.front());
+        state_->delivered_jobs.pop_front();
+      } else if (state_->finished) {
+        return IterationTraits<T>::End();
+      } else {
+        waiting_future = Future<T>::Make();
+        
state_->waiting_jobs.push_back(std::make_shared<Future<T>>(waiting_future));
+      }
+    }
+    if (delivered_job) {
+      delivered_job->deliverer().AddCallback(InnerCallback{state_, 
delivered_job->index});
+      return std::move(delivered_job->value);
+    }
+    if (state_->first) {
+      state_->first = false;
+      for (std::size_t i = 0; i < state_->active_subscriptions.size(); i++) {
+        state_->source().AddCallback(OuterCallback{state_, i});
+      }
+    }
+    return waiting_future;
+  }
+
+ private:
+  struct DeliveredJob {
+    explicit DeliveredJob(AsyncGenerator<T> deliverer_, T value_, std::size_t 
index_)
+        : deliverer(deliverer_), value(value_), index(index_) {}
+
+    AsyncGenerator<T> deliverer;
+    T value;
+    std::size_t index;
+  };
+
+  struct State {
+    State(AsyncGenerator<AsyncGenerator<T>> source, int max_subscriptions)
+        : source(std::move(source)),
+          active_subscriptions(max_subscriptions),
+          delivered_jobs(),
+          waiting_jobs(),
+          mutex(),
+          first(true),
+          source_exhausted(false),
+          finished(false),
+          num_active_subscriptions(max_subscriptions) {}
+
+    AsyncGenerator<AsyncGenerator<T>> source;
+    // active_subscriptions and delivered_jobs will be bounded by 
max_subscriptions
+    std::vector<AsyncGenerator<T>> active_subscriptions;
+    std::deque<std::shared_ptr<DeliveredJob>> delivered_jobs;
+    // waiting_jobs is unbounded, reentrant pulls (e.g. AddReadahead) will 
provide the
+    // backpressure
+    std::deque<std::shared_ptr<Future<T>>> waiting_jobs;
+    util::Mutex mutex;
+    bool first;
+    bool source_exhausted;
+    bool finished;
+    int num_active_subscriptions;
+  };
+
+  struct InnerCallback {
+    void operator()(const Result<T>& maybe_next) {
+      bool finished = false;
+      Future<T> sink;
+      if (maybe_next.ok()) {
+        finished = IterationTraits<T>::IsEnd(*maybe_next);
+        {
+          auto guard = state->mutex.Lock();
+          if (!finished) {
+            if (state->waiting_jobs.empty()) {
+              state->delivered_jobs.push_back(std::make_shared<DeliveredJob>(
+                  state->active_subscriptions[index], *maybe_next, index));
+            } else {
+              sink = std::move(*state->waiting_jobs.front());
+              state->waiting_jobs.pop_front();
+            }
+          }
+        }
+      } else {
+        finished = true;
+      }
+      if (finished) {
+        state->source().AddCallback(OuterCallback{state, index});
+      } else if (sink.is_valid()) {
+        sink.MarkFinished(*maybe_next);
+        state->active_subscriptions[index]().AddCallback(*this);
+      }
+    }
+    std::shared_ptr<State> state;
+    std::size_t index;
+  };
+
+  struct OuterCallback {
+    void operator()(const Result<AsyncGenerator<T>>& maybe_next) {
+      bool should_purge = false;
+      bool should_continue = false;
+      {
+        auto guard = state->mutex.Lock();
+        if (!maybe_next.ok() || 
IterationTraits<AsyncGenerator<T>>::IsEnd(*maybe_next)) {
+          state->source_exhausted = true;
+          if (--state->num_active_subscriptions == 0) {
+            state->finished = true;
+            should_purge = true;
+          }
+        } else {
+          state->active_subscriptions[index] = *maybe_next;
+          should_continue = true;
+        }
+      }
+      if (should_continue) {
+        (*maybe_next)().AddCallback(InnerCallback{state, index});
+      } else if (should_purge) {
+        // At this point state->finished has been marked true so no one else
+        // will be interacting with waiting_jobs and we can iterate outside 
lock
+        while (!state->waiting_jobs.empty()) {
+          state->waiting_jobs.front()->MarkFinished(IterationTraits<T>::End());
+          state->waiting_jobs.pop_front();
+        }
+      }
+    }
+    std::shared_ptr<State> state;
+    std::size_t index;
+  };
+
+  std::shared_ptr<State> state_;
+};
+
+/// \brief Creates a generator that takes in a stream of generators and pulls 
from up to
+/// max_subscriptions at a time
 ///
-/// This generator is not async-reentrant
-template <typename T, typename V>
-AsyncGenerator<V> MakeAsyncGenerator(AsyncGenerator<T> generator,
-                                     Transformer<T, V> transformer) {
-  return TransformingGenerator<T, V>(generator, transformer);
+/// Note: This is the equivalent of Rx::MergeMap.  This may deliver items out 
of

Review comment:
       Ok, I removed any reference to Rx and I agree the term `Map` doesn't 
really belong.  Also, the Rx version that has a mapping function is really just 
a composition of merged and map so I don't think we'd need that.
   
   So, followed your suggestion and renamed to Merged and Concatenated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to