westonpace commented on code in PR #13143:
URL: https://github.com/apache/arrow/pull/13143#discussion_r918299723


##########
cpp/src/arrow/compute/exec/exec_plan.h:
##########
@@ -61,6 +61,63 @@ class ARROW_EXPORT ExecPlan : public 
std::enable_shared_from_this<ExecPlan> {
     return out;
   }
 
+  /// \brief Returns the index of the current thread.
+  size_t GetThreadIndex();
+  /// \brief Returns the maximum number of threads that the plan could use.
+  ///
+  /// GetThreadIndex will always return something less than this, so it is 
safe to
+  /// e.g. make an array of thread-locals off this.
+  size_t max_concurrency() const;
+
+  // The below API interfaces with the scheduler to add tasks to the task 
group. Tasks
+  // should be added sparingly! Prefer just doing the work immediately rather 
than adding
+  // a task for it. Tasks are used in pipeline breakers that may output many 
more rows
+  // than they received (such as a full outer join).
+  //
+  //
+  /// \brief Adds a future to the plan's task group.
+  ///
+  /// \param fut The future to add
+  ///
+  /// Use this when interfacing with anything that returns a future (such as 
IO), but
+  /// prefer ScheduleTask/StartTaskGroup inside of
+  /// ExecNodes.
+  Status AddFuture(Future<> fut);
+
+  /// \brief Adds a single function as a task to the plan's task group.
+  ///
+  /// \param fn The task to run. Takes no arguments and returns a Status.
+  Status ScheduleTask(std::function<Status()> fn);
+
+  /// \brief Adds a single function as a task to the plan's task group.

Review Comment:
   ```suggestion
     /// \brief Add a single function as a task to the plan's task group.
   ```



##########
cpp/src/arrow/compute/exec/exec_plan.h:
##########
@@ -61,6 +61,63 @@ class ARROW_EXPORT ExecPlan : public 
std::enable_shared_from_this<ExecPlan> {
     return out;
   }
 
+  /// \brief Returns the index of the current thread.
+  size_t GetThreadIndex();
+  /// \brief Returns the maximum number of threads that the plan could use.
+  ///
+  /// GetThreadIndex will always return something less than this, so it is 
safe to
+  /// e.g. make an array of thread-locals off this.
+  size_t max_concurrency() const;
+
+  // The below API interfaces with the scheduler to add tasks to the task 
group. Tasks
+  // should be added sparingly! Prefer just doing the work immediately rather 
than adding
+  // a task for it. Tasks are used in pipeline breakers that may output many 
more rows
+  // than they received (such as a full outer join).
+  //
+  //
+  /// \brief Adds a future to the plan's task group.
+  ///
+  /// \param fut The future to add
+  ///
+  /// Use this when interfacing with anything that returns a future (such as 
IO), but
+  /// prefer ScheduleTask/StartTaskGroup inside of
+  /// ExecNodes.
+  Status AddFuture(Future<> fut);
+
+  /// \brief Adds a single function as a task to the plan's task group.

Review Comment:
   ```suggestion
     /// \brief Add a single function as a task to the plan's task group.
   ```



##########
cpp/src/arrow/compute/exec/exec_plan.h:
##########
@@ -61,6 +61,63 @@ class ARROW_EXPORT ExecPlan : public 
std::enable_shared_from_this<ExecPlan> {
     return out;
   }
 
+  /// \brief Returns the index of the current thread.
+  size_t GetThreadIndex();
+  /// \brief Returns the maximum number of threads that the plan could use.
+  ///
+  /// GetThreadIndex will always return something less than this, so it is 
safe to
+  /// e.g. make an array of thread-locals off this.
+  size_t max_concurrency() const;
+
+  // The below API interfaces with the scheduler to add tasks to the task 
group. Tasks

Review Comment:
   Can you add this as part of the `///` comment block?  Just put it after 
`ExecNodes.`



##########
cpp/src/arrow/compute/exec/exec_plan.h:
##########
@@ -61,6 +61,63 @@ class ARROW_EXPORT ExecPlan : public 
std::enable_shared_from_this<ExecPlan> {
     return out;
   }
 
+  /// \brief Returns the index of the current thread.
+  size_t GetThreadIndex();
+  /// \brief Returns the maximum number of threads that the plan could use.
+  ///
+  /// GetThreadIndex will always return something less than this, so it is 
safe to
+  /// e.g. make an array of thread-locals off this.
+  size_t max_concurrency() const;
+
+  // The below API interfaces with the scheduler to add tasks to the task 
group. Tasks
+  // should be added sparingly! Prefer just doing the work immediately rather 
than adding
+  // a task for it. Tasks are used in pipeline breakers that may output many 
more rows
+  // than they received (such as a full outer join).
+  //
+  //
+  /// \brief Adds a future to the plan's task group.
+  ///
+  /// \param fut The future to add
+  ///
+  /// Use this when interfacing with anything that returns a future (such as 
IO), but
+  /// prefer ScheduleTask/StartTaskGroup inside of
+  /// ExecNodes.
+  Status AddFuture(Future<> fut);
+
+  /// \brief Adds a single function as a task to the plan's task group.
+  ///
+  /// \param fn The task to run. Takes no arguments and returns a Status.
+  Status ScheduleTask(std::function<Status()> fn);
+
+  /// \brief Adds a single function as a task to the plan's task group.
+  ///
+  /// \param fn The task to run. Takes the thread index and returns a Status.
+  Status ScheduleTask(std::function<Status(size_t)> fn);
+  // Register/Start TaskGroup is a way of performing a "Parallel For" pattern:
+  // - The task function takes the thread index and the index of the task
+  // - The on_finished function takes the thread index
+  // Returns an integer ID that will be used to reference the task group in
+  // StartTaskGroup. At runtime, call StartTaskGroup with the ID and the 
number of times
+  // you'd like the task to be executed. The need to register a task group 
before use will
+  // be removed after we rewrite the scheduler.
+  /// \brief Registers a "parallel for" task group with the scheduler

Review Comment:
   ```suggestion
     /// \brief Register a "parallel for" task group with the scheduler
   ```



##########
cpp/src/arrow/compute/exec/source_node.cc:
##########
@@ -196,17 +186,16 @@ struct SourceNode : ExecNode {
   void StopProducing() override {
     std::unique_lock<std::mutex> lock(mutex_);
     stop_requested_ = true;
+    if (!started_) finished_.MarkFinished();

Review Comment:
   If `started_` is false wouldn't that have to mean that someone called 
`StopProducing` before they called `StartProducing`?  What kind of situation 
are you trying to guard against here?



##########
cpp/src/arrow/compute/exec/exec_plan.h:
##########
@@ -61,6 +61,63 @@ class ARROW_EXPORT ExecPlan : public 
std::enable_shared_from_this<ExecPlan> {
     return out;
   }
 
+  /// \brief Returns the index of the current thread.
+  size_t GetThreadIndex();
+  /// \brief Returns the maximum number of threads that the plan could use.
+  ///
+  /// GetThreadIndex will always return something less than this, so it is 
safe to
+  /// e.g. make an array of thread-locals off this.
+  size_t max_concurrency() const;
+
+  // The below API interfaces with the scheduler to add tasks to the task 
group. Tasks
+  // should be added sparingly! Prefer just doing the work immediately rather 
than adding
+  // a task for it. Tasks are used in pipeline breakers that may output many 
more rows
+  // than they received (such as a full outer join).
+  //
+  //
+  /// \brief Adds a future to the plan's task group.
+  ///
+  /// \param fut The future to add
+  ///
+  /// Use this when interfacing with anything that returns a future (such as 
IO), but
+  /// prefer ScheduleTask/StartTaskGroup inside of
+  /// ExecNodes.
+  Status AddFuture(Future<> fut);
+
+  /// \brief Adds a single function as a task to the plan's task group.
+  ///
+  /// \param fn The task to run. Takes no arguments and returns a Status.
+  Status ScheduleTask(std::function<Status()> fn);
+
+  /// \brief Adds a single function as a task to the plan's task group.
+  ///
+  /// \param fn The task to run. Takes the thread index and returns a Status.
+  Status ScheduleTask(std::function<Status(size_t)> fn);

Review Comment:
   Do these have the same caveats as the future-task adding overload (e.g. use 
sparingly, use for connecting with asynchronous code)?  If so, we probably 
shouldn't repeat ourselves but maybe should reference the primary overload.



##########
cpp/src/arrow/compute/exec/exec_plan.h:
##########
@@ -61,6 +61,63 @@ class ARROW_EXPORT ExecPlan : public 
std::enable_shared_from_this<ExecPlan> {
     return out;
   }
 
+  /// \brief Returns the index of the current thread.
+  size_t GetThreadIndex();
+  /// \brief Returns the maximum number of threads that the plan could use.
+  ///
+  /// GetThreadIndex will always return something less than this, so it is 
safe to
+  /// e.g. make an array of thread-locals off this.
+  size_t max_concurrency() const;
+
+  // The below API interfaces with the scheduler to add tasks to the task 
group. Tasks
+  // should be added sparingly! Prefer just doing the work immediately rather 
than adding
+  // a task for it. Tasks are used in pipeline breakers that may output many 
more rows
+  // than they received (such as a full outer join).
+  //
+  //
+  /// \brief Adds a future to the plan's task group.

Review Comment:
   ```suggestion
     /// \brief Add a future to the plan's task group.
   ```
   



##########
cpp/src/arrow/compute/exec/exec_plan.h:
##########
@@ -61,6 +61,63 @@ class ARROW_EXPORT ExecPlan : public 
std::enable_shared_from_this<ExecPlan> {
     return out;
   }
 
+  /// \brief Returns the index of the current thread.
+  size_t GetThreadIndex();
+  /// \brief Returns the maximum number of threads that the plan could use.
+  ///
+  /// GetThreadIndex will always return something less than this, so it is 
safe to
+  /// e.g. make an array of thread-locals off this.
+  size_t max_concurrency() const;
+
+  // The below API interfaces with the scheduler to add tasks to the task 
group. Tasks
+  // should be added sparingly! Prefer just doing the work immediately rather 
than adding
+  // a task for it. Tasks are used in pipeline breakers that may output many 
more rows
+  // than they received (such as a full outer join).
+  //
+  //
+  /// \brief Adds a future to the plan's task group.
+  ///
+  /// \param fut The future to add
+  ///
+  /// Use this when interfacing with anything that returns a future (such as 
IO), but
+  /// prefer ScheduleTask/StartTaskGroup inside of
+  /// ExecNodes.
+  Status AddFuture(Future<> fut);
+
+  /// \brief Adds a single function as a task to the plan's task group.
+  ///
+  /// \param fn The task to run. Takes no arguments and returns a Status.
+  Status ScheduleTask(std::function<Status()> fn);
+
+  /// \brief Adds a single function as a task to the plan's task group.
+  ///
+  /// \param fn The task to run. Takes the thread index and returns a Status.
+  Status ScheduleTask(std::function<Status(size_t)> fn);
+  // Register/Start TaskGroup is a way of performing a "Parallel For" pattern:
+  // - The task function takes the thread index and the index of the task
+  // - The on_finished function takes the thread index
+  // Returns an integer ID that will be used to reference the task group in
+  // StartTaskGroup. At runtime, call StartTaskGroup with the ID and the 
number of times
+  // you'd like the task to be executed. The need to register a task group 
before use will
+  // be removed after we rewrite the scheduler.
+  /// \brief Registers a "parallel for" task group with the scheduler
+  ///
+  /// \param task The function implementing the task. Takes the thread_index 
and
+  ///             the task index.
+  /// \param on_finished The function that gets run once all tasks have been 
completed.
+  /// Takes
+  ///                    the thread_index.
+  ///
+  /// Must be called inside of ExecNode::Init.
+  int RegisterTaskGroup(std::function<Status(size_t, int64_t)> task,
+                        std::function<Status(size_t)> on_finished);
+
+  /// \brief Starts the task group with the specified ID

Review Comment:
   ```suggestion
     /// \brief Start the task group with the specified ID
   ```



##########
cpp/src/arrow/compute/exec/exec_plan.h:
##########
@@ -61,6 +61,63 @@ class ARROW_EXPORT ExecPlan : public 
std::enable_shared_from_this<ExecPlan> {
     return out;
   }
 
+  /// \brief Returns the index of the current thread.
+  size_t GetThreadIndex();
+  /// \brief Returns the maximum number of threads that the plan could use.
+  ///
+  /// GetThreadIndex will always return something less than this, so it is 
safe to
+  /// e.g. make an array of thread-locals off this.
+  size_t max_concurrency() const;
+
+  // The below API interfaces with the scheduler to add tasks to the task 
group. Tasks
+  // should be added sparingly! Prefer just doing the work immediately rather 
than adding
+  // a task for it. Tasks are used in pipeline breakers that may output many 
more rows
+  // than they received (such as a full outer join).
+  //
+  //
+  /// \brief Adds a future to the plan's task group.
+  ///
+  /// \param fut The future to add
+  ///
+  /// Use this when interfacing with anything that returns a future (such as 
IO), but
+  /// prefer ScheduleTask/StartTaskGroup inside of
+  /// ExecNodes.
+  Status AddFuture(Future<> fut);
+
+  /// \brief Adds a single function as a task to the plan's task group.
+  ///
+  /// \param fn The task to run. Takes no arguments and returns a Status.
+  Status ScheduleTask(std::function<Status()> fn);
+
+  /// \brief Adds a single function as a task to the plan's task group.
+  ///
+  /// \param fn The task to run. Takes the thread index and returns a Status.
+  Status ScheduleTask(std::function<Status(size_t)> fn);
+  // Register/Start TaskGroup is a way of performing a "Parallel For" pattern:
+  // - The task function takes the thread index and the index of the task
+  // - The on_finished function takes the thread index
+  // Returns an integer ID that will be used to reference the task group in
+  // StartTaskGroup. At runtime, call StartTaskGroup with the ID and the 
number of times
+  // you'd like the task to be executed. The need to register a task group 
before use will
+  // be removed after we rewrite the scheduler.
+  /// \brief Registers a "parallel for" task group with the scheduler
+  ///
+  /// \param task The function implementing the task. Takes the thread_index 
and
+  ///             the task index.
+  /// \param on_finished The function that gets run once all tasks have been 
completed.
+  /// Takes
+  ///                    the thread_index.
+  ///
+  /// Must be called inside of ExecNode::Init.

Review Comment:
   Can you fix the formatting here?



##########
cpp/src/arrow/compute/exec/exec_plan.h:
##########
@@ -61,6 +61,63 @@ class ARROW_EXPORT ExecPlan : public 
std::enable_shared_from_this<ExecPlan> {
     return out;
   }
 
+  /// \brief Returns the index of the current thread.
+  size_t GetThreadIndex();
+  /// \brief Returns the maximum number of threads that the plan could use.
+  ///
+  /// GetThreadIndex will always return something less than this, so it is 
safe to
+  /// e.g. make an array of thread-locals off this.
+  size_t max_concurrency() const;
+
+  // The below API interfaces with the scheduler to add tasks to the task 
group. Tasks
+  // should be added sparingly! Prefer just doing the work immediately rather 
than adding
+  // a task for it. Tasks are used in pipeline breakers that may output many 
more rows
+  // than they received (such as a full outer join).
+  //
+  //
+  /// \brief Adds a future to the plan's task group.
+  ///
+  /// \param fut The future to add
+  ///
+  /// Use this when interfacing with anything that returns a future (such as 
IO), but
+  /// prefer ScheduleTask/StartTaskGroup inside of
+  /// ExecNodes.

Review Comment:
   ```suggestion
     /// prefer ScheduleTask/StartTaskGroup inside of ExecNodes.
   ```



##########
cpp/src/arrow/compute/exec/exec_plan.h:
##########
@@ -61,6 +61,63 @@ class ARROW_EXPORT ExecPlan : public 
std::enable_shared_from_this<ExecPlan> {
     return out;
   }
 
+  /// \brief Returns the index of the current thread.
+  size_t GetThreadIndex();
+  /// \brief Returns the maximum number of threads that the plan could use.
+  ///
+  /// GetThreadIndex will always return something less than this, so it is 
safe to
+  /// e.g. make an array of thread-locals off this.
+  size_t max_concurrency() const;
+
+  // The below API interfaces with the scheduler to add tasks to the task 
group. Tasks
+  // should be added sparingly! Prefer just doing the work immediately rather 
than adding
+  // a task for it. Tasks are used in pipeline breakers that may output many 
more rows
+  // than they received (such as a full outer join).
+  //
+  //
+  /// \brief Adds a future to the plan's task group.
+  ///
+  /// \param fut The future to add
+  ///
+  /// Use this when interfacing with anything that returns a future (such as 
IO), but
+  /// prefer ScheduleTask/StartTaskGroup inside of
+  /// ExecNodes.
+  Status AddFuture(Future<> fut);
+
+  /// \brief Adds a single function as a task to the plan's task group.
+  ///
+  /// \param fn The task to run. Takes no arguments and returns a Status.
+  Status ScheduleTask(std::function<Status()> fn);
+
+  /// \brief Adds a single function as a task to the plan's task group.
+  ///
+  /// \param fn The task to run. Takes the thread index and returns a Status.
+  Status ScheduleTask(std::function<Status(size_t)> fn);
+  // Register/Start TaskGroup is a way of performing a "Parallel For" pattern:
+  // - The task function takes the thread index and the index of the task
+  // - The on_finished function takes the thread index
+  // Returns an integer ID that will be used to reference the task group in
+  // StartTaskGroup. At runtime, call StartTaskGroup with the ID and the 
number of times
+  // you'd like the task to be executed. The need to register a task group 
before use will
+  // be removed after we rewrite the scheduler.
+  /// \brief Registers a "parallel for" task group with the scheduler
+  ///
+  /// \param task The function implementing the task. Takes the thread_index 
and
+  ///             the task index.
+  /// \param on_finished The function that gets run once all tasks have been 
completed.
+  /// Takes
+  ///                    the thread_index.
+  ///
+  /// Must be called inside of ExecNode::Init.
+  int RegisterTaskGroup(std::function<Status(size_t, int64_t)> task,
+                        std::function<Status(size_t)> on_finished);
+
+  /// \brief Starts the task group with the specified ID
+  ///
+  /// \param task_group_id The ID  of the task group to run
+  /// \param num_tasks The number of times to run the task
+  Status StartTaskGroup(int task_group_id, int64_t num_tasks);

Review Comment:
   Can this be called more than once per registered task group (I wouldn't 
think so but it might not hurt to mention this explicitly)



##########
cpp/src/arrow/compute/exec/hash_join_node.cc:
##########
@@ -894,25 +898,42 @@ class HashJoinNode : public ExecNode {
     }
   }
 
-  Status PrepareToProduce() override {
+  Status Init() override {
+    RETURN_NOT_OK(ExecNode::Init());
     bool use_sync_execution = !(plan_->exec_context()->executor());
     // TODO(ARROW-15732)
     // Each side of join might have an IO thread being called from. Once this 
is fixed
     // we will change it back to just the CPU's thread pool capacity.
     size_t num_threads = (GetCpuThreadPoolCapacity() + 
io::GetIOThreadPoolCapacity() + 1);

Review Comment:
   Should this be a call to max_concurrency?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to