This is an automated email from the ASF dual-hosted git repository.
xiangfu0 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new b6e9f0388cc Finalize aggregate intermediate results in
AggregationResultsBlock.getRows() for server-return-final (#18835)
b6e9f0388cc is described below
commit b6e9f0388ccbe12f29c8e366c39fd996da44e8a2
Author: Yash Mayya <[email protected]>
AuthorDate: Tue Jun 23 00:16:35 2026 -0700
Finalize aggregate intermediate results in
AggregationResultsBlock.getRows() for server-return-final (#18835)
A no-group-by aggregate whose data is colocated on a single server (e.g.
partition-pruned
to one partition on a strictReplicaGroup table) is planned by the v2
physical optimizer as a
single-stage AGGREGATE_DIRECT that returns final results
(SERVER_RETURN_FINAL_RESULT). For
aggregations whose intermediate type differs from their final type
(DISTINCTCOUNTHLLPLUS,
DISTINCTCOUNT, ...), the leaf crashed during serialization with e.g.
'HyperLogLogPlus cannot be cast to Long'.
AggregationResultsBlock.getDataSchema() reports the final column types when
isServerReturnFinalResult() is true, but getRows() returned the raw
intermediate results
without finalizing (only getDataTable() finalized). The MSE LeafOperator
consumes
getRows() + getDataSchema(), so an intermediate object was left in a column
typed as its
final type and failed on MAILBOX_SEND serialization. getRows() now
finalizes via
extractFinalResult() when isServerReturnFinalResult() is true, consistent
with
getDataTable() and the group-by path (GroupByCombineOperator already
finalizes the table).
The default (v1) MSE planner always splits no-group-by aggregates into
LEAF+FINAL and is
unaffected; group-by DIRECT aggregates already finalize in the combine
operator.
---
.../blocks/results/AggregationResultsBlock.java | 15 ++++-
.../queries/DirectAggregateObjectIntermediate.json | 69 ++++++++++++++++++++++
2 files changed, 83 insertions(+), 1 deletion(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
index c28e2a8faa2..907ea1eaf4b 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
@@ -97,7 +97,20 @@ public class AggregationResultsBlock extends
BaseResultsBlock {
@Override
public List<Object[]> getRows() {
- return Collections.singletonList(_results.toArray());
+ if (!_queryContext.isServerReturnFinalResult()) {
+ return Collections.singletonList(_results.toArray());
+ }
+ // When the server is requested to return the final result (e.g. a
single-server colocated DIRECT aggregate in the
+ // multi-stage engine), getDataSchema() reports the final column types.
Finalize the intermediate results here so
+ // that the rows are consistent with the schema; otherwise an intermediate
object (e.g. a HyperLogLogPlus) would be
+ // left in a column typed as its final type (e.g. LONG) and fail when the
block is serialized. This mirrors the
+ // finalization done in getDataTable() and in GroupByCombineOperator for
the group-by case.
+ int numColumns = _results.size();
+ Object[] row = new Object[numColumns];
+ for (int i = 0; i < numColumns; i++) {
+ row[i] = _aggregationFunctions[i].extractFinalResult(_results.get(i));
+ }
+ return Collections.singletonList(row);
}
@Override
diff --git
a/pinot-query-runtime/src/test/resources/queries/DirectAggregateObjectIntermediate.json
b/pinot-query-runtime/src/test/resources/queries/DirectAggregateObjectIntermediate.json
new file mode 100644
index 00000000000..1094dd1e847
--- /dev/null
+++
b/pinot-query-runtime/src/test/resources/queries/DirectAggregateObjectIntermediate.json
@@ -0,0 +1,69 @@
+{
+ "direct_aggregate_object_intermediate": {
+ "comments": "Regression test for the HLL->Long ClassCastException. When a
no-group-by aggregate is colocated on a single server, the v2 physical
optimizer produces an AGGREGATE_DIRECT leaf that returns final results
(SERVER_RETURN_FINAL_RESULT). Aggregations whose intermediate type differs from
their final type (e.g. DISTINCTCOUNTHLLPLUS -> HyperLogLogPlus/LONG,
DISTINCTCOUNT -> Set/INT) must be finalized before the leaf serializes its
output. A 'replicated' table forces single-serve [...]
+ "tables": {
+ "tbl": {
+ "replicated": true,
+ "schema": [
+ {"name": "amount", "type": "INT"},
+ {"name": "user_id", "type": "STRING"}
+ ],
+ "inputs": [
+ [10, "u1"],
+ [20, "u2"],
+ [30, "u1"],
+ [40, ""]
+ ]
+ }
+ },
+ "queries": [
+ {
+ "description": "SUM + DISTINCTCOUNTHLLPLUS FILTER (the reported
failing shape)",
+ "sql": "SELECT SUM(amount) AS s, DISTINCTCOUNTHLLPLUS(user_id) FILTER
(WHERE user_id <> '') AS dc FROM {tbl}",
+ "h2Sql": "SELECT SUM(amount) AS s, COUNT(DISTINCT CASE WHEN user_id <>
'' THEN user_id END) AS dc FROM {tbl}"
+ },
+ {
+ "description": "Plain DISTINCTCOUNTHLLPLUS (no filter) still hits the
same DIRECT path",
+ "sql": "SELECT DISTINCTCOUNTHLLPLUS(user_id) AS dc FROM {tbl}",
+ "h2Sql": "SELECT COUNT(DISTINCT user_id) AS dc FROM {tbl}"
+ },
+ {
+ "description": "DISTINCTCOUNT (Set intermediate, INT final) - same
finalize requirement",
+ "sql": "SELECT DISTINCTCOUNT(user_id) AS dc FROM {tbl}",
+ "h2Sql": "SELECT COUNT(DISTINCT user_id) AS dc FROM {tbl}"
+ },
+ {
+ "description": "Primitive-only DIRECT aggregate (intermediate type ==
final type) - finalize loop must be a correct no-op",
+ "sql": "SELECT SUM(amount) AS s, COUNT(*) AS c FROM {tbl}",
+ "h2Sql": "SELECT SUM(amount) AS s, COUNT(*) AS c FROM {tbl}"
+ }
+ ]
+ },
+ "direct_aggregate_object_intermediate_null_handling": {
+ "comments": "Same DIRECT path with column-based null handling enabled and
FILTERs that match zero rows, so the finalized value is NULL (SUM) / 0
(COUNT-family). Exercises the null finalization path of getRows() in
SERVER_RETURN_FINAL_RESULT mode.",
+ "extraProps": {
+ "enableColumnBasedNullHandling": true
+ },
+ "tables": {
+ "tbl": {
+ "replicated": true,
+ "schema": [
+ {"name": "amount", "type": "INT"},
+ {"name": "user_id", "type": "STRING"}
+ ],
+ "inputs": [
+ [10, "u1"],
+ [20, "u2"],
+ [30, "u1"]
+ ]
+ }
+ },
+ "queries": [
+ {
+ "description": "Zero-match FILTERs: SUM finalizes to NULL,
DISTINCTCOUNTHLLPLUS/COUNT finalize to 0",
+ "sql": "SELECT SUM(amount) FILTER (WHERE amount > 1000) AS s, COUNT(*)
FILTER (WHERE amount > 1000) AS c, DISTINCTCOUNTHLLPLUS(user_id) FILTER (WHERE
amount > 1000) AS dc FROM {tbl}",
+ "h2Sql": "SELECT SUM(CASE WHEN amount > 1000 THEN amount END) AS s,
COUNT(CASE WHEN amount > 1000 THEN 1 END) AS c, COUNT(DISTINCT CASE WHEN amount
> 1000 THEN user_id END) AS dc FROM {tbl}"
+ }
+ ]
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]