westonpace commented on a change in pull request #9714:
URL: https://github.com/apache/arrow/pull/9714#discussion_r594597313



##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -336,6 +345,68 @@ class ReadaheadGenerator {
   std::queue<Future<T>> readahead_queue_;
 };
 
+template <typename T>
+class PushGenerator {

Review comment:
       This would require some kind of producer running on a separate thread 
right?  Did you take a look at `BackgroundGenerator`?  It's a bit of a 
different model but intended to solve the same sort of problem.

##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -336,6 +345,68 @@ class ReadaheadGenerator {
   std::queue<Future<T>> readahead_queue_;
 };
 
+template <typename T>
+class PushGenerator {
+  struct State {
+    util::Mutex mutex;
+    std::deque<Result<T>> result_q;
+    util::optional<Future<T>> consumer_fut;
+    bool finished = false;
+  };
+
+  struct Generator {
+    const std::shared_ptr<State> state_;
+
+    Future<T> operator()() {
+      auto lock = state_->mutex.Lock();
+      assert(!state_->consumer_fut.has_value());  // Non-reentrant
+      if (!state_->result_q.empty()) {
+        auto fut = 
Future<T>::MakeFinished(std::move(state_->result_q.front()));
+        state_->result_q.pop_front();
+        return fut;
+      }
+      if (state_->finished) {
+        return AsyncGeneratorEnd<T>();
+      }
+      auto fut = Future<T>::Make();
+      state_->consumer_fut = fut;
+      return fut;
+    }
+  };
+
+ public:
+  PushGenerator() : state_(std::make_shared<State>()) {}
+
+  void Push(Result<T> result) {
+    auto lock = state_->mutex.Lock();
+    if (state_->consumer_fut.has_value()) {
+      auto fut = std::move(state_->consumer_fut.value());

Review comment:
       Is this really a move?  I think you need to do...
   ```
   auto fut = std::move(state_->consumer_fut).value();
   ```
   ...also, then you shouldn't need to reset on next line.

##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -336,6 +345,68 @@ class ReadaheadGenerator {
   std::queue<Future<T>> readahead_queue_;
 };
 
+template <typename T>
+class PushGenerator {

Review comment:
       One issue with this model is that there is no back-pressure.  In the 
other generators they have bounded queuing so there is no way a slow consumer / 
fast producer can end up filling up RAM (the producer will be stopped while the 
consumer catches up).  Depending on use this may not be a problem (e.g. I think 
you're using this for very small file info objects which all fit in memory 
easily).

##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -336,6 +345,68 @@ class ReadaheadGenerator {
   std::queue<Future<T>> readahead_queue_;
 };
 
+template <typename T>
+class PushGenerator {
+  struct State {
+    util::Mutex mutex;
+    std::deque<Result<T>> result_q;
+    util::optional<Future<T>> consumer_fut;
+    bool finished = false;
+  };
+
+  struct Generator {
+    const std::shared_ptr<State> state_;
+
+    Future<T> operator()() {
+      auto lock = state_->mutex.Lock();
+      assert(!state_->consumer_fut.has_value());  // Non-reentrant
+      if (!state_->result_q.empty()) {
+        auto fut = 
Future<T>::MakeFinished(std::move(state_->result_q.front()));
+        state_->result_q.pop_front();
+        return fut;
+      }
+      if (state_->finished) {
+        return AsyncGeneratorEnd<T>();
+      }
+      auto fut = Future<T>::Make();
+      state_->consumer_fut = fut;
+      return fut;
+    }
+  };
+
+ public:
+  PushGenerator() : state_(std::make_shared<State>()) {}
+
+  void Push(Result<T> result) {

Review comment:
       You could maybe check if `result` is not ok and mark finished to true 
(potentially even clearing out the result q) and then on future pushes simply 
return immediately if finished is true.  I can see where you question on Zulip 
came from now.  The only disadvantage I can see to this approach is potentially 
wasted memory keeping blocks around that are invalid.

##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -16,12 +16,16 @@
 // under the License.
 
 #pragma once
+
+#include <cassert>

Review comment:
       Is this used?

##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -336,6 +345,68 @@ class ReadaheadGenerator {
   std::queue<Future<T>> readahead_queue_;
 };
 
+template <typename T>
+class PushGenerator {
+  struct State {
+    util::Mutex mutex;
+    std::deque<Result<T>> result_q;
+    util::optional<Future<T>> consumer_fut;
+    bool finished = false;
+  };
+
+  struct Generator {
+    const std::shared_ptr<State> state_;
+
+    Future<T> operator()() {
+      auto lock = state_->mutex.Lock();
+      assert(!state_->consumer_fut.has_value());  // Non-reentrant
+      if (!state_->result_q.empty()) {
+        auto fut = 
Future<T>::MakeFinished(std::move(state_->result_q.front()));
+        state_->result_q.pop_front();
+        return fut;
+      }
+      if (state_->finished) {
+        return AsyncGeneratorEnd<T>();
+      }
+      auto fut = Future<T>::Make();
+      state_->consumer_fut = fut;
+      return fut;
+    }
+  };
+
+ public:
+  PushGenerator() : state_(std::make_shared<State>()) {}
+
+  void Push(Result<T> result) {
+    auto lock = state_->mutex.Lock();
+    if (state_->consumer_fut.has_value()) {
+      auto fut = std::move(state_->consumer_fut.value());
+      state_->consumer_fut.reset();
+      lock.Unlock();  // unlock before potentially invoking a callback
+      fut.MarkFinished(std::move(result));
+      return;
+    }
+    state_->result_q.push_back(std::move(result));
+  }
+
+  void Close() {
+    auto lock = state_->mutex.Lock();
+    state_->finished = true;
+    if (state_->consumer_fut.has_value()) {
+      auto fut = std::move(state_->consumer_fut.value());
+      state_->consumer_fut.reset();
+      lock.Unlock();  // unlock before potentially invoking a callback
+      fut.MarkFinished(IterationTraits<T>::End());
+    }
+  }
+
+  /// Return a non-reentrant async generator
+  Generator generator() { return Generator{state_}; }

Review comment:
       All of the other generators have a corresponding `MakeXyz` function.  
It's not strictly necessary but can keep the public API to simply 
`std::function`.  Wouldn't work here though because the producer needs access 
to the generator.  However, could you add a statement about queuing.  
Specifically that it might queue an unbounded number of results depending on 
how fast the producer ran.

##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -336,6 +345,68 @@ class ReadaheadGenerator {
   std::queue<Future<T>> readahead_queue_;
 };
 
+template <typename T>
+class PushGenerator {
+  struct State {
+    util::Mutex mutex;
+    std::deque<Result<T>> result_q;
+    util::optional<Future<T>> consumer_fut;
+    bool finished = false;
+  };
+
+  struct Generator {
+    const std::shared_ptr<State> state_;
+
+    Future<T> operator()() {
+      auto lock = state_->mutex.Lock();
+      assert(!state_->consumer_fut.has_value());  // Non-reentrant
+      if (!state_->result_q.empty()) {
+        auto fut = 
Future<T>::MakeFinished(std::move(state_->result_q.front()));
+        state_->result_q.pop_front();
+        return fut;
+      }
+      if (state_->finished) {
+        return AsyncGeneratorEnd<T>();
+      }
+      auto fut = Future<T>::Make();
+      state_->consumer_fut = fut;
+      return fut;
+    }
+  };
+
+ public:
+  PushGenerator() : state_(std::make_shared<State>()) {}
+
+  void Push(Result<T> result) {
+    auto lock = state_->mutex.Lock();
+    if (state_->consumer_fut.has_value()) {
+      auto fut = std::move(state_->consumer_fut.value());
+      state_->consumer_fut.reset();
+      lock.Unlock();  // unlock before potentially invoking a callback
+      fut.MarkFinished(std::move(result));
+      return;
+    }
+    state_->result_q.push_back(std::move(result));
+  }
+
+  void Close() {
+    auto lock = state_->mutex.Lock();
+    state_->finished = true;
+    if (state_->consumer_fut.has_value()) {
+      auto fut = std::move(state_->consumer_fut.value());

Review comment:
       Same as above `std::move(state_->consumer_fut).value()`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to