westonpace commented on a change in pull request #9643:
URL: https://github.com/apache/arrow/pull/9643#discussion_r593082909
##########
File path: cpp/src/arrow/util/iterator_test.cc
##########
@@ -570,8 +616,48 @@ TEST(ReadaheadIterator, NextError) {
// --------------------------------------------------------------------
// Asynchronous iterator tests
+template <typename T>
+class ReentrantChecker {
+ public:
+ explicit ReentrantChecker(AsyncGenerator<T> source)
+ : state_(std::make_shared<State>(std::move(source))) {}
+
+ Future<T> operator()() {
+ if (state_->in.load()) {
+ state_->valid.store(false);
+ }
+ state_->in.store(true);
+ auto result = state_->source();
+ return result.Then(Callback{state_});
+ }
+
+ void AssertValid() {
Review comment:
I did something a little different but similar. It gives the
flexibility of the latter approach but still uses RAII.
##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -21,23 +21,58 @@
#include "arrow/util/functional.h"
#include "arrow/util/future.h"
#include "arrow/util/iterator.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/mutex.h"
#include "arrow/util/optional.h"
+#include "arrow/util/queue.h"
#include "arrow/util/thread_pool.h"
namespace arrow {
+/*
Review comment:
Done.
##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -21,23 +21,58 @@
#include "arrow/util/functional.h"
#include "arrow/util/future.h"
#include "arrow/util/iterator.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/mutex.h"
#include "arrow/util/optional.h"
+#include "arrow/util/queue.h"
#include "arrow/util/thread_pool.h"
namespace arrow {
+/*
+The methods in this file create, modify, and utilize AsyncGenerator which is
an iterator
+of futures. This allows an asynchronous source (like file input) to be run
through a
+pipeline in the same way that iterators can be used to create pipelined
workflows.
+
+In order to support pipeline parallelism we introduce the concept of
asynchronous
+reentrancy. This is different than synchronous reentrancy. With synchronous
code a
+function is reentrant if the function can be called again while a previous
call to that
+function is still running. Unless otherwise called out none of these
generators are
Review comment:
Done.
##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -21,23 +21,58 @@
#include "arrow/util/functional.h"
#include "arrow/util/future.h"
#include "arrow/util/iterator.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/mutex.h"
#include "arrow/util/optional.h"
+#include "arrow/util/queue.h"
#include "arrow/util/thread_pool.h"
namespace arrow {
+/*
+The methods in this file create, modify, and utilize AsyncGenerator which is
an iterator
+of futures. This allows an asynchronous source (like file input) to be run
through a
+pipeline in the same way that iterators can be used to create pipelined
workflows.
+
+In order to support pipeline parallelism we introduce the concept of
asynchronous
+reentrancy. This is different than synchronous reentrancy. With synchronous
code a
+function is reentrant if the function can be called again while a previous
call to that
+function is still running. Unless otherwise called out none of these
generators are
+synchronously reentrant. Care should be taken to avoid calling them in such a
way (and
+the utilities Visit/Collect/Await take care to do this).
+
+Asynchronous reentrancy on the other hand means the function is called again
before the
+future returned by the function completes (but after the call to get the future
+completes). Some of these generators are async-reentrant while others (e.g.
those that
Review comment:
Done.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]