pitrou commented on a change in pull request #9533: URL: https://github.com/apache/arrow/pull/9533#discussion_r591503004
########## File path: cpp/src/arrow/util/async_generator.h ########## @@ -177,6 +179,94 @@ class TransformingGenerator { std::shared_ptr<TransformingGeneratorState> state_; }; +template <typename T> +class SerialReadaheadGenerator { + public: + SerialReadaheadGenerator(AsyncGenerator<T> source_generator, int max_readahead) + : state_(std::make_shared<State>(std::move(source_generator), max_readahead)) {} + + Future<T> operator()() { + if (state_->first) { + // Lazy generator, need to wait for the first ask to prime the pump + state_->first = false; + auto next = state_->source(); + return next.Then(Callback{state_}); + } + + // This generator is not async-reentrant. We won't be called until the last + // future finished so we know there is something in the queue + auto finished = state_->finished.load(); + if (finished && state_->readahead_queue.IsEmpty()) { + return Future<T>::MakeFinished(IterationTraits<T>::End()); + } + + auto next_ptr = state_->readahead_queue.FrontPtr(); + auto next = std::move(**next_ptr); + state_->readahead_queue.PopFront(); + + auto last_available = state_->spaces_available.fetch_add(1); + if (last_available == 0 && !finished) { + // Reader idled out, we need to restart it + ARROW_RETURN_NOT_OK(state_->Pump(state_)); + } + return next; + } + + private: + struct State { + State(AsyncGenerator<T> source_, int max_readahead) + : first(true), + source(std::move(source_)), + finished(false), + spaces_available(max_readahead), + readahead_queue(max_readahead) {} + + Status Pump(const std::shared_ptr<State>& self) { + // Can't do readahead_queue.write(source().Then(Callback{self})) because then the + // callback might run immediately and add itself to the queue before this gets added + // to the queue messing up the order + auto next_slot = std::make_shared<Future<T>>(); + auto written = readahead_queue.Write(next_slot); + if (!written) { + return Status::UnknownError("Could not write to readahead_queue"); + } + *next_slot = source().Then(Callback{self}); + return Status::OK(); + } + + // Only accessed by the consumer end + bool first; + // Accessed by both threads + AsyncGenerator<T> source; + std::atomic<bool> finished; + std::atomic<uint32_t> spaces_available; + util::SpscQueue<std::shared_ptr<Future<T>>> readahead_queue; + }; + + struct Callback { + Result<T> operator()(const Result<T>& maybe_next) { + if (!maybe_next.ok()) { + state_->finished.store(true); + return maybe_next; + } + const auto& next = *maybe_next; + if (next == IterationTraits<T>::End()) { + state_->finished = true; Review comment: Nit, but would be better to use the same convention consistently (either `= true` or `.store(true)`). ########## File path: cpp/src/arrow/util/async_generator.h ########## @@ -177,6 +179,94 @@ class TransformingGenerator { std::shared_ptr<TransformingGeneratorState> state_; }; +template <typename T> +class SerialReadaheadGenerator { + public: + SerialReadaheadGenerator(AsyncGenerator<T> source_generator, int max_readahead) + : state_(std::make_shared<State>(std::move(source_generator), max_readahead)) {} + + Future<T> operator()() { + if (state_->first) { + // Lazy generator, need to wait for the first ask to prime the pump + state_->first = false; + auto next = state_->source(); + return next.Then(Callback{state_}); + } + + // This generator is not async-reentrant. We won't be called until the last + // future finished so we know there is something in the queue + auto finished = state_->finished.load(); + if (finished && state_->readahead_queue.IsEmpty()) { + return Future<T>::MakeFinished(IterationTraits<T>::End()); + } + + auto next_ptr = state_->readahead_queue.FrontPtr(); + auto next = std::move(**next_ptr); + state_->readahead_queue.PopFront(); + + auto last_available = state_->spaces_available.fetch_add(1); + if (last_available == 0 && !finished) { + // Reader idled out, we need to restart it + ARROW_RETURN_NOT_OK(state_->Pump(state_)); + } + return next; + } + + private: + struct State { + State(AsyncGenerator<T> source_, int max_readahead) + : first(true), + source(std::move(source_)), + finished(false), + spaces_available(max_readahead), + readahead_queue(max_readahead) {} + + Status Pump(const std::shared_ptr<State>& self) { + // Can't do readahead_queue.write(source().Then(Callback{self})) because then the + // callback might run immediately and add itself to the queue before this gets added + // to the queue messing up the order + auto next_slot = std::make_shared<Future<T>>(); + auto written = readahead_queue.Write(next_slot); + if (!written) { + return Status::UnknownError("Could not write to readahead_queue"); + } + *next_slot = source().Then(Callback{self}); + return Status::OK(); + } + + // Only accessed by the consumer end + bool first; + // Accessed by both threads + AsyncGenerator<T> source; + std::atomic<bool> finished; + std::atomic<uint32_t> spaces_available; + util::SpscQueue<std::shared_ptr<Future<T>>> readahead_queue; + }; + + struct Callback { + Result<T> operator()(const Result<T>& maybe_next) { + if (!maybe_next.ok()) { + state_->finished.store(true); + return maybe_next; + } + const auto& next = *maybe_next; + if (next == IterationTraits<T>::End()) { + state_->finished = true; + return maybe_next; + } + auto last_available = state_->spaces_available.fetch_sub(1); + if (last_available > 1) { + ARROW_RETURN_NOT_OK(state_->Pump(state_)); + } + return maybe_next; + } + + std::shared_ptr<State> state_; Review comment: Do we want to make this a `weak_ptr`? If for some reason the `SerialReadaheadGenerator` doesn't exist anymore, we don't want to keep pumping under the hood. As a separate concern, it would be nice to start thinking whether generators like this need to take a StopToken. ########## File path: cpp/src/arrow/vendored/ProducerConsumerQueue.h ########## @@ -123,7 +123,7 @@ struct ProducerConsumerQueue { return false; } - // move the value at the front of the queue to given variable + // move (or copy) the value at the front of the queue to given variable Review comment: Note I had changed this comment because, from reading the code, it seems it would always move the value (which is a gotcha actually). Are there cases where the value wouldn't be moved? ########## File path: cpp/src/arrow/util/async_generator.h ########## @@ -177,6 +179,94 @@ class TransformingGenerator { std::shared_ptr<TransformingGeneratorState> state_; }; +template <typename T> +class SerialReadaheadGenerator { + public: + SerialReadaheadGenerator(AsyncGenerator<T> source_generator, int max_readahead) + : state_(std::make_shared<State>(std::move(source_generator), max_readahead)) {} + + Future<T> operator()() { + if (state_->first) { + // Lazy generator, need to wait for the first ask to prime the pump + state_->first = false; + auto next = state_->source(); + return next.Then(Callback{state_}); + } + + // This generator is not async-reentrant. We won't be called until the last + // future finished so we know there is something in the queue + auto finished = state_->finished.load(); + if (finished && state_->readahead_queue.IsEmpty()) { + return Future<T>::MakeFinished(IterationTraits<T>::End()); + } + + auto next_ptr = state_->readahead_queue.FrontPtr(); + auto next = std::move(**next_ptr); + state_->readahead_queue.PopFront(); + + auto last_available = state_->spaces_available.fetch_add(1); + if (last_available == 0 && !finished) { + // Reader idled out, we need to restart it + ARROW_RETURN_NOT_OK(state_->Pump(state_)); + } + return next; + } + + private: + struct State { + State(AsyncGenerator<T> source_, int max_readahead) Review comment: Nit: the naming convention here is reversed: `source_` should be the instance variable and `source` the local variable and/or parameter. ########## File path: cpp/src/arrow/util/async_generator.h ########## @@ -177,6 +179,94 @@ class TransformingGenerator { std::shared_ptr<TransformingGeneratorState> state_; }; +template <typename T> +class SerialReadaheadGenerator { + public: + SerialReadaheadGenerator(AsyncGenerator<T> source_generator, int max_readahead) + : state_(std::make_shared<State>(std::move(source_generator), max_readahead)) {} + + Future<T> operator()() { + if (state_->first) { + // Lazy generator, need to wait for the first ask to prime the pump + state_->first = false; + auto next = state_->source(); + return next.Then(Callback{state_}); + } + + // This generator is not async-reentrant. We won't be called until the last + // future finished so we know there is something in the queue + auto finished = state_->finished.load(); + if (finished && state_->readahead_queue.IsEmpty()) { + return Future<T>::MakeFinished(IterationTraits<T>::End()); + } + + auto next_ptr = state_->readahead_queue.FrontPtr(); + auto next = std::move(**next_ptr); + state_->readahead_queue.PopFront(); + + auto last_available = state_->spaces_available.fetch_add(1); + if (last_available == 0 && !finished) { + // Reader idled out, we need to restart it Review comment: When does this happen? ########## File path: cpp/src/arrow/util/async_generator.h ########## @@ -177,6 +179,94 @@ class TransformingGenerator { std::shared_ptr<TransformingGeneratorState> state_; }; +template <typename T> +class SerialReadaheadGenerator { + public: + SerialReadaheadGenerator(AsyncGenerator<T> source_generator, int max_readahead) + : state_(std::make_shared<State>(std::move(source_generator), max_readahead)) {} + + Future<T> operator()() { + if (state_->first) { + // Lazy generator, need to wait for the first ask to prime the pump + state_->first = false; + auto next = state_->source(); + return next.Then(Callback{state_}); + } + + // This generator is not async-reentrant. We won't be called until the last + // future finished so we know there is something in the queue + auto finished = state_->finished.load(); + if (finished && state_->readahead_queue.IsEmpty()) { + return Future<T>::MakeFinished(IterationTraits<T>::End()); + } + + auto next_ptr = state_->readahead_queue.FrontPtr(); + auto next = std::move(**next_ptr); + state_->readahead_queue.PopFront(); + + auto last_available = state_->spaces_available.fetch_add(1); + if (last_available == 0 && !finished) { + // Reader idled out, we need to restart it + ARROW_RETURN_NOT_OK(state_->Pump(state_)); + } + return next; + } + + private: + struct State { + State(AsyncGenerator<T> source_, int max_readahead) + : first(true), + source(std::move(source_)), + finished(false), + spaces_available(max_readahead), + readahead_queue(max_readahead) {} + + Status Pump(const std::shared_ptr<State>& self) { + // Can't do readahead_queue.write(source().Then(Callback{self})) because then the + // callback might run immediately and add itself to the queue before this gets added + // to the queue messing up the order + auto next_slot = std::make_shared<Future<T>>(); + auto written = readahead_queue.Write(next_slot); + if (!written) { + return Status::UnknownError("Could not write to readahead_queue"); Review comment: Well, doesn't this simply mean that the queue is full? Why is this an error? ########## File path: cpp/src/arrow/util/async_generator.h ########## @@ -177,6 +179,94 @@ class TransformingGenerator { std::shared_ptr<TransformingGeneratorState> state_; }; +template <typename T> +class SerialReadaheadGenerator { + public: + SerialReadaheadGenerator(AsyncGenerator<T> source_generator, int max_readahead) + : state_(std::make_shared<State>(std::move(source_generator), max_readahead)) {} + + Future<T> operator()() { + if (state_->first) { + // Lazy generator, need to wait for the first ask to prime the pump + state_->first = false; + auto next = state_->source(); + return next.Then(Callback{state_}); + } + + // This generator is not async-reentrant. We won't be called until the last + // future finished so we know there is something in the queue + auto finished = state_->finished.load(); + if (finished && state_->readahead_queue.IsEmpty()) { + return Future<T>::MakeFinished(IterationTraits<T>::End()); + } + + auto next_ptr = state_->readahead_queue.FrontPtr(); + auto next = std::move(**next_ptr); + state_->readahead_queue.PopFront(); + + auto last_available = state_->spaces_available.fetch_add(1); + if (last_available == 0 && !finished) { + // Reader idled out, we need to restart it + ARROW_RETURN_NOT_OK(state_->Pump(state_)); + } + return next; + } + + private: + struct State { + State(AsyncGenerator<T> source_, int max_readahead) + : first(true), + source(std::move(source_)), + finished(false), + spaces_available(max_readahead), + readahead_queue(max_readahead) {} + + Status Pump(const std::shared_ptr<State>& self) { + // Can't do readahead_queue.write(source().Then(Callback{self})) because then the + // callback might run immediately and add itself to the queue before this gets added + // to the queue messing up the order + auto next_slot = std::make_shared<Future<T>>(); + auto written = readahead_queue.Write(next_slot); + if (!written) { + return Status::UnknownError("Could not write to readahead_queue"); + } + *next_slot = source().Then(Callback{self}); + return Status::OK(); + } + + // Only accessed by the consumer end + bool first; + // Accessed by both threads + AsyncGenerator<T> source; + std::atomic<bool> finished; + std::atomic<uint32_t> spaces_available; + util::SpscQueue<std::shared_ptr<Future<T>>> readahead_queue; Review comment: `Future` is already basically a `shared_ptr`. Why do we need to wrap it in another one? ########## File path: cpp/src/arrow/util/iterator_test.cc ########## @@ -726,6 +730,138 @@ TEST(TestAsyncUtil, CompleteBackgroundStressTest) { } } +template <typename T> +class ReentrantChecker { + public: + explicit ReentrantChecker(AsyncGenerator<T> source) + : state_(std::make_shared<State>(std::move(source))) {} + + Future<T> operator()() { + if (state_->in.load()) { + state_->valid.store(false); + } + state_->in.store(true); + auto result = state_->source(); + return result.Then(Callback{state_}); + } + + void AssertValid() { + EXPECT_EQ(true, state_->valid.load()) + << "The generator was accessed in a reentrant manner"; + } + + private: + struct State { + explicit State(AsyncGenerator<T> source_) + : source(std::move(source_)), in(false), valid(true) {} + + AsyncGenerator<T> source; + std::atomic<bool> in; + std::atomic<bool> valid; + }; + struct Callback { + Future<T> operator()(const Result<T>& result) { + state_->in.store(false); + return result; + } + std::shared_ptr<State> state_; + }; + + std::shared_ptr<State> state_; +}; + +TEST(TestAsyncUtil, SerialReadaheadSlowProducer) { + AsyncGenerator<TestInt> it = BackgroundAsyncVectorIt({1, 2, 3, 4, 5}); + ReentrantChecker<TestInt> checker(std::move(it)); + SerialReadaheadGenerator<TestInt> serial_readahead(checker, 2); + AssertAsyncGeneratorMatch({1, 2, 3, 4, 5}, + static_cast<AsyncGenerator<TestInt>>(serial_readahead)); + checker.AssertValid(); +} + +TEST(TestAsyncUtil, SerialReadaheadSlowConsumer) { + int num_delivered = 0; + auto source = [&num_delivered]() { + if (num_delivered < 5) { + return Future<TestInt>::MakeFinished(num_delivered++); + } else { + return Future<TestInt>::MakeFinished(IterationTraits<TestInt>::End()); + } + }; + SerialReadaheadGenerator<TestInt> serial_readahead(std::move(source), 3); + SleepABit(); + ASSERT_EQ(0, num_delivered); Review comment: Hmm, this may already have been discussed, but why doesn't the readahead generator start pumping as soon as you instantiate it? ########## File path: cpp/src/arrow/util/iterator_test.cc ########## @@ -726,6 +730,138 @@ TEST(TestAsyncUtil, CompleteBackgroundStressTest) { } } +template <typename T> +class ReentrantChecker { + public: + explicit ReentrantChecker(AsyncGenerator<T> source) + : state_(std::make_shared<State>(std::move(source))) {} + + Future<T> operator()() { + if (state_->in.load()) { + state_->valid.store(false); + } + state_->in.store(true); + auto result = state_->source(); + return result.Then(Callback{state_}); + } + + void AssertValid() { + EXPECT_EQ(true, state_->valid.load()) + << "The generator was accessed in a reentrant manner"; + } + + private: + struct State { + explicit State(AsyncGenerator<T> source_) + : source(std::move(source_)), in(false), valid(true) {} + + AsyncGenerator<T> source; + std::atomic<bool> in; + std::atomic<bool> valid; + }; + struct Callback { + Future<T> operator()(const Result<T>& result) { + state_->in.store(false); + return result; + } + std::shared_ptr<State> state_; + }; + + std::shared_ptr<State> state_; +}; + +TEST(TestAsyncUtil, SerialReadaheadSlowProducer) { + AsyncGenerator<TestInt> it = BackgroundAsyncVectorIt({1, 2, 3, 4, 5}); + ReentrantChecker<TestInt> checker(std::move(it)); + SerialReadaheadGenerator<TestInt> serial_readahead(checker, 2); + AssertAsyncGeneratorMatch({1, 2, 3, 4, 5}, + static_cast<AsyncGenerator<TestInt>>(serial_readahead)); + checker.AssertValid(); +} + +TEST(TestAsyncUtil, SerialReadaheadSlowConsumer) { + int num_delivered = 0; + auto source = [&num_delivered]() { + if (num_delivered < 5) { + return Future<TestInt>::MakeFinished(num_delivered++); + } else { + return Future<TestInt>::MakeFinished(IterationTraits<TestInt>::End()); Review comment: Since this appears in various places, I think it would be nice to be able to write something like: ```c++ return AsyncGeneratorEnd<TestInt>(); ``` ########## File path: cpp/src/arrow/util/async_generator.h ########## @@ -177,6 +179,94 @@ class TransformingGenerator { std::shared_ptr<TransformingGeneratorState> state_; }; +template <typename T> +class SerialReadaheadGenerator { + public: + SerialReadaheadGenerator(AsyncGenerator<T> source_generator, int max_readahead) + : state_(std::make_shared<State>(std::move(source_generator), max_readahead)) {} + + Future<T> operator()() { + if (state_->first) { + // Lazy generator, need to wait for the first ask to prime the pump + state_->first = false; + auto next = state_->source(); + return next.Then(Callback{state_}); + } + + // This generator is not async-reentrant. We won't be called until the last + // future finished so we know there is something in the queue + auto finished = state_->finished.load(); + if (finished && state_->readahead_queue.IsEmpty()) { + return Future<T>::MakeFinished(IterationTraits<T>::End()); + } + + auto next_ptr = state_->readahead_queue.FrontPtr(); + auto next = std::move(**next_ptr); + state_->readahead_queue.PopFront(); + + auto last_available = state_->spaces_available.fetch_add(1); + if (last_available == 0 && !finished) { + // Reader idled out, we need to restart it + ARROW_RETURN_NOT_OK(state_->Pump(state_)); + } + return next; + } + + private: + struct State { + State(AsyncGenerator<T> source_, int max_readahead) + : first(true), + source(std::move(source_)), + finished(false), + spaces_available(max_readahead), + readahead_queue(max_readahead) {} + + Status Pump(const std::shared_ptr<State>& self) { + // Can't do readahead_queue.write(source().Then(Callback{self})) because then the + // callback might run immediately and add itself to the queue before this gets added + // to the queue messing up the order + auto next_slot = std::make_shared<Future<T>>(); + auto written = readahead_queue.Write(next_slot); + if (!written) { + return Status::UnknownError("Could not write to readahead_queue"); + } + *next_slot = source().Then(Callback{self}); + return Status::OK(); + } + + // Only accessed by the consumer end + bool first; + // Accessed by both threads + AsyncGenerator<T> source; + std::atomic<bool> finished; + std::atomic<uint32_t> spaces_available; Review comment: Can you add a comment explaining what this is? The name isn't very descriptive. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org