kosiew opened a new pull request, #19285:
URL: https://github.com/apache/datafusion/pull/19285
## Which issue does this PR close?
* Closes #19219.
## Rationale for this change
Queries that group by text columns and apply an `ORDER BY ... LIMIT` can
currently fail with an execution error such as `Can't group type: Utf8View`
when the TopK aggregation optimization is enabled. The same queries work
without `LIMIT`, which indicates the issue is in the TopK-specific execution
path rather than in the core aggregation logic.
This is particularly confusing for users because:
* The failure only appears when the optimizer decides to use TopK
aggregation.
* Disabling `datafusion.optimizer.enable_topk_aggregation` works around the
problem, but at the cost of performance and with non-obvious configuration.
The goal of this PR is to:
* Ensure TopK aggregation correctly supports UTF-8 key types (including
`Utf8View`) where possible.
* Ensure TopK gracefully declines to optimize unsupported type combinations
instead of panicking or returning confusing errors.
* Preserve or improve performance for supported cases while restoring
correctness for queries that previously errored.
## What changes are included in this PR?
This PR makes the following changes:
### 1. Centralized TopK type compatibility checks
* Introduces `topk_types_supported(key_type: &DataType, value_type:
&DataType) -> bool` in `datafusion_physical_plan::aggregates` to express which
key/value type combinations are supported by TopK aggregation.
* TopK support is now defined in terms of two internal helpers:
* `is_supported_hash_key_type(&DataType) -> bool` in `topk::hash_table`
(grouping key type support), and
* `is_supported_heap_type(&DataType) -> bool` in `topk::heap` (aggregate
value type support).
* Supported types include Arrow primitive numeric/temporal/decimal/interval
types and UTF-8 string variants (`Utf8`, `LargeUtf8`, `Utf8View`).
* Explicitly disallows unsupported combinations (for example, binary key
types or non-primitive/non-string value types).
### 2. Extend TopK heap implementation to support string aggregate values
* Adds a new `StringHeap` implementation of `ArrowHeap` that stores owned
`String` values and uses lexicographic ordering for comparisons.
* Supports all three UTF-8 string representations: `Utf8`, `LargeUtf8`, and
`Utf8View`.
* Introduces a small helper `extract_string_value` to consistently extract a
`String` from an `ArrayRef` for these types.
* Updates `new_heap` to:
* Dispatch to `StringHeap` for UTF-8 value types.
* Continue to use `PrimitiveHeap` for primitive types.
* Return a clearer error message (`"Unsupported TopK aggregate value
type"`) for unsupported types instead of the previous generic grouping error.
### 3. Reuse AggregateExec when pushing down LIMIT for TopK
* Adds `AggregateExec::with_new_limit(&self, limit: Option<usize>) -> Self`,
which clones an `AggregateExec` while overriding only the `limit` field.
* Refactors the `TopKAggregation` optimizer rule to:
* Use `topk_types_supported` to gate TopK-based rewrite decisions.
* Use `with_new_limit` instead of manually reconstructing `AggregateExec`.
* This reduces duplication, keeps the optimizer and execution layers
consistent, and avoids the risk of forgetting future `AggregateExec` fields in
the optimizer code.
### 4. Safer TopK stream selection in AggregateExec
* Extends `AggregateExec`'s stream selection logic to consult a new helper:
* `GroupedTopKAggregateStream::can_use_topk(&AggregateExec) ->
Result<bool>`.
* `can_use_topk` performs structural validation (single grouping key, single
min/max aggregate, presence of a limit) and also validates types via
`topk_types_supported`.
* The TopK execution path is now only used when *both* of the following are
true:
* The query structure matches TopK expectations (min/max with a single
group key and limit), and
* The key and aggregate value types are supported by the underlying TopK
data structures.
* This ensures that if the optimizer ever fails to filter out an unsupported
combination, the execution plan will still fall back to the standard
aggregation path instead of attempting to construct a
`GroupedTopKAggregateStream` that cannot handle the types.
### 5. Improved hash-table type support for TopK keys
* Introduces `is_supported_hash_key_type` in `topk::hash_table` and uses it
to define which types can be used as grouping keys for TopK.
* For now, this matches the previous behavior (primitive
numeric/temporal/decimal/interval types plus UTF-8 variants) but centralizes
the knowledge in a single function for reuse by both the optimizer and runtime.
### 6. Tests and regressions
This PR includes several new tests to validate behavior and prevent
regressions:
#### Rust tests
* `utf8_grouping_min_max_limit_fallbacks` in `aggregate_statistics.rs`:
* Builds an in-memory table with an UTF-8 grouping key and both string and
numeric value columns.
* Verifies that:
* A **supported** query (`GROUP BY g, max(val_num) ORDER BY max(val_num)
DESC LIMIT 1`) returns the correct row and can use TopK.
* An **unsupported** combination (`GROUP BY g, max(val_str) ORDER BY
max(val_str) DESC LIMIT 1`) executes successfully but does **not** use
`GroupedTopKAggregateStream` in the physical plan (falls back to standard
aggregation without panicking).
* Uses `displayable` to validate that `GroupedTopKAggregateStream` does
not appear in the plan for the unsupported case.
* New tests in `topk::priority_map`:
* `should_track_lexicographic_min_utf8_value`
* `should_track_lexicographic_max_utf8_value_desc`
* `should_track_large_utf8_values`
* `should_track_utf8_view_values`
* These tests exercise lexicographic min/max behavior and verify that
`PriorityMap` works with `Utf8`, `LargeUtf8`, and `Utf8View` value types in
both ascending and descending modes.
#### SQL logic tests
* Extends `aggregates_topk.slt` with string-focused TopK scenarios:
* Creates a `string_topk` table and a `string_topk_view` using
`arrow_cast(..., 'Utf8View')` to exercise both regular `Utf8` and `Utf8View`
paths.
* Verifies `GROUP BY category, max(val)` with `ORDER BY max(val) DESC
LIMIT 2` returns correct results for both table and view.
* Asserts that the physical plan uses TopK on the aggregate
(`AggregateExec` with `lim=[2]` and `SortExec: TopK(fetch=2)`).
* Adds regression tests for string `max` over the `traces.trace_id` column
with `ORDER BY max_trace DESC LIMIT 2` to ensure schema stability and correct
plan selection.
### 7. Error message polish
* When a TopK heap cannot be constructed for a given value type, the error
now reports `"Unsupported TopK aggregate value type: ..."` instead of the more
confusing generic grouping error.
* This should make it clearer to users when they hit a genuine type
limitation versus a bug.
## Are these changes tested?
```
DataFusion CLI v51.0.0
> create table t(a text, b text) as values ('123', '4'), ('1', '3');
0 row(s) fetched.
Elapsed 0.094 seconds.
> select a, max(b) as y_axis from t group by a order by y_axis desc;
+-----+--------+
| a | y_axis |
+-----+--------+
| 123 | 4 |
| 1 | 3 |
+-----+--------+
2 row(s) fetched.
Elapsed 0.046 seconds.
> select a, max(b) as y_axis from t group by a order by y_axis desc limit 1;
+-----+--------+
| a | y_axis |
+-----+--------+
| 123 | 4 |
+-----+--------+
1 row(s) fetched.
Elapsed 0.020 seconds.
```
Yes.
* New unit tests have been added for:
* `GroupedTopKAggregateStream` behavior with UTF-8 grouping and both
numeric and string aggregate values.
* `PriorityMap` handling of `Utf8`, `LargeUtf8`, and `Utf8View` value
types and lexicographic min/max behavior.
* New sqllogictest cases have been added to `aggregates_topk.slt` to cover:
* `GROUP BY` + `ORDER BY ... LIMIT` with string keys and values.
* Regression coverage for `Utf8View`-backed queries to prevent
reintroducing the `"Can't group type: Utf8View"` error.
* Existing test suites (unit tests and SQL logic tests) are expected to
continue to pass, ensuring backwards compatibility for existing queries and
plans.
## Are there any user-facing changes?
Yes, there are user-visible behavior changes, but no breaking changes to SQL
semantics:
* Queries that group by UTF-8 (including `Utf8View`) text columns and use
`ORDER BY` on a `MIN`/`MAX` aggregate with `LIMIT` will now:
* Execute successfully, and
* Use the TopK aggregation optimization when the key and aggregate value
types are supported.
* Queries that involve unsupported TopK type combinations will:
* Fall back to the standard aggregation path, and
* Avoid panics or confusing `"Can't group type"` errors.
* Error messages for unsupported TopK value types are more explicit
(`"Unsupported TopK aggregate value type: ..."`).
From an API surface perspective:
* `datafusion_physical_plan::aggregates` now exposes
`topk_types_supported(&DataType, &DataType) -> bool` as a helper for
determining whether TopK can be used for a particular key/value type pair.
* `AggregateExec` gains a new `with_new_limit(&self, limit: Option<usize>)
-> Self` helper to cheaply clone with a different limit hint. This is additive
and backwards-compatible.
No configuration changes are required for users. The existing
`datafusion.optimizer.enable_topk_aggregation` setting continues to control
whether TopK is considered at all; the main difference is that enabling it is
now safe for UTF-8 grouping/query patterns that previously errored.
## LLM-generated code disclosure
This PR includes LLM-generated code, tests, and comments. All LLM-generated
content has been reviewed and tested.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]