westonpace commented on code in PR #13143: URL: https://github.com/apache/arrow/pull/13143#discussion_r918299723
########## cpp/src/arrow/compute/exec/exec_plan.h: ########## @@ -61,6 +61,63 @@ class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> { return out; } + /// \brief Returns the index of the current thread. + size_t GetThreadIndex(); + /// \brief Returns the maximum number of threads that the plan could use. + /// + /// GetThreadIndex will always return something less than this, so it is safe to + /// e.g. make an array of thread-locals off this. + size_t max_concurrency() const; + + // The below API interfaces with the scheduler to add tasks to the task group. Tasks + // should be added sparingly! Prefer just doing the work immediately rather than adding + // a task for it. Tasks are used in pipeline breakers that may output many more rows + // than they received (such as a full outer join). + // + // + /// \brief Adds a future to the plan's task group. + /// + /// \param fut The future to add + /// + /// Use this when interfacing with anything that returns a future (such as IO), but + /// prefer ScheduleTask/StartTaskGroup inside of + /// ExecNodes. + Status AddFuture(Future<> fut); + + /// \brief Adds a single function as a task to the plan's task group. + /// + /// \param fn The task to run. Takes no arguments and returns a Status. + Status ScheduleTask(std::function<Status()> fn); + + /// \brief Adds a single function as a task to the plan's task group. Review Comment: ```suggestion /// \brief Add a single function as a task to the plan's task group. ``` ########## cpp/src/arrow/compute/exec/exec_plan.h: ########## @@ -61,6 +61,63 @@ class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> { return out; } + /// \brief Returns the index of the current thread. + size_t GetThreadIndex(); + /// \brief Returns the maximum number of threads that the plan could use. + /// + /// GetThreadIndex will always return something less than this, so it is safe to + /// e.g. make an array of thread-locals off this. + size_t max_concurrency() const; + + // The below API interfaces with the scheduler to add tasks to the task group. Tasks + // should be added sparingly! Prefer just doing the work immediately rather than adding + // a task for it. Tasks are used in pipeline breakers that may output many more rows + // than they received (such as a full outer join). + // + // + /// \brief Adds a future to the plan's task group. + /// + /// \param fut The future to add + /// + /// Use this when interfacing with anything that returns a future (such as IO), but + /// prefer ScheduleTask/StartTaskGroup inside of + /// ExecNodes. + Status AddFuture(Future<> fut); + + /// \brief Adds a single function as a task to the plan's task group. Review Comment: ```suggestion /// \brief Add a single function as a task to the plan's task group. ``` ########## cpp/src/arrow/compute/exec/exec_plan.h: ########## @@ -61,6 +61,63 @@ class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> { return out; } + /// \brief Returns the index of the current thread. + size_t GetThreadIndex(); + /// \brief Returns the maximum number of threads that the plan could use. + /// + /// GetThreadIndex will always return something less than this, so it is safe to + /// e.g. make an array of thread-locals off this. + size_t max_concurrency() const; + + // The below API interfaces with the scheduler to add tasks to the task group. Tasks Review Comment: Can you add this as part of the `///` comment block? Just put it after `ExecNodes.` ########## cpp/src/arrow/compute/exec/exec_plan.h: ########## @@ -61,6 +61,63 @@ class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> { return out; } + /// \brief Returns the index of the current thread. + size_t GetThreadIndex(); + /// \brief Returns the maximum number of threads that the plan could use. + /// + /// GetThreadIndex will always return something less than this, so it is safe to + /// e.g. make an array of thread-locals off this. + size_t max_concurrency() const; + + // The below API interfaces with the scheduler to add tasks to the task group. Tasks + // should be added sparingly! Prefer just doing the work immediately rather than adding + // a task for it. Tasks are used in pipeline breakers that may output many more rows + // than they received (such as a full outer join). + // + // + /// \brief Adds a future to the plan's task group. + /// + /// \param fut The future to add + /// + /// Use this when interfacing with anything that returns a future (such as IO), but + /// prefer ScheduleTask/StartTaskGroup inside of + /// ExecNodes. + Status AddFuture(Future<> fut); + + /// \brief Adds a single function as a task to the plan's task group. + /// + /// \param fn The task to run. Takes no arguments and returns a Status. + Status ScheduleTask(std::function<Status()> fn); + + /// \brief Adds a single function as a task to the plan's task group. + /// + /// \param fn The task to run. Takes the thread index and returns a Status. + Status ScheduleTask(std::function<Status(size_t)> fn); + // Register/Start TaskGroup is a way of performing a "Parallel For" pattern: + // - The task function takes the thread index and the index of the task + // - The on_finished function takes the thread index + // Returns an integer ID that will be used to reference the task group in + // StartTaskGroup. At runtime, call StartTaskGroup with the ID and the number of times + // you'd like the task to be executed. The need to register a task group before use will + // be removed after we rewrite the scheduler. + /// \brief Registers a "parallel for" task group with the scheduler Review Comment: ```suggestion /// \brief Register a "parallel for" task group with the scheduler ``` ########## cpp/src/arrow/compute/exec/source_node.cc: ########## @@ -196,17 +186,16 @@ struct SourceNode : ExecNode { void StopProducing() override { std::unique_lock<std::mutex> lock(mutex_); stop_requested_ = true; + if (!started_) finished_.MarkFinished(); Review Comment: If `started_` is false wouldn't that have to mean that someone called `StopProducing` before they called `StartProducing`? What kind of situation are you trying to guard against here? ########## cpp/src/arrow/compute/exec/exec_plan.h: ########## @@ -61,6 +61,63 @@ class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> { return out; } + /// \brief Returns the index of the current thread. + size_t GetThreadIndex(); + /// \brief Returns the maximum number of threads that the plan could use. + /// + /// GetThreadIndex will always return something less than this, so it is safe to + /// e.g. make an array of thread-locals off this. + size_t max_concurrency() const; + + // The below API interfaces with the scheduler to add tasks to the task group. Tasks + // should be added sparingly! Prefer just doing the work immediately rather than adding + // a task for it. Tasks are used in pipeline breakers that may output many more rows + // than they received (such as a full outer join). + // + // + /// \brief Adds a future to the plan's task group. + /// + /// \param fut The future to add + /// + /// Use this when interfacing with anything that returns a future (such as IO), but + /// prefer ScheduleTask/StartTaskGroup inside of + /// ExecNodes. + Status AddFuture(Future<> fut); + + /// \brief Adds a single function as a task to the plan's task group. + /// + /// \param fn The task to run. Takes no arguments and returns a Status. + Status ScheduleTask(std::function<Status()> fn); + + /// \brief Adds a single function as a task to the plan's task group. + /// + /// \param fn The task to run. Takes the thread index and returns a Status. + Status ScheduleTask(std::function<Status(size_t)> fn); Review Comment: Do these have the same caveats as the future-task adding overload (e.g. use sparingly, use for connecting with asynchronous code)? If so, we probably shouldn't repeat ourselves but maybe should reference the primary overload. ########## cpp/src/arrow/compute/exec/exec_plan.h: ########## @@ -61,6 +61,63 @@ class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> { return out; } + /// \brief Returns the index of the current thread. + size_t GetThreadIndex(); + /// \brief Returns the maximum number of threads that the plan could use. + /// + /// GetThreadIndex will always return something less than this, so it is safe to + /// e.g. make an array of thread-locals off this. + size_t max_concurrency() const; + + // The below API interfaces with the scheduler to add tasks to the task group. Tasks + // should be added sparingly! Prefer just doing the work immediately rather than adding + // a task for it. Tasks are used in pipeline breakers that may output many more rows + // than they received (such as a full outer join). + // + // + /// \brief Adds a future to the plan's task group. Review Comment: ```suggestion /// \brief Add a future to the plan's task group. ``` ########## cpp/src/arrow/compute/exec/exec_plan.h: ########## @@ -61,6 +61,63 @@ class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> { return out; } + /// \brief Returns the index of the current thread. + size_t GetThreadIndex(); + /// \brief Returns the maximum number of threads that the plan could use. + /// + /// GetThreadIndex will always return something less than this, so it is safe to + /// e.g. make an array of thread-locals off this. + size_t max_concurrency() const; + + // The below API interfaces with the scheduler to add tasks to the task group. Tasks + // should be added sparingly! Prefer just doing the work immediately rather than adding + // a task for it. Tasks are used in pipeline breakers that may output many more rows + // than they received (such as a full outer join). + // + // + /// \brief Adds a future to the plan's task group. + /// + /// \param fut The future to add + /// + /// Use this when interfacing with anything that returns a future (such as IO), but + /// prefer ScheduleTask/StartTaskGroup inside of + /// ExecNodes. + Status AddFuture(Future<> fut); + + /// \brief Adds a single function as a task to the plan's task group. + /// + /// \param fn The task to run. Takes no arguments and returns a Status. + Status ScheduleTask(std::function<Status()> fn); + + /// \brief Adds a single function as a task to the plan's task group. + /// + /// \param fn The task to run. Takes the thread index and returns a Status. + Status ScheduleTask(std::function<Status(size_t)> fn); + // Register/Start TaskGroup is a way of performing a "Parallel For" pattern: + // - The task function takes the thread index and the index of the task + // - The on_finished function takes the thread index + // Returns an integer ID that will be used to reference the task group in + // StartTaskGroup. At runtime, call StartTaskGroup with the ID and the number of times + // you'd like the task to be executed. The need to register a task group before use will + // be removed after we rewrite the scheduler. + /// \brief Registers a "parallel for" task group with the scheduler + /// + /// \param task The function implementing the task. Takes the thread_index and + /// the task index. + /// \param on_finished The function that gets run once all tasks have been completed. + /// Takes + /// the thread_index. + /// + /// Must be called inside of ExecNode::Init. + int RegisterTaskGroup(std::function<Status(size_t, int64_t)> task, + std::function<Status(size_t)> on_finished); + + /// \brief Starts the task group with the specified ID Review Comment: ```suggestion /// \brief Start the task group with the specified ID ``` ########## cpp/src/arrow/compute/exec/exec_plan.h: ########## @@ -61,6 +61,63 @@ class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> { return out; } + /// \brief Returns the index of the current thread. + size_t GetThreadIndex(); + /// \brief Returns the maximum number of threads that the plan could use. + /// + /// GetThreadIndex will always return something less than this, so it is safe to + /// e.g. make an array of thread-locals off this. + size_t max_concurrency() const; + + // The below API interfaces with the scheduler to add tasks to the task group. Tasks + // should be added sparingly! Prefer just doing the work immediately rather than adding + // a task for it. Tasks are used in pipeline breakers that may output many more rows + // than they received (such as a full outer join). + // + // + /// \brief Adds a future to the plan's task group. + /// + /// \param fut The future to add + /// + /// Use this when interfacing with anything that returns a future (such as IO), but + /// prefer ScheduleTask/StartTaskGroup inside of + /// ExecNodes. + Status AddFuture(Future<> fut); + + /// \brief Adds a single function as a task to the plan's task group. + /// + /// \param fn The task to run. Takes no arguments and returns a Status. + Status ScheduleTask(std::function<Status()> fn); + + /// \brief Adds a single function as a task to the plan's task group. + /// + /// \param fn The task to run. Takes the thread index and returns a Status. + Status ScheduleTask(std::function<Status(size_t)> fn); + // Register/Start TaskGroup is a way of performing a "Parallel For" pattern: + // - The task function takes the thread index and the index of the task + // - The on_finished function takes the thread index + // Returns an integer ID that will be used to reference the task group in + // StartTaskGroup. At runtime, call StartTaskGroup with the ID and the number of times + // you'd like the task to be executed. The need to register a task group before use will + // be removed after we rewrite the scheduler. + /// \brief Registers a "parallel for" task group with the scheduler + /// + /// \param task The function implementing the task. Takes the thread_index and + /// the task index. + /// \param on_finished The function that gets run once all tasks have been completed. + /// Takes + /// the thread_index. + /// + /// Must be called inside of ExecNode::Init. Review Comment: Can you fix the formatting here? ########## cpp/src/arrow/compute/exec/exec_plan.h: ########## @@ -61,6 +61,63 @@ class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> { return out; } + /// \brief Returns the index of the current thread. + size_t GetThreadIndex(); + /// \brief Returns the maximum number of threads that the plan could use. + /// + /// GetThreadIndex will always return something less than this, so it is safe to + /// e.g. make an array of thread-locals off this. + size_t max_concurrency() const; + + // The below API interfaces with the scheduler to add tasks to the task group. Tasks + // should be added sparingly! Prefer just doing the work immediately rather than adding + // a task for it. Tasks are used in pipeline breakers that may output many more rows + // than they received (such as a full outer join). + // + // + /// \brief Adds a future to the plan's task group. + /// + /// \param fut The future to add + /// + /// Use this when interfacing with anything that returns a future (such as IO), but + /// prefer ScheduleTask/StartTaskGroup inside of + /// ExecNodes. Review Comment: ```suggestion /// prefer ScheduleTask/StartTaskGroup inside of ExecNodes. ``` ########## cpp/src/arrow/compute/exec/exec_plan.h: ########## @@ -61,6 +61,63 @@ class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> { return out; } + /// \brief Returns the index of the current thread. + size_t GetThreadIndex(); + /// \brief Returns the maximum number of threads that the plan could use. + /// + /// GetThreadIndex will always return something less than this, so it is safe to + /// e.g. make an array of thread-locals off this. + size_t max_concurrency() const; + + // The below API interfaces with the scheduler to add tasks to the task group. Tasks + // should be added sparingly! Prefer just doing the work immediately rather than adding + // a task for it. Tasks are used in pipeline breakers that may output many more rows + // than they received (such as a full outer join). + // + // + /// \brief Adds a future to the plan's task group. + /// + /// \param fut The future to add + /// + /// Use this when interfacing with anything that returns a future (such as IO), but + /// prefer ScheduleTask/StartTaskGroup inside of + /// ExecNodes. + Status AddFuture(Future<> fut); + + /// \brief Adds a single function as a task to the plan's task group. + /// + /// \param fn The task to run. Takes no arguments and returns a Status. + Status ScheduleTask(std::function<Status()> fn); + + /// \brief Adds a single function as a task to the plan's task group. + /// + /// \param fn The task to run. Takes the thread index and returns a Status. + Status ScheduleTask(std::function<Status(size_t)> fn); + // Register/Start TaskGroup is a way of performing a "Parallel For" pattern: + // - The task function takes the thread index and the index of the task + // - The on_finished function takes the thread index + // Returns an integer ID that will be used to reference the task group in + // StartTaskGroup. At runtime, call StartTaskGroup with the ID and the number of times + // you'd like the task to be executed. The need to register a task group before use will + // be removed after we rewrite the scheduler. + /// \brief Registers a "parallel for" task group with the scheduler + /// + /// \param task The function implementing the task. Takes the thread_index and + /// the task index. + /// \param on_finished The function that gets run once all tasks have been completed. + /// Takes + /// the thread_index. + /// + /// Must be called inside of ExecNode::Init. + int RegisterTaskGroup(std::function<Status(size_t, int64_t)> task, + std::function<Status(size_t)> on_finished); + + /// \brief Starts the task group with the specified ID + /// + /// \param task_group_id The ID of the task group to run + /// \param num_tasks The number of times to run the task + Status StartTaskGroup(int task_group_id, int64_t num_tasks); Review Comment: Can this be called more than once per registered task group (I wouldn't think so but it might not hurt to mention this explicitly) ########## cpp/src/arrow/compute/exec/hash_join_node.cc: ########## @@ -894,25 +898,42 @@ class HashJoinNode : public ExecNode { } } - Status PrepareToProduce() override { + Status Init() override { + RETURN_NOT_OK(ExecNode::Init()); bool use_sync_execution = !(plan_->exec_context()->executor()); // TODO(ARROW-15732) // Each side of join might have an IO thread being called from. Once this is fixed // we will change it back to just the CPU's thread pool capacity. size_t num_threads = (GetCpuThreadPoolCapacity() + io::GetIOThreadPoolCapacity() + 1); Review Comment: Should this be a call to max_concurrency? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org