This is an automated email from the ASF dual-hosted git repository.

westonpace pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new f74d7473a7 GH-35383: [C++] Prefer max_concurrency over executor 
capacity to avoid segmentation fault (#35384)
f74d7473a7 is described below

commit f74d7473a79f3c7b1e6221272509037c7fe421cb
Author: Weston Pace <[email protected]>
AuthorDate: Tue May 2 10:30:39 2023 -0700

    GH-35383: [C++] Prefer max_concurrency over executor capacity to avoid 
segmentation fault (#35384)
    
    ### Rationale for this change
    
    The recent change (#34912) calculates the max concurrency using 
`plan->query_context()->executor()->GetCapacity()`.  This is later used to 
initialize the kernel states.  However, this is different than what we used to 
use.  The previous method used was `plan->query_context()->max_concurrency()` 
which is slightly different(if the aggregate node IS run in parallel then we 
initialize one state for each CPU thread, one for each I/O thread, and one for 
the calling user thread).
    
    This is unfortunately a bit complicated as `max_concurrency` would not be a 
good indicator to use when determining if the plan is running in parallel or 
not.  So we need to query both properties and use them in their respective 
spots.
    
    ### What changes are included in this PR?
    
    Now, `max_concurrency` is used to figure out how many thread local states 
need to be initialized and `GetCapacity` is used to figure out if there are 
multiple CPU threads or not.
    
    ### Are these changes tested?
    
    The bug was caught by the benchmarks which is a bit concerning.  Most of 
the CI have a very small number of CPU threads and don't experience much 
concurrency and so I think we just didn't see this pattern. Or possibly, this 
pattern is only experienced in the legacy way that pyarrow launches exec plans.
    
    ### Are there any user-facing changes?
    
    No.
    * Closes: #35383
    
    Authored-by: Weston Pace <[email protected]>
    Signed-off-by: Weston Pace <[email protected]>
---
 cpp/src/arrow/acero/aggregate_node.cc | 36 ++++++++++++++++++-----------------
 1 file changed, 19 insertions(+), 17 deletions(-)

diff --git a/cpp/src/arrow/acero/aggregate_node.cc 
b/cpp/src/arrow/acero/aggregate_node.cc
index 0de59899b4..d9120ffa0d 100644
--- a/cpp/src/arrow/acero/aggregate_node.cc
+++ b/cpp/src/arrow/acero/aggregate_node.cc
@@ -291,7 +291,7 @@ class ScalarAggregateNode : public ExecNode, public 
TracedNode {
   static Result<AggregateNodeArgs<ScalarAggregateKernel>> 
MakeAggregateNodeArgs(
       const std::shared_ptr<Schema>& input_schema, const 
std::vector<FieldRef>& keys,
       const std::vector<FieldRef>& segment_keys, const std::vector<Aggregate>& 
aggs,
-      ExecContext* exec_ctx, size_t concurrency) {
+      ExecContext* exec_ctx, size_t concurrency, bool is_cpu_parallel) {
     // Copy (need to modify options pointer below)
     std::vector<Aggregate> aggregates(aggs);
     std::vector<int> segment_field_ids(segment_keys.size());
@@ -352,7 +352,7 @@ class ScalarAggregateNode : public ExecNode, public 
TracedNode {
                             function->DispatchExact(kernel_intypes[i]));
       const ScalarAggregateKernel* agg_kernel =
           static_cast<const ScalarAggregateKernel*>(kernel);
-      if (concurrency > 1 && agg_kernel->ordered) {
+      if (is_cpu_parallel && agg_kernel->ordered) {
         return Status::NotImplemented(
             "Using ordered aggregator in multiple threaded execution is not 
supported");
       }
@@ -398,13 +398,14 @@ class ScalarAggregateNode : public ExecNode, public 
TracedNode {
     auto aggregates = aggregate_options.aggregates;
     const auto& keys = aggregate_options.keys;
     const auto& segment_keys = aggregate_options.segment_keys;
-    const auto concurreny =
-        plan->query_context()->exec_context()->executor()->GetCapacity();
+    const auto concurrency = plan->query_context()->max_concurrency();
+    // We can't use concurrency == 1 because that include I/O concurrency
+    const bool is_cpu_parallel = 
plan->query_context()->executor()->GetCapacity() > 1;
 
     if (keys.size() > 0) {
       return Status::Invalid("Scalar aggregation with some key");
     }
-    if (concurreny > 1 && segment_keys.size() > 0) {
+    if (is_cpu_parallel && segment_keys.size() > 0) {
       return Status::NotImplemented("Segmented aggregation in a multi-threaded 
plan");
     }
 
@@ -412,11 +413,10 @@ class ScalarAggregateNode : public ExecNode, public 
TracedNode {
     auto exec_ctx = plan->query_context()->exec_context();
 
     ARROW_ASSIGN_OR_RAISE(
-        auto args,
-        MakeAggregateNodeArgs(input_schema, keys, segment_keys, aggregates, 
exec_ctx,
-                              /*concurrency=*/concurreny));
+        auto args, MakeAggregateNodeArgs(input_schema, keys, segment_keys, 
aggregates,
+                                         exec_ctx, concurrency, 
is_cpu_parallel));
 
-    if (concurreny > 1) {
+    if (is_cpu_parallel) {
       for (auto& kernel : args.kernels) {
         if (kernel->ordered) {
           return Status::NotImplemented(
@@ -443,6 +443,7 @@ class ScalarAggregateNode : public ExecNode, public 
TracedNode {
                            aggs_[i].options ? aggs_[i].options->ToString() : 
"<NULLPTR>"},
                           {"function.kind", std::string(kind_name()) + 
"::Consume"}});
       KernelContext batch_ctx{plan()->query_context()->exec_context()};
+      DCHECK_LT(thread_index, states_[i].size());
       batch_ctx.SetState(states_[i][thread_index].get());
 
       std::vector<ExecValue> column_values;
@@ -616,7 +617,7 @@ class GroupByNode : public ExecNode, public TracedNode {
   static Result<AggregateNodeArgs<HashAggregateKernel>> MakeAggregateNodeArgs(
       const std::shared_ptr<Schema>& input_schema, const 
std::vector<FieldRef>& keys,
       const std::vector<FieldRef>& segment_keys, const std::vector<Aggregate>& 
aggs,
-      ExecContext* ctx, const int concurrency) {
+      ExecContext* ctx, const bool is_cpu_parallel) {
     // Find input field indices for key fields
     std::vector<int> key_field_ids(keys.size());
     for (size_t i = 0; i < keys.size(); ++i) {
@@ -673,7 +674,7 @@ class GroupByNode : public ExecNode, public TracedNode {
     // Construct aggregates
     ARROW_ASSIGN_OR_RAISE(auto agg_kernels, GetKernels(ctx, aggs, 
agg_src_types));
 
-    if (concurrency > 1) {
+    if (is_cpu_parallel) {
       if (segment_keys.size() > 0) {
         return Status::NotImplemented(
             "Segmented aggregation in a multi-threaded execution context");
@@ -734,13 +735,13 @@ class GroupByNode : public ExecNode, public TracedNode {
     const auto& keys = aggregate_options.keys;
     const auto& segment_keys = aggregate_options.segment_keys;
     auto aggs = aggregate_options.aggregates;
-    auto concurrency = 
plan->query_context()->exec_context()->executor()->GetCapacity();
+    bool is_cpu_parallel = plan->query_context()->executor()->GetCapacity() > 
1;
 
     const auto& input_schema = input->output_schema();
     auto exec_ctx = plan->query_context()->exec_context();
     ARROW_ASSIGN_OR_RAISE(
         auto args, MakeAggregateNodeArgs(input_schema, keys, segment_keys, 
aggs, exec_ctx,
-                                         concurrency));
+                                         is_cpu_parallel));
 
     return input->plan()->EmplaceNode<GroupByNode>(
         input, std::move(args.output_schema), 
std::move(args.grouping_key_field_ids),
@@ -1063,14 +1064,15 @@ Result<std::shared_ptr<Schema>> MakeOutputSchema(
     const std::vector<FieldRef>& segment_keys, const std::vector<Aggregate>& 
aggregates,
     ExecContext* exec_ctx) {
   if (keys.empty()) {
-    ARROW_ASSIGN_OR_RAISE(auto args, 
ScalarAggregateNode::MakeAggregateNodeArgs(
-                                         input_schema, keys, segment_keys, 
aggregates,
-                                         exec_ctx, /*concurrency=*/1));
+    ARROW_ASSIGN_OR_RAISE(auto args,
+                          ScalarAggregateNode::MakeAggregateNodeArgs(
+                              input_schema, keys, segment_keys, aggregates, 
exec_ctx,
+                              /*concurrency=*/1, /*is_cpu_parallel=*/false));
     return std::move(args.output_schema);
   } else {
     ARROW_ASSIGN_OR_RAISE(auto args, GroupByNode::MakeAggregateNodeArgs(
                                          input_schema, keys, segment_keys, 
aggregates,
-                                         exec_ctx, /*concurrency=*/1));
+                                         exec_ctx, /*is_cpu_parallel=*/false));
     return std::move(args.output_schema);
   }
 }

Reply via email to