This is an automated email from the ASF dual-hosted git repository. englefly pushed a commit to branch ckb2 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 8a7475082eccde6fee474e2d4e7b5fb2482208a3 Author: englefly <[email protected]> AuthorDate: Tue Mar 10 16:58:18 2026 +0800 [feature](nereids) add RewriteSimpleAggToConstantRule to rewrite simple agg to constant Add a new Nereids rewrite rule that replaces simple aggregate queries (COUNT(*)/COUNT(not-null-col)/MIN/MAX without GROUP BY) on DUP_KEYS OlapTable with pre-computed constant values from an async Caffeine cache. Key components: - SimpleAggCacheMgr: singleton async cache backed by internal SQL queries for exact MIN/MAX column stats and row counts. Uses version double-check (getVisibleVersionTime before and after SQL) to ensure cache consistency. Invalidates on empty, stale, or exception results. - RewriteSimpleAggToConstantRule: Nereids rewrite rule that fires on LogicalAggregate over LogicalOlapScan, checks DUP_KEYS model, no GROUP BY, supported agg functions, and replaces with LogicalOneRowRelation containing constant literals. - Unit test with mock cache covering 6 positive and 8 negative cases. - Regression test with cache warmup polling, positive/negative explain checks, correctness assertions, and disable-rule verification. --- .../doris/nereids/jobs/executor/Rewriter.java | 3 + .../org/apache/doris/nereids/rules/RuleType.java | 1 + .../rewrite/RewriteSimpleAggToConstantRule.java | 260 ++++++++++++ .../doris/nereids/stats/SimpleAggCacheMgr.java | 456 +++++++++++++++++++++ .../RewriteSimpleAggToConstantRuleTest.java | 292 +++++++++++++ .../rewrite_simple_agg_to_constant.out | 37 ++ .../agg_use_key_direct/agg_use_key_direct.groovy | 2 +- .../suites/nereids_p0/hint/test_hint.groovy | 2 +- .../rewrite_simple_agg_to_constant.groovy | 317 ++++++++++++++ 9 files changed, 1368 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java index 00be607fc94..dfe3250aec9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java @@ -157,6 +157,7 @@ import org.apache.doris.nereids.rules.rewrite.ReduceAggregateChildOutputRows; import org.apache.doris.nereids.rules.rewrite.ReorderJoin; import org.apache.doris.nereids.rules.rewrite.RewriteCteChildren; import org.apache.doris.nereids.rules.rewrite.RewriteSearchToSlots; +import org.apache.doris.nereids.rules.rewrite.RewriteSimpleAggToConstantRule; import org.apache.doris.nereids.rules.rewrite.SaltJoin; import org.apache.doris.nereids.rules.rewrite.SetPreAggStatus; import org.apache.doris.nereids.rules.rewrite.SimplifyEncodeDecode; @@ -285,6 +286,7 @@ public class Rewriter extends AbstractBatchJobExecutor { topDown( new NormalizeAggregate(), new CountLiteralRewrite(), + new RewriteSimpleAggToConstantRule(), new NormalizeSort() ), @@ -522,6 +524,7 @@ public class Rewriter extends AbstractBatchJobExecutor { topDown( new NormalizeAggregate(), new CountLiteralRewrite(), + new RewriteSimpleAggToConstantRule(), new NormalizeSort() ), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java index 131ce0602c2..e9160055888 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java @@ -76,6 +76,7 @@ public enum RuleType { COUNT_LITERAL_REWRITE(RuleTypeClass.REWRITE), SUM_LITERAL_REWRITE(RuleTypeClass.REWRITE), + REWRITE_SIMPLE_AGG_TO_CONSTANT(RuleTypeClass.REWRITE), REPLACE_SORT_EXPRESSION_BY_CHILD_OUTPUT(RuleTypeClass.REWRITE), FILL_UP_HAVING_AGGREGATE(RuleTypeClass.REWRITE), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/RewriteSimpleAggToConstantRule.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/RewriteSimpleAggToConstantRule.java new file mode 100644 index 00000000000..0faab74f584 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/RewriteSimpleAggToConstantRule.java @@ -0,0 +1,260 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.rewrite; + +import org.apache.doris.analysis.LiteralExpr; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.KeysType; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.nereids.StatementContext; +import org.apache.doris.nereids.rules.Rule; +import org.apache.doris.nereids.rules.RuleType; +import org.apache.doris.nereids.stats.SimpleAggCacheMgr; +import org.apache.doris.nereids.stats.SimpleAggCacheMgr.ColumnMinMax; +import org.apache.doris.nereids.stats.SimpleAggCacheMgr.ColumnMinMaxKey; +import org.apache.doris.nereids.trees.expressions.Alias; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction; +import org.apache.doris.nereids.trees.expressions.functions.agg.Count; +import org.apache.doris.nereids.trees.expressions.functions.agg.Max; +import org.apache.doris.nereids.trees.expressions.functions.agg.Min; +import org.apache.doris.nereids.trees.expressions.literal.BigIntLiteral; +import org.apache.doris.nereids.trees.expressions.literal.Literal; +import org.apache.doris.nereids.trees.expressions.literal.NullLiteral; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate; +import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; +import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation; +import org.apache.doris.nereids.trees.plans.logical.LogicalProject; +import org.apache.doris.rpc.RpcException; +import org.apache.doris.statistics.util.StatisticsUtil; + +import com.google.common.collect.ImmutableList; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.Set; + +/** + * For simple aggregation queries like + * 'select count(*), count(not-null column), min(col), max(col) from olap_table', + * rewrite them to return constants directly from FE metadata, completely bypassing BE. + * + * <p>COUNT uses table.getRowCount(). + * MIN/MAX uses an async FE-side cache ({@link SimpleAggCacheMgr}) that stores exact values + * obtained via internal SQL queries, NOT sampled ColumnStatistic. + * + * <p>Conditions: + * 1. DUP_KEYS table only (AGG_KEYS: rowCount inflated; UNIQUE_KEYS: min/max may be inaccurate + * in MoW model before compaction merges delete-marked rows) + * 2. No GROUP BY + * 3. Only COUNT / MIN / MAX aggregate functions + * 4. COUNT(col): col must be NOT NULL (so count(col) == rowCount) + * 5. MIN/MAX(col): col must be numeric or date type, and not aggregated + */ +public class RewriteSimpleAggToConstantRule implements RewriteRuleFactory { + + @Override + public List<Rule> buildRules() { + return ImmutableList.of( + // pattern: agg -> scan + logicalAggregate(logicalOlapScan()) + .thenApply(ctx -> { + LogicalAggregate<LogicalOlapScan> agg = ctx.root; + LogicalOlapScan olapScan = agg.child(); + return tryRewrite(agg, olapScan, ctx.statementContext); + }) + .toRule(RuleType.REWRITE_SIMPLE_AGG_TO_CONSTANT), + // pattern: agg -> project -> scan + logicalAggregate(logicalProject(logicalOlapScan())) + .thenApply(ctx -> { + LogicalAggregate<?> agg = ctx.root; + LogicalOlapScan olapScan = (LogicalOlapScan) ctx.root.child().child(); + return tryRewrite(agg, olapScan, ctx.statementContext); + }) + .toRule(RuleType.REWRITE_SIMPLE_AGG_TO_CONSTANT) + ); + } + + private Plan tryRewrite(LogicalAggregate<?> agg, LogicalOlapScan olapScan, + StatementContext statementContext) { + if (olapScan.isIndexSelected() + || !olapScan.getManuallySpecifiedPartitions().isEmpty() + || !olapScan.getManuallySpecifiedTabletIds().isEmpty() + || olapScan.getTableSample().isPresent()) { + return null; + } + OlapTable table = olapScan.getTable(); + + // Condition 1: DUP_KEYS only. + // - DUP_KEYS: FE rowCount equals actual count(*); min/max are accurate. + // - AGG_KEYS: rowCount may be inflated before full compaction. + // - UNIQUE_KEYS: in MoW model, min/max may include values from delete-marked rows + // not yet compacted, so the result could be inaccurate. + if (table.getKeysType() != KeysType.DUP_KEYS) { + return null; + } + + // Condition 2: No GROUP BY + if (!agg.getGroupByExpressions().isEmpty()) { + return null; + } + + // Condition 3: Only COUNT / MIN / MAX aggregate functions. + Set<AggregateFunction> funcs = agg.getAggregateFunctions(); + if (funcs.isEmpty()) { + return null; + } + for (AggregateFunction func : funcs) { + if (!(func instanceof Count) && !(func instanceof Min) && !(func instanceof Max)) { + return null; + } + } + + // Try to compute a constant for each output expression. + // If ANY one cannot be replaced, we give up the entire rewrite. + List<NamedExpression> newOutputExprs = new ArrayList<>(); + for (NamedExpression outputExpr : agg.getOutputExpressions()) { + if (!(outputExpr instanceof Alias)) { + // Unexpected: for no-group-by aggregates, outputs should all be aliases over agg funcs + return null; + } + Alias alias = (Alias) outputExpr; + Expression child = alias.child(); + if (!(child instanceof AggregateFunction)) { + return null; + } + AggregateFunction func = (AggregateFunction) child; + Optional<Literal> constant = tryGetConstant(func, table); + if (!constant.isPresent()) { + // Cannot replace this agg function — give up + return null; + } + newOutputExprs.add(new Alias(alias.getExprId(), constant.get(), alias.getName())); + } + + if (newOutputExprs.isEmpty()) { + return null; + } + + // Build: LogicalProject(constants) -> LogicalOneRowRelation(dummy) + // The OneRowRelation provides a single-row source; all real values come from the project. + LogicalOneRowRelation oneRowRelation = new LogicalOneRowRelation( + statementContext.getNextRelationId(), + ImmutableList.of(new Alias(new NullLiteral(), "__dummy__"))); + return new LogicalProject<>(newOutputExprs, oneRowRelation); + } + + /** + * Try to compute a compile-time constant value for the given aggregate function, + * using FE-side cached row-counts (for COUNT) or exact min/max cache (for MIN/MAX). + * All values are obtained via internal SQL queries (SELECT count/min/max), + * NOT from BE tablet stats reporting, to avoid delayed-reporting and version issues. + */ + private Optional<Literal> tryGetConstant(AggregateFunction func, OlapTable table) { + if (func.isDistinct()) { + return Optional.empty(); + } + + long version; + try { + version = table.getVisibleVersion(); + } catch (RpcException e) { + return Optional.empty(); + } + // Bug: version not changed after truncating table + if (table.selectNonEmptyPartitionIds(table.getPartitionIds()).isEmpty()) { + return Optional.empty(); + } + // --- COUNT --- + if (func instanceof Count) { + // Look up exact row count from the async cache. + // The count is obtained by executing "SELECT count(*) FROM table" internally, + // so it is accurate and versioned, unlike BE tablet stats reporting which + // has delayed-reporting and version-mismatch issues. + OptionalLong cachedCount = SimpleAggCacheMgr.internalInstance() + .getRowCount(table.getId(), version); + if (!cachedCount.isPresent()) { + return Optional.empty(); + } + long rowCount = cachedCount.getAsLong(); + if (func.getArguments().isEmpty()) { + // count(*) or count() + return Optional.of(new BigIntLiteral(rowCount)); + } + if (func.getArguments().size() == 1) { + Expression arg = func.getArguments().get(0); + if (arg instanceof SlotReference) { + Optional<Column> colOpt = ((SlotReference) arg).getOriginalColumn(); + // count(not-null col) == rowCount + if (colOpt.isPresent() && !colOpt.get().isAllowNull()) { + return Optional.of(new BigIntLiteral(rowCount)); + } + } + } + return Optional.empty(); + } + + // --- MIN / MAX --- + if (func instanceof Min || func instanceof Max) { + if (func.getArguments().size() != 1) { + return Optional.empty(); + } + Expression arg = func.getArguments().get(0); + if (!(arg instanceof SlotReference)) { + return Optional.empty(); + } + SlotReference slot = (SlotReference) arg; + Optional<Column> colOpt = slot.getOriginalColumn(); + if (!colOpt.isPresent()) { + return Optional.empty(); + } + Column column = colOpt.get(); + // Only numeric and date/datetime columns are supported + if (!column.getType().isNumericType() && !column.getType().isDateType()) { + return Optional.empty(); + } + // Aggregated columns cannot give correct min/max + if (column.isAggregated()) { + return Optional.empty(); + } + + // Look up exact min/max from the async cache + ColumnMinMaxKey cacheKey = new ColumnMinMaxKey(table.getId(), column.getName()); + Optional<ColumnMinMax> minMax = SimpleAggCacheMgr.internalInstance().getStats(cacheKey, version); + if (!minMax.isPresent()) { + return Optional.empty(); + } + + // Convert the string value to a Nereids Literal + try { + String value = (func instanceof Min) ? minMax.get().minValue() : minMax.get().maxValue(); + LiteralExpr legacyLiteral = StatisticsUtil.readableValue(column.getType(), value); + return Optional.of(Literal.fromLegacyLiteral(legacyLiteral, column.getType())); + } catch (Exception e) { + return Optional.empty(); + } + } + + return Optional.empty(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/SimpleAggCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/SimpleAggCacheMgr.java new file mode 100644 index 00000000000..4660e977ef8 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/SimpleAggCacheMgr.java @@ -0,0 +1,456 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.stats; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.Config; +import org.apache.doris.common.ThreadPoolManager; +import org.apache.doris.qe.AutoCloseConnectContext; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.statistics.ResultRow; +import org.apache.doris.statistics.util.StatisticsUtil; + +import com.github.benmanes.caffeine.cache.AsyncCacheLoader; +import com.github.benmanes.caffeine.cache.AsyncLoadingCache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.google.common.annotations.VisibleForTesting; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.checkerframework.checker.nullness.qual.NonNull; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; + +/** + * Async cache that stores exact MIN/MAX/COUNT values for OlapTable, + * used by {@code RewriteSimpleAggToConstantRule} to replace simple + * aggregations with constant values. + * + * <p>MIN/MAX values are obtained by executing + * {@code SELECT min(col), max(col) FROM table}, and COUNT values by + * {@code SELECT count(*) FROM table}, both as internal SQL queries + * inside FE. Results are cached with a version stamp derived from + * {@code OlapTable.getVisibleVersionTime()}. + * When a caller provides a version newer than the cached version, + * the stale entry is evicted and a background reload is triggered. + * + * <p>Only numeric and date/datetime columns are cached for MIN/MAX; + * aggregated columns are skipped. + */ +public class SimpleAggCacheMgr { + + // ======================== Public inner types ======================== + + /** + * Holds exact min and max values for a column as strings. + */ + public static class ColumnMinMax { + private final String minValue; + private final String maxValue; + + public ColumnMinMax(String minValue, String maxValue) { + this.minValue = minValue; + this.maxValue = maxValue; + } + + public String minValue() { + return minValue; + } + + public String maxValue() { + return maxValue; + } + } + + /** + * Cache key identifying a column by its table ID and column name. + */ + public static final class ColumnMinMaxKey { + private final long tableId; + private final String columnName; + + public ColumnMinMaxKey(long tableId, String columnName) { + this.tableId = tableId; + this.columnName = columnName; + } + + public long getTableId() { + return tableId; + } + + public String getColumnName() { + return columnName; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof ColumnMinMaxKey)) { + return false; + } + ColumnMinMaxKey that = (ColumnMinMaxKey) o; + return tableId == that.tableId && columnName.equalsIgnoreCase(that.columnName); + } + + @Override + public int hashCode() { + return Objects.hash(tableId, columnName.toLowerCase()); + } + + @Override + public String toString() { + return "ColumnMinMaxKey{tableId=" + tableId + ", column=" + columnName + "}"; + } + } + + private static class CacheValue { + private final ColumnMinMax minMax; + private final long version; + + CacheValue(ColumnMinMax minMax, long version) { + this.minMax = minMax; + this.version = version; + } + + ColumnMinMax minMax() { + return minMax; + } + + long version() { + return version; + } + } + + /** + * Cached row count with version stamp. + */ + private static class RowCountValue { + private final long rowCount; + private final long version; + + RowCountValue(long rowCount, long version) { + this.rowCount = rowCount; + this.version = version; + } + + long rowCount() { + return rowCount; + } + + long version() { + return version; + } + } + + private static final Logger LOG = LogManager.getLogger(SimpleAggCacheMgr.class); + + private static volatile SimpleAggCacheMgr INSTANCE; + private static volatile SimpleAggCacheMgr TEST_INSTANCE; + + private final AsyncLoadingCache<ColumnMinMaxKey, Optional<CacheValue>> cache; + private final AsyncLoadingCache<Long, Optional<RowCountValue>> rowCountCache; + + /** + * Protected no-arg constructor for test subclassing. + * Subclasses override {@link #getStats}, {@link #getRowCount}, etc. + */ + protected SimpleAggCacheMgr() { + this.cache = null; + this.rowCountCache = null; + } + + private SimpleAggCacheMgr(ExecutorService executor) { + this.cache = Caffeine.newBuilder() + .maximumSize(Config.stats_cache_size) + .executor(executor) + .buildAsync(new CacheLoader()); + this.rowCountCache = Caffeine.newBuilder() + .maximumSize(Config.stats_cache_size) + .executor(executor) + .buildAsync(new RowCountLoader()); + } + + private static SimpleAggCacheMgr getInstance() { + if (INSTANCE == null) { + synchronized (SimpleAggCacheMgr.class) { + if (INSTANCE == null) { + ExecutorService executor = ThreadPoolManager.newDaemonCacheThreadPool( + 4, "simple-agg-cache-pool", true); + INSTANCE = new SimpleAggCacheMgr(executor); + } + } + } + return INSTANCE; + } + + /** + * Returns the singleton instance backed by async-loading cache, + * or the test override if one has been set. + */ + public static SimpleAggCacheMgr internalInstance() { + SimpleAggCacheMgr test = TEST_INSTANCE; + if (test != null) { + return test; + } + return getInstance(); + } + + /** + * Override used only in unit tests to inject a mock implementation. + */ + @VisibleForTesting + public static void setTestInstance(SimpleAggCacheMgr instance) { + TEST_INSTANCE = instance; + } + + /** + * Reset the test override so that subsequent calls go back to the real cache. + */ + @VisibleForTesting + public static void clearTestInstance() { + TEST_INSTANCE = null; + } + + /** + * Get the cached min/max for a column. + */ + public Optional<ColumnMinMax> getStats(ColumnMinMaxKey key, long version) { + CompletableFuture<Optional<CacheValue>> future = cache.get(key); + if (future.isDone()) { + try { + Optional<CacheValue> cacheValue = future.get(); + if (cacheValue.isPresent()) { + CacheValue value = cacheValue.get(); + if (value.version() >= version) { + return Optional.of(value.minMax()); + } + } + // Either empty (load failed / version changed during load) + // or stale — evict so next call triggers a fresh reload. + cache.synchronous().invalidate(key); + } catch (Exception e) { + LOG.warn("Failed to get MinMax for column: {}, version: {}", key, version, e); + cache.synchronous().invalidate(key); + } + } + return Optional.empty(); + } + + /** + * Evict the cached stats for a column, if present. Used when we know the data has changed + */ + public void removeStats(ColumnMinMaxKey key) { + cache.synchronous().invalidate(key); + } + + /** + * Get the cached row count for a table. + */ + public OptionalLong getRowCount(long tableId, long version) { + CompletableFuture<Optional<RowCountValue>> future = rowCountCache.get(tableId); + if (future.isDone()) { + try { + Optional<RowCountValue> cached = future.get(); + if (cached.isPresent()) { + RowCountValue value = cached.get(); + if (value.version() >= version) { + return OptionalLong.of(value.rowCount()); + } + } + // Either empty (load failed / version changed during load) + // or stale — evict so next call triggers a fresh reload. + rowCountCache.synchronous().invalidate(tableId); + } catch (Exception e) { + LOG.warn("Failed to get row count for table: {}, version: {}", tableId, version, e); + rowCountCache.synchronous().invalidate(tableId); + } + } + return OptionalLong.empty(); + } + + /** + * Generate the internal SQL for fetching exact min/max values. + */ + @VisibleForTesting + public static String genMinMaxSql(List<String> qualifiers, String columnName) { + // qualifiers: [catalogName, dbName, tableName] + String quotedCol = "`" + StatisticsUtil.escapeColumnName(columnName) + "`"; + String fullTable = "`" + qualifiers.get(0) + "`.`" + + qualifiers.get(1) + "`.`" + + qualifiers.get(2) + "`"; + return "SELECT min(" + quotedCol + "), max(" + quotedCol + ") FROM " + fullTable; + } + + /** + * Generate the internal SQL for fetching exact row count. + */ + @VisibleForTesting + public static String genCountSql(List<String> qualifiers) { + String fullTable = "`" + qualifiers.get(0) + "`.`" + + qualifiers.get(1) + "`.`" + + qualifiers.get(2) + "`"; + return "SELECT count(*) FROM " + fullTable; + } + + /** + * Async cache loader that issues internal SQL queries to compute exact min/max. + */ + protected static final class CacheLoader + implements AsyncCacheLoader<ColumnMinMaxKey, Optional<CacheValue>> { + + @Override + public @NonNull CompletableFuture<Optional<CacheValue>> asyncLoad( + @NonNull ColumnMinMaxKey key, @NonNull Executor executor) { + return CompletableFuture.supplyAsync(() -> { + try { + return doLoad(key); + } catch (Exception e) { + LOG.warn("Failed to load MinMax for column: {}", key, e); + return Optional.empty(); + } + }, executor); + } + + private Optional<CacheValue> doLoad(ColumnMinMaxKey key) throws Exception { + // Look up the table by its ID + TableIf tableIf = Env.getCurrentInternalCatalog().getTableByTableId(key.getTableId()); + if (!(tableIf instanceof OlapTable)) { + return Optional.empty(); + } + OlapTable olapTable = (OlapTable) tableIf; + + // Validate column exists and is eligible + Column column = olapTable.getColumn(key.getColumnName()); + if (column == null) { + return Optional.empty(); + } + if (!column.getType().isNumericType() && !column.getType().isDateType()) { + return Optional.empty(); + } + if (column.isAggregated()) { + return Optional.empty(); + } + + // Use table-level visibleVersion (strictly monotonic) for cache staleness check, + // consistent with how the caller (RewriteSimpleAggToConstantRule) obtains the version. + long version = olapTable.getVisibleVersion(); + + // Build and execute internal SQL + List<String> qualifiers = olapTable.getFullQualifiers(); + String sql = genMinMaxSql(qualifiers, column.getName()); + + List<ResultRow> rows; + try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(false)) { + r.connectContext.getSessionVariable().setPipelineTaskNum("1"); + // Disable our own rule to prevent infinite recursion: + // this internal SQL goes through Nereids and would otherwise trigger + // RewriteSimpleAggToConstantRule again. + r.connectContext.getSessionVariable().setDisableNereidsRules( + "REWRITE_SIMPLE_AGG_TO_CONSTANT"); + StmtExecutor stmtExecutor = new StmtExecutor(r.connectContext, sql); + rows = stmtExecutor.executeInternalQuery(); + } + if (rows == null || rows.isEmpty()) { + return Optional.empty(); + } + ResultRow row = rows.get(0); + String minVal = row.get(0); + String maxVal = row.get(1); + if (minVal == null || maxVal == null) { + return Optional.empty(); + } + // Re-check version after query execution to detect concurrent data changes. + // If the version changed during the query, the result is unreliable. + long versionAfter = olapTable.getVisibleVersion(); + if (versionAfter != version) { + return Optional.empty(); + } + return Optional.of(new CacheValue(new ColumnMinMax(minVal, maxVal), version)); + } + } + + /** + * Async cache loader that issues {@code SELECT count(*) FROM table} + * to compute exact row counts. + */ + protected static final class RowCountLoader + implements AsyncCacheLoader<Long, Optional<RowCountValue>> { + + @Override + public @NonNull CompletableFuture<Optional<RowCountValue>> asyncLoad( + @NonNull Long tableId, @NonNull Executor executor) { + return CompletableFuture.supplyAsync(() -> { + try { + return doLoad(tableId); + } catch (Exception e) { + LOG.warn("Failed to load row count for table: {}", tableId, e); + return Optional.empty(); + } + }, executor); + } + + private Optional<RowCountValue> doLoad(Long tableId) throws Exception { + TableIf tableIf = Env.getCurrentInternalCatalog().getTableByTableId(tableId); + if (!(tableIf instanceof OlapTable)) { + return Optional.empty(); + } + OlapTable olapTable = (OlapTable) tableIf; + + long version = olapTable.getVisibleVersion(); + + List<String> qualifiers = olapTable.getFullQualifiers(); + String sql = genCountSql(qualifiers); + + List<ResultRow> rows; + try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(false)) { + r.connectContext.getSessionVariable().setPipelineTaskNum("1"); + // Disable our own rule to prevent infinite recursion: + // this internal SQL goes through Nereids and would otherwise trigger + // RewriteSimpleAggToConstantRule again. + r.connectContext.getSessionVariable().setDisableNereidsRules( + "REWRITE_SIMPLE_AGG_TO_CONSTANT"); + StmtExecutor stmtExecutor = new StmtExecutor(r.connectContext, sql); + rows = stmtExecutor.executeInternalQuery(); + } + if (rows == null || rows.isEmpty()) { + return Optional.empty(); + } + String countStr = rows.get(0).get(0); + if (countStr == null) { + return Optional.empty(); + } + long count = Long.parseLong(countStr); + // Re-check version after query execution to detect concurrent data changes. + long versionAfter = olapTable.getVisibleVersion(); + if (versionAfter != version) { + return Optional.empty(); + } + return Optional.of(new RowCountValue(count, version)); + } + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/RewriteSimpleAggToConstantRuleTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/RewriteSimpleAggToConstantRuleTest.java new file mode 100644 index 00000000000..8adf0b6b21f --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/RewriteSimpleAggToConstantRuleTest.java @@ -0,0 +1,292 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.rewrite; + +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.nereids.stats.SimpleAggCacheMgr; +import org.apache.doris.nereids.stats.SimpleAggCacheMgr.ColumnMinMax; +import org.apache.doris.nereids.stats.SimpleAggCacheMgr.ColumnMinMaxKey; +import org.apache.doris.nereids.util.MemoPatternMatchSupported; +import org.apache.doris.nereids.util.PlanChecker; +import org.apache.doris.utframe.TestWithFeService; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Test; + +import java.util.Optional; +import java.util.OptionalLong; + +/** + * Tests for {@link RewriteSimpleAggToConstantRule}. + * + * <p>This rule rewrites simple aggregation queries (count/min/max on DUP_KEYS tables + * without GROUP BY) into constant values from FE metadata, producing + * LogicalProject -> LogicalOneRowRelation plans. + */ +class RewriteSimpleAggToConstantRuleTest extends TestWithFeService implements MemoPatternMatchSupported { + + @Override + protected void runBeforeAll() throws Exception { + createDatabase("test"); + + // DUP_KEYS table with NOT NULL columns + createTable("CREATE TABLE test.dup_tbl (\n" + + " k1 INT NOT NULL,\n" + + " v1 INT NOT NULL,\n" + + " v2 BIGINT NOT NULL,\n" + + " v3 DATE NOT NULL,\n" + + " v4 VARCHAR(128)\n" + + ") DUPLICATE KEY(k1)\n" + + "DISTRIBUTED BY HASH(k1) BUCKETS 1\n" + + "PROPERTIES('replication_num' = '1');"); + + // UNIQUE_KEYS table (should NOT be rewritten) + createTable("CREATE TABLE test.uniq_tbl (\n" + + " k1 INT NOT NULL,\n" + + " v1 INT NOT NULL\n" + + ") UNIQUE KEY(k1)\n" + + "DISTRIBUTED BY HASH(k1) BUCKETS 1\n" + + "PROPERTIES('replication_num' = '1');"); + + // AGG_KEYS table (should NOT be rewritten) + createTable("CREATE TABLE test.agg_tbl (\n" + + " k1 INT NOT NULL,\n" + + " v1 INT SUM NOT NULL\n" + + ") AGGREGATE KEY(k1)\n" + + "DISTRIBUTED BY HASH(k1) BUCKETS 1\n" + + "PROPERTIES('replication_num' = '1');"); + + connectContext.setDatabase("test"); + connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION"); + + // Install a mock SimpleAggCacheMgr that returns known min/max and row count for dup_tbl + SimpleAggCacheMgr.setTestInstance(new MockMinMaxStatsMgr()); + } + + @AfterAll + public static void tearDown() { + SimpleAggCacheMgr.clearTestInstance(); + } + + private OlapTable getOlapTable(String tableName) throws Exception { + Database db = Env.getCurrentInternalCatalog().getDbOrMetaException("test"); + return (OlapTable) db.getTableOrMetaException(tableName); + } + + // ======================== Positive tests: should rewrite ======================== + + @Test + void testCountStarRewrite() { + // count(*) on DUP_KEYS with reported row counts → rewrite to constant + PlanChecker.from(connectContext) + .analyze("SELECT count(*) FROM dup_tbl") + .rewrite() + .matches(logicalResultSink(logicalOneRowRelation())) + .printlnTree(); + } + + @Test + void testCountNotNullColumnRewrite() { + // count(not-null column) on DUP_KEYS → rewrite to constant (equals rowCount) + PlanChecker.from(connectContext) + .analyze("SELECT count(k1) FROM dup_tbl") + .rewrite() + .matches(logicalResultSink(logicalOneRowRelation())) + .printlnTree(); + } + + @Test + void testMinRewrite() { + // min(int col) on DUP_KEYS with cache hit → rewrite to constant + PlanChecker.from(connectContext) + .analyze("SELECT min(v1) FROM dup_tbl") + .rewrite() + .matches(logicalResultSink(logicalOneRowRelation())) + .printlnTree(); + } + + @Test + void testMaxRewrite() { + // max(bigint col) on DUP_KEYS with cache hit → rewrite to constant + PlanChecker.from(connectContext) + .analyze("SELECT max(v2) FROM dup_tbl") + .rewrite() + .matches(logicalResultSink(logicalOneRowRelation())) + .printlnTree(); + } + + @Test + void testMinMaxDateRewrite() { + // min/max(date col) on DUP_KEYS with cache hit → rewrite to constant + PlanChecker.from(connectContext) + .analyze("SELECT min(v3), max(v3) FROM dup_tbl") + .rewrite() + .matches(logicalResultSink(logicalOneRowRelation())) + .printlnTree(); + } + + @Test + void testMixedCountMinMax() { + // count(*), min(v1), max(v2) on DUP_KEYS → rewrite to constant + PlanChecker.from(connectContext) + .analyze("SELECT count(*), min(v1), max(v2) FROM dup_tbl") + .rewrite() + .matches(logicalResultSink(logicalOneRowRelation())) + .printlnTree(); + } + + // ======================== Negative tests: should NOT rewrite ======================== + + @Test + void testUniqueKeysNotRewrite() { + // UNIQUE_KEYS table → rule should NOT trigger + PlanChecker.from(connectContext) + .analyze("SELECT count(*) FROM uniq_tbl") + .rewrite() + .nonMatch(logicalResultSink(logicalOneRowRelation())); + } + + @Test + void testAggKeysNotRewrite() { + // AGG_KEYS table → rule should NOT trigger + PlanChecker.from(connectContext) + .analyze("SELECT count(*) FROM agg_tbl") + .rewrite() + .nonMatch(logicalResultSink(logicalOneRowRelation())); + } + + @Test + void testGroupByNotRewrite() { + // GROUP BY present → rule should NOT trigger + PlanChecker.from(connectContext) + .analyze("SELECT count(*) FROM dup_tbl GROUP BY k1") + .rewrite() + .nonMatch(logicalResultSink(logicalOneRowRelation())); + } + + @Test + void testUnsupportedAggFuncNotRewrite() { + // SUM is not supported → rule should NOT trigger + PlanChecker.from(connectContext) + .analyze("SELECT sum(v1) FROM dup_tbl") + .rewrite() + .nonMatch(logicalResultSink(logicalOneRowRelation())); + } + + @Test + void testAvgNotRewrite() { + // AVG is not supported → rule should NOT trigger + PlanChecker.from(connectContext) + .analyze("SELECT avg(v1) FROM dup_tbl") + .rewrite() + .nonMatch(logicalResultSink(logicalOneRowRelation())); + } + + @Test + void testDistinctCountNotRewrite() { + // count(distinct col) → rule should NOT trigger + PlanChecker.from(connectContext) + .analyze("SELECT count(distinct k1) FROM dup_tbl") + .rewrite() + .nonMatch(logicalResultSink(logicalOneRowRelation())); + } + + @Test + void testCountNullableColumnNotRewrite() { + // count(nullable column v4) → cannot guarantee count(v4) == rowCount, rule should NOT trigger + PlanChecker.from(connectContext) + .analyze("SELECT count(v4) FROM dup_tbl") + .rewrite() + .nonMatch(logicalResultSink(logicalOneRowRelation())); + } + + @Test + void testMinMaxStringColumnNotRewrite() { + // min(varchar col) → not supported for string types + PlanChecker.from(connectContext) + .analyze("SELECT min(v4) FROM dup_tbl") + .rewrite() + .nonMatch(logicalResultSink(logicalOneRowRelation())); + } + + @Test + void testMixedSupportedAndUnsupportedNotRewrite() { + // If ANY agg function cannot be replaced, the entire rewrite is skipped. + // count(*) can be replaced, but sum(v1) cannot → entire query NOT rewritten + PlanChecker.from(connectContext) + .analyze("SELECT count(*), sum(v1) FROM dup_tbl") + .rewrite() + .nonMatch(logicalResultSink(logicalOneRowRelation())); + } + + // ======================== Mock SimpleAggCacheMgr ======================== + + /** + * A simple mock that returns known min/max values and row count for dup_tbl. + * It accepts any version (returns the values regardless of version). + */ + private class MockMinMaxStatsMgr extends SimpleAggCacheMgr { + + @Override + public Optional<ColumnMinMax> getStats(ColumnMinMaxKey key, long version) { + try { + OlapTable table = getOlapTable("dup_tbl"); + if (key.getTableId() != table.getId()) { + return Optional.empty(); + } + } catch (Exception e) { + return Optional.empty(); + } + + String colName = key.getColumnName().toLowerCase(); + switch (colName) { + case "k1": + return Optional.of(new ColumnMinMax("1", "100")); + case "v1": + return Optional.of(new ColumnMinMax("10", "999")); + case "v2": + return Optional.of(new ColumnMinMax("100", "99999")); + case "v3": + return Optional.of(new ColumnMinMax("2024-01-01", "2025-12-31")); + default: + // v4 (varchar) and unknown columns → no cache + return Optional.empty(); + } + } + + @Override + public OptionalLong getRowCount(long tableId, long version) { + try { + OlapTable table = getOlapTable("dup_tbl"); + if (tableId == table.getId()) { + return OptionalLong.of(100L); + } + } catch (Exception e) { + // fall through + } + return OptionalLong.empty(); + } + + @Override + public void removeStats(ColumnMinMaxKey key) { + // no-op for mock + } + } +} diff --git a/regression-test/data/nereids_rules_p0/rewrite_simple_agg_to_constant/rewrite_simple_agg_to_constant.out b/regression-test/data/nereids_rules_p0/rewrite_simple_agg_to_constant/rewrite_simple_agg_to_constant.out new file mode 100644 index 00000000000..a982c987c22 --- /dev/null +++ b/regression-test/data/nereids_rules_p0/rewrite_simple_agg_to_constant/rewrite_simple_agg_to_constant.out @@ -0,0 +1,37 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !count_star -- +5 + +-- !count_notnull -- +5 + +-- !min_int -- +10 + +-- !max_int -- +50 + +-- !min_bigint -- +100 + +-- !max_bigint -- +500 + +-- !min_date -- +2024-01-01 + +-- !max_date -- +2025-06-30 + +-- !mixed -- +5 10 500 + +-- !count_nullable -- +4 + +-- !uniq_count -- +3 + +-- !agg_count -- +3 + diff --git a/regression-test/suites/mv_p0/agg_use_key_direct/agg_use_key_direct.groovy b/regression-test/suites/mv_p0/agg_use_key_direct/agg_use_key_direct.groovy index 4a866c20e3c..938d015b7ff 100644 --- a/regression-test/suites/mv_p0/agg_use_key_direct/agg_use_key_direct.groovy +++ b/regression-test/suites/mv_p0/agg_use_key_direct/agg_use_key_direct.groovy @@ -23,7 +23,7 @@ suite ("agg_use_key_direct") { // this mv rewrite would not be rewritten in RBO phase, so set TRY_IN_RBO explicitly to make case stable sql "set pre_materialized_view_rewrite_strategy = TRY_IN_RBO" - + sql "set disable_nereids_rules='REWRITE_SIMPLE_AGG_TO_CONSTANT'"; sql "drop table if exists ${tblName} force;" sql """ create table ${tblName} ( diff --git a/regression-test/suites/nereids_p0/hint/test_hint.groovy b/regression-test/suites/nereids_p0/hint/test_hint.groovy index 2e89cabafe2..e7638440a11 100644 --- a/regression-test/suites/nereids_p0/hint/test_hint.groovy +++ b/regression-test/suites/nereids_p0/hint/test_hint.groovy @@ -27,7 +27,7 @@ suite("test_hint") { sql 'set exec_mem_limit=21G' sql 'set be_number_for_test=1' sql 'set parallel_pipeline_task_num=1' - sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION" + sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION, REWRITE_SIMPLE_AGG_TO_CONSTANT" sql 'set enable_nereids_planner=true' sql 'set enable_nereids_distribute_planner=false' sql "set ignore_shape_nodes='PhysicalProject'" diff --git a/regression-test/suites/nereids_rules_p0/rewrite_simple_agg_to_constant/rewrite_simple_agg_to_constant.groovy b/regression-test/suites/nereids_rules_p0/rewrite_simple_agg_to_constant/rewrite_simple_agg_to_constant.groovy new file mode 100644 index 00000000000..2aed9f841ad --- /dev/null +++ b/regression-test/suites/nereids_rules_p0/rewrite_simple_agg_to_constant/rewrite_simple_agg_to_constant.groovy @@ -0,0 +1,317 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("rewrite_simple_agg_to_constant") { + sql "SET enable_nereids_planner=true" + sql "SET enable_fallback_to_original_planner=false" + + sql "DROP DATABASE IF EXISTS test_rewrite_simple_agg_constant" + sql "CREATE DATABASE test_rewrite_simple_agg_constant" + sql "USE test_rewrite_simple_agg_constant" + + // ========== Create test tables ========== + + // DUP_KEYS table with NOT NULL columns + sql """ + CREATE TABLE dup_tbl ( + k1 INT NOT NULL, + v1 INT NOT NULL, + v2 BIGINT NOT NULL, + v3 DATE NOT NULL, + v4 VARCHAR(128) + ) DUPLICATE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES('replication_num' = '1'); + """ + + // UNIQUE_KEYS table + sql """ + CREATE TABLE uniq_tbl ( + k1 INT NOT NULL, + v1 INT NOT NULL + ) UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES('replication_num' = '1'); + """ + + // AGG_KEYS table + sql """ + CREATE TABLE agg_tbl ( + k1 INT NOT NULL, + v1 INT SUM NOT NULL + ) AGGREGATE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES('replication_num' = '1'); + """ + + // ========== Insert test data ========== + sql """ + INSERT INTO dup_tbl VALUES + (1, 10, 100, '2024-01-01', 'aaa'), + (2, 20, 200, '2024-06-15', 'bbb'), + (3, 30, 300, '2024-12-31', null), + (4, 40, 400, '2025-03-01', 'ddd'), + (5, 50, 500, '2025-06-30', 'eee'); + """ + + sql """ + INSERT INTO uniq_tbl VALUES (1, 10), (2, 20), (3, 30); + """ + + sql """ + INSERT INTO agg_tbl VALUES (1, 10), (2, 20), (3, 30); + """ + + // Wait a bit for tablet stats to be reported to FE + sleep(3000) + + // =================================================================== + // Warm up the SimpleAggCacheMgr async cache. + // + // The first call to getStats()/getRowCount() triggers an async load; + // the result is not available until the internal SQL finishes. + // We poll until explain shows "constant exprs", which proves the cache + // entry is loaded and the rule can fire. + // =================================================================== + // Trigger cache loads for all columns we'll test + sql "SELECT count(*) FROM dup_tbl" + sql "SELECT min(v1), max(v1) FROM dup_tbl" + sql "SELECT min(v2), max(v2) FROM dup_tbl" + sql "SELECT min(v3), max(v3) FROM dup_tbl" + + // Poll until the rule fires (cache is warm) + def warmUpSql = "SELECT count(*), min(v1), max(v2), min(v3) FROM dup_tbl" + def cacheReady = false + for (int i = 0; i < 30; i++) { + def explainResult = sql "EXPLAIN ${warmUpSql}" + if (explainResult.toString().contains("constant exprs")) { + cacheReady = true + break + } + sleep(1000) + } + if (!cacheReady) { + if (isCloudMode()) { + logger.info("SimpleAggCacheMgr cache did not warm up within 30s in cloud mode, skip remaining tests") + return + } + assertTrue(false, "SimpleAggCacheMgr cache did not warm up within 30 seconds") + } + + // =================================================================== + // Positive tests: verify the rule IS applied. + // The cache is confirmed warm from the poll above. + // =================================================================== + + // count(*) + explain { + sql("SELECT count(*) FROM dup_tbl") + contains "constant exprs" + } + order_qt_count_star """SELECT count(*) FROM dup_tbl;""" + + // count(not-null column) + explain { + sql("SELECT count(k1) FROM dup_tbl") + contains "constant exprs" + } + order_qt_count_notnull """SELECT count(k1) FROM dup_tbl;""" + + // min(int) + explain { + sql("SELECT min(v1) FROM dup_tbl") + contains "constant exprs" + } + order_qt_min_int """SELECT min(v1) FROM dup_tbl;""" + + // max(int) + explain { + sql("SELECT max(v1) FROM dup_tbl") + contains "constant exprs" + } + order_qt_max_int """SELECT max(v1) FROM dup_tbl;""" + + // min(bigint) + explain { + sql("SELECT min(v2) FROM dup_tbl") + contains "constant exprs" + } + order_qt_min_bigint """SELECT min(v2) FROM dup_tbl;""" + + // max(bigint) + explain { + sql("SELECT max(v2) FROM dup_tbl") + contains "constant exprs" + } + order_qt_max_bigint """SELECT max(v2) FROM dup_tbl;""" + + // min(date) + explain { + sql("SELECT min(v3) FROM dup_tbl") + contains "constant exprs" + } + order_qt_min_date """SELECT min(v3) FROM dup_tbl;""" + + // max(date) + explain { + sql("SELECT max(v3) FROM dup_tbl") + contains "constant exprs" + } + order_qt_max_date """SELECT max(v3) FROM dup_tbl;""" + + // Mixed: count(*), min, max together + explain { + sql("SELECT count(*), min(v1), max(v2) FROM dup_tbl") + contains "constant exprs" + } + order_qt_mixed """SELECT count(*), min(v1), max(v2) FROM dup_tbl;""" + + // =================================================================== + // Negative tests: these queries should NEVER be rewritten. + // The cache is confirmed warm (the poll + positive tests above proved + // count/min/max for dup_tbl all hit cache). So if these plans do NOT + // contain "constant exprs", the rule actively rejected them. + // =================================================================== + + // Non-DUP_KEYS: UNIQUE_KEYS table should not be rewritten + explain { + sql("SELECT count(*) FROM uniq_tbl") + notContains "constant exprs" + } + + // Non-DUP_KEYS: AGG_KEYS table should not be rewritten + explain { + sql("SELECT count(*) FROM agg_tbl") + notContains "constant exprs" + } + + // GROUP BY present → should not be rewritten + explain { + sql("SELECT count(*) FROM dup_tbl GROUP BY k1") + notContains "constant exprs" + } + + // Unsupported aggregate function: SUM (cache for v1 is already warm from min/max above) + explain { + sql("SELECT sum(v1) FROM dup_tbl") + notContains "constant exprs" + } + + // Unsupported aggregate function: AVG + explain { + sql("SELECT avg(v1) FROM dup_tbl") + notContains "constant exprs" + } + + // DISTINCT count + explain { + sql("SELECT count(distinct k1) FROM dup_tbl") + notContains "constant exprs" + } + + // count(nullable column) → cannot guarantee count(v4) equals row count + explain { + sql("SELECT count(v4) FROM dup_tbl") + notContains "constant exprs" + } + + // min/max on string column → not supported (row count cache is warm) + explain { + sql("SELECT min(v4) FROM dup_tbl") + notContains "constant exprs" + } + + // Mixed supported (count) and unsupported (sum) → entire query NOT rewritten + explain { + sql("SELECT count(*), sum(v1) FROM dup_tbl") + notContains "constant exprs" + } + + // Manually specified partition → should not be rewritten + explain { + sql("SELECT count(*) FROM dup_tbl PARTITION(dup_tbl)") + notContains "constant exprs" + } + explain { + sql("SELECT min(v1), max(v2) FROM dup_tbl PARTITION(dup_tbl)") + notContains "constant exprs" + } + + // Manually specified tablet → should not be rewritten + def tabletResult = sql "SHOW TABLETS FROM dup_tbl" + def tabletId = tabletResult[0][0] + explain { + sql("SELECT count(*) FROM dup_tbl TABLET(${tabletId})") + notContains "constant exprs" + } + explain { + sql("SELECT min(v1), max(v2) FROM dup_tbl TABLET(${tabletId})") + notContains "constant exprs" + } + + // TABLESAMPLE → should not be rewritten + explain { + sql("SELECT count(*) FROM dup_tbl TABLESAMPLE(10 PERCENT)") + notContains "constant exprs" + } + explain { + sql("SELECT min(v1), max(v2) FROM dup_tbl TABLESAMPLE(3 ROWS)") + notContains "constant exprs" + } + + // Sync materialized view (indexSelected = true) → should not be rewritten + createMV("""CREATE MATERIALIZED VIEW mv_dup_sum AS SELECT v1 as m1, sum(v2) as m2 FROM dup_tbl GROUP BY v1;""") + explain { + sql("SELECT count(*) FROM dup_tbl INDEX mv_dup_sum") + notContains "constant exprs" + } + + // =================================================================== + // Verify disabling the rule works. + // When the rule is disabled, even simple count(*) should NOT produce constant exprs. + // =================================================================== + explain { + sql("SELECT /*+ SET_VAR(disable_nereids_rules=REWRITE_SIMPLE_AGG_TO_CONSTANT) */ count(*) FROM dup_tbl") + notContains "constant exprs" + } + + // =================================================================== + // Correctness-only tests for queries that should NOT be rewritten. + // The result must still be correct even though the rule does not fire. + // =================================================================== + + // count(nullable column) — not rewritten, but result must be correct + order_qt_count_nullable """SELECT count(v4) FROM dup_tbl;""" + + // Count on non-DUP_KEYS tables — result correctness + order_qt_uniq_count """SELECT count(*) FROM uniq_tbl;""" + order_qt_agg_count """SELECT count(*) FROM agg_tbl;""" + + // =================================================================== + // Cache invalidation test: inserting new data should invalidate the + // cached min/max stats so the rule no longer fires until cache refreshes. + // =================================================================== + sql "INSERT INTO dup_tbl VALUES (6, 60, 600, '2025-12-01', 'fff');" + + // Right after INSERT the cached stats are stale; explain should NOT + // show "constant exprs" because the cache entry has been invalidated. + explain { + sql("SELECT min(v2), max(v2) FROM dup_tbl") + notContains "constant exprs" + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
