This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a7622546633 Push limit to leaf stage by default for DISTINCT /
no-aggregate GROUP BY (#18598)
a7622546633 is described below
commit a7622546633ca8868bb8125ebac1145ade1beae6
Author: Yash Mayya <[email protected]>
AuthorDate: Thu May 28 18:25:36 2026 -0700
Push limit to leaf stage by default for DISTINCT / no-aggregate GROUP BY
(#18598)
---
.../tests/custom/GroupByOptionsTest.java | 62 ++++++++++++++
.../integration/tests/custom/JsonPathTest.java | 6 +-
.../PinotAggregateExchangeNodeInsertRule.java | 22 ++++-
.../src/test/resources/queries/GroupByPlans.json | 94 ++++++++++++++++++++++
.../resources/queries/PhysicalOptimizerPlans.json | 17 ++++
5 files changed, 193 insertions(+), 8 deletions(-)
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GroupByOptionsTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GroupByOptionsTest.java
index 4a97cc014dd..4194e53ca9c 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GroupByOptionsTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GroupByOptionsTest.java
@@ -389,6 +389,68 @@ public class GroupByOptionsTest extends
CustomDataQueryClusterIntegrationTest {
}
}
+ @Test
+ public void testDistinctWithLimitAndOffsetReturnsFullCardinality()
+ throws Exception {
+ // Default-on leaf-limit pushdown for no-aggregate DISTINCT must still
honor OFFSET. The planner pushes
+ // offset + fetch down (the sort-exchange-copy folds offset into the inner
sort's fetch), so a paginated DISTINCT
+ // returns the full requested page, not fetch - offset rows. 'j' has 10
distinct values (0..9), well above n + m.
+ setUseMultiStageQueryEngine(true);
+ String table = getTableName();
+
+ // Ordered: the returned rows are the global ranks (m+1)..(m+n), i.e. the
3rd, 4th, 5th smallest distinct
+ // values => 2, 3, 4.
+ Assert.assertEquals(
+ toResultStr(postV2Query("select distinct j from " + table + " order by
j limit 3 offset 2")),
+ "\"j\"[\"LONG\"]\n2\n3\n4");
+ // Control with offset 0 => 0, 1, 2.
+ Assert.assertEquals(
+ toResultStr(postV2Query("select distinct j from " + table + " order by
j limit 3")),
+ "\"j\"[\"LONG\"]\n0\n1\n2");
+
+ // Unordered: the result set is arbitrary, but the cardinality must be
exactly the requested page size (3), and
+ // every value must be a valid distinct 'j'. Without accounting for the
offset this would undercount.
+ JsonNode rows = postV2Query("select distinct j from " + table + " limit 3
offset 2").get(RESULT_TABLE).get("rows");
+ Assert.assertEquals(rows.size(), 3, "DISTINCT with LIMIT 3 OFFSET 2 must
return a full page of 3 rows");
+ for (JsonNode row : rows) {
+ long value = row.get(0).asLong();
+ Assert.assertTrue(value >= 0 && value <= 9, "unexpected distinct value:
" + value);
+ }
+ }
+
+ @Test
+ public void testGroupByNoAggregateWithLimitOffsetAndTrimEquivalence()
+ throws Exception {
+ // Covers the no-aggregate GROUP BY (non-DISTINCT) path with the
default-on leaf-limit pushdown, plus
+ // result-equivalence between default-on group trim and the explicit
opt-out when trim is a no-op.
+ setUseMultiStageQueryEngine(true);
+ String table = getTableName();
+
+ // No-aggregate GROUP BY col with LIMIT/OFFSET must return the full
requested page (same trim machinery as
+ // DISTINCT). 'j' has 10 distinct values; ordered page (m+1)..(m+n) => 2,
3, 4.
+ Assert.assertEquals(
+ toResultStr(postV2Query("select j from " + table + " group by j order
by j limit 3 offset 2")),
+ "\"j\"[\"LONG\"]\n2\n3\n4");
+
+ // Unordered no-aggregate GROUP BY: cardinality must be exactly the page
size (3), every value a valid 'j'.
+ JsonNode rows = postV2Query("select j from " + table + " group by j limit
3 offset 2")
+ .get(RESULT_TABLE).get("rows");
+ Assert.assertEquals(rows.size(), 3, "GROUP BY without aggregate with LIMIT
3 OFFSET 2 must return a full page");
+ for (JsonNode row : rows) {
+ long value = row.get(0).asLong();
+ Assert.assertTrue(value >= 0 && value <= 9, "unexpected group key: " +
value);
+ }
+
+ // When the total number of distinct values ('i' has 4) is below the
limit, leaf/final trim is a no-op, so the
+ // default-on behavior must return exactly the same rows as the explicit
opt-out. Order by the key for a
+ // deterministic comparison.
+ String defaultOn = toResultStr(postV2Query("select distinct i from " +
table + " order by i limit 100"));
+ String optedOut = toResultStr(postV2Query(
+ "select /*+ aggOptions(is_enable_group_trim='false') */ distinct i
from " + table + " order by i limit 100"));
+ Assert.assertEquals(defaultOn, optedOut, "default-on trim must match the
opt-out when total distinct < limit");
+ Assert.assertEquals(defaultOn, "\"i\"[\"INT\"]\n0\n1\n2\n3");
+ }
+
@Test
public void testOrderByByKeysAndValuesIsPushedToFinalAggregationStage()
throws Exception {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/JsonPathTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/JsonPathTest.java
index bb90cf7313d..94af87e0bb7 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/JsonPathTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/JsonPathTest.java
@@ -929,11 +929,7 @@ public class JsonPathTest extends
CustomDataQueryClusterIntegrationTest {
assertEquals(extractOrderedDistinctValues(baselineResponse).size(), 5);
assertEquals(extractOrderedDistinctValues(optimizedResponse).size(), 5);
-
- // TODO: Fix LIMIT push down for MSE
- if (!useMultiStageQueryEngine) {
-
assertEquals(optimizedResponse.get("numEntriesScannedPostFilter").asInt(), 5 *
getNumAvroFiles());
- }
+ assertEquals(optimizedResponse.get("numEntriesScannedPostFilter").asInt(),
5 * getNumAvroFiles());
}
/// Cross-path 5-arg form: filter on `$.k2`, extract `$.k1`.
`getMatchingFlattenedDocsMap` applies the filter
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java
index 39e3988fe64..b6f7ea56257 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java
@@ -132,7 +132,7 @@ public class PinotAggregateExchangeNodeInsertRule {
Map<String, String> hintOptions =
PinotHintStrategyTable.getHintOptions(aggRel.getHints(),
PinotHintOptions.AGGREGATE_HINT_OPTIONS);
- if (!isGroupTrimmingEnabled(call, hintOptions)) {
+ if (!isGroupTrimmingEnabled(call, hintOptions, aggRel)) {
return;
} else if (hintOptions == null) {
hintOptions = Collections.emptyMap();
@@ -186,7 +186,7 @@ public class PinotAggregateExchangeNodeInsertRule {
Map<String, String> hintOptions =
PinotHintStrategyTable.getHintOptions(aggRel.getHints(),
PinotHintOptions.AGGREGATE_HINT_OPTIONS);
- if (!isGroupTrimmingEnabled(call, hintOptions)) {
+ if (!isGroupTrimmingEnabled(call, hintOptions, aggRel)) {
return;
} else if (hintOptions == null) {
hintOptions = Collections.emptyMap();
@@ -479,14 +479,30 @@ public class PinotAggregateExchangeNodeInsertRule {
return null;
}
- private static boolean isGroupTrimmingEnabled(RelOptRuleCall call,
Map<String, String> hintOptions) {
+ private static boolean isGroupTrimmingEnabled(RelOptRuleCall call,
Map<String, String> hintOptions,
+ Aggregate aggRel) {
if (hintOptions != null) {
String option =
hintOptions.get(PinotHintOptions.AggregateOptions.IS_ENABLE_GROUP_TRIM);
if (option != null) {
+ // Explicit hint always wins (true or false), for aggregates with AND
without aggregate functions.
return Boolean.parseBoolean(option);
}
}
+ // Group-by WITHOUT aggregate functions (DISTINCT or `GROUP BY col` with
no agg calls) can always push the
+ // limit/collations down to the leaf stage by default: ORDER BY can only
reference group keys, which are fully
+ // materialized at the leaf, so leaf-level trim is exact (and a plain
LIMIT without ORDER BY returns a valid
+ // subset). This mirrors PinotLogicalAggregateRule (the physical-optimizer
path).
+ // TODO: Consider also enabling this by default for aggregation queries
whose ORDER BY references only group keys
+ // (not aggregate results). The same argument holds there - a
group's rank by its key is identical at every
+ // leaf, so keeping the per-leaf top-K never drops a group that
belongs in the global top-N, and the kept
+ // groups still get their aggregates fully merged at the final
stage. It is NOT safe when ORDER BY is over an
+ // aggregate (partial values rank differently per leaf) or for an
unordered limit with aggregates (an
+ // arbitrarily dropped group would be under-counted).
+ if (aggRel.getAggCallList().isEmpty()) {
+ return true;
+ }
+
Context genericContext = call.getPlanner().getContext();
if (genericContext != null) {
QueryEnvironment.Config context =
genericContext.unwrap(QueryEnvironment.Config.class);
diff --git a/pinot-query-planner/src/test/resources/queries/GroupByPlans.json
b/pinot-query-planner/src/test/resources/queries/GroupByPlans.json
index 099782e5768..5977fb6ff01 100644
--- a/pinot-query-planner/src/test/resources/queries/GroupByPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/GroupByPlans.json
@@ -298,6 +298,100 @@
"\n PinotLogicalTableScan(table=[[default, a]])",
"\n"
]
+ },
+ {
+ "description": "Distinct with limit pushes limit to leaf and final
aggregate by default (no hint)",
+ "sql": "EXPLAIN PLAN FOR SELECT DISTINCT col1, col2 FROM a WHERE col3
>= 0 LIMIT 10",
+ "output": [
+ "Execution Plan",
+ "\nLogicalSort(offset=[0], fetch=[10])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]],
isSortOnSender=[false], isSortOnReceiver=[false])",
+ "\n LogicalSort(fetch=[10])",
+ "\n PinotLogicalAggregate(group=[{0, 1}], aggType=[FINAL],
collations=[[]], limit=[10])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalAggregate(group=[{0, 1}], aggType=[LEAF],
collations=[[]], limit=[10])",
+ "\n LogicalFilter(condition=[>=($2, 0)])",
+ "\n PinotLogicalTableScan(table=[[default, a]])",
+ "\n"
+ ]
+ },
+ {
+ "description": "Group by without aggregate functions with limit pushes
limit to leaf and final by default",
+ "sql": "EXPLAIN PLAN FOR SELECT col1 FROM a GROUP BY col1 LIMIT 10",
+ "output": [
+ "Execution Plan",
+ "\nLogicalSort(offset=[0], fetch=[10])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]],
isSortOnSender=[false], isSortOnReceiver=[false])",
+ "\n LogicalSort(fetch=[10])",
+ "\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL],
collations=[[]], limit=[10])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalAggregate(group=[{0}], aggType=[LEAF],
collations=[[]], limit=[10])",
+ "\n PinotLogicalTableScan(table=[[default, a]])",
+ "\n"
+ ]
+ },
+ {
+ "description": "Distinct with order by on group key and limit pushes
collations and limit to leaf and final by default",
+ "sql": "EXPLAIN PLAN FOR SELECT DISTINCT col1 FROM a ORDER BY col1
LIMIT 10",
+ "output": [
+ "Execution Plan",
+ "\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0], fetch=[10])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[0]],
isSortOnSender=[false], isSortOnReceiver=[true])",
+ "\n LogicalSort(sort0=[$0], dir0=[ASC], fetch=[10])",
+ "\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL],
collations=[[0]], limit=[10])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalAggregate(group=[{0}], aggType=[LEAF],
collations=[[0]], limit=[10])",
+ "\n PinotLogicalTableScan(table=[[default, a]])",
+ "\n"
+ ]
+ },
+ {
+ "description": "Group by without aggregate functions with HAVING on
the group key still pushes limit (filter applied at leaf before trim)",
+ "sql": "EXPLAIN PLAN FOR SELECT col3 FROM a GROUP BY col3 HAVING col3
> 5 LIMIT 10",
+ "output": [
+ "Execution Plan",
+ "\nLogicalSort(offset=[0], fetch=[10])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]],
isSortOnSender=[false], isSortOnReceiver=[false])",
+ "\n LogicalSort(fetch=[10])",
+ "\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL],
collations=[[]], limit=[10])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalAggregate(group=[{2}], aggType=[LEAF],
collations=[[]], limit=[10])",
+ "\n LogicalFilter(condition=[>($2, 5)])",
+ "\n PinotLogicalTableScan(table=[[default, a]])",
+ "\n"
+ ]
+ },
+ {
+ "description": "Distinct with limit and offset pushes offset+fetch to
leaf and final aggregate by default",
+ "sql": "EXPLAIN PLAN FOR SELECT DISTINCT col1, col2 FROM a WHERE col3
>= 0 LIMIT 11 OFFSET 10",
+ "output": [
+ "Execution Plan",
+ "\nLogicalSort(offset=[10], fetch=[11])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]],
isSortOnSender=[false], isSortOnReceiver=[false])",
+ "\n LogicalSort(fetch=[21])",
+ "\n PinotLogicalAggregate(group=[{0, 1}], aggType=[FINAL],
collations=[[]], limit=[21])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalAggregate(group=[{0, 1}], aggType=[LEAF],
collations=[[]], limit=[21])",
+ "\n LogicalFilter(condition=[>=($2, 0)])",
+ "\n PinotLogicalTableScan(table=[[default, a]])",
+ "\n"
+ ]
+ },
+ {
+ "description": "Distinct with limit does not push limit to aggregate
when group trim is explicitly disabled via hint",
+ "sql": "EXPLAIN PLAN FOR SELECT /*+
aggOptions(is_enable_group_trim='false') */ DISTINCT col1, col2 FROM a WHERE
col3 >= 0 LIMIT 10",
+ "output": [
+ "Execution Plan",
+ "\nLogicalSort(offset=[0], fetch=[10])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]],
isSortOnSender=[false], isSortOnReceiver=[false])",
+ "\n LogicalSort(fetch=[10])",
+ "\n PinotLogicalAggregate(group=[{0, 1}], aggType=[FINAL])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalAggregate(group=[{0, 1}], aggType=[LEAF])",
+ "\n LogicalFilter(condition=[>=($2, 0)])",
+ "\n PinotLogicalTableScan(table=[[default, a]])",
+ "\n"
+ ]
}
]
}
diff --git
a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
index e2d5a7de71c..d9c69907af1 100644
--- a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
@@ -653,6 +653,23 @@
"\n"
]
},
+ {
+ "description": "Distinct with limit pushes limit to leaf and final
aggregate by default (no hint)",
+ "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT
DISTINCT col1, col2 FROM a WHERE col3 >= 0 LIMIT 10",
+ "output": [
+ "Execution Plan",
+ "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+ "\n PhysicalSort(fetch=[10])",
+ "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+ "\n PhysicalSort(fetch=[10])",
+ "\n PhysicalAggregate(group=[{0, 1}], aggType=[FINAL],
limit=[10])",
+ "\n
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0, 1]])",
+ "\n PhysicalAggregate(group=[{0, 1}], aggType=[LEAF],
limit=[10])",
+ "\n PhysicalFilter(condition=[>=($2, 0)])",
+ "\n PhysicalTableScan(table=[[default, a]])",
+ "\n"
+ ]
+ },
{
"description": "SQL hint based group by optimization with group trim
enabled and offset pushes down offset + fetch",
"sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT /*+
aggOptions(is_enable_group_trim='true') */ COUNT(DISTINCT col2) AS cnt FROM a
WHERE a.col3 >= 0 GROUP BY col1 ORDER BY cnt DESC LIMIT 10 OFFSET 5",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]