kosiew opened a new pull request, #19285:
URL: https://github.com/apache/datafusion/pull/19285

   ## Which issue does this PR close?
   
   * Closes #19219.
   
   ## Rationale for this change
   
   Queries that group by text columns and apply an `ORDER BY ... LIMIT` can 
currently fail with an execution error such as `Can't group type: Utf8View` 
when the TopK aggregation optimization is enabled. The same queries work 
without `LIMIT`, which indicates the issue is in the TopK-specific execution 
path rather than in the core aggregation logic.
   
   This is particularly confusing for users because:
   
   * The failure only appears when the optimizer decides to use TopK 
aggregation.
   * Disabling `datafusion.optimizer.enable_topk_aggregation` works around the 
problem, but at the cost of performance and with non-obvious configuration.
   
   The goal of this PR is to:
   
   * Ensure TopK aggregation correctly supports UTF-8 key types (including 
`Utf8View`) where possible.
   * Ensure TopK gracefully declines to optimize unsupported type combinations 
instead of panicking or returning confusing errors.
   * Preserve or improve performance for supported cases while restoring 
correctness for queries that previously errored.
   
   ## What changes are included in this PR?
   
   This PR makes the following changes:
   
   ### 1. Centralized TopK type compatibility checks
   
   * Introduces `topk_types_supported(key_type: &DataType, value_type: 
&DataType) -> bool` in `datafusion_physical_plan::aggregates` to express which 
key/value type combinations are supported by TopK aggregation.
   * TopK support is now defined in terms of two internal helpers:
   
     * `is_supported_hash_key_type(&DataType) -> bool` in `topk::hash_table` 
(grouping key type support), and
     * `is_supported_heap_type(&DataType) -> bool` in `topk::heap` (aggregate 
value type support).
   * Supported types include Arrow primitive numeric/temporal/decimal/interval 
types and UTF-8 string variants (`Utf8`, `LargeUtf8`, `Utf8View`).
   * Explicitly disallows unsupported combinations (for example, binary key 
types or non-primitive/non-string value types).
   
   ### 2. Extend TopK heap implementation to support string aggregate values
   
   * Adds a new `StringHeap` implementation of `ArrowHeap` that stores owned 
`String` values and uses lexicographic ordering for comparisons.
   * Supports all three UTF-8 string representations: `Utf8`, `LargeUtf8`, and 
`Utf8View`.
   * Introduces a small helper `extract_string_value` to consistently extract a 
`String` from an `ArrayRef` for these types.
   * Updates `new_heap` to:
   
     * Dispatch to `StringHeap` for UTF-8 value types.
     * Continue to use `PrimitiveHeap` for primitive types.
     * Return a clearer error message (`"Unsupported TopK aggregate value 
type"`) for unsupported types instead of the previous generic grouping error.
   
   ### 3. Reuse AggregateExec when pushing down LIMIT for TopK
   
   * Adds `AggregateExec::with_new_limit(&self, limit: Option<usize>) -> Self`, 
which clones an `AggregateExec` while overriding only the `limit` field.
   * Refactors the `TopKAggregation` optimizer rule to:
   
     * Use `topk_types_supported` to gate TopK-based rewrite decisions.
     * Use `with_new_limit` instead of manually reconstructing `AggregateExec`.
   * This reduces duplication, keeps the optimizer and execution layers 
consistent, and avoids the risk of forgetting future `AggregateExec` fields in 
the optimizer code.
   
   ### 4. Safer TopK stream selection in AggregateExec
   
   * Extends `AggregateExec`'s stream selection logic to consult a new helper:
   
     * `GroupedTopKAggregateStream::can_use_topk(&AggregateExec) -> 
Result<bool>`.
   * `can_use_topk` performs structural validation (single grouping key, single 
min/max aggregate, presence of a limit) and also validates types via 
`topk_types_supported`.
   * The TopK execution path is now only used when *both* of the following are 
true:
   
     * The query structure matches TopK expectations (min/max with a single 
group key and limit), and
     * The key and aggregate value types are supported by the underlying TopK 
data structures.
   * This ensures that if the optimizer ever fails to filter out an unsupported 
combination, the execution plan will still fall back to the standard 
aggregation path instead of attempting to construct a 
`GroupedTopKAggregateStream` that cannot handle the types.
   
   ### 5. Improved hash-table type support for TopK keys
   
   * Introduces `is_supported_hash_key_type` in `topk::hash_table` and uses it 
to define which types can be used as grouping keys for TopK.
   * For now, this matches the previous behavior (primitive 
numeric/temporal/decimal/interval types plus UTF-8 variants) but centralizes 
the knowledge in a single function for reuse by both the optimizer and runtime.
   
   ### 6. Tests and regressions
   
   This PR includes several new tests to validate behavior and prevent 
regressions:
   
   #### Rust tests
   
   * `utf8_grouping_min_max_limit_fallbacks` in `aggregate_statistics.rs`:
   
     * Builds an in-memory table with an UTF-8 grouping key and both string and 
numeric value columns.
     * Verifies that:
   
       * A **supported** query (`GROUP BY g, max(val_num) ORDER BY max(val_num) 
DESC LIMIT 1`) returns the correct row and can use TopK.
       * An **unsupported** combination (`GROUP BY g, max(val_str) ORDER BY 
max(val_str) DESC LIMIT 1`) executes successfully but does **not** use 
`GroupedTopKAggregateStream` in the physical plan (falls back to standard 
aggregation without panicking).
     * Uses `displayable` to validate that `GroupedTopKAggregateStream` does 
not appear in the plan for the unsupported case.
   
   * New tests in `topk::priority_map`:
   
     * `should_track_lexicographic_min_utf8_value`
     * `should_track_lexicographic_max_utf8_value_desc`
     * `should_track_large_utf8_values`
     * `should_track_utf8_view_values`
     * These tests exercise lexicographic min/max behavior and verify that 
`PriorityMap` works with `Utf8`, `LargeUtf8`, and `Utf8View` value types in 
both ascending and descending modes.
   
   #### SQL logic tests
   
   * Extends `aggregates_topk.slt` with string-focused TopK scenarios:
   
     * Creates a `string_topk` table and a `string_topk_view` using 
`arrow_cast(..., 'Utf8View')` to exercise both regular `Utf8` and `Utf8View` 
paths.
     * Verifies `GROUP BY category, max(val)` with `ORDER BY max(val) DESC 
LIMIT 2` returns correct results for both table and view.
     * Asserts that the physical plan uses TopK on the aggregate 
(`AggregateExec` with `lim=[2]` and `SortExec: TopK(fetch=2)`).
   
   * Adds regression tests for string `max` over the `traces.trace_id` column 
with `ORDER BY max_trace DESC LIMIT 2` to ensure schema stability and correct 
plan selection.
   
   ### 7. Error message polish
   
   * When a TopK heap cannot be constructed for a given value type, the error 
now reports `"Unsupported TopK aggregate value type: ..."` instead of the more 
confusing generic grouping error.
   * This should make it clearer to users when they hit a genuine type 
limitation versus a bug.
   
   ## Are these changes tested?
   
   ```
   DataFusion CLI v51.0.0
   > create table t(a text, b text) as values ('123', '4'), ('1', '3');
   0 row(s) fetched.
   Elapsed 0.094 seconds.
   
   > select a, max(b) as y_axis from t group by a order by y_axis desc;
   +-----+--------+
   | a   | y_axis |
   +-----+--------+
   | 123 | 4      |
   | 1   | 3      |
   +-----+--------+
   2 row(s) fetched.
   Elapsed 0.046 seconds.
   
   > select a, max(b) as y_axis from t group by a order by y_axis desc limit 1;
   +-----+--------+
   | a   | y_axis |
   +-----+--------+
   | 123 | 4      |
   +-----+--------+
   1 row(s) fetched.
   Elapsed 0.020 seconds.
   ```
   
   Yes.
   
   * New unit tests have been added for:
   
     * `GroupedTopKAggregateStream` behavior with UTF-8 grouping and both 
numeric and string aggregate values.
     * `PriorityMap` handling of `Utf8`, `LargeUtf8`, and `Utf8View` value 
types and lexicographic min/max behavior.
   * New sqllogictest cases have been added to `aggregates_topk.slt` to cover:
   
     * `GROUP BY` + `ORDER BY ... LIMIT` with string keys and values.
     * Regression coverage for `Utf8View`-backed queries to prevent 
reintroducing the `"Can't group type: Utf8View"` error.
   * Existing test suites (unit tests and SQL logic tests) are expected to 
continue to pass, ensuring backwards compatibility for existing queries and 
plans.
   
   ## Are there any user-facing changes?
   
   Yes, there are user-visible behavior changes, but no breaking changes to SQL 
semantics:
   
   * Queries that group by UTF-8 (including `Utf8View`) text columns and use 
`ORDER BY` on a `MIN`/`MAX` aggregate with `LIMIT` will now:
   
     * Execute successfully, and
     * Use the TopK aggregation optimization when the key and aggregate value 
types are supported.
   * Queries that involve unsupported TopK type combinations will:
   
     * Fall back to the standard aggregation path, and
     * Avoid panics or confusing `"Can't group type"` errors.
   * Error messages for unsupported TopK value types are more explicit 
(`"Unsupported TopK aggregate value type: ..."`).
   
   From an API surface perspective:
   
   * `datafusion_physical_plan::aggregates` now exposes 
`topk_types_supported(&DataType, &DataType) -> bool` as a helper for 
determining whether TopK can be used for a particular key/value type pair.
   * `AggregateExec` gains a new `with_new_limit(&self, limit: Option<usize>) 
-> Self` helper to cheaply clone with a different limit hint. This is additive 
and backwards-compatible.
   
   No configuration changes are required for users. The existing 
`datafusion.optimizer.enable_topk_aggregation` setting continues to control 
whether TopK is considered at all; the main difference is that enabling it is 
now safe for UTF-8 grouping/query patterns that previously errored.
   
   ## LLM-generated code disclosure
   
   This PR includes LLM-generated code, tests, and comments. All LLM-generated 
content has been reviewed and tested.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to