westonpace commented on a change in pull request #9643:
URL: https://github.com/apache/arrow/pull/9643#discussion_r589800180



##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -235,29 +505,213 @@ class ReadaheadGenerator {
 /// The source generator must be async-reentrant
 ///
 /// This generator itself is async-reentrant.
+///
+/// This generator may queue up to max_readahead instances of T
 template <typename T>
 AsyncGenerator<T> MakeReadaheadGenerator(AsyncGenerator<T> source_generator,
                                          int max_readahead) {
   return ReadaheadGenerator<T>(std::move(source_generator), max_readahead);
 }
 
-/// \brief Transforms an async generator using a transformer function 
returning a new
-/// AsyncGenerator
+/// \brief Creates a generator that will yield finished futures from a vector
 ///
-/// The transform function here behaves exactly the same as the transform 
function in
-/// MakeTransformedIterator and you can safely use the same transform function 
to
-/// transform both synchronous and asynchronous streams.
+/// This generator is async-reentrant
+template <typename T>
+AsyncGenerator<T> MakeVectorGenerator(std::vector<T> vec) {
+  struct State {
+    explicit State(std::vector<T> vec_) : vec(std::move(vec_)), vec_idx(0) {}
+
+    std::vector<T> vec;
+    std::atomic<std::size_t> vec_idx;
+  };
+
+  auto state = std::make_shared<State>(std::move(vec));
+  return [state]() {
+    auto idx = state->vec_idx.fetch_add(1);
+    if (idx >= state->vec.size()) {
+      return Future<T>::MakeFinished(IterationTraits<T>::End());
+    }
+    return Future<T>::MakeFinished(state->vec[idx]);
+  };
+}
+
+template <typename T>
+class MergeMapGenerator {
+ public:
+  explicit MergeMapGenerator(AsyncGenerator<AsyncGenerator<T>> source,
+                             int max_subscriptions)
+      : state_(std::make_shared<State>(std::move(source), max_subscriptions)) 
{}
+
+  Future<T> operator()() {
+    Future<T> waiting_future;
+    std::shared_ptr<DeliveredJob> delivered_job;
+    {
+      auto guard = state_->mutex.Lock();
+      if (!state_->delivered_jobs.empty()) {
+        delivered_job = std::move(state_->delivered_jobs.front());
+        state_->delivered_jobs.pop_front();
+      } else if (state_->finished) {
+        return IterationTraits<T>::End();
+      } else {
+        waiting_future = Future<T>::Make();
+        
state_->waiting_jobs.push_back(std::make_shared<Future<T>>(waiting_future));
+      }
+    }
+    if (delivered_job) {
+      delivered_job->deliverer().AddCallback(InnerCallback{state_, 
delivered_job->index});
+      return std::move(delivered_job->value);
+    }
+    if (state_->first) {
+      state_->first = false;
+      for (std::size_t i = 0; i < state_->active_subscriptions.size(); i++) {
+        state_->source().AddCallback(OuterCallback{state_, i});
+      }
+    }
+    return waiting_future;
+  }
+
+ private:
+  struct DeliveredJob {
+    explicit DeliveredJob(AsyncGenerator<T> deliverer_, T value_, std::size_t 
index_)
+        : deliverer(deliverer_), value(value_), index(index_) {}
+
+    AsyncGenerator<T> deliverer;
+    T value;
+    std::size_t index;
+  };
+
+  struct State {
+    State(AsyncGenerator<AsyncGenerator<T>> source, int max_subscriptions)
+        : source(std::move(source)),
+          active_subscriptions(max_subscriptions),
+          delivered_jobs(),
+          waiting_jobs(),
+          mutex(),
+          first(true),
+          source_exhausted(false),
+          finished(false),
+          num_active_subscriptions(max_subscriptions) {}
+
+    AsyncGenerator<AsyncGenerator<T>> source;
+    // active_subscriptions and delivered_jobs will be bounded by 
max_subscriptions
+    std::vector<AsyncGenerator<T>> active_subscriptions;
+    std::deque<std::shared_ptr<DeliveredJob>> delivered_jobs;
+    // waiting_jobs is unbounded, reentrant pulls (e.g. AddReadahead) will 
provide the
+    // backpressure
+    std::deque<std::shared_ptr<Future<T>>> waiting_jobs;
+    util::Mutex mutex;
+    bool first;
+    bool source_exhausted;
+    bool finished;
+    int num_active_subscriptions;
+  };
+
+  struct InnerCallback {
+    void operator()(const Result<T>& maybe_next) {
+      bool finished = false;
+      Future<T> sink;
+      if (maybe_next.ok()) {
+        finished = IterationTraits<T>::IsEnd(*maybe_next);
+        {
+          auto guard = state->mutex.Lock();
+          if (!finished) {
+            if (state->waiting_jobs.empty()) {
+              state->delivered_jobs.push_back(std::make_shared<DeliveredJob>(
+                  state->active_subscriptions[index], *maybe_next, index));
+            } else {
+              sink = std::move(*state->waiting_jobs.front());
+              state->waiting_jobs.pop_front();
+            }
+          }
+        }
+      } else {
+        finished = true;
+      }
+      if (finished) {
+        state->source().AddCallback(OuterCallback{state, index});
+      } else if (sink.is_valid()) {
+        sink.MarkFinished(*maybe_next);
+        state->active_subscriptions[index]().AddCallback(*this);
+      }
+    }
+    std::shared_ptr<State> state;
+    std::size_t index;
+  };
+
+  struct OuterCallback {
+    void operator()(const Result<AsyncGenerator<T>>& maybe_next) {
+      bool should_purge = false;
+      bool should_continue = false;
+      {
+        auto guard = state->mutex.Lock();
+        if (!maybe_next.ok() || 
IterationTraits<AsyncGenerator<T>>::IsEnd(*maybe_next)) {
+          state->source_exhausted = true;
+          if (--state->num_active_subscriptions == 0) {
+            state->finished = true;
+            should_purge = true;
+          }
+        } else {
+          state->active_subscriptions[index] = *maybe_next;
+          should_continue = true;
+        }
+      }
+      if (should_continue) {
+        (*maybe_next)().AddCallback(InnerCallback{state, index});
+      } else if (should_purge) {
+        // At this point state->finished has been marked true so no one else
+        // will be interacting with waiting_jobs and we can iterate outside 
lock
+        while (!state->waiting_jobs.empty()) {
+          state->waiting_jobs.front()->MarkFinished(IterationTraits<T>::End());
+          state->waiting_jobs.pop_front();
+        }
+      }
+    }
+    std::shared_ptr<State> state;
+    std::size_t index;
+  };
+
+  std::shared_ptr<State> state_;
+};
+
+/// \brief Creates a generator that takes in a stream of generators and pulls 
from up to
+/// max_subscriptions at a time
 ///
-/// This generator is not async-reentrant
-template <typename T, typename V>
-AsyncGenerator<V> MakeAsyncGenerator(AsyncGenerator<T> generator,
-                                     Transformer<T, V> transformer) {
-  return TransformingGenerator<T, V>(generator, transformer);
+/// Note: This is the equivalent of Rx::MergeMap.  This may deliver items out 
of
+/// sequence. For example, items from the third AsyncGenerator generated by 
the source
+/// may be emitted before some items from the first AsyncGenerator generated 
by the
+/// source.
+///
+/// This generator expects source to be async-reentrant regardless of whether 
this
+/// generator is async-reentrant or not (unless max_readahead is 1)
+/// This generator will not pull from the individual subscriptions 
reentrantly.  Add
+/// readahead to the individual subscriptions if that is desired.
+/// This generator is async-reentrant
+///
+/// This generator may queue up to max_readahead instances of T
+template <typename T>
+AsyncGenerator<T> MakeMergeMapGenerator(AsyncGenerator<AsyncGenerator<T>> 
source,
+                                        int max_readahead) {
+  return MergeMapGenerator<T>(std::move(source), max_readahead);
 }
 
-/// \brief Transfers execution of the generator onto the given executor
+/// \brief Creates a generator that takes in a stream of generators and pulls 
from each
+/// one in sequence.
+///
+/// Note: This is the equivalent of Rx::ConcatMap.  One could conceivably 
create the
+/// equivalent of Rx::MergeMap by pulling from `source` before the last emitted
+/// generator is exhausted.  Then ConcatMap folds into the special case of 
MergeMap with
+/// one max subscription (which is how its handled in Rx)  TODO(ARROW-11800).  
Deferring
+/// that now for simplicity.

Review comment:
       Good catch.  I removed the comment.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to