This is an automated email from the ASF dual-hosted git repository.
westonpace pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new f74d7473a7 GH-35383: [C++] Prefer max_concurrency over executor
capacity to avoid segmentation fault (#35384)
f74d7473a7 is described below
commit f74d7473a79f3c7b1e6221272509037c7fe421cb
Author: Weston Pace <[email protected]>
AuthorDate: Tue May 2 10:30:39 2023 -0700
GH-35383: [C++] Prefer max_concurrency over executor capacity to avoid
segmentation fault (#35384)
### Rationale for this change
The recent change (#34912) calculates the max concurrency using
`plan->query_context()->executor()->GetCapacity()`. This is later used to
initialize the kernel states. However, this is different than what we used to
use. The previous method used was `plan->query_context()->max_concurrency()`
which is slightly different(if the aggregate node IS run in parallel then we
initialize one state for each CPU thread, one for each I/O thread, and one for
the calling user thread).
This is unfortunately a bit complicated as `max_concurrency` would not be a
good indicator to use when determining if the plan is running in parallel or
not. So we need to query both properties and use them in their respective
spots.
### What changes are included in this PR?
Now, `max_concurrency` is used to figure out how many thread local states
need to be initialized and `GetCapacity` is used to figure out if there are
multiple CPU threads or not.
### Are these changes tested?
The bug was caught by the benchmarks which is a bit concerning. Most of
the CI have a very small number of CPU threads and don't experience much
concurrency and so I think we just didn't see this pattern. Or possibly, this
pattern is only experienced in the legacy way that pyarrow launches exec plans.
### Are there any user-facing changes?
No.
* Closes: #35383
Authored-by: Weston Pace <[email protected]>
Signed-off-by: Weston Pace <[email protected]>
---
cpp/src/arrow/acero/aggregate_node.cc | 36 ++++++++++++++++++-----------------
1 file changed, 19 insertions(+), 17 deletions(-)
diff --git a/cpp/src/arrow/acero/aggregate_node.cc
b/cpp/src/arrow/acero/aggregate_node.cc
index 0de59899b4..d9120ffa0d 100644
--- a/cpp/src/arrow/acero/aggregate_node.cc
+++ b/cpp/src/arrow/acero/aggregate_node.cc
@@ -291,7 +291,7 @@ class ScalarAggregateNode : public ExecNode, public
TracedNode {
static Result<AggregateNodeArgs<ScalarAggregateKernel>>
MakeAggregateNodeArgs(
const std::shared_ptr<Schema>& input_schema, const
std::vector<FieldRef>& keys,
const std::vector<FieldRef>& segment_keys, const std::vector<Aggregate>&
aggs,
- ExecContext* exec_ctx, size_t concurrency) {
+ ExecContext* exec_ctx, size_t concurrency, bool is_cpu_parallel) {
// Copy (need to modify options pointer below)
std::vector<Aggregate> aggregates(aggs);
std::vector<int> segment_field_ids(segment_keys.size());
@@ -352,7 +352,7 @@ class ScalarAggregateNode : public ExecNode, public
TracedNode {
function->DispatchExact(kernel_intypes[i]));
const ScalarAggregateKernel* agg_kernel =
static_cast<const ScalarAggregateKernel*>(kernel);
- if (concurrency > 1 && agg_kernel->ordered) {
+ if (is_cpu_parallel && agg_kernel->ordered) {
return Status::NotImplemented(
"Using ordered aggregator in multiple threaded execution is not
supported");
}
@@ -398,13 +398,14 @@ class ScalarAggregateNode : public ExecNode, public
TracedNode {
auto aggregates = aggregate_options.aggregates;
const auto& keys = aggregate_options.keys;
const auto& segment_keys = aggregate_options.segment_keys;
- const auto concurreny =
- plan->query_context()->exec_context()->executor()->GetCapacity();
+ const auto concurrency = plan->query_context()->max_concurrency();
+ // We can't use concurrency == 1 because that include I/O concurrency
+ const bool is_cpu_parallel =
plan->query_context()->executor()->GetCapacity() > 1;
if (keys.size() > 0) {
return Status::Invalid("Scalar aggregation with some key");
}
- if (concurreny > 1 && segment_keys.size() > 0) {
+ if (is_cpu_parallel && segment_keys.size() > 0) {
return Status::NotImplemented("Segmented aggregation in a multi-threaded
plan");
}
@@ -412,11 +413,10 @@ class ScalarAggregateNode : public ExecNode, public
TracedNode {
auto exec_ctx = plan->query_context()->exec_context();
ARROW_ASSIGN_OR_RAISE(
- auto args,
- MakeAggregateNodeArgs(input_schema, keys, segment_keys, aggregates,
exec_ctx,
- /*concurrency=*/concurreny));
+ auto args, MakeAggregateNodeArgs(input_schema, keys, segment_keys,
aggregates,
+ exec_ctx, concurrency,
is_cpu_parallel));
- if (concurreny > 1) {
+ if (is_cpu_parallel) {
for (auto& kernel : args.kernels) {
if (kernel->ordered) {
return Status::NotImplemented(
@@ -443,6 +443,7 @@ class ScalarAggregateNode : public ExecNode, public
TracedNode {
aggs_[i].options ? aggs_[i].options->ToString() :
"<NULLPTR>"},
{"function.kind", std::string(kind_name()) +
"::Consume"}});
KernelContext batch_ctx{plan()->query_context()->exec_context()};
+ DCHECK_LT(thread_index, states_[i].size());
batch_ctx.SetState(states_[i][thread_index].get());
std::vector<ExecValue> column_values;
@@ -616,7 +617,7 @@ class GroupByNode : public ExecNode, public TracedNode {
static Result<AggregateNodeArgs<HashAggregateKernel>> MakeAggregateNodeArgs(
const std::shared_ptr<Schema>& input_schema, const
std::vector<FieldRef>& keys,
const std::vector<FieldRef>& segment_keys, const std::vector<Aggregate>&
aggs,
- ExecContext* ctx, const int concurrency) {
+ ExecContext* ctx, const bool is_cpu_parallel) {
// Find input field indices for key fields
std::vector<int> key_field_ids(keys.size());
for (size_t i = 0; i < keys.size(); ++i) {
@@ -673,7 +674,7 @@ class GroupByNode : public ExecNode, public TracedNode {
// Construct aggregates
ARROW_ASSIGN_OR_RAISE(auto agg_kernels, GetKernels(ctx, aggs,
agg_src_types));
- if (concurrency > 1) {
+ if (is_cpu_parallel) {
if (segment_keys.size() > 0) {
return Status::NotImplemented(
"Segmented aggregation in a multi-threaded execution context");
@@ -734,13 +735,13 @@ class GroupByNode : public ExecNode, public TracedNode {
const auto& keys = aggregate_options.keys;
const auto& segment_keys = aggregate_options.segment_keys;
auto aggs = aggregate_options.aggregates;
- auto concurrency =
plan->query_context()->exec_context()->executor()->GetCapacity();
+ bool is_cpu_parallel = plan->query_context()->executor()->GetCapacity() >
1;
const auto& input_schema = input->output_schema();
auto exec_ctx = plan->query_context()->exec_context();
ARROW_ASSIGN_OR_RAISE(
auto args, MakeAggregateNodeArgs(input_schema, keys, segment_keys,
aggs, exec_ctx,
- concurrency));
+ is_cpu_parallel));
return input->plan()->EmplaceNode<GroupByNode>(
input, std::move(args.output_schema),
std::move(args.grouping_key_field_ids),
@@ -1063,14 +1064,15 @@ Result<std::shared_ptr<Schema>> MakeOutputSchema(
const std::vector<FieldRef>& segment_keys, const std::vector<Aggregate>&
aggregates,
ExecContext* exec_ctx) {
if (keys.empty()) {
- ARROW_ASSIGN_OR_RAISE(auto args,
ScalarAggregateNode::MakeAggregateNodeArgs(
- input_schema, keys, segment_keys,
aggregates,
- exec_ctx, /*concurrency=*/1));
+ ARROW_ASSIGN_OR_RAISE(auto args,
+ ScalarAggregateNode::MakeAggregateNodeArgs(
+ input_schema, keys, segment_keys, aggregates,
exec_ctx,
+ /*concurrency=*/1, /*is_cpu_parallel=*/false));
return std::move(args.output_schema);
} else {
ARROW_ASSIGN_OR_RAISE(auto args, GroupByNode::MakeAggregateNodeArgs(
input_schema, keys, segment_keys,
aggregates,
- exec_ctx, /*concurrency=*/1));
+ exec_ctx, /*is_cpu_parallel=*/false));
return std::move(args.output_schema);
}
}