yashmayya opened a new pull request, #18513:
URL: https://github.com/apache/pinot/pull/18513

   ## Summary
   
   Adds a planner rule to the multi-stage engine (MSE) that pushes an 
`Aggregate` past a `UNION ALL` so that each branch aggregates locally before 
any cross-stage shuffle. This is enabled by default.
   
   For `SELECT g, SUM(x) FROM (a UNION ALL b) GROUP BY g`, the rule rewrites
   
   ```
   Aggregate(group=g, SUM(x))
     Union ALL
       a
       b
   ```
   
   into
   
   ```
   Aggregate(group=g, SUM(x))
     Union ALL
       Aggregate(group=g, SUM(x))  -- a
       Aggregate(group=g, SUM(x))  -- b
   ```
   
   After Pinot's exchange-insertion rules run, each branch then ships 
*pre-aggregated* rows (one per group key per branch) through its exchange 
instead of raw rows.
   
   ## Why this matters for Pinot specifically
   
   In a distributed OLAP engine, cross-stage data shuffle is the dominant cost 
— moving bytes across the network is far more expensive than running an extra 
aggregate pass on a worker. With N rows per branch and C distinct group keys, 
the shuffle cost goes from O(N) raw rows to O(C) partial aggregates per branch. 
For low-cardinality dimensions over large fact tables — the common analytics 
shape — this is a big win.
   
   Trade-offs:
   - **When it helps a lot:** group key cardinality ≪ row count, large 
branches, or skewed branch sizes where the larger branch benefits 
disproportionately.
   - **When it doesn't help:** group keys are nearly unique (per-branch 
aggregation compresses nothing), or branches are very small (fixed aggregator 
overhead dominates).
   
   For the edge cases, users can opt out with `SET 
skipPlannerRules='AggregateUnionTranspose'`.
   
   This matches what most distributed OLAP planners do by default (Spark, 
BigQuery, Snowflake, Druid, ClickHouse all push aggregates through unions).
   
   ## Why a custom rule instead of Calcite's `AggregateUnionTransposeRule`
   
   Calcite ships an `AggregateUnionTransposeRule` for exactly this 
transformation. Adding it to `PinotQueryRuleSets` directly — which is the first 
thing I tried — produces no effect: the rule fires but immediately bails out 
without transforming the tree.
   
   The cause is in how the upstream rule decides which aggregate functions are 
splittable. It does a 
`SUPPORTED_AGGREGATES.containsKey(call.getAggregation().getClass())` lookup 
against a hard-coded allow-list of Calcite classes:
   
   ```
   SqlSumAggFunction, SqlMinMaxAggFunction, SqlCountAggFunction,
   SqlSumEmptyIsZeroAggFunction, SqlAnyValueAggFunction, SqlBitOpAggFunction
   ```
   
   Pinot ships its own subclasses of `SqlAggFunction` — `PinotSumFunction`, 
`PinotMinMaxFunction`, etc. — which intentionally do **not** extend Calcite's 
`SqlSumAggFunction` / `SqlMinMaxAggFunction` (see 
[PinotOperatorTable.java](https://github.com/apache/pinot/blob/master/pinot-query-planner/src/main/java/org/apache/pinot/calcite/sql/fun/PinotOperatorTable.java#L489-L512);
 the comment explains it needs to carry custom return-type inference and 
operand checkers that Calcite's classes don't allow overriding cleanly). So the 
class-equality check in upstream's rule never matches, `transformAggCalls` 
returns null, and the rule no-ops on essentially every Pinot query.
   
   The Pinot variant in this PR matches on `SqlKind` (`SUM`, `SUM0`, `COUNT`, 
`MIN`, `MAX`, `ANY_VALUE`, `BIT_AND`, `BIT_OR`, `BIT_XOR`) instead. The rest of 
the logic — the `union.all` guard, the `hasUniqueKeyInAllInputs` short-circuit 
(which is what prevents the rule from looping on its own output), the `COUNT` → 
`SUM0` rollup, and the per-branch nullability re-typing — is a straight port of 
upstream's behavior. A few line-by-line differences:
   - `AggregateCall.create` is called with `aggRel` (the full Aggregate node, 
so its `(groupCols, aggCols)` row type is what the new top-of-union exchange 
exposes) instead of `aggRel.getInput()`. Upstream uses `aggRel.copy(...)` for 
the same effect; passing `aggRel` directly is slightly cheaper.
   - Pinot checkstyle bans `i++` / `++i` in `for`-loop iterators, so the loops 
use `i += 1`.
   
   If/when Pinot ever switches to using Calcite's stock `SqlSumAggFunction` 
family, the upstream rule becomes a drop-in replacement and the Pinot variant 
can be deleted.
   
   ## Why default-on, and how it interacts with `AggregateUnionAggregateRule`
   
   `AggregateUnionTransposeRule` and `AggregateUnionAggregateRule` are roughly 
inverse operations:
   - **Transpose:** `Agg(Union(A, B))` → `Agg(Union(Agg(A), Agg(B)))` (push 
aggregates into branches).
   - **UnionAggregate:** `Agg(Union(Agg(A), B))` → `Agg(Union(A, B))` (collapse 
pre-aggregation back up).
   
   For Pinot's cost profile the right defaults are asymmetric:
   - Transpose is **on** by default — shuffle-reducing.
   - UnionAggregate stays **off** by default — shuffle-increasing, since 
collapsing the branch-level aggregate means the union's exchange now carries 
raw rows.
   
   The two rules sit in separate `HepInstruction`s, with 
`AggregateUnionAggregateRule` registered first and 
`PinotAggregateUnionTransposeRule` second. Each instruction runs its rule to 
fixpoint and then control moves on, so there is no oscillation. If a user opts 
into `AggregateUnionAggregateRule`, Transpose still runs afterward and 
effectively undoes the merge — that combination is rarely useful, and 
`QueryPlannerRuleOptionsTest.testAggregateUnionAggregateEnabled` now documents 
this by explicitly skipping Transpose so it can verify the merge behavior in 
isolation.
   
   A rationale comment in `PinotQueryRuleSets` captures all of the above so 
future maintainers don't flip the defaults.
   
   ## Self-recursion safety
   
   Transpose's output `Agg(Union(Agg(A), Agg(B)))` still matches its own 
pattern `Agg(Union(...))`. Without a guard the rule would loop within its own 
`HepInstruction`. The guard is the `hasUniqueKeyInAllInputs` check: after one 
application, every union input is an `Aggregate` and is therefore unique on the 
group keys, so the next attempt bails. This is the same mechanism Calcite's 
upstream rule uses.
   
   ## Test plan
   
   - [x] New EXPLAIN PLAN fixtures in `SetOpPlans.json`:
     - `SELECT col2, SUM(col3) FROM (a UNION ALL b) GROUP BY col2` — confirms 
the partial aggregate is pushed into each branch.
     - `SELECT col2, COUNT(*) FROM (a UNION ALL b) GROUP BY col2` — confirms 
per-branch `COUNT` rolls up via top-level `SUM0`.
     - `SET skipPlannerRules='AggregateUnionTranspose'` variant — confirms the 
opt-out keeps the un-pushed plan.
   - [x] Updated existing fixtures whose plans now show the pushed-down 
aggregates:
     - `SetOpPlans.json` — "UNION from three tables" (UNION DISTINCT, because 
`UnionToDistinctRule` introduces an `Aggregate` that Transpose then pushes down 
recursively).
     - `PhysicalOptimizerPlans.json` — "Union, distinct, etc. but still 
maximally identity exchange".
   - [x] Updated 
`QueryPlannerRuleOptionsTest.testAggregateUnionAggregateEnabled` and 
`testDisablePruneEmptyUnion` to also disable Transpose so they continue to 
exercise their target rule in isolation. Added a small 
`explainQueryWithRules(query, enable, disable)` helper to support 
enable+disable in one call.
   - [x] `./mvnw -pl pinot-query-planner test` — 1151 tests, 0 failures.
   - [x] `./mvnw -pl pinot-query-runtime test` — 4230 tests, 0 failures (6 
pre-existing skips).
   - [x] Spotless / checkstyle / license-format / license-check all clean.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to