This is an automated email from the ASF dual-hosted git repository. lgbo-ustc pushed a commit to branch bug_group_limit_empty_offsets in repository https://gitbox.apache.org/repos/asf/gluten.git
commit 2e134bb09d31f88e956a0e1d64d716358bcdd9a1 Author: lgbo-ustc <[email protected]> AuthorDate: Thu May 28 20:01:51 2026 +0800 [CH] Fix group limit first array result offset RowNumGroupArraySorted writes aggregate results into a newly created ColumnArray. For the first output row the array offsets vector can be empty, but insertResultInto read result_array_offsets.back() before appending the first offset. That is undefined behavior and can crash when aggregate top-k writes its first result. Treat an empty offsets vector as having previous offset 0 before appending the next cumulative offset. Add a ClickHouse backend regression test that forces row_number top-k through the aggregate group limit path and validates the first array result row against vanilla Spark. --- .../GlutenClickHouseTPCHSaltNullParquetSuite.scala | 29 ++++++++++++++++++++++ .../AggregateFunctions/GroupLimitFunctions.cpp | 3 ++- 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala index 0539f721e6..9a305cc883 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala @@ -3045,6 +3045,35 @@ class GlutenClickHouseTPCHSaltNullParquetSuite } + test("row number aggregate topk handles first array result offset") { + withSQLConf( + (CHConfig.runtimeSettings("enable_window_group_limit_to_aggregate"), "true"), + (CHConfig.runtimeSettings("window.aggregate_topk_high_cardinality_threshold"), "2.0") + ) { + spark.sql("drop table if exists test_win_top_first_offset") + spark.sql("create table test_win_top_first_offset (a string, b int) using parquet") + spark.sql("insert into test_win_top_first_offset values ('a', 2), ('a', 1)") + + compareResultsAgainstVanillaSpark( + """ + |select * from ( + | select a, b, row_number() over (partition by a order by b) as r + | from test_win_top_first_offset + |) where r <= 1 + |""".stripMargin, + compareResult = true, + df => { + val aggregateGroupLimit = collectWithSubqueries(df.queryExecution.executedPlan) { + case e: CHAggregateGroupLimitExecTransformer => e + } + assert(aggregateGroupLimit.nonEmpty) + } + ) + + spark.sql("drop table if exists test_win_top_first_offset") + } + } + test("GLUTEN-7905 get topk of window by window") { withSQLConf( (CHConfig.runtimeSettings("enable_window_group_limit_to_aggregate"), "true"), diff --git a/cpp-ch/local-engine/AggregateFunctions/GroupLimitFunctions.cpp b/cpp-ch/local-engine/AggregateFunctions/GroupLimitFunctions.cpp index 20ef632ee6..032bf6f686 100644 --- a/cpp-ch/local-engine/AggregateFunctions/GroupLimitFunctions.cpp +++ b/cpp-ch/local-engine/AggregateFunctions/GroupLimitFunctions.cpp @@ -142,7 +142,8 @@ public: sortAndLimit(max_elements, sort_orders); - result_array_offsets.push_back(result_array_offsets.back() + values.size()); + const auto previous_offset = result_array_offsets.empty() ? 0 : result_array_offsets.back(); + result_array_offsets.push_back(previous_offset + values.size()); if (values.empty()) return; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
