westonpace commented on a change in pull request #11285: URL: https://github.com/apache/arrow/pull/11285#discussion_r721792741
########## File path: cpp/src/arrow/util/async_generator.h ########## @@ -1645,6 +1667,47 @@ AsyncGenerator<T> MakeCancellable(AsyncGenerator<T> source, StopToken stop_token return CancellableGenerator<T>{std::move(source), std::move(stop_token)}; } +template <typename T> +struct PauseableGenerator { + public: + PauseableGenerator(AsyncGenerator<T> source, std::shared_ptr<util::AsyncToggle> toggle) + : state_(std::make_shared<PauseableGeneratorState>(std::move(source), + std::move(toggle))) {} + + Future<T> operator()() { return (*state_)(); } + + private: + struct PauseableGeneratorState + : public std::enable_shared_from_this<PauseableGeneratorState> { + PauseableGeneratorState(AsyncGenerator<T> source, + std::shared_ptr<util::AsyncToggle> toggle) + : source_(std::move(source)), toggle_(std::move(toggle)) {} + + Future<T> operator()() { + std::shared_ptr<PauseableGeneratorState> self = this->shared_from_this(); + return toggle_->WhenOpen().Then([self] { Review comment: I cleaned it up a little bit which should eliminate most reordering but I could not escape the case: Thread A calls Open Thread D calls WhenOpen Thread B calls Close Thread C calls Open Thread D calls WhenOpen It is possible for callbacks on the second call to WhenOpen to finish before callbacks on the first call. So what I did was go ahead and change the comments for MakePauseable to state that it is not async reentrant. This should prevent any possible reordering because the second call to WhenOpen will not happen until the future returned from the first call to WhenOpen has completed. The only spot we are using MakePauseable does not need async reentrancy so this should be ok. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org