yashmayya opened a new issue, #18592:
URL: https://github.com/apache/pinot/issues/18592

   ## Summary
   
   In the multi-stage query engine, when the **physical optimizer** is enabled 
(`SET usePhysicalOptimizer=true`), the rule that pushes a `LIMIT` down into the 
leaf/intermediate aggregate for group-trim **ignores the query `OFFSET`**. It 
pushes `limit = fetch` instead of `limit = offset + fetch`, so the leaf trims 
away rows that the outer `OFFSET … FETCH` window still needs. Paginated queries 
that hit group-trim then return **too few rows** (and the wrong window for 
ordered queries). The results are silently wrong — no error is raised.
   
   The default/legacy multi-stage planner is **not affected** (explained below).
   
   ## Scope
   
   Only when `usePhysicalOptimizer=true` (currently off by default — 
`DEFAULT_USE_PHYSICAL_OPTIMIZER = false` in `CommonConstants.java`). It affects 
**all** group-trim paths on that engine, not just one query shape:
   
   - `SELECT DISTINCT col … LIMIT n OFFSET m` and no-aggregate `GROUP BY col … 
LIMIT n OFFSET m` — group-trim is on by default for these on the 
physical-optimizer path; and
   - aggregate queries that opt into group-trim via the hint, e.g. `/*+ 
aggOptions(is_enable_group_trim='true') */ … GROUP BY … ORDER BY <agg> LIMIT n 
OFFSET m`.
   
   ## Reproduction
   
   ```sql
   SET usePhysicalOptimizer=true;
   EXPLAIN PLAN FOR SELECT DISTINCT col1 FROM mytable LIMIT 5 OFFSET 10;
   ```
   
   ```
   PhysicalSort(offset=[10], fetch=[5])
     PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])
       PhysicalSort(fetch=[15])                                        <-- 
correct: offset + fetch
         PhysicalAggregate(group=[{0}], aggType=[FINAL], limit=[5])    <-- BUG: 
should be 15
           PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[0]])
             PhysicalAggregate(group=[{0}], aggType=[LEAF], limit=[5]) <-- BUG: 
should be 15
               PhysicalTableScan(table=[[default, mytable]])
   ```
   
   The inner `PhysicalSort` correctly carries `fetch=15` (= offset + fetch), 
but the `PhysicalAggregate` nodes get `limit=5`. Each server therefore keeps 
only 5 groups; after the outer `OFFSET 10` there are fewer than 5 (often 0) 
rows left. On a table with more than 15 distinct `col1` values, this query 
returns too few rows instead of 5; an ordered variant returns the wrong page.
   
   ## Root cause
   
   In `PinotLogicalAggregateRule` (the physical-optimizer aggregate-exchange 
rule), both sub-rules derive the pushed-down limit from `fetch` only and never 
add `offset`:
   
   - `SortProjectAggregate.onMatch` — fetch extraction at 
`pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotLogicalAggregateRule.java:93`
   - `SortAggregate.onMatch` — fetch extraction at the same file, `:130`
   
   ```java
   int limit = 0;
   if (sortRel.fetch != null) {
     limit = RexLiteral.intValue(sortRel.fetch);   // <-- ignores sortRel.offset
   }
   if (limit <= 0) {
     return;
   }
   PinotLogicalAggregate newAggRel = createPlan(aggRel, collations, limit);
   ```
   
   ### Why the legacy path is not affected
   
   On the default (non-physical-optimizer) path, 
`PinotSortExchangeNodeInsertRule` and `PinotSortExchangeCopyRule` run 
**before** the aggregate rules (`PinotQueryRuleSets.POST_LOGICAL_RULES`) and 
fold the offset into the copied inner sort's fetch — 
`PinotSortExchangeCopyRule` computes `fetch + offset` and the copied inner sort 
has its offset stripped to `null`. So the legacy aggregate rule matches an 
inner sort whose `fetch` already equals `offset + fetch`. `PINOT_POST_RULES_V2` 
contains neither sort-exchange rule, so the physical-optimizer rule matches the 
user's outer `Sort` directly and sees the raw `fetch`.
   
   (Confirmed empirically: on the legacy path, `EXPLAIN PLAN FOR SELECT 
DISTINCT col1 FROM mytable LIMIT 5 OFFSET 10` yields leaf/final `limit=[15]`.)
   
   ## Proposed fix
   
   In both sub-rules, after the `if (limit <= 0) return;` guard, add the offset 
to the pushed-down limit (clamped to avoid `int` overflow):
   
   ```java
   if (sortRel.offset != null) {
     limit = (int) Math.min(Integer.MAX_VALUE, (long) limit + 
RexLiteral.intValue(sortRel.offset));
   }
   ```
   
   This makes the leaf/intermediate keep `offset + fetch` groups so the outer 
offset window is fully covered. `limit == Integer.MAX_VALUE` is safe downstream 
— `GroupByUtils.getTableCapacity` uses `long` arithmetic. (The same clamp is a 
no-op on the legacy path, since the matched inner sort there already has its 
offset stripped to `null`.)
   
   ## Suggested tests
   
   - Plan-level (`PhysicalOptimizerPlans.json`): no-aggregate `DISTINCT … LIMIT 
n OFFSET m` and a hinted aggregate `… ORDER BY <agg> … LIMIT n OFFSET m`, 
asserting leaf/final `limit=[offset+fetch]`.
   - Integration (`usePhysicalOptimizer=true`): assert full page cardinality 
for both the no-aggregate and hinted-aggregate paginated paths.
   
   ## Importance / timing
   
   This is latent today because the physical optimizer is off by default. It 
should be fixed **before** `DEFAULT_USE_PHYSICAL_OPTIMIZER` is flipped to 
`true`, otherwise it becomes a silent correctness regression (wrong results, no 
error) for paginated `DISTINCT` / `GROUP BY` / group-trim queries on the 
default path.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to