pitrou commented on a change in pull request #9892:
URL: https://github.com/apache/arrow/pull/9892#discussion_r607892026



##########
File path: cpp/src/arrow/util/thread_pool.cc
##########
@@ -44,6 +44,63 @@ struct Task {
 
 }  // namespace
 
+struct SerialExecutor::State {
+  std::queue<Task> task_queue;
+  std::mutex mutex;
+  std::condition_variable wait_for_tasks;
+};
+
+SerialExecutor::SerialExecutor() : state_(new State()) {}
+SerialExecutor::~SerialExecutor() {}
+
+Status SerialExecutor::SpawnReal(TaskHints hints, FnOnce<void()> task,
+                                 StopToken stop_token, StopCallback&& 
stop_callback) {
+  // The serial task queue is truly serial (no mutex needed) but SpawnReal may 
be called
+  // from external threads (e.g. when transferring back from blocking I/O 
threads) so a
+  // mutex is needed
+  {
+    std::lock_guard<std::mutex> lg(state_->mutex);
+    state_->task_queue.push(
+        Task{std::move(task), std::move(stop_token), 
std::move(stop_callback)});
+  }
+  state_->wait_for_tasks.notify_one();
+  return Status::OK();
+}
+
+void SerialExecutor::MarkFinished(bool& finished) {

Review comment:
       `bool* finished`, since it's mutable.

##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -189,6 +190,64 @@ class ARROW_EXPORT Executor {
                            StopCallback&&) = 0;
 };
 
+/// \brief An executor implementation that runs all tasks on a single thread 
using an
+/// event loop.
+///
+/// Note: Any sort of nested parallelism will deadlock this executor.  
Blocking waits are
+/// fine but if one task needs to wait for another task it must be expressed 
as an
+/// asynchronous continuation.
+class ARROW_EXPORT SerialExecutor : public Executor {
+ public:
+  template <typename T = ::arrow::detail::Empty>
+  using FinishSignal = internal::FnOnce<void(const Result<T>&)>;
+  template <typename T = ::arrow::detail::Empty>
+  using Scheduler = internal::FnOnce<Status(Executor*, FinishSignal<T>)>;

Review comment:
       "Scheduler" is confusing. "TopLevelTask" perhaps?

##########
File path: cpp/src/arrow/dataset/scanner_internal.h
##########
@@ -29,10 +29,12 @@
 #include "arrow/dataset/partition.h"
 #include "arrow/dataset/scanner.h"
 #include "arrow/util/logging.h"
+#include "arrow/util/thread_pool.h"

Review comment:
       Is this required?

##########
File path: cpp/src/arrow/util/thread_pool_test.cc
##########
@@ -123,6 +123,125 @@ class AddTester {
   std::vector<int> outs_;
 };
 
+template <typename T = arrow::detail::Empty>
+struct TerminalCallback {
+  void operator()() {
+    auto result = std::move(callback)();
+    std::move(finish_signal)(result);
+  }
+
+  FnOnce<Result<T>()> callback;
+  SerialExecutor::FinishSignal<T> finish_signal;
+};
+
+template <>
+struct TerminalCallback<arrow::detail::Empty> {
+  void operator()() {
+    auto st = std::move(callback)();
+    if (!st.ok()) {
+      std::move(finish_signal)(st);
+    } else {
+      std::move(finish_signal)(arrow::detail::Empty());
+    }
+  }
+
+  FnOnce<Status()> callback;
+  SerialExecutor::FinishSignal<> finish_signal;
+};
+
+TEST(TestSerialExecutor, Create) {
+  bool task_ran = false;
+  SerialExecutor::Scheduler<> task = [&](Executor* executor,
+                                         SerialExecutor::FinishSignal<> 
finish_signal) {
+    EXPECT_TRUE(executor != nullptr);
+    task_ran = true;
+    std::move(finish_signal)(arrow::detail::Empty());
+    return Status::OK();
+  };
+  ASSERT_OK(SerialExecutor::RunInSerialExecutor(std::move(task)));
+  EXPECT_TRUE(task_ran);
+}
+
+TEST(TestSerialExecutor, SpawnNested) {
+  bool nested_ran = false;
+  SerialExecutor::Scheduler<> scheduler =
+      [&](Executor* executor, SerialExecutor::FinishSignal<> finish_signal) {
+        return executor->Spawn(TerminalCallback<>{[&] {
+                                                    nested_ran = true;
+                                                    return Status::OK();
+                                                  },
+                                                  std::move(finish_signal)});
+      };
+  ASSERT_OK(SerialExecutor::RunInSerialExecutor(std::move(scheduler)));
+  EXPECT_TRUE(nested_ran);
+}
+
+TEST(TestSerialExecutor, WithResult) {
+  SerialExecutor::Scheduler<int> scheduler =
+      [&](Executor* executor, SerialExecutor::FinishSignal<int> finish_signal) 
{
+        return executor->Spawn(
+            TerminalCallback<int>{[] { return 42; }, 
std::move(finish_signal)});
+      };
+  ASSERT_OK_AND_EQ(42, 
SerialExecutor::RunInSerialExecutor(std::move(scheduler)));
+}
+
+TEST(TestSerialExecutor, StopToken) {
+  bool nested_ran = false;
+  StopSource stop_source;
+  SerialExecutor::Scheduler<> scheduler =
+      [&](Executor* executor, SerialExecutor::FinishSignal<> finish_signal) {
+        RETURN_NOT_OK(executor->Spawn([&] { nested_ran = true; }, 
stop_source.token()));
+        RETURN_NOT_OK(executor->Spawn(
+            TerminalCallback<>{[&] { return Status::OK(); }, 
std::move(finish_signal)}));
+        stop_source.RequestStop(Status::Invalid("XYZ"));
+        return Status::OK();
+      };
+  ASSERT_OK(SerialExecutor::RunInSerialExecutor(std::move(scheduler)));
+  EXPECT_FALSE(nested_ran);
+}
+
+TEST(TestSerialExecutor, ContinueAfterExternal) {
+  bool continuation_ran = false;
+  EXPECT_OK_AND_ASSIGN(auto mockIoPool, ThreadPool::Make(1));
+  SerialExecutor::Scheduler<> scheduler =
+      [&](Executor* executor, SerialExecutor::FinishSignal<> finish_signal) {
+        struct Callback {
+          void operator()(const Result<arrow::detail::Empty>& emp) {
+            continuation_ran = true;
+            std::move(finish_signal)(emp);
+          }
+          SerialExecutor::FinishSignal<> finish_signal;
+          bool& continuation_ran;
+        };
+        executor
+            ->Transfer(DeferNotOk(mockIoPool->Submit([&] {
+              SleepABit();
+              return Status::OK();
+            })))
+            .AddCallback(Callback{std::move(finish_signal), continuation_ran});
+        return Status::OK();
+      };
+  ASSERT_OK(SerialExecutor::RunInSerialExecutor(std::move(scheduler)));
+  EXPECT_TRUE(continuation_ran);
+}
+
+TEST(TestSerialExecutor, SchedulerAbort) {
+  SerialExecutor::Scheduler<> scheduler =
+      [&](Executor* executor, SerialExecutor::FinishSignal<> finish_signal) {
+        return Status::Invalid("XYZ");
+      };
+  ASSERT_RAISES(Invalid, 
SerialExecutor::RunInSerialExecutor(std::move(scheduler)));
+}
+
+TEST(TestSerialExecutor, PropagatedError) {
+  SerialExecutor::Scheduler<> scheduler =
+      [&](Executor* executor, SerialExecutor::FinishSignal<> finish_signal) {
+        std::move(finish_signal)(Status::Invalid("XYZ"));
+        return Status::OK();
+      };
+  ASSERT_RAISES(Invalid, 
SerialExecutor::RunInSerialExecutor(std::move(scheduler)));

Review comment:
       Hmm... instead of testing `RunInSerialExecutor` explicitly, can all 
these tests call `RunSynchronously`, so that you can run the same tests for 
different `use_threads`?

##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -262,5 +321,40 @@ class ARROW_EXPORT ThreadPool : public Executor {
 // Return the process-global thread pool for CPU-bound tasks.
 ARROW_EXPORT ThreadPool* GetCpuThreadPool();
 
+/// \brief Runs a potentially async operation serially
+///
+/// This means that all CPU tasks spawned by the operation will run on the 
thread calling
+/// this method and the future will be completed before this call finishes.
+template <typename T = arrow::detail::Empty>
+Result<T> RunSerially(FnOnce<Future<T>(Executor*)> get_future) {
+  struct InnerCallback {
+    void operator()(const Result<T> res) { 
std::move(finish_signal)(std::move(res)); }
+    SerialExecutor::FinishSignal<T> finish_signal;
+  };
+  struct OuterCallback {
+    Status operator()(Executor* executor, SerialExecutor::FinishSignal<T> 
finish_signal) {
+      auto fut = std::move(get_future)(executor);
+      fut.AddCallback(InnerCallback{std::move(finish_signal)});
+      return Status::OK();
+    }
+    FnOnce<Future<T>(Executor*)> get_future;
+  };
+  return 
SerialExecutor::RunInSerialExecutor<T>(OuterCallback{std::move(get_future)});
+}
+
+/// \brief Potentially runs an async operation serially if use_threads is true

Review comment:
       This isn't very descriptive. Please something such as:
   ```c++
   /// \brief Spawn a future and any dependent tasks on an executor, wait for 
completion
   ///
   /// If `use_threads` is true, the global CPU executor is used.
   /// If `use_threads` is false, a temporary SerialExecutor is used.
   /// `get_future` is called (from this thread) with the chosen executor and 
must
   /// return a future that will eventually finish. This function returns once 
the
   /// future has finished.
   ```
   

##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -262,5 +321,40 @@ class ARROW_EXPORT ThreadPool : public Executor {
 // Return the process-global thread pool for CPU-bound tasks.
 ARROW_EXPORT ThreadPool* GetCpuThreadPool();
 
+/// \brief Runs a potentially async operation serially

Review comment:
       "Run"

##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -189,6 +190,64 @@ class ARROW_EXPORT Executor {
                            StopCallback&&) = 0;
 };
 
+/// \brief An executor implementation that runs all tasks on a single thread 
using an
+/// event loop.
+///
+/// Note: Any sort of nested parallelism will deadlock this executor.  
Blocking waits are

Review comment:
       This is true of a parallel executor as well, no? 

##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -166,14 +174,15 @@ class CsvScanTask : public ScanTask {
         source_(fragment->source()) {}
 
   Result<RecordBatchIterator> Execute() override {
-    ARROW_ASSIGN_OR_RAISE(auto gen, ExecuteAsync());
+    ARROW_ASSIGN_OR_RAISE(auto gen, 
ExecuteAsync(internal::GetCpuThreadPool()));

Review comment:
       Shouldn't you lookup `ScanOptions::use_threads`? Or am I missing 
something?

##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -262,5 +321,40 @@ class ARROW_EXPORT ThreadPool : public Executor {
 // Return the process-global thread pool for CPU-bound tasks.
 ARROW_EXPORT ThreadPool* GetCpuThreadPool();
 
+/// \brief Runs a potentially async operation serially
+///
+/// This means that all CPU tasks spawned by the operation will run on the 
thread calling
+/// this method and the future will be completed before this call finishes.
+template <typename T = arrow::detail::Empty>
+Result<T> RunSerially(FnOnce<Future<T>(Executor*)> get_future) {
+  struct InnerCallback {
+    void operator()(const Result<T> res) { 
std::move(finish_signal)(std::move(res)); }
+    SerialExecutor::FinishSignal<T> finish_signal;
+  };
+  struct OuterCallback {
+    Status operator()(Executor* executor, SerialExecutor::FinishSignal<T> 
finish_signal) {
+      auto fut = std::move(get_future)(executor);
+      fut.AddCallback(InnerCallback{std::move(finish_signal)});
+      return Status::OK();
+    }
+    FnOnce<Future<T>(Executor*)> get_future;
+  };
+  return 
SerialExecutor::RunInSerialExecutor<T>(OuterCallback{std::move(get_future)});
+}
+
+/// \brief Potentially runs an async operation serially if use_threads is true
+/// \see RunSerially
+///
+/// If `use_threads` is false then the operation is run normally but this 
method will
+/// still block the calling thread until the operation has completed.
+template <typename T>
+Result<T> RunSynchronously(FnOnce<Future<T>(Executor*)> get_future, bool 
use_threads) {

Review comment:
       Why not return a `Future<T>` instead? It seems that would at least 
remove the `FinishSignal` complication.

##########
File path: cpp/src/arrow/util/thread_pool_test.cc
##########
@@ -123,6 +123,125 @@ class AddTester {
   std::vector<int> outs_;
 };
 
+template <typename T = arrow::detail::Empty>

Review comment:
       Add comments for this?

##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -32,6 +32,7 @@
 #include "arrow/memory_pool.h"
 #include "arrow/type_fwd.h"
 #include "arrow/util/async_generator.h"
+#include "arrow/util/thread_pool.h"

Review comment:
       No need to, `internal::Executor` is already declared in 
`arrow/util/type_fwd.h`.
   Also, can you try to remove `async_generator.h`?

##########
File path: cpp/src/arrow/util/thread_pool_test.cc
##########
@@ -123,6 +123,125 @@ class AddTester {
   std::vector<int> outs_;
 };
 
+template <typename T = arrow::detail::Empty>
+struct TerminalCallback {
+  void operator()() {
+    auto result = std::move(callback)();
+    std::move(finish_signal)(result);
+  }
+
+  FnOnce<Result<T>()> callback;
+  SerialExecutor::FinishSignal<T> finish_signal;
+};
+
+template <>
+struct TerminalCallback<arrow::detail::Empty> {
+  void operator()() {
+    auto st = std::move(callback)();
+    if (!st.ok()) {
+      std::move(finish_signal)(st);
+    } else {
+      std::move(finish_signal)(arrow::detail::Empty());
+    }
+  }
+
+  FnOnce<Status()> callback;
+  SerialExecutor::FinishSignal<> finish_signal;
+};
+
+TEST(TestSerialExecutor, Create) {
+  bool task_ran = false;
+  SerialExecutor::Scheduler<> task = [&](Executor* executor,
+                                         SerialExecutor::FinishSignal<> 
finish_signal) {
+    EXPECT_TRUE(executor != nullptr);

Review comment:
       `ASSERT_NE`

##########
File path: cpp/src/arrow/util/thread_pool.cc
##########
@@ -44,6 +44,63 @@ struct Task {
 
 }  // namespace
 
+struct SerialExecutor::State {
+  std::queue<Task> task_queue;
+  std::mutex mutex;
+  std::condition_variable wait_for_tasks;
+};
+
+SerialExecutor::SerialExecutor() : state_(new State()) {}
+SerialExecutor::~SerialExecutor() {}
+
+Status SerialExecutor::SpawnReal(TaskHints hints, FnOnce<void()> task,
+                                 StopToken stop_token, StopCallback&& 
stop_callback) {
+  // The serial task queue is truly serial (no mutex needed) but SpawnReal may 
be called
+  // from external threads (e.g. when transferring back from blocking I/O 
threads) so a
+  // mutex is needed
+  {
+    std::lock_guard<std::mutex> lg(state_->mutex);
+    state_->task_queue.push(
+        Task{std::move(task), std::move(stop_token), 
std::move(stop_callback)});
+  }
+  state_->wait_for_tasks.notify_one();
+  return Status::OK();
+}
+
+void SerialExecutor::MarkFinished(bool& finished) {
+  {
+    std::lock_guard<std::mutex> lk(state_->mutex);
+    finished = true;
+  }
+  state_->wait_for_tasks.notify_one();
+}
+
+void SerialExecutor::RunLoop(const bool& finished) {
+  std::unique_lock<std::mutex> lk(state_->mutex);
+
+  while (!finished) {
+    while (!state_->task_queue.empty()) {
+      Task& task = state_->task_queue.front();

Review comment:
       The `task_queue` can be mutated from another thread, and I don't think 
there are any guarantees about reference stability.

##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -189,6 +190,64 @@ class ARROW_EXPORT Executor {
                            StopCallback&&) = 0;
 };
 
+/// \brief An executor implementation that runs all tasks on a single thread 
using an
+/// event loop.
+///
+/// Note: Any sort of nested parallelism will deadlock this executor.  
Blocking waits are
+/// fine but if one task needs to wait for another task it must be expressed 
as an
+/// asynchronous continuation.
+class ARROW_EXPORT SerialExecutor : public Executor {
+ public:
+  template <typename T = ::arrow::detail::Empty>
+  using FinishSignal = internal::FnOnce<void(const Result<T>&)>;
+  template <typename T = ::arrow::detail::Empty>
+  using Scheduler = internal::FnOnce<Status(Executor*, FinishSignal<T>)>;
+
+  SerialExecutor();
+  ~SerialExecutor();
+
+  int GetCapacity() override { return 1; };
+  Status SpawnReal(TaskHints hints, FnOnce<void()> task, StopToken,
+                   StopCallback&&) override;
+
+  /// \brief Runs the scheduler and any scheduled tasks
+  ///
+  /// The scheduler must either return an invalid status or call the finish 
signal.
+  /// Failure to do this will result in a deadlock.  For this reason it is 
preferable (if
+  /// possible) to use the helper methods (below) RunSynchronously/RunSerially 
which
+  /// delegates the responsiblity onto a Future producer's existing 
responsibility to
+  /// always mark a future finished (which can someday be aided by 
ARROW-12207).
+  template <typename T>
+  static Result<T> RunInSerialExecutor(Scheduler<T> initial_task) {
+    auto serial_executor = std::make_shared<SerialExecutor>();

Review comment:
       Nit, but this can probably be `SerialExecutor serial_executor;`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to