westonpace commented on a change in pull request #9892:
URL: https://github.com/apache/arrow/pull/9892#discussion_r608219924
##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -262,5 +321,40 @@ class ARROW_EXPORT ThreadPool : public Executor {
// Return the process-global thread pool for CPU-bound tasks.
ARROW_EXPORT ThreadPool* GetCpuThreadPool();
+/// \brief Runs a potentially async operation serially
+///
+/// This means that all CPU tasks spawned by the operation will run on the
thread calling
+/// this method and the future will be completed before this call finishes.
+template <typename T = arrow::detail::Empty>
+Result<T> RunSerially(FnOnce<Future<T>(Executor*)> get_future) {
+ struct InnerCallback {
+ void operator()(const Result<T> res) {
std::move(finish_signal)(std::move(res)); }
+ SerialExecutor::FinishSignal<T> finish_signal;
+ };
+ struct OuterCallback {
+ Status operator()(Executor* executor, SerialExecutor::FinishSignal<T>
finish_signal) {
+ auto fut = std::move(get_future)(executor);
+ fut.AddCallback(InnerCallback{std::move(finish_signal)});
+ return Status::OK();
+ }
+ FnOnce<Future<T>(Executor*)> get_future;
+ };
+ return
SerialExecutor::RunInSerialExecutor<T>(OuterCallback{std::move(get_future)});
+}
+
+/// \brief Potentially runs an async operation serially if use_threads is true
+/// \see RunSerially
+///
+/// If `use_threads` is false then the operation is run normally but this
method will
+/// still block the calling thread until the operation has completed.
+template <typename T>
+Result<T> RunSynchronously(FnOnce<Future<T>(Executor*)> get_future, bool
use_threads) {
Review comment:
I'm not sure I follow. The purpose of this code is to run asynchronous
code synchronously in the calling thread. So the function cannot return until
future has been completed.
For example, consider an asynchronous operation that reads a file
asynchronously from the filesystem and parses it into a schema. First...
* The calling thread sets up the read (this happens in the calling thread
even if run asynchronously) and then launches the read with io_executor->Submit.
* The I/O thread performs the read. Meanwhile, the calling thread waits for
the read to finish. This is different than standard asynchronous flow. In
standard asynchronous flow the calling thread would have returned a future at
this point.
* The I/O thread finishes the read and adds a follow-up task to the CPU
executor. In this case that is our newly created thread pool. The calling
thread wakes up and processes the follow-up task. In normal asynchronous flow
this follow-up task would have been submitted to the CPU thread pool.
* The calling thread finishes the follow-up task and calls the finish
signal. The calling thread then returns all the way out of the
`RunSynchronously` call. In normal asynchronous flow we would have returned
from that call long ago and the CPU thread would just be marking a future
complete.
However, there are some other choices we can make here.
* We could require the I/O executor be replaced with our newly created
serial executor as well. This way we don't use any I/O threads OR CPU threads
(this is how the previous implementation operated anyways). In that case we
don't need the finish condition because we know we are done when we run out of
tasks in the thread pool. We could also modify the SpawnReal function to
return an invalid status if the current calling thread was not the original
calling thread (this would safeguard against mistakenly using the I/O executor).
* We could keep track (perhaps in some thread local) of a counter when we
schedule tasks on other executors and then decrement this "outgoing task count"
when those tasks finish. In that case we don't need the finish condition
because we know we are finished when `outgoing task count == 0 &&
task_queue.empty()`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]