github-actions[bot] commented on code in PR #61183: URL: https://github.com/apache/doris/pull/61183#discussion_r2910550834
########## fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java: ########## @@ -60,115 +60,7 @@ import org.apache.doris.nereids.rules.rewrite.CountDistinctRewrite; import org.apache.doris.nereids.rules.rewrite.CountLiteralRewrite; import org.apache.doris.nereids.rules.rewrite.CreatePartitionTopNFromWindow; -import org.apache.doris.nereids.rules.rewrite.DecomposeRepeatWithPreAggregation; -import org.apache.doris.nereids.rules.rewrite.DecoupleEncodeDecode; Review Comment: **[Critical / Build Break]** This diff removes ~109 explicit imports (e.g., `MergeFilters`, `EliminateSort`, `InferPredicates`, `ReorderJoin`, `MergeAggregate`, `EliminateAggCaseWhen`, etc.) that are all still referenced by simple name in the body of `Rewriter.java`. No wildcard import replaces them. This will cause a compilation failure. This appears to be an accidental IDE artifact — perhaps an auto-import cleanup that went wrong. Please restore all deleted imports and only add the new `RewriteSimpleAggToConstantRule` import. ########## fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/RewriteSimpleAggToConstantRule.java: ########## @@ -0,0 +1,246 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.rewrite; + +import org.apache.doris.analysis.LiteralExpr; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.KeysType; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.nereids.StatementContext; +import org.apache.doris.nereids.rules.Rule; +import org.apache.doris.nereids.rules.RuleType; +import org.apache.doris.nereids.stats.SimpleAggCacheMgr; +import org.apache.doris.nereids.stats.SimpleAggCacheMgr.ColumnMinMax; +import org.apache.doris.nereids.stats.SimpleAggCacheMgr.ColumnMinMaxKey; +import org.apache.doris.nereids.trees.expressions.Alias; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction; +import org.apache.doris.nereids.trees.expressions.functions.agg.Count; +import org.apache.doris.nereids.trees.expressions.functions.agg.Max; +import org.apache.doris.nereids.trees.expressions.functions.agg.Min; +import org.apache.doris.nereids.trees.expressions.literal.BigIntLiteral; +import org.apache.doris.nereids.trees.expressions.literal.Literal; +import org.apache.doris.nereids.trees.expressions.literal.NullLiteral; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate; +import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; +import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation; +import org.apache.doris.nereids.trees.plans.logical.LogicalProject; +import org.apache.doris.statistics.util.StatisticsUtil; + +import com.google.common.collect.ImmutableList; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.Set; + +/** + * For simple aggregation queries like + * 'select count(*), count(not-null column), min(col), max(col) from olap_table', + * rewrite them to return constants directly from FE metadata, completely bypassing BE. + * + * <p>COUNT uses table.getRowCount(). + * MIN/MAX uses an async FE-side cache ({@link SimpleAggCacheMgr}) that stores exact values + * obtained via internal SQL queries, NOT sampled ColumnStatistic. + * + * <p>Conditions: + * 1. DUP_KEYS table only (AGG_KEYS: rowCount inflated; UNIQUE_KEYS: min/max may be inaccurate + * in MoW model before compaction merges delete-marked rows) + * 2. No GROUP BY + * 3. Only COUNT / MIN / MAX aggregate functions + * 4. COUNT(col): col must be NOT NULL (so count(col) == rowCount) + * 5. MIN/MAX(col): col must be numeric or date type, and not aggregated + */ +public class RewriteSimpleAggToConstantRule implements RewriteRuleFactory { + + @Override + public List<Rule> buildRules() { + return ImmutableList.of( + // pattern: agg -> scan + logicalAggregate(logicalOlapScan()) + .thenApply(ctx -> { + LogicalAggregate<LogicalOlapScan> agg = ctx.root; + LogicalOlapScan olapScan = agg.child(); + return tryRewrite(agg, olapScan, ctx.statementContext); + }) + .toRule(RuleType.REWRITE_SIMPLE_AGG_TO_CONSTANT), + // pattern: agg -> project -> scan + logicalAggregate(logicalProject(logicalOlapScan())) + .thenApply(ctx -> { + LogicalAggregate<?> agg = ctx.root; + LogicalOlapScan olapScan = (LogicalOlapScan) ctx.root.child().child(); + return tryRewrite(agg, olapScan, ctx.statementContext); + }) + .toRule(RuleType.REWRITE_SIMPLE_AGG_TO_CONSTANT) + ); + } + + private Plan tryRewrite(LogicalAggregate<?> agg, LogicalOlapScan olapScan, + StatementContext statementContext) { + OlapTable table = olapScan.getTable(); + long selectedIndex = olapScan.getSelectedIndexId(); + + // Condition 1: DUP_KEYS only. + // - DUP_KEYS: FE rowCount equals actual count(*); min/max are accurate. + // - AGG_KEYS: rowCount may be inflated before full compaction. + // - UNIQUE_KEYS: in MoW model, min/max may include values from delete-marked rows + // not yet compacted, so the result could be inaccurate. + if (table.getKeysType() != KeysType.DUP_KEYS) { + return null; + } + + // Condition 2: No GROUP BY + if (!agg.getGroupByExpressions().isEmpty()) { + return null; + } + + // Condition 3: Only COUNT / MIN / MAX aggregate functions. + Set<AggregateFunction> funcs = agg.getAggregateFunctions(); + if (funcs.isEmpty()) { + return null; + } + for (AggregateFunction func : funcs) { + if (!(func instanceof Count) && !(func instanceof Min) && !(func instanceof Max)) { + return null; + } + } + + // Try to compute a constant for each output expression. + // If ANY one cannot be replaced, we give up the entire rewrite. + List<NamedExpression> newOutputExprs = new ArrayList<>(); + for (NamedExpression outputExpr : agg.getOutputExpressions()) { + if (!(outputExpr instanceof Alias)) { + // Unexpected: for no-group-by aggregates, outputs should all be aliases over agg funcs + return null; + } + Alias alias = (Alias) outputExpr; + Expression child = alias.child(); + if (!(child instanceof AggregateFunction)) { + return null; + } + AggregateFunction func = (AggregateFunction) child; + Optional<Literal> constant = tryGetConstant(func, table, selectedIndex); + if (!constant.isPresent()) { + // Cannot replace this agg function — give up + return null; + } + newOutputExprs.add(new Alias(alias.getExprId(), constant.get(), alias.getName())); + } + + if (newOutputExprs.isEmpty()) { + return null; + } + + // Build: LogicalProject(constants) -> LogicalOneRowRelation(dummy) + // The OneRowRelation provides a single-row source; all real values come from the project. + LogicalOneRowRelation oneRowRelation = new LogicalOneRowRelation( + statementContext.getNextRelationId(), + ImmutableList.of(new Alias(new NullLiteral(), "__dummy__"))); + return new LogicalProject<>(newOutputExprs, oneRowRelation); + } + + /** + * Try to compute a compile-time constant value for the given aggregate function, + * using FE-side cached row-counts (for COUNT) or exact min/max cache (for MIN/MAX). + * All values are obtained via internal SQL queries (SELECT count/min/max), + * NOT from BE tablet stats reporting, to avoid delayed-reporting and version issues. + */ + private Optional<Literal> tryGetConstant(AggregateFunction func, OlapTable table, long selectedIndex) { + if (func.isDistinct()) { + return Optional.empty(); + } + + long version = table.getVisibleVersionTime(); + Review Comment: **[Nit]** The variable is named `version` but `OlapTable.getVisibleVersionTime()` returns a **millisecond timestamp**, not a version number. Consider renaming to `versionTime` or `visibleVersionTimeMs` for clarity, both here and in the cache loaders. ########## fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/RewriteSimpleAggToConstantRule.java: ########## @@ -0,0 +1,246 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.rewrite; + +import org.apache.doris.analysis.LiteralExpr; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.KeysType; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.nereids.StatementContext; +import org.apache.doris.nereids.rules.Rule; +import org.apache.doris.nereids.rules.RuleType; +import org.apache.doris.nereids.stats.SimpleAggCacheMgr; +import org.apache.doris.nereids.stats.SimpleAggCacheMgr.ColumnMinMax; +import org.apache.doris.nereids.stats.SimpleAggCacheMgr.ColumnMinMaxKey; +import org.apache.doris.nereids.trees.expressions.Alias; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction; +import org.apache.doris.nereids.trees.expressions.functions.agg.Count; +import org.apache.doris.nereids.trees.expressions.functions.agg.Max; +import org.apache.doris.nereids.trees.expressions.functions.agg.Min; +import org.apache.doris.nereids.trees.expressions.literal.BigIntLiteral; +import org.apache.doris.nereids.trees.expressions.literal.Literal; +import org.apache.doris.nereids.trees.expressions.literal.NullLiteral; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate; +import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; +import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation; +import org.apache.doris.nereids.trees.plans.logical.LogicalProject; +import org.apache.doris.statistics.util.StatisticsUtil; + +import com.google.common.collect.ImmutableList; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.Set; + +/** + * For simple aggregation queries like + * 'select count(*), count(not-null column), min(col), max(col) from olap_table', + * rewrite them to return constants directly from FE metadata, completely bypassing BE. + * + * <p>COUNT uses table.getRowCount(). + * MIN/MAX uses an async FE-side cache ({@link SimpleAggCacheMgr}) that stores exact values + * obtained via internal SQL queries, NOT sampled ColumnStatistic. + * + * <p>Conditions: + * 1. DUP_KEYS table only (AGG_KEYS: rowCount inflated; UNIQUE_KEYS: min/max may be inaccurate + * in MoW model before compaction merges delete-marked rows) + * 2. No GROUP BY + * 3. Only COUNT / MIN / MAX aggregate functions + * 4. COUNT(col): col must be NOT NULL (so count(col) == rowCount) + * 5. MIN/MAX(col): col must be numeric or date type, and not aggregated + */ +public class RewriteSimpleAggToConstantRule implements RewriteRuleFactory { + + @Override + public List<Rule> buildRules() { + return ImmutableList.of( + // pattern: agg -> scan + logicalAggregate(logicalOlapScan()) + .thenApply(ctx -> { + LogicalAggregate<LogicalOlapScan> agg = ctx.root; + LogicalOlapScan olapScan = agg.child(); + return tryRewrite(agg, olapScan, ctx.statementContext); + }) + .toRule(RuleType.REWRITE_SIMPLE_AGG_TO_CONSTANT), + // pattern: agg -> project -> scan + logicalAggregate(logicalProject(logicalOlapScan())) + .thenApply(ctx -> { + LogicalAggregate<?> agg = ctx.root; + LogicalOlapScan olapScan = (LogicalOlapScan) ctx.root.child().child(); + return tryRewrite(agg, olapScan, ctx.statementContext); + }) + .toRule(RuleType.REWRITE_SIMPLE_AGG_TO_CONSTANT) + ); + } + + private Plan tryRewrite(LogicalAggregate<?> agg, LogicalOlapScan olapScan, Review Comment: **[Critical / Data Correctness]** `tryRewrite()` does not check whether the `LogicalOlapScan` has: - `manuallySpecifiedPartitions` (e.g., `SELECT count(*) FROM t PARTITION(p1)`) - `manuallySpecifiedTabletIds` (e.g., `SELECT count(*) FROM t TABLET(12345)`) - `tableSample` (e.g., `SELECT count(*) FROM t TABLESAMPLE(10 ROWS)`) The cached values are for the **full table**, so returning them for a partition-/tablet-/sample-restricted scan produces wrong results. WHERE clause filters are safe because they produce a `LogicalFilter` node that prevents pattern matching, but the three restrictions above are properties of `LogicalOlapScan` itself. Suggested fix: ```java // After getting olapScan: if (!olapScan.getManuallySpecifiedPartitions().isEmpty() || !olapScan.getManuallySpecifiedTabletIds().isEmpty() || olapScan.getTableSample() != null) { return null; } ``` Please also add negative test cases for these scenarios. ########## fe/fe-core/src/main/java/org/apache/doris/nereids/stats/SimpleAggCacheMgr.java: ########## @@ -0,0 +1,473 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.stats; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.Config; +import org.apache.doris.common.ThreadPoolManager; +import org.apache.doris.qe.AutoCloseConnectContext; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.statistics.ResultRow; +import org.apache.doris.statistics.util.StatisticsUtil; + +import com.github.benmanes.caffeine.cache.AsyncCacheLoader; +import com.github.benmanes.caffeine.cache.AsyncLoadingCache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.google.common.annotations.VisibleForTesting; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.checkerframework.checker.nullness.qual.NonNull; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; + +/** + * Async cache that stores exact MIN/MAX/COUNT values for OlapTable, + * used by {@code RewriteSimpleAggToConstantRule} to replace simple + * aggregations with constant values. + * + * <p>MIN/MAX values are obtained by executing + * {@code SELECT min(col), max(col) FROM table}, and COUNT values by + * {@code SELECT count(*) FROM table}, both as internal SQL queries + * inside FE. Results are cached with a version stamp derived from + * {@code OlapTable.getVisibleVersionTime()}. + * When a caller provides a version newer than the cached version, + * the stale entry is evicted and a background reload is triggered. + * + * <p>Only numeric and date/datetime columns are cached for MIN/MAX; + * aggregated columns are skipped. + */ +public class SimpleAggCacheMgr { + + // ======================== Public inner types ======================== + + /** + * Holds exact min and max values for a column as strings. + */ + public static class ColumnMinMax { + private final String minValue; + private final String maxValue; + + public ColumnMinMax(String minValue, String maxValue) { + this.minValue = minValue; + this.maxValue = maxValue; + } + + public String minValue() { + return minValue; + } + + public String maxValue() { + return maxValue; + } + } + + /** + * Cache key identifying a column by its table ID and column name. + */ + public static final class ColumnMinMaxKey { + private final long tableId; + private final String columnName; + + public ColumnMinMaxKey(long tableId, String columnName) { + this.tableId = tableId; + this.columnName = columnName; + } + + public long getTableId() { + return tableId; + } + + public String getColumnName() { + return columnName; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof ColumnMinMaxKey)) { + return false; + } + ColumnMinMaxKey that = (ColumnMinMaxKey) o; + return tableId == that.tableId && columnName.equalsIgnoreCase(that.columnName); + } + + @Override + public int hashCode() { + return Objects.hash(tableId, columnName.toLowerCase()); + } + + @Override + public String toString() { + return "ColumnMinMaxKey{tableId=" + tableId + ", column=" + columnName + "}"; + } + } + + private static class CacheValue { + private final ColumnMinMax minMax; + private final long version; + + CacheValue(ColumnMinMax minMax, long version) { + this.minMax = minMax; + this.version = version; + } + + ColumnMinMax minMax() { + return minMax; + } + + long version() { + return version; + } + } + + /** + * Cached row count with version stamp. + */ + private static class RowCountValue { + private final long rowCount; + private final long version; + + RowCountValue(long rowCount, long version) { + this.rowCount = rowCount; + this.version = version; + } + + long rowCount() { + return rowCount; + } + + long version() { + return version; + } + } + + private static final Logger LOG = LogManager.getLogger(SimpleAggCacheMgr.class); + + private static volatile SimpleAggCacheMgr INSTANCE; + private static volatile SimpleAggCacheMgr TEST_INSTANCE; + + private final AsyncLoadingCache<ColumnMinMaxKey, Optional<CacheValue>> cache; + private final AsyncLoadingCache<Long, Optional<RowCountValue>> rowCountCache; + + /** + * Protected no-arg constructor for test subclassing. + * Subclasses override {@link #getStats}, {@link #getRowCount}, etc. + */ + protected SimpleAggCacheMgr() { + this.cache = null; + this.rowCountCache = null; + } + + private SimpleAggCacheMgr(ExecutorService executor) { + this.cache = Caffeine.newBuilder() + .maximumSize(Config.stats_cache_size) + .executor(executor) + .buildAsync(new CacheLoader()); + this.rowCountCache = Caffeine.newBuilder() + .maximumSize(Config.stats_cache_size) + .executor(executor) + .buildAsync(new RowCountLoader()); + } + + private static SimpleAggCacheMgr getInstance() { + if (INSTANCE == null) { + synchronized (SimpleAggCacheMgr.class) { + if (INSTANCE == null) { + ExecutorService executor = ThreadPoolManager.newDaemonCacheThreadPool( + 4, "simple-agg-cache-pool", true); + INSTANCE = new SimpleAggCacheMgr(executor); + } + } + } + return INSTANCE; + } + + /** + * Returns the singleton instance backed by async-loading cache, + * or the test override if one has been set. + */ + public static SimpleAggCacheMgr internalInstance() { + SimpleAggCacheMgr test = TEST_INSTANCE; + if (test != null) { + return test; + } + return getInstance(); + } + + /** + * Override used only in unit tests to inject a mock implementation. + */ + @VisibleForTesting + public static void setTestInstance(SimpleAggCacheMgr instance) { + TEST_INSTANCE = instance; + } + + /** + * Reset the test override so that subsequent calls go back to the real cache. + */ + @VisibleForTesting + public static void clearTestInstance() { + TEST_INSTANCE = null; + } + + /** + * Get the cached min/max for a column. + */ + public Optional<ColumnMinMax> getStats(ColumnMinMaxKey key, long version) { + CompletableFuture<Optional<CacheValue>> future = cache.get(key); + if (future.isDone()) { + try { + Optional<CacheValue> cacheValue = future.get(); + if (cacheValue.isPresent()) { + CacheValue value = cacheValue.get(); + if (value.version() >= version) { + return Optional.of(value.minMax()); + } + } + // Either empty (load failed / version changed during load) + // or stale — evict so next call triggers a fresh reload. + cache.synchronous().invalidate(key); + } catch (Exception e) { + LOG.warn("Failed to get MinMax for column: {}, version: {}", key, version, e); + cache.synchronous().invalidate(key); + } + } + return Optional.empty(); + } + + /** + * Evict the cached stats for a column, if present. Used when we know the data has changed + */ + public void removeStats(ColumnMinMaxKey key) { + cache.synchronous().invalidate(key); + } + + /** + * Get the cached row count for a table. + */ + public OptionalLong getRowCount(long tableId, long version) { + CompletableFuture<Optional<RowCountValue>> future = rowCountCache.get(tableId); + if (future.isDone()) { + try { + Optional<RowCountValue> cached = future.get(); + if (cached.isPresent()) { + RowCountValue value = cached.get(); + if (value.version() >= version) { + return OptionalLong.of(value.rowCount()); + } + } + // Either empty (load failed / version changed during load) + // or stale — evict so next call triggers a fresh reload. + rowCountCache.synchronous().invalidate(tableId); + } catch (Exception e) { + LOG.warn("Failed to get row count for table: {}, version: {}", tableId, version, e); + rowCountCache.synchronous().invalidate(tableId); + } + } + return OptionalLong.empty(); + } + + /** + * Generate the internal SQL for fetching exact min/max values. + */ + @VisibleForTesting + public static String genMinMaxSql(List<String> qualifiers, String columnName) { + // qualifiers: [catalogName, dbName, tableName] + String quotedCol = "`" + StatisticsUtil.escapeColumnName(columnName) + "`"; + String fullTable = "`" + qualifiers.get(0) + "`.`" + + qualifiers.get(1) + "`.`" + + qualifiers.get(2) + "`"; + return "SELECT min(" + quotedCol + "), max(" + quotedCol + ") FROM " + fullTable; + } + + /** + * Generate the internal SQL for fetching exact row count. + */ + @VisibleForTesting + public static String genCountSql(List<String> qualifiers) { + String fullTable = "`" + qualifiers.get(0) + "`.`" + + qualifiers.get(1) + "`.`" + + qualifiers.get(2) + "`"; + return "SELECT count(*) FROM " + fullTable; + } + + /** + * Async cache loader that issues internal SQL queries to compute exact min/max. + */ + protected static final class CacheLoader + implements AsyncCacheLoader<ColumnMinMaxKey, Optional<CacheValue>> { + + @Override + public @NonNull CompletableFuture<Optional<CacheValue>> asyncLoad( + @NonNull ColumnMinMaxKey key, @NonNull Executor executor) { Review Comment: **[Suggestion / Robustness]** The cache loaders execute `SELECT count(*)` / `SELECT min(col), max(col)` as internal SQL, which goes through the full Nereids rewrite pipeline — including `RewriteSimpleAggToConstantRule` itself. Currently this is *accidentally safe* because Caffeine deduplicates concurrent loads for the same key, and `isDone()` returns false for the in-flight future, so the rule bails out for the recursive internal query. But this is fragile and undocumented. Consider either: 1. Adding a comment documenting why recursion is safe, or (better) 2. Disabling the rule for internal queries, e.g., by setting `disable_nereids_rules=REWRITE_SIMPLE_AGG_TO_CONSTANT` in the `ConnectContext` created by `StatisticsUtil.buildConnectContext()`, or 3. Checking `connectContext.getSessionVariable().isInternalQuery()` (or similar) in the rule itself. ########## fe/fe-core/src/main/java/org/apache/doris/nereids/stats/SimpleAggCacheMgr.java: ########## @@ -0,0 +1,473 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.stats; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.Config; +import org.apache.doris.common.ThreadPoolManager; +import org.apache.doris.qe.AutoCloseConnectContext; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.statistics.ResultRow; +import org.apache.doris.statistics.util.StatisticsUtil; + +import com.github.benmanes.caffeine.cache.AsyncCacheLoader; +import com.github.benmanes.caffeine.cache.AsyncLoadingCache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.google.common.annotations.VisibleForTesting; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.checkerframework.checker.nullness.qual.NonNull; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; + +/** + * Async cache that stores exact MIN/MAX/COUNT values for OlapTable, + * used by {@code RewriteSimpleAggToConstantRule} to replace simple + * aggregations with constant values. + * + * <p>MIN/MAX values are obtained by executing + * {@code SELECT min(col), max(col) FROM table}, and COUNT values by + * {@code SELECT count(*) FROM table}, both as internal SQL queries + * inside FE. Results are cached with a version stamp derived from + * {@code OlapTable.getVisibleVersionTime()}. + * When a caller provides a version newer than the cached version, + * the stale entry is evicted and a background reload is triggered. + * + * <p>Only numeric and date/datetime columns are cached for MIN/MAX; + * aggregated columns are skipped. + */ +public class SimpleAggCacheMgr { + + // ======================== Public inner types ======================== + + /** + * Holds exact min and max values for a column as strings. + */ + public static class ColumnMinMax { + private final String minValue; + private final String maxValue; + + public ColumnMinMax(String minValue, String maxValue) { + this.minValue = minValue; + this.maxValue = maxValue; + } + + public String minValue() { + return minValue; + } + + public String maxValue() { + return maxValue; + } + } + + /** + * Cache key identifying a column by its table ID and column name. + */ + public static final class ColumnMinMaxKey { + private final long tableId; + private final String columnName; + + public ColumnMinMaxKey(long tableId, String columnName) { + this.tableId = tableId; + this.columnName = columnName; + } + + public long getTableId() { + return tableId; + } + + public String getColumnName() { + return columnName; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof ColumnMinMaxKey)) { + return false; + } + ColumnMinMaxKey that = (ColumnMinMaxKey) o; + return tableId == that.tableId && columnName.equalsIgnoreCase(that.columnName); + } + + @Override + public int hashCode() { + return Objects.hash(tableId, columnName.toLowerCase()); + } + + @Override + public String toString() { + return "ColumnMinMaxKey{tableId=" + tableId + ", column=" + columnName + "}"; + } + } + + private static class CacheValue { + private final ColumnMinMax minMax; + private final long version; + + CacheValue(ColumnMinMax minMax, long version) { + this.minMax = minMax; + this.version = version; + } + + ColumnMinMax minMax() { + return minMax; + } + + long version() { + return version; + } + } + + /** + * Cached row count with version stamp. + */ + private static class RowCountValue { + private final long rowCount; + private final long version; + + RowCountValue(long rowCount, long version) { + this.rowCount = rowCount; + this.version = version; + } + + long rowCount() { + return rowCount; + } + + long version() { + return version; + } + } + + private static final Logger LOG = LogManager.getLogger(SimpleAggCacheMgr.class); + + private static volatile SimpleAggCacheMgr INSTANCE; + private static volatile SimpleAggCacheMgr TEST_INSTANCE; + + private final AsyncLoadingCache<ColumnMinMaxKey, Optional<CacheValue>> cache; + private final AsyncLoadingCache<Long, Optional<RowCountValue>> rowCountCache; + + /** + * Protected no-arg constructor for test subclassing. + * Subclasses override {@link #getStats}, {@link #getRowCount}, etc. + */ + protected SimpleAggCacheMgr() { + this.cache = null; + this.rowCountCache = null; + } + + private SimpleAggCacheMgr(ExecutorService executor) { + this.cache = Caffeine.newBuilder() + .maximumSize(Config.stats_cache_size) + .executor(executor) + .buildAsync(new CacheLoader()); + this.rowCountCache = Caffeine.newBuilder() + .maximumSize(Config.stats_cache_size) + .executor(executor) + .buildAsync(new RowCountLoader()); + } + + private static SimpleAggCacheMgr getInstance() { + if (INSTANCE == null) { + synchronized (SimpleAggCacheMgr.class) { + if (INSTANCE == null) { + ExecutorService executor = ThreadPoolManager.newDaemonCacheThreadPool( + 4, "simple-agg-cache-pool", true); + INSTANCE = new SimpleAggCacheMgr(executor); + } + } + } + return INSTANCE; + } + + /** + * Returns the singleton instance backed by async-loading cache, + * or the test override if one has been set. + */ + public static SimpleAggCacheMgr internalInstance() { + SimpleAggCacheMgr test = TEST_INSTANCE; + if (test != null) { + return test; + } + return getInstance(); + } + + /** + * Override used only in unit tests to inject a mock implementation. + */ + @VisibleForTesting + public static void setTestInstance(SimpleAggCacheMgr instance) { + TEST_INSTANCE = instance; + } + + /** + * Reset the test override so that subsequent calls go back to the real cache. + */ + @VisibleForTesting + public static void clearTestInstance() { + TEST_INSTANCE = null; + } + + /** + * Get the cached min/max for a column. + */ + public Optional<ColumnMinMax> getStats(ColumnMinMaxKey key, long version) { + CompletableFuture<Optional<CacheValue>> future = cache.get(key); + if (future.isDone()) { + try { + Optional<CacheValue> cacheValue = future.get(); + if (cacheValue.isPresent()) { + CacheValue value = cacheValue.get(); + if (value.version() >= version) { + return Optional.of(value.minMax()); + } + } + // Either empty (load failed / version changed during load) + // or stale — evict so next call triggers a fresh reload. + cache.synchronous().invalidate(key); + } catch (Exception e) { + LOG.warn("Failed to get MinMax for column: {}, version: {}", key, version, e); + cache.synchronous().invalidate(key); + } + } + return Optional.empty(); + } + + /** + * Evict the cached stats for a column, if present. Used when we know the data has changed + */ + public void removeStats(ColumnMinMaxKey key) { + cache.synchronous().invalidate(key); + } + + /** + * Get the cached row count for a table. + */ + public OptionalLong getRowCount(long tableId, long version) { + CompletableFuture<Optional<RowCountValue>> future = rowCountCache.get(tableId); + if (future.isDone()) { + try { + Optional<RowCountValue> cached = future.get(); + if (cached.isPresent()) { + RowCountValue value = cached.get(); + if (value.version() >= version) { + return OptionalLong.of(value.rowCount()); + } + } + // Either empty (load failed / version changed during load) + // or stale — evict so next call triggers a fresh reload. + rowCountCache.synchronous().invalidate(tableId); + } catch (Exception e) { + LOG.warn("Failed to get row count for table: {}, version: {}", tableId, version, e); + rowCountCache.synchronous().invalidate(tableId); + } + } + return OptionalLong.empty(); + } + + /** + * Generate the internal SQL for fetching exact min/max values. + */ + @VisibleForTesting + public static String genMinMaxSql(List<String> qualifiers, String columnName) { + // qualifiers: [catalogName, dbName, tableName] + String quotedCol = "`" + StatisticsUtil.escapeColumnName(columnName) + "`"; + String fullTable = "`" + qualifiers.get(0) + "`.`" + + qualifiers.get(1) + "`.`" + + qualifiers.get(2) + "`"; + return "SELECT min(" + quotedCol + "), max(" + quotedCol + ") FROM " + fullTable; + } + + /** + * Generate the internal SQL for fetching exact row count. + */ + @VisibleForTesting + public static String genCountSql(List<String> qualifiers) { + String fullTable = "`" + qualifiers.get(0) + "`.`" + + qualifiers.get(1) + "`.`" + + qualifiers.get(2) + "`"; + return "SELECT count(*) FROM " + fullTable; + } + + /** + * Async cache loader that issues internal SQL queries to compute exact min/max. + */ + protected static final class CacheLoader + implements AsyncCacheLoader<ColumnMinMaxKey, Optional<CacheValue>> { + + @Override + public @NonNull CompletableFuture<Optional<CacheValue>> asyncLoad( + @NonNull ColumnMinMaxKey key, @NonNull Executor executor) { + return CompletableFuture.supplyAsync(() -> { + try { + return doLoad(key); + } catch (Exception e) { + LOG.warn("Failed to load MinMax for column: {}", key, e); + return Optional.empty(); + } + }, executor); + } + + private Optional<CacheValue> doLoad(ColumnMinMaxKey key) throws Exception { + // Look up the table by its ID + TableIf tableIf = findTableById(key.getTableId()); + if (!(tableIf instanceof OlapTable)) { + return Optional.empty(); + } + OlapTable olapTable = (OlapTable) tableIf; + + // Validate column exists and is eligible + Column column = olapTable.getColumn(key.getColumnName()); + if (column == null) { + return Optional.empty(); + } + if (!column.getType().isNumericType() && !column.getType().isDateType()) { + return Optional.empty(); + } + if (column.isAggregated()) { + return Optional.empty(); + } + + // Use table-level visibleVersionTime, consistent with how the caller + // (RewriteSimpleAggToConstantRule) obtains the version for comparison. + long version = olapTable.getVisibleVersionTime(); + + // Build and execute internal SQL + List<String> qualifiers = olapTable.getFullQualifiers(); + String sql = genMinMaxSql(qualifiers, column.getName()); + + List<ResultRow> rows; + try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(false)) { + r.connectContext.getSessionVariable().setPipelineTaskNum("1"); + StmtExecutor stmtExecutor = new StmtExecutor(r.connectContext, sql); + rows = stmtExecutor.executeInternalQuery(); + } + if (rows == null || rows.isEmpty()) { + return Optional.empty(); + } + ResultRow row = rows.get(0); + String minVal = row.get(0); + String maxVal = row.get(1); + if (minVal == null || maxVal == null) { + return Optional.empty(); + } + // Re-check version after query execution to detect concurrent data changes. + // If the version changed during the query, the result is unreliable. + long versionAfter = olapTable.getVisibleVersionTime(); + if (versionAfter != version) { + return Optional.empty(); + } + return Optional.of(new CacheValue(new ColumnMinMax(minVal, maxVal), version)); + } + + /** + * Look up a table by its table ID across all internal databases. + */ + private static TableIf findTableById(long tableId) { + try { Review Comment: **[Performance]** `findTableById()` iterates over **all** database IDs (`Env.getCurrentInternalCatalog().getDbIds()`) and queries each one for the table. This is O(databases) per cache miss. Consider using a direct lookup if available, e.g.: ```java TableIf table = Env.getCurrentInternalCatalog().getTableByTableId(tableId); ``` or caching the database ID in the cache key to avoid the linear scan. ########## fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/RewriteSimpleAggToConstantRule.java: ########## @@ -0,0 +1,246 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.rewrite; + +import org.apache.doris.analysis.LiteralExpr; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.KeysType; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.nereids.StatementContext; +import org.apache.doris.nereids.rules.Rule; +import org.apache.doris.nereids.rules.RuleType; +import org.apache.doris.nereids.stats.SimpleAggCacheMgr; +import org.apache.doris.nereids.stats.SimpleAggCacheMgr.ColumnMinMax; +import org.apache.doris.nereids.stats.SimpleAggCacheMgr.ColumnMinMaxKey; +import org.apache.doris.nereids.trees.expressions.Alias; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction; +import org.apache.doris.nereids.trees.expressions.functions.agg.Count; +import org.apache.doris.nereids.trees.expressions.functions.agg.Max; +import org.apache.doris.nereids.trees.expressions.functions.agg.Min; +import org.apache.doris.nereids.trees.expressions.literal.BigIntLiteral; +import org.apache.doris.nereids.trees.expressions.literal.Literal; +import org.apache.doris.nereids.trees.expressions.literal.NullLiteral; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate; +import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; +import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation; +import org.apache.doris.nereids.trees.plans.logical.LogicalProject; +import org.apache.doris.statistics.util.StatisticsUtil; + +import com.google.common.collect.ImmutableList; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.Set; + +/** + * For simple aggregation queries like + * 'select count(*), count(not-null column), min(col), max(col) from olap_table', + * rewrite them to return constants directly from FE metadata, completely bypassing BE. + * + * <p>COUNT uses table.getRowCount(). + * MIN/MAX uses an async FE-side cache ({@link SimpleAggCacheMgr}) that stores exact values + * obtained via internal SQL queries, NOT sampled ColumnStatistic. + * + * <p>Conditions: + * 1. DUP_KEYS table only (AGG_KEYS: rowCount inflated; UNIQUE_KEYS: min/max may be inaccurate + * in MoW model before compaction merges delete-marked rows) + * 2. No GROUP BY + * 3. Only COUNT / MIN / MAX aggregate functions + * 4. COUNT(col): col must be NOT NULL (so count(col) == rowCount) + * 5. MIN/MAX(col): col must be numeric or date type, and not aggregated + */ +public class RewriteSimpleAggToConstantRule implements RewriteRuleFactory { + + @Override + public List<Rule> buildRules() { + return ImmutableList.of( + // pattern: agg -> scan + logicalAggregate(logicalOlapScan()) + .thenApply(ctx -> { + LogicalAggregate<LogicalOlapScan> agg = ctx.root; + LogicalOlapScan olapScan = agg.child(); + return tryRewrite(agg, olapScan, ctx.statementContext); + }) + .toRule(RuleType.REWRITE_SIMPLE_AGG_TO_CONSTANT), + // pattern: agg -> project -> scan + logicalAggregate(logicalProject(logicalOlapScan())) + .thenApply(ctx -> { + LogicalAggregate<?> agg = ctx.root; + LogicalOlapScan olapScan = (LogicalOlapScan) ctx.root.child().child(); + return tryRewrite(agg, olapScan, ctx.statementContext); + }) + .toRule(RuleType.REWRITE_SIMPLE_AGG_TO_CONSTANT) + ); + } + + private Plan tryRewrite(LogicalAggregate<?> agg, LogicalOlapScan olapScan, + StatementContext statementContext) { + OlapTable table = olapScan.getTable(); + long selectedIndex = olapScan.getSelectedIndexId(); + Review Comment: **[Minor]** `selectedIndex` is captured here and passed to `tryGetConstant()`, but inside that method the parameter is **never read or used**. The internal SQL always runs against the base table regardless of selected materialized view index. Either: 1. Remove the unused parameter, or 2. If the intent is to restrict rewriting to base-table-only scans, add a check: ```java if (selectedIndex != olapScan.getTable().getBaseIndexId()) { return null; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
