This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c2561d4c63e Include OFFSET in physical optimizer group-trim limit
pushdown (#18600)
c2561d4c63e is described below
commit c2561d4c63e7b94a8f2172ed71bad1a4d24624fe
Author: Yash Mayya <[email protected]>
AuthorDate: Wed May 27 17:55:05 2026 -0700
Include OFFSET in physical optimizer group-trim limit pushdown (#18600)
---
.../tests/custom/GroupByOptionsTest.java | 48 ++++++++++++++++++++++
.../rel/rules/PinotLogicalAggregateRule.java | 32 +++++++++++----
.../resources/queries/PhysicalOptimizerPlans.json | 35 ++++++++++++++++
3 files changed, 107 insertions(+), 8 deletions(-)
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GroupByOptionsTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GroupByOptionsTest.java
index 8d3c8d2672e..4a97cc014dd 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GroupByOptionsTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GroupByOptionsTest.java
@@ -23,9 +23,11 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
@@ -341,6 +343,52 @@ public class GroupByOptionsTest extends
CustomDataQueryClusterIntegrationTest {
);
}
+ @Test
+ public void testGroupTrimWithOffsetReturnsFullPageOnPhysicalOptimizer()
+ throws Exception {
+ setUseMultiStageQueryEngine(true);
+
+ // On the physical-optimizer path, group trim must push down 'offset +
fetch' to the leaf/final aggregate.
+ // Without it, the aggregate keeps only 'fetch' groups (there are 40
distinct (i, j) groups), so the outer
+ // OFFSET drops everything and the page comes back short. We query without
ORDER BY so the runtime trims to
+ // exactly the pushed-down limit (see AggregateOperator), making the
cardinality bug deterministic.
+ String physicalOpt = "SET usePhysicalOptimizer=true; ";
+
+ // No-aggregate DISTINCT path: group trim is on by default here.
+ JsonNode distinct = postV2Query(physicalOpt
+ + " select distinct i, j from " + getTableName() + " limit 5 offset
10");
+ assertFullPageOfGroups(distinct, -1);
+
+ // Hinted aggregate path with group trim enabled. Every (i, j) group has
exactly 2 rows in the test data.
+ JsonNode aggregated = postV2Query(physicalOpt
+ + " select /*+ aggOptions(is_enable_group_trim='true') */ i, j,
count(*) as cnt from " + getTableName()
+ + " group by i, j limit 5 offset 10");
+ assertFullPageOfGroups(aggregated, 2);
+ }
+
+ /**
+ * Asserts the result is a full page of 5 distinct, in-domain (i, j) groups.
The rows are not ordered (we query
+ * without ORDER BY for deterministic trimming), so we validate the page
size and group validity rather than exact
+ * values. When {@code expectedCount >= 0}, also asserts each group's
COUNT(*).
+ */
+ private static void assertFullPageOfGroups(JsonNode mainNode, int
expectedCount) {
+ JsonNode resultTable = mainNode.get(RESULT_TABLE);
+ Assert.assertNotNull(resultTable, toResultStr(mainNode));
+ JsonNode rows = resultTable.get("rows");
+ Assert.assertEquals(rows.size(), 5, toResultStr(mainNode));
+ Set<String> seenGroups = new HashSet<>();
+ for (JsonNode row : rows) {
+ int i = row.get(0).intValue();
+ int j = row.get(1).intValue();
+ Assert.assertTrue(i >= 0 && i < FILES_NO, "i out of range: " + i);
+ Assert.assertTrue(j >= 0 && j < 10, "j out of range: " + j);
+ Assert.assertTrue(seenGroups.add(i + "," + j), "duplicate group: (" + i
+ ", " + j + ")");
+ if (expectedCount >= 0) {
+ Assert.assertEquals(row.get(2).intValue(), expectedCount,
toResultStr(mainNode));
+ }
+ }
+ }
+
@Test
public void testOrderByByKeysAndValuesIsPushedToFinalAggregationStage()
throws Exception {
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotLogicalAggregateRule.java
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotLogicalAggregateRule.java
index 5ae5e3a1c23..83299279a09 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotLogicalAggregateRule.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotLogicalAggregateRule.java
@@ -88,10 +88,7 @@ public class PinotLogicalAggregateRule {
return;
}
}
- int limit = 0;
- if (sortRel.fetch != null) {
- limit = RexLiteral.intValue(sortRel.fetch);
- }
+ int limit = getGroupTrimLimit(sortRel);
if (limit <= 0) {
// Cannot enable group trim when there is no limit.
return;
@@ -125,10 +122,7 @@ public class PinotLogicalAggregateRule {
Sort sortRel = call.rel(0);
List<RelFieldCollation> collations =
sortRel.getCollation().getFieldCollations();
- int limit = 0;
- if (sortRel.fetch != null) {
- limit = RexLiteral.intValue(sortRel.fetch);
- }
+ int limit = getGroupTrimLimit(sortRel);
if (limit <= 0) {
// Cannot enable group trim when there is no limit.
return;
@@ -162,6 +156,28 @@ public class PinotLogicalAggregateRule {
return createPlan(aggRel, null, 0);
}
+ /**
+ * Returns the limit to push down into the aggregate for group trim, or 0 if
group trim should not be applied.
+ * The pushed-down limit is {@code offset + fetch} so that the
leaf/intermediate aggregate retains enough groups to
+ * cover the outer {@code OFFSET ... FETCH} window. Adding only {@code
fetch} would trim away rows that the offset
+ * window still needs, under-counting paginated queries.
+ */
+ private static int getGroupTrimLimit(Sort sortRel) {
+ if (sortRel.fetch == null) {
+ return 0;
+ }
+ int limit = RexLiteral.intValue(sortRel.fetch);
+ if (limit <= 0) {
+ return 0;
+ }
+ if (sortRel.offset != null) {
+ // Clamp to avoid int overflow. Integer.MAX_VALUE is safe downstream:
GroupByUtils.getTableCapacity uses long
+ // arithmetic.
+ limit = (int) Math.min(Integer.MAX_VALUE, (long) limit +
RexLiteral.intValue(sortRel.offset));
+ }
+ return limit;
+ }
+
private static PinotLogicalAggregate createPlan(Aggregate aggRel, @Nullable
List<RelFieldCollation> collations,
int limit) {
Map<String, String> hintOptions =
diff --git
a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
index e7656c04108..7d637c1d5e8 100644
--- a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
@@ -634,6 +634,41 @@
"\n PhysicalTableScan(table=[[default, a]])",
"\n"
]
+ },
+ {
+ "description": "SQL hint based distinct optimization with group trim
enabled and offset pushes down offset + fetch",
+ "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT /*+
aggOptions(is_enable_group_trim='true') */ DISTINCT col1, col2 FROM a WHERE
col3 >= 0 LIMIT 10 OFFSET 5",
+ "output": [
+ "Execution Plan",
+ "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+ "\n PhysicalSort(offset=[5], fetch=[10])",
+ "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+ "\n PhysicalSort(fetch=[15])",
+ "\n PhysicalAggregate(group=[{0, 1}], aggType=[FINAL],
limit=[15])",
+ "\n
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0, 1]])",
+ "\n PhysicalAggregate(group=[{0, 1}], aggType=[LEAF],
limit=[15])",
+ "\n PhysicalFilter(condition=[>=($2, 0)])",
+ "\n PhysicalTableScan(table=[[default, a]])",
+ "\n"
+ ]
+ },
+ {
+ "description": "SQL hint based group by optimization with group trim
enabled and offset pushes down offset + fetch",
+ "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT /*+
aggOptions(is_enable_group_trim='true') */ COUNT(DISTINCT col2) AS cnt FROM a
WHERE a.col3 >= 0 GROUP BY col1 ORDER BY cnt DESC LIMIT 10 OFFSET 5",
+ "output": [
+ "Execution Plan",
+ "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+ "\n PhysicalSort(sort0=[$0], dir0=[DESC], offset=[5], fetch=[10])",
+ "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+ "\n PhysicalSort(sort0=[$0], dir0=[DESC], fetch=[15])",
+ "\n PhysicalProject(cnt=[$1])",
+ "\n PhysicalAggregate(group=[{0}],
agg#0=[DISTINCTCOUNT($1)], aggType=[FINAL], limit=[15])",
+ "\n
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
+ "\n PhysicalAggregate(group=[{0}],
agg#0=[DISTINCTCOUNT($1)], aggType=[LEAF], collations=[[1 DESC]], limit=[15])",
+ "\n PhysicalFilter(condition=[>=($2, 0)])",
+ "\n PhysicalTableScan(table=[[default, a]])",
+ "\n"
+ ]
}
]
},
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]