westonpace commented on a change in pull request #9892:
URL: https://github.com/apache/arrow/pull/9892#discussion_r608243632
##########
File path: cpp/src/arrow/util/thread_pool.cc
##########
@@ -44,6 +44,63 @@ struct Task {
} // namespace
+struct SerialExecutor::State {
+ std::queue<Task> task_queue;
+ std::mutex mutex;
+ std::condition_variable wait_for_tasks;
+};
+
+SerialExecutor::SerialExecutor() : state_(new State()) {}
+SerialExecutor::~SerialExecutor() {}
+
+Status SerialExecutor::SpawnReal(TaskHints hints, FnOnce<void()> task,
+ StopToken stop_token, StopCallback&&
stop_callback) {
+ // The serial task queue is truly serial (no mutex needed) but SpawnReal may
be called
+ // from external threads (e.g. when transferring back from blocking I/O
threads) so a
+ // mutex is needed
+ {
+ std::lock_guard<std::mutex> lg(state_->mutex);
+ state_->task_queue.push(
+ Task{std::move(task), std::move(stop_token),
std::move(stop_callback)});
+ }
+ state_->wait_for_tasks.notify_one();
+ return Status::OK();
+}
+
+void SerialExecutor::MarkFinished(bool& finished) {
+ {
+ std::lock_guard<std::mutex> lk(state_->mutex);
+ finished = true;
+ }
+ state_->wait_for_tasks.notify_one();
+}
+
+void SerialExecutor::RunLoop(const bool& finished) {
+ std::unique_lock<std::mutex> lk(state_->mutex);
+
+ while (!finished) {
+ while (!state_->task_queue.empty()) {
+ Task& task = state_->task_queue.front();
Review comment:
Fixed.
##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -189,6 +190,64 @@ class ARROW_EXPORT Executor {
StopCallback&&) = 0;
};
+/// \brief An executor implementation that runs all tasks on a single thread
using an
+/// event loop.
+///
+/// Note: Any sort of nested parallelism will deadlock this executor.
Blocking waits are
+/// fine but if one task needs to wait for another task it must be expressed
as an
+/// asynchronous continuation.
+class ARROW_EXPORT SerialExecutor : public Executor {
+ public:
+ template <typename T = ::arrow::detail::Empty>
+ using FinishSignal = internal::FnOnce<void(const Result<T>&)>;
+ template <typename T = ::arrow::detail::Empty>
+ using Scheduler = internal::FnOnce<Status(Executor*, FinishSignal<T>)>;
Review comment:
Renamed to TopLevelTask
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]