westonpace commented on a change in pull request #9643: URL: https://github.com/apache/arrow/pull/9643#discussion_r593082909
########## File path: cpp/src/arrow/util/iterator_test.cc ########## @@ -570,8 +616,48 @@ TEST(ReadaheadIterator, NextError) { // -------------------------------------------------------------------- // Asynchronous iterator tests +template <typename T> +class ReentrantChecker { + public: + explicit ReentrantChecker(AsyncGenerator<T> source) + : state_(std::make_shared<State>(std::move(source))) {} + + Future<T> operator()() { + if (state_->in.load()) { + state_->valid.store(false); + } + state_->in.store(true); + auto result = state_->source(); + return result.Then(Callback{state_}); + } + + void AssertValid() { Review comment: I did something a little different but similar. It gives the flexibility of the latter approach but still uses RAII. ########## File path: cpp/src/arrow/util/async_generator.h ########## @@ -21,23 +21,58 @@ #include "arrow/util/functional.h" #include "arrow/util/future.h" #include "arrow/util/iterator.h" +#include "arrow/util/logging.h" +#include "arrow/util/mutex.h" #include "arrow/util/optional.h" +#include "arrow/util/queue.h" #include "arrow/util/thread_pool.h" namespace arrow { +/* Review comment: Done. ########## File path: cpp/src/arrow/util/async_generator.h ########## @@ -21,23 +21,58 @@ #include "arrow/util/functional.h" #include "arrow/util/future.h" #include "arrow/util/iterator.h" +#include "arrow/util/logging.h" +#include "arrow/util/mutex.h" #include "arrow/util/optional.h" +#include "arrow/util/queue.h" #include "arrow/util/thread_pool.h" namespace arrow { +/* +The methods in this file create, modify, and utilize AsyncGenerator which is an iterator +of futures. This allows an asynchronous source (like file input) to be run through a +pipeline in the same way that iterators can be used to create pipelined workflows. + +In order to support pipeline parallelism we introduce the concept of asynchronous +reentrancy. This is different than synchronous reentrancy. With synchronous code a +function is reentrant if the function can be called again while a previous call to that +function is still running. Unless otherwise called out none of these generators are Review comment: Done. ########## File path: cpp/src/arrow/util/async_generator.h ########## @@ -21,23 +21,58 @@ #include "arrow/util/functional.h" #include "arrow/util/future.h" #include "arrow/util/iterator.h" +#include "arrow/util/logging.h" +#include "arrow/util/mutex.h" #include "arrow/util/optional.h" +#include "arrow/util/queue.h" #include "arrow/util/thread_pool.h" namespace arrow { +/* +The methods in this file create, modify, and utilize AsyncGenerator which is an iterator +of futures. This allows an asynchronous source (like file input) to be run through a +pipeline in the same way that iterators can be used to create pipelined workflows. + +In order to support pipeline parallelism we introduce the concept of asynchronous +reentrancy. This is different than synchronous reentrancy. With synchronous code a +function is reentrant if the function can be called again while a previous call to that +function is still running. Unless otherwise called out none of these generators are +synchronously reentrant. Care should be taken to avoid calling them in such a way (and +the utilities Visit/Collect/Await take care to do this). + +Asynchronous reentrancy on the other hand means the function is called again before the +future returned by the function completes (but after the call to get the future +completes). Some of these generators are async-reentrant while others (e.g. those that Review comment: Done. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org