westonpace commented on a change in pull request #9533: URL: https://github.com/apache/arrow/pull/9533#discussion_r592268811
########## File path: cpp/src/arrow/util/iterator_test.cc ########## @@ -726,6 +730,138 @@ TEST(TestAsyncUtil, CompleteBackgroundStressTest) { } } +template <typename T> +class ReentrantChecker { + public: + explicit ReentrantChecker(AsyncGenerator<T> source) + : state_(std::make_shared<State>(std::move(source))) {} + + Future<T> operator()() { + if (state_->in.load()) { + state_->valid.store(false); + } + state_->in.store(true); + auto result = state_->source(); + return result.Then(Callback{state_}); + } + + void AssertValid() { + EXPECT_EQ(true, state_->valid.load()) + << "The generator was accessed in a reentrant manner"; + } + + private: + struct State { + explicit State(AsyncGenerator<T> source_) + : source(std::move(source_)), in(false), valid(true) {} + + AsyncGenerator<T> source; + std::atomic<bool> in; + std::atomic<bool> valid; + }; + struct Callback { + Future<T> operator()(const Result<T>& result) { + state_->in.store(false); + return result; + } + std::shared_ptr<State> state_; + }; + + std::shared_ptr<State> state_; +}; + +TEST(TestAsyncUtil, SerialReadaheadSlowProducer) { + AsyncGenerator<TestInt> it = BackgroundAsyncVectorIt({1, 2, 3, 4, 5}); + ReentrantChecker<TestInt> checker(std::move(it)); + SerialReadaheadGenerator<TestInt> serial_readahead(checker, 2); + AssertAsyncGeneratorMatch({1, 2, 3, 4, 5}, + static_cast<AsyncGenerator<TestInt>>(serial_readahead)); + checker.AssertValid(); +} + +TEST(TestAsyncUtil, SerialReadaheadSlowConsumer) { + int num_delivered = 0; + auto source = [&num_delivered]() { + if (num_delivered < 5) { + return Future<TestInt>::MakeFinished(num_delivered++); + } else { + return Future<TestInt>::MakeFinished(IterationTraits<TestInt>::End()); + } + }; + SerialReadaheadGenerator<TestInt> serial_readahead(std::move(source), 3); + SleepABit(); + ASSERT_EQ(0, num_delivered); Review comment: Keeping it lazy gives us flexibility and doesn't really cost anything. Imagine you scan a dataset and you find 100 files so you convert each file into an AsyncGenerator<RecordBatch>. You probably don't want to start reading from all 100 files at once. If you really want to start reading right away then just call the generator as soon as you create it. Creating a generator should be trivial so you shouldn't be losing any real time. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org