westonpace commented on a change in pull request #9892:
URL: https://github.com/apache/arrow/pull/9892#discussion_r608865322
##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -189,6 +190,64 @@ class ARROW_EXPORT Executor {
StopCallback&&) = 0;
};
+/// \brief An executor implementation that runs all tasks on a single thread
using an
+/// event loop.
+///
+/// Note: Any sort of nested parallelism will deadlock this executor.
Blocking waits are
+/// fine but if one task needs to wait for another task it must be expressed
as an
+/// asynchronous continuation.
+class ARROW_EXPORT SerialExecutor : public Executor {
+ public:
+ template <typename T = ::arrow::detail::Empty>
+ using FinishSignal = internal::FnOnce<void(const Result<T>&)>;
+ template <typename T = ::arrow::detail::Empty>
+ using Scheduler = internal::FnOnce<Status(Executor*, FinishSignal<T>)>;
+
+ SerialExecutor();
+ ~SerialExecutor();
+
+ int GetCapacity() override { return 1; };
+ Status SpawnReal(TaskHints hints, FnOnce<void()> task, StopToken,
+ StopCallback&&) override;
+
+ /// \brief Runs the scheduler and any scheduled tasks
+ ///
+ /// The scheduler must either return an invalid status or call the finish
signal.
+ /// Failure to do this will result in a deadlock. For this reason it is
preferable (if
+ /// possible) to use the helper methods (below) RunSynchronously/RunSerially
which
+ /// delegates the responsiblity onto a Future producer's existing
responsibility to
+ /// always mark a future finished (which can someday be aided by
ARROW-12207).
+ template <typename T>
+ static Result<T> RunInSerialExecutor(Scheduler<T> initial_task) {
+ auto serial_executor = std::make_shared<SerialExecutor>();
+ return serial_executor->Run<T>(std::move(initial_task));
+ }
+
+ private:
+ // State uses mutex
+ struct State;
+ std::unique_ptr<State> state_;
+
+ template <typename T>
+ Result<T> Run(Scheduler<T> initial_task) {
+ bool finished = false;
+ Result<T> final_result;
+ FinishSignal<T> finish_signal = [&](const Result<T>& res) {
+ // The finish signal could be called from an external executor callback
so need to
+ // protect it with the mutex. Also, final_result must be set before the
call to
+ // MarkFinished here to ensure we don't try and return it before it is
finished
+ // setting
+ final_result = res;
+ MarkFinished(finished);
+ };
+ ARROW_RETURN_NOT_OK(std::move(initial_task)(this,
std::move(finish_signal)));
+ RunLoop(finished);
+ return final_result;
+ }
+ void RunLoop(const bool& finished);
+ void MarkFinished(bool& finished);
Review comment:
I moved `finished` into the internal state. The executor is not meant
to be restartable at all and should not exist outside of the scope of
`RunInSerialExecutor`. I've moved the constructor for `SerialExecutor` to
private to better reflect this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]