westonpace commented on a change in pull request #9892:
URL: https://github.com/apache/arrow/pull/9892#discussion_r608243602



##########
File path: cpp/src/arrow/util/thread_pool.cc
##########
@@ -44,6 +44,63 @@ struct Task {
 
 }  // namespace
 
+struct SerialExecutor::State {
+  std::queue<Task> task_queue;
+  std::mutex mutex;
+  std::condition_variable wait_for_tasks;
+};
+
+SerialExecutor::SerialExecutor() : state_(new State()) {}
+SerialExecutor::~SerialExecutor() {}
+
+Status SerialExecutor::SpawnReal(TaskHints hints, FnOnce<void()> task,
+                                 StopToken stop_token, StopCallback&& 
stop_callback) {
+  // The serial task queue is truly serial (no mutex needed) but SpawnReal may 
be called
+  // from external threads (e.g. when transferring back from blocking I/O 
threads) so a
+  // mutex is needed
+  {
+    std::lock_guard<std::mutex> lg(state_->mutex);
+    state_->task_queue.push(
+        Task{std::move(task), std::move(stop_token), 
std::move(stop_callback)});
+  }
+  state_->wait_for_tasks.notify_one();
+  return Status::OK();
+}
+
+void SerialExecutor::MarkFinished(bool& finished) {
+  {
+    std::lock_guard<std::mutex> lk(state_->mutex);
+    finished = true;
+  }
+  state_->wait_for_tasks.notify_one();
+}
+
+void SerialExecutor::RunLoop(const bool& finished) {
+  std::unique_lock<std::mutex> lk(state_->mutex);
+
+  while (!finished) {
+    while (!state_->task_queue.empty()) {
+      Task& task = state_->task_queue.front();

Review comment:
       True, I thought I was fine since I re-lock before I pop but I suppose 
the underlying storage could be getting reallocated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to