github-actions[bot] commented on code in PR #61183:
URL: https://github.com/apache/doris/pull/61183#discussion_r2929553835
##########
fe/fe-core/src/main/java/org/apache/doris/catalog/TableAttributes.java:
##########
@@ -66,4 +66,9 @@ public void updateVisibleVersionAndTime(long visibleVersion,
long visibleVersion
public long getNextVersion() {
return visibleVersion + 1;
}
+
+ public void resetVisibleVersion() {
+ this.visibleVersion = TABLE_INIT_VERSION;
Review Comment:
**Observation (low risk):** `visibleVersion` and `visibleVersionTime` are
plain `long` fields (not `volatile`). They're written here under the table
write lock (correct), but read by `SimpleAggCacheMgr` cache loader threads via
`getVisibleVersionTime()` without synchronization. On 64-bit JVMs, long reads
are atomic per JLS 17.7, but without `volatile` or a lock there's no
happens-before guarantee — a reader could theoretically see a stale value.
In practice this is mitigated by the two-level validation in
`SimpleAggCacheMgr.getStats()` (versionTime check + version check), so a stale
read would at worst cause a conservative cache miss, never incorrect results.
Acceptable as-is, but worth noting.
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/stats/SimpleAggCacheMgr.java:
##########
@@ -0,0 +1,546 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.stats;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.ThreadPoolManager;
+import org.apache.doris.qe.AutoCloseConnectContext;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.statistics.ResultRow;
+import org.apache.doris.statistics.util.StatisticsUtil;
+
+import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
+import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.function.LongSupplier;
+
+/**
+ * Async cache that stores exact MIN/MAX/COUNT values for OlapTable,
+ * used by {@code RewriteSimpleAggToConstantRule} to replace simple
+ * aggregations with constant values.
+ *
+ * <p>MIN/MAX values are obtained by executing
+ * {@code SELECT min(col), max(col) FROM table}, and COUNT values by
+ * {@code SELECT count(*) FROM table}, both as internal SQL queries
+ * inside FE. Results are cached with a version stamp derived from
+ * {@code OlapTable.getVisibleVersionTime()}.
+ * When a caller provides a versionTime newer than the cached versionTime,
+ * the stale entry is evicted and a background reload is triggered.
+ *
+ * <p>Cache validation uses a two-level check driven by {@code versionTime}:
+ * <ol>
+ * <li>If times differ: the table has definitely changed — evict immediately,
+ * without calling {@code getVisibleVersion()} (which may involve an RPC
in cloud mode).</li>
+ * <li>If times match: call {@code getVisibleVersion()} once to compare
versions,
+ * guarding against the rare case of two writes completing within the
same millisecond.</li>
+ * </ol>
+ *
+ * <p>Only numeric and date/datetime columns are cached for MIN/MAX;
+ * aggregated columns are skipped.
+ */
+public class SimpleAggCacheMgr {
+
+ // ======================== Public inner types ========================
+
+ /**
+ * Holds exact min and max values for a column as strings.
+ */
+ public static class ColumnMinMax {
+ private final String minValue;
+ private final String maxValue;
+
+ public ColumnMinMax(String minValue, String maxValue) {
+ this.minValue = minValue;
+ this.maxValue = maxValue;
+ }
+
+ public String minValue() {
+ return minValue;
+ }
+
+ public String maxValue() {
+ return maxValue;
+ }
+ }
+
+ /**
+ * Cache key identifying a column by its table ID and column name.
+ */
+ public static final class ColumnMinMaxKey {
+ private final long tableId;
+ private final String columnName;
+
+ public ColumnMinMaxKey(long tableId, String columnName) {
+ this.tableId = tableId;
+ this.columnName = columnName;
+ }
+
+ public long getTableId() {
+ return tableId;
+ }
+
+ public String getColumnName() {
+ return columnName;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof ColumnMinMaxKey)) {
+ return false;
+ }
+ ColumnMinMaxKey that = (ColumnMinMaxKey) o;
+ return tableId == that.tableId &&
columnName.equalsIgnoreCase(that.columnName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(tableId, columnName.toLowerCase());
+ }
+
+ @Override
+ public String toString() {
+ return "ColumnMinMaxKey{tableId=" + tableId + ", column=" +
columnName + "}";
+ }
+ }
+
+ private static class CacheValue {
+ private final ColumnMinMax minMax;
+ private final long version;
+ private final long versionTime;
+
+ CacheValue(ColumnMinMax minMax, long version, long versionTime) {
+ this.minMax = minMax;
+ this.version = version;
+ this.versionTime = versionTime;
+ }
+
+ ColumnMinMax minMax() {
+ return minMax;
+ }
+
+ long version() {
+ return version;
+ }
+
+ long versionTime() {
+ return versionTime;
+ }
+ }
+
+ /**
+ * Cached row count with version stamp.
+ */
+ private static class RowCountValue {
+ private final long rowCount;
+ private final long version;
+ private final long versionTime;
+
+ RowCountValue(long rowCount, long version, long versionTime) {
+ this.rowCount = rowCount;
+ this.version = version;
+ this.versionTime = versionTime;
+ }
+
+ long rowCount() {
+ return rowCount;
+ }
+
+ long version() {
+ return version;
+ }
+
+ long versionTime() {
+ return versionTime;
+ }
+ }
+
+ private static final Logger LOG =
LogManager.getLogger(SimpleAggCacheMgr.class);
+
+ private static volatile SimpleAggCacheMgr INSTANCE;
+ private static volatile SimpleAggCacheMgr TEST_INSTANCE;
+
+ private final AsyncLoadingCache<ColumnMinMaxKey, Optional<CacheValue>>
cache;
+ private final AsyncLoadingCache<Long, Optional<RowCountValue>>
rowCountCache;
+
+ /**
+ * Protected no-arg constructor for test subclassing.
+ * Subclasses override {@link #getStats}, {@link #getRowCount}, etc.
+ */
+ protected SimpleAggCacheMgr() {
+ this.cache = null;
Review Comment:
**Observation (low risk):** This protected no-arg constructor sets
`this.cache = null` and `this.rowCountCache = null`. It exists for test
subclassing (the mock in `RewriteSimpleAggToConstantRuleTest` overrides all
public methods). However, if a future subclass forgets to override `getStats()`
or `getRowCount()`, it would NPE on `cache.get(key)`. Consider adding a brief
Javadoc warning, or throwing `UnsupportedOperationException` in the default
`getStats()`/`getRowCount()` when `cache == null`.
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/stats/SimpleAggCacheMgr.java:
##########
@@ -0,0 +1,546 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.stats;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.ThreadPoolManager;
+import org.apache.doris.qe.AutoCloseConnectContext;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.statistics.ResultRow;
+import org.apache.doris.statistics.util.StatisticsUtil;
+
+import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
+import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.function.LongSupplier;
+
+/**
+ * Async cache that stores exact MIN/MAX/COUNT values for OlapTable,
+ * used by {@code RewriteSimpleAggToConstantRule} to replace simple
+ * aggregations with constant values.
+ *
+ * <p>MIN/MAX values are obtained by executing
+ * {@code SELECT min(col), max(col) FROM table}, and COUNT values by
+ * {@code SELECT count(*) FROM table}, both as internal SQL queries
+ * inside FE. Results are cached with a version stamp derived from
+ * {@code OlapTable.getVisibleVersionTime()}.
+ * When a caller provides a versionTime newer than the cached versionTime,
+ * the stale entry is evicted and a background reload is triggered.
+ *
+ * <p>Cache validation uses a two-level check driven by {@code versionTime}:
+ * <ol>
+ * <li>If times differ: the table has definitely changed — evict immediately,
+ * without calling {@code getVisibleVersion()} (which may involve an RPC
in cloud mode).</li>
+ * <li>If times match: call {@code getVisibleVersion()} once to compare
versions,
+ * guarding against the rare case of two writes completing within the
same millisecond.</li>
+ * </ol>
+ *
+ * <p>Only numeric and date/datetime columns are cached for MIN/MAX;
+ * aggregated columns are skipped.
+ */
+public class SimpleAggCacheMgr {
+
+ // ======================== Public inner types ========================
+
+ /**
+ * Holds exact min and max values for a column as strings.
+ */
+ public static class ColumnMinMax {
+ private final String minValue;
+ private final String maxValue;
+
+ public ColumnMinMax(String minValue, String maxValue) {
+ this.minValue = minValue;
+ this.maxValue = maxValue;
+ }
+
+ public String minValue() {
+ return minValue;
+ }
+
+ public String maxValue() {
+ return maxValue;
+ }
+ }
+
+ /**
+ * Cache key identifying a column by its table ID and column name.
+ */
+ public static final class ColumnMinMaxKey {
+ private final long tableId;
+ private final String columnName;
+
+ public ColumnMinMaxKey(long tableId, String columnName) {
+ this.tableId = tableId;
+ this.columnName = columnName;
+ }
+
+ public long getTableId() {
+ return tableId;
+ }
+
+ public String getColumnName() {
+ return columnName;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof ColumnMinMaxKey)) {
+ return false;
+ }
+ ColumnMinMaxKey that = (ColumnMinMaxKey) o;
+ return tableId == that.tableId &&
columnName.equalsIgnoreCase(that.columnName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(tableId, columnName.toLowerCase());
+ }
+
+ @Override
+ public String toString() {
+ return "ColumnMinMaxKey{tableId=" + tableId + ", column=" +
columnName + "}";
+ }
+ }
+
+ private static class CacheValue {
+ private final ColumnMinMax minMax;
+ private final long version;
+ private final long versionTime;
+
+ CacheValue(ColumnMinMax minMax, long version, long versionTime) {
+ this.minMax = minMax;
+ this.version = version;
+ this.versionTime = versionTime;
+ }
+
+ ColumnMinMax minMax() {
+ return minMax;
+ }
+
+ long version() {
+ return version;
+ }
+
+ long versionTime() {
+ return versionTime;
+ }
+ }
+
+ /**
+ * Cached row count with version stamp.
+ */
+ private static class RowCountValue {
+ private final long rowCount;
+ private final long version;
+ private final long versionTime;
+
+ RowCountValue(long rowCount, long version, long versionTime) {
+ this.rowCount = rowCount;
+ this.version = version;
+ this.versionTime = versionTime;
+ }
+
+ long rowCount() {
+ return rowCount;
+ }
+
+ long version() {
+ return version;
+ }
+
+ long versionTime() {
+ return versionTime;
+ }
+ }
+
+ private static final Logger LOG =
LogManager.getLogger(SimpleAggCacheMgr.class);
+
+ private static volatile SimpleAggCacheMgr INSTANCE;
+ private static volatile SimpleAggCacheMgr TEST_INSTANCE;
+
+ private final AsyncLoadingCache<ColumnMinMaxKey, Optional<CacheValue>>
cache;
+ private final AsyncLoadingCache<Long, Optional<RowCountValue>>
rowCountCache;
+
+ /**
+ * Protected no-arg constructor for test subclassing.
+ * Subclasses override {@link #getStats}, {@link #getRowCount}, etc.
+ */
+ protected SimpleAggCacheMgr() {
+ this.cache = null;
+ this.rowCountCache = null;
+ }
+
+ private SimpleAggCacheMgr(ExecutorService executor) {
+ this.cache = Caffeine.newBuilder()
+ .maximumSize(Config.stats_cache_size)
+ .executor(executor)
+ .buildAsync(new CacheLoader());
+ this.rowCountCache = Caffeine.newBuilder()
+ .maximumSize(Config.stats_cache_size)
+ .executor(executor)
+ .buildAsync(new RowCountLoader());
+ }
+
+ private static SimpleAggCacheMgr getInstance() {
+ if (INSTANCE == null) {
+ synchronized (SimpleAggCacheMgr.class) {
+ if (INSTANCE == null) {
+ ExecutorService executor =
ThreadPoolManager.newDaemonCacheThreadPool(
+ 4, "simple-agg-cache-pool", true);
+ INSTANCE = new SimpleAggCacheMgr(executor);
+ }
+ }
+ }
+ return INSTANCE;
+ }
+
+ /**
+ * Returns the singleton instance backed by async-loading cache,
+ * or the test override if one has been set.
+ */
+ public static SimpleAggCacheMgr internalInstance() {
+ SimpleAggCacheMgr test = TEST_INSTANCE;
+ if (test != null) {
+ return test;
+ }
+ return getInstance();
+ }
+
+ /**
+ * Override used only in unit tests to inject a mock implementation.
+ */
+ @VisibleForTesting
+ public static void setTestInstance(SimpleAggCacheMgr instance) {
+ TEST_INSTANCE = instance;
+ }
+
+ /**
+ * Reset the test override so that subsequent calls go back to the real
cache.
+ */
+ @VisibleForTesting
+ public static void clearTestInstance() {
+ TEST_INSTANCE = null;
+ }
+
+ /**
+ * Get the cached min/max for a column.
+ *
+ * <p>Cache validation uses a two-level check driven by {@code
versionTime}:
+ * <ol>
+ * <li>If {@code cachedVersionTime != callerVersionTime}: times differ,
the table has
+ * definitely been modified — evict immediately. {@code
versionSupplier} is
+ * <em>never</em> called, saving a potentially expensive RPC in
cloud mode.</li>
+ * <li>If {@code cachedVersionTime == callerVersionTime}: times match,
but two writes
+ * in the same millisecond could produce the same time with
different versions.
+ * {@code versionSupplier} is called exactly once to confirm {@code
version} equality.</li>
+ * </ol>
+ *
+ * @param key cache key (tableId + columnName)
+ * @param callerVersionTime the caller's current {@code
table.getVisibleVersionTime()} (cheap, always local)
+ * @param versionSupplier lazy supplier for {@code
table.getVisibleVersion()}; called only when
+ * {@code versionTime} values are equal. Returns
-1 on RPC failure,
+ * in which case the method returns empty
conservatively.
+ */
+ public Optional<ColumnMinMax> getStats(ColumnMinMaxKey key, long
callerVersionTime,
+ LongSupplier versionSupplier) {
+ CompletableFuture<Optional<CacheValue>> future = cache.get(key);
+ if (future.isDone()) {
+ try {
+ Optional<CacheValue> cacheValue = future.get();
+ if (cacheValue.isPresent()) {
+ CacheValue value = cacheValue.get();
+ if (value.versionTime() == callerVersionTime) {
+ // Times match — verify version to guard against two
writes in the same ms.
+ // versionSupplier may be an RPC in cloud mode; it is
only invoked here.
+ long callerVersion = versionSupplier.getAsLong();
+ if (callerVersion < 0) {
+ // RPC failed: cannot verify version — return
empty conservatively.
+ // Do not invalidate: keep the existing entry so
it can be reused once the RPC recovers.
+ return Optional.empty();
+ }
+ if (value.version() == callerVersion) {
+ return Optional.of(value.minMax());
+ }
+ // Same time but different version: two writes in the
same ms — stale.
+ }
+ // Times differ → definitely stale; skip the version RPC
entirely.
+ }
+ // Either empty (load failed) or stale — evict so next call
triggers a fresh reload.
+ cache.synchronous().invalidate(key);
+ } catch (Exception e) {
+ LOG.warn("Failed to get MinMax for column: {}, versionTime:
{}", key, callerVersionTime, e);
+ cache.synchronous().invalidate(key);
+ }
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * Evict the cached stats for a column, if present. Used when we know the
data has changed
+ */
+ public void removeStats(ColumnMinMaxKey key) {
+ cache.synchronous().invalidate(key);
+ }
+
+ /**
+ * Get the cached row count for a table.
+ *
+ * <p>Cache validation uses a two-level check driven by {@code
versionTime}:
+ * <ol>
+ * <li>If {@code cachedVersionTime != callerVersionTime}: times differ,
the table has
+ * definitely been modified — evict immediately. {@code
versionSupplier} is
+ * <em>never</em> called, saving a potentially expensive RPC in
cloud mode.</li>
+ * <li>If {@code cachedVersionTime == callerVersionTime}: times match,
but two writes
+ * in the same millisecond could produce the same time with
different versions.
+ * {@code versionSupplier} is called exactly once to confirm {@code
version} equality.</li>
+ * </ol>
+ *
+ * @param tableId the table id
+ * @param callerVersionTime the caller's current {@code
table.getVisibleVersionTime()} (cheap, always local)
+ * @param versionSupplier lazy supplier for {@code
table.getVisibleVersion()}; called only when
+ * {@code versionTime} values are equal. Returns
-1 on RPC failure,
+ * in which case the method returns empty
conservatively.
+ */
+ public OptionalLong getRowCount(long tableId, long callerVersionTime,
LongSupplier versionSupplier) {
+ CompletableFuture<Optional<RowCountValue>> future =
rowCountCache.get(tableId);
+ if (future.isDone()) {
+ try {
+ Optional<RowCountValue> cached = future.get();
+ if (cached.isPresent()) {
+ RowCountValue value = cached.get();
+ if (value.versionTime() == callerVersionTime) {
+ // Times match — verify version to guard against two
writes in the same ms.
+ long callerVersion = versionSupplier.getAsLong();
+ if (callerVersion < 0) {
+ // RPC failed: cannot verify version — return
empty conservatively.
+ // Do not invalidate: keep the existing entry so
it can be reused once the RPC recovers.
+ return OptionalLong.empty();
+ }
+ if (value.version() == callerVersion) {
+ return OptionalLong.of(value.rowCount());
+ }
+ // Same time but different version: two writes in the
same ms — stale.
+ }
+ // Times differ → definitely stale; skip the version RPC
entirely.
+ }
+ // Either empty (load failed) or stale — evict so next call
triggers a fresh reload.
+ rowCountCache.synchronous().invalidate(tableId);
+ } catch (Exception e) {
+ LOG.warn("Failed to get row count for table: {}, versionTime:
{}", tableId, callerVersionTime, e);
+ rowCountCache.synchronous().invalidate(tableId);
+ }
+ }
+ return OptionalLong.empty();
+ }
+
+ /**
+ * Generate the internal SQL for fetching exact min/max values.
+ */
+ @VisibleForTesting
+ public static String genMinMaxSql(List<String> qualifiers, String
columnName) {
+ // qualifiers: [catalogName, dbName, tableName]
+ String quotedCol = "`" + StatisticsUtil.escapeColumnName(columnName) +
"`";
+ String fullTable = "`" + qualifiers.get(0) + "`.`"
+ + qualifiers.get(1) + "`.`"
+ + qualifiers.get(2) + "`";
+ return "SELECT min(" + quotedCol + "), max(" + quotedCol + ") FROM " +
fullTable;
+ }
+
+ /**
+ * Generate the internal SQL for fetching exact row count.
+ */
+ @VisibleForTesting
+ public static String genCountSql(List<String> qualifiers) {
+ String fullTable = "`" + qualifiers.get(0) + "`.`"
+ + qualifiers.get(1) + "`.`"
+ + qualifiers.get(2) + "`";
+ return "SELECT count(*) FROM " + fullTable;
+ }
+
+ /**
+ * Async cache loader that issues internal SQL queries to compute exact
min/max.
+ */
+ protected static final class CacheLoader
+ implements AsyncCacheLoader<ColumnMinMaxKey, Optional<CacheValue>>
{
+
+ @Override
+ public @NonNull CompletableFuture<Optional<CacheValue>> asyncLoad(
+ @NonNull ColumnMinMaxKey key, @NonNull Executor executor) {
+ return CompletableFuture.supplyAsync(() -> {
+ try {
+ return doLoad(key);
+ } catch (Exception e) {
+ LOG.warn("Failed to load MinMax for column: {}", key, e);
+ return Optional.empty();
+ }
+ }, executor);
+ }
+
+ private Optional<CacheValue> doLoad(ColumnMinMaxKey key) throws
Exception {
+ // Look up the table by its ID
+ TableIf tableIf =
Env.getCurrentInternalCatalog().getTableByTableId(key.getTableId());
+ if (!(tableIf instanceof OlapTable)) {
+ return Optional.empty();
+ }
+ OlapTable olapTable = (OlapTable) tableIf;
+
+ // Validate column exists and is eligible
+ Column column = olapTable.getColumn(key.getColumnName());
+ if (column == null) {
+ return Optional.empty();
+ }
+ if (!column.getType().isNumericType() &&
!column.getType().isDateType()) {
+ return Optional.empty();
+ }
+ if (column.isAggregated()) {
+ return Optional.empty();
+ }
+
+ // Capture version and versionTime before the query.
+ // Both are needed: versionTime (cheap) catches writes that cross
a millisecond boundary;
+ // version (may be RPC in cloud mode) is the ground truth for
same-millisecond writes.
+ long versionBefore = olapTable.getVisibleVersion();
+ long versionTimeBefore = olapTable.getVisibleVersionTime();
+
Review Comment:
**Correctness confirmed:** `getVisibleVersion()` may throw checked
`RpcException` in cloud mode. This is safely handled: `doLoad()` declares
`throws Exception`, and the outer `asyncLoad` lambda catches `Exception` (line
413) and returns `Optional.empty()`. Good.
Also confirmed: the before/after version check pattern (capture version
before query, compare after) correctly invalidates results if a concurrent
write occurs during the internal SQL execution.
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/stats/SimpleAggCacheMgr.java:
##########
@@ -0,0 +1,546 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.stats;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.ThreadPoolManager;
+import org.apache.doris.qe.AutoCloseConnectContext;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.statistics.ResultRow;
+import org.apache.doris.statistics.util.StatisticsUtil;
+
+import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
+import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.function.LongSupplier;
+
+/**
+ * Async cache that stores exact MIN/MAX/COUNT values for OlapTable,
+ * used by {@code RewriteSimpleAggToConstantRule} to replace simple
+ * aggregations with constant values.
+ *
+ * <p>MIN/MAX values are obtained by executing
+ * {@code SELECT min(col), max(col) FROM table}, and COUNT values by
+ * {@code SELECT count(*) FROM table}, both as internal SQL queries
+ * inside FE. Results are cached with a version stamp derived from
+ * {@code OlapTable.getVisibleVersionTime()}.
+ * When a caller provides a versionTime newer than the cached versionTime,
+ * the stale entry is evicted and a background reload is triggered.
+ *
+ * <p>Cache validation uses a two-level check driven by {@code versionTime}:
+ * <ol>
+ * <li>If times differ: the table has definitely changed — evict immediately,
+ * without calling {@code getVisibleVersion()} (which may involve an RPC
in cloud mode).</li>
+ * <li>If times match: call {@code getVisibleVersion()} once to compare
versions,
+ * guarding against the rare case of two writes completing within the
same millisecond.</li>
+ * </ol>
+ *
+ * <p>Only numeric and date/datetime columns are cached for MIN/MAX;
+ * aggregated columns are skipped.
+ */
+public class SimpleAggCacheMgr {
+
+ // ======================== Public inner types ========================
+
+ /**
+ * Holds exact min and max values for a column as strings.
+ */
+ public static class ColumnMinMax {
+ private final String minValue;
+ private final String maxValue;
+
+ public ColumnMinMax(String minValue, String maxValue) {
+ this.minValue = minValue;
+ this.maxValue = maxValue;
+ }
+
+ public String minValue() {
+ return minValue;
+ }
+
+ public String maxValue() {
+ return maxValue;
+ }
+ }
+
+ /**
+ * Cache key identifying a column by its table ID and column name.
+ */
+ public static final class ColumnMinMaxKey {
+ private final long tableId;
+ private final String columnName;
+
+ public ColumnMinMaxKey(long tableId, String columnName) {
+ this.tableId = tableId;
+ this.columnName = columnName;
+ }
+
+ public long getTableId() {
+ return tableId;
+ }
+
+ public String getColumnName() {
+ return columnName;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof ColumnMinMaxKey)) {
+ return false;
+ }
+ ColumnMinMaxKey that = (ColumnMinMaxKey) o;
+ return tableId == that.tableId &&
columnName.equalsIgnoreCase(that.columnName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(tableId, columnName.toLowerCase());
+ }
+
+ @Override
+ public String toString() {
+ return "ColumnMinMaxKey{tableId=" + tableId + ", column=" +
columnName + "}";
+ }
+ }
+
+ private static class CacheValue {
+ private final ColumnMinMax minMax;
+ private final long version;
+ private final long versionTime;
+
+ CacheValue(ColumnMinMax minMax, long version, long versionTime) {
+ this.minMax = minMax;
+ this.version = version;
+ this.versionTime = versionTime;
+ }
+
+ ColumnMinMax minMax() {
+ return minMax;
+ }
+
+ long version() {
+ return version;
+ }
+
+ long versionTime() {
+ return versionTime;
+ }
+ }
+
+ /**
+ * Cached row count with version stamp.
+ */
+ private static class RowCountValue {
+ private final long rowCount;
+ private final long version;
+ private final long versionTime;
+
+ RowCountValue(long rowCount, long version, long versionTime) {
+ this.rowCount = rowCount;
+ this.version = version;
+ this.versionTime = versionTime;
+ }
+
+ long rowCount() {
+ return rowCount;
+ }
+
+ long version() {
+ return version;
+ }
+
+ long versionTime() {
+ return versionTime;
+ }
+ }
+
+ private static final Logger LOG =
LogManager.getLogger(SimpleAggCacheMgr.class);
+
+ private static volatile SimpleAggCacheMgr INSTANCE;
+ private static volatile SimpleAggCacheMgr TEST_INSTANCE;
+
+ private final AsyncLoadingCache<ColumnMinMaxKey, Optional<CacheValue>>
cache;
+ private final AsyncLoadingCache<Long, Optional<RowCountValue>>
rowCountCache;
+
+ /**
+ * Protected no-arg constructor for test subclassing.
+ * Subclasses override {@link #getStats}, {@link #getRowCount}, etc.
+ */
+ protected SimpleAggCacheMgr() {
+ this.cache = null;
+ this.rowCountCache = null;
+ }
+
+ private SimpleAggCacheMgr(ExecutorService executor) {
+ this.cache = Caffeine.newBuilder()
+ .maximumSize(Config.stats_cache_size)
+ .executor(executor)
+ .buildAsync(new CacheLoader());
+ this.rowCountCache = Caffeine.newBuilder()
+ .maximumSize(Config.stats_cache_size)
+ .executor(executor)
+ .buildAsync(new RowCountLoader());
+ }
+
+ private static SimpleAggCacheMgr getInstance() {
+ if (INSTANCE == null) {
+ synchronized (SimpleAggCacheMgr.class) {
+ if (INSTANCE == null) {
+ ExecutorService executor =
ThreadPoolManager.newDaemonCacheThreadPool(
+ 4, "simple-agg-cache-pool", true);
+ INSTANCE = new SimpleAggCacheMgr(executor);
+ }
+ }
+ }
+ return INSTANCE;
+ }
+
+ /**
+ * Returns the singleton instance backed by async-loading cache,
+ * or the test override if one has been set.
+ */
+ public static SimpleAggCacheMgr internalInstance() {
+ SimpleAggCacheMgr test = TEST_INSTANCE;
+ if (test != null) {
+ return test;
+ }
+ return getInstance();
+ }
+
+ /**
+ * Override used only in unit tests to inject a mock implementation.
+ */
+ @VisibleForTesting
+ public static void setTestInstance(SimpleAggCacheMgr instance) {
+ TEST_INSTANCE = instance;
+ }
+
+ /**
+ * Reset the test override so that subsequent calls go back to the real
cache.
+ */
+ @VisibleForTesting
+ public static void clearTestInstance() {
+ TEST_INSTANCE = null;
+ }
+
+ /**
+ * Get the cached min/max for a column.
+ *
+ * <p>Cache validation uses a two-level check driven by {@code
versionTime}:
+ * <ol>
+ * <li>If {@code cachedVersionTime != callerVersionTime}: times differ,
the table has
+ * definitely been modified — evict immediately. {@code
versionSupplier} is
+ * <em>never</em> called, saving a potentially expensive RPC in
cloud mode.</li>
+ * <li>If {@code cachedVersionTime == callerVersionTime}: times match,
but two writes
+ * in the same millisecond could produce the same time with
different versions.
+ * {@code versionSupplier} is called exactly once to confirm {@code
version} equality.</li>
+ * </ol>
+ *
+ * @param key cache key (tableId + columnName)
+ * @param callerVersionTime the caller's current {@code
table.getVisibleVersionTime()} (cheap, always local)
+ * @param versionSupplier lazy supplier for {@code
table.getVisibleVersion()}; called only when
+ * {@code versionTime} values are equal. Returns
-1 on RPC failure,
+ * in which case the method returns empty
conservatively.
+ */
+ public Optional<ColumnMinMax> getStats(ColumnMinMaxKey key, long
callerVersionTime,
+ LongSupplier versionSupplier) {
+ CompletableFuture<Optional<CacheValue>> future = cache.get(key);
+ if (future.isDone()) {
+ try {
+ Optional<CacheValue> cacheValue = future.get();
+ if (cacheValue.isPresent()) {
+ CacheValue value = cacheValue.get();
+ if (value.versionTime() == callerVersionTime) {
+ // Times match — verify version to guard against two
writes in the same ms.
+ // versionSupplier may be an RPC in cloud mode; it is
only invoked here.
+ long callerVersion = versionSupplier.getAsLong();
+ if (callerVersion < 0) {
+ // RPC failed: cannot verify version — return
empty conservatively.
+ // Do not invalidate: keep the existing entry so
it can be reused once the RPC recovers.
+ return Optional.empty();
+ }
+ if (value.version() == callerVersion) {
+ return Optional.of(value.minMax());
+ }
+ // Same time but different version: two writes in the
same ms — stale.
+ }
+ // Times differ → definitely stale; skip the version RPC
entirely.
+ }
+ // Either empty (load failed) or stale — evict so next call
triggers a fresh reload.
+ cache.synchronous().invalidate(key);
+ } catch (Exception e) {
+ LOG.warn("Failed to get MinMax for column: {}, versionTime:
{}", key, callerVersionTime, e);
+ cache.synchronous().invalidate(key);
+ }
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * Evict the cached stats for a column, if present. Used when we know the
data has changed
+ */
+ public void removeStats(ColumnMinMaxKey key) {
+ cache.synchronous().invalidate(key);
+ }
+
+ /**
+ * Get the cached row count for a table.
+ *
+ * <p>Cache validation uses a two-level check driven by {@code
versionTime}:
+ * <ol>
+ * <li>If {@code cachedVersionTime != callerVersionTime}: times differ,
the table has
+ * definitely been modified — evict immediately. {@code
versionSupplier} is
+ * <em>never</em> called, saving a potentially expensive RPC in
cloud mode.</li>
+ * <li>If {@code cachedVersionTime == callerVersionTime}: times match,
but two writes
+ * in the same millisecond could produce the same time with
different versions.
+ * {@code versionSupplier} is called exactly once to confirm {@code
version} equality.</li>
+ * </ol>
+ *
+ * @param tableId the table id
+ * @param callerVersionTime the caller's current {@code
table.getVisibleVersionTime()} (cheap, always local)
+ * @param versionSupplier lazy supplier for {@code
table.getVisibleVersion()}; called only when
+ * {@code versionTime} values are equal. Returns
-1 on RPC failure,
+ * in which case the method returns empty
conservatively.
+ */
+ public OptionalLong getRowCount(long tableId, long callerVersionTime,
LongSupplier versionSupplier) {
+ CompletableFuture<Optional<RowCountValue>> future =
rowCountCache.get(tableId);
+ if (future.isDone()) {
+ try {
+ Optional<RowCountValue> cached = future.get();
+ if (cached.isPresent()) {
+ RowCountValue value = cached.get();
+ if (value.versionTime() == callerVersionTime) {
+ // Times match — verify version to guard against two
writes in the same ms.
+ long callerVersion = versionSupplier.getAsLong();
+ if (callerVersion < 0) {
+ // RPC failed: cannot verify version — return
empty conservatively.
+ // Do not invalidate: keep the existing entry so
it can be reused once the RPC recovers.
+ return OptionalLong.empty();
+ }
+ if (value.version() == callerVersion) {
+ return OptionalLong.of(value.rowCount());
+ }
+ // Same time but different version: two writes in the
same ms — stale.
+ }
+ // Times differ → definitely stale; skip the version RPC
entirely.
+ }
+ // Either empty (load failed) or stale — evict so next call
triggers a fresh reload.
+ rowCountCache.synchronous().invalidate(tableId);
+ } catch (Exception e) {
+ LOG.warn("Failed to get row count for table: {}, versionTime:
{}", tableId, callerVersionTime, e);
+ rowCountCache.synchronous().invalidate(tableId);
+ }
+ }
+ return OptionalLong.empty();
+ }
+
+ /**
+ * Generate the internal SQL for fetching exact min/max values.
+ */
+ @VisibleForTesting
+ public static String genMinMaxSql(List<String> qualifiers, String
columnName) {
+ // qualifiers: [catalogName, dbName, tableName]
+ String quotedCol = "`" + StatisticsUtil.escapeColumnName(columnName) +
"`";
+ String fullTable = "`" + qualifiers.get(0) + "`.`"
+ + qualifiers.get(1) + "`.`"
+ + qualifiers.get(2) + "`";
+ return "SELECT min(" + quotedCol + "), max(" + quotedCol + ") FROM " +
fullTable;
+ }
+
+ /**
+ * Generate the internal SQL for fetching exact row count.
+ */
+ @VisibleForTesting
+ public static String genCountSql(List<String> qualifiers) {
+ String fullTable = "`" + qualifiers.get(0) + "`.`"
+ + qualifiers.get(1) + "`.`"
+ + qualifiers.get(2) + "`";
+ return "SELECT count(*) FROM " + fullTable;
+ }
+
+ /**
+ * Async cache loader that issues internal SQL queries to compute exact
min/max.
+ */
+ protected static final class CacheLoader
+ implements AsyncCacheLoader<ColumnMinMaxKey, Optional<CacheValue>>
{
+
+ @Override
+ public @NonNull CompletableFuture<Optional<CacheValue>> asyncLoad(
+ @NonNull ColumnMinMaxKey key, @NonNull Executor executor) {
+ return CompletableFuture.supplyAsync(() -> {
+ try {
+ return doLoad(key);
+ } catch (Exception e) {
+ LOG.warn("Failed to load MinMax for column: {}", key, e);
+ return Optional.empty();
+ }
+ }, executor);
+ }
+
+ private Optional<CacheValue> doLoad(ColumnMinMaxKey key) throws
Exception {
+ // Look up the table by its ID
+ TableIf tableIf =
Env.getCurrentInternalCatalog().getTableByTableId(key.getTableId());
+ if (!(tableIf instanceof OlapTable)) {
+ return Optional.empty();
+ }
+ OlapTable olapTable = (OlapTable) tableIf;
+
+ // Validate column exists and is eligible
+ Column column = olapTable.getColumn(key.getColumnName());
+ if (column == null) {
+ return Optional.empty();
+ }
+ if (!column.getType().isNumericType() &&
!column.getType().isDateType()) {
+ return Optional.empty();
+ }
+ if (column.isAggregated()) {
+ return Optional.empty();
+ }
+
+ // Capture version and versionTime before the query.
+ // Both are needed: versionTime (cheap) catches writes that cross
a millisecond boundary;
+ // version (may be RPC in cloud mode) is the ground truth for
same-millisecond writes.
+ long versionBefore = olapTable.getVisibleVersion();
+ long versionTimeBefore = olapTable.getVisibleVersionTime();
+
+ // Build and execute internal SQL
+ List<String> qualifiers = olapTable.getFullQualifiers();
+ String sql = genMinMaxSql(qualifiers, column.getName());
+
+ List<ResultRow> rows;
+ try (AutoCloseConnectContext r =
StatisticsUtil.buildConnectContext(false)) {
+ r.connectContext.getSessionVariable().setPipelineTaskNum("1");
+ // Disable our own rule to prevent infinite recursion:
+ // this internal SQL goes through Nereids and would otherwise
trigger
+ // RewriteSimpleAggToConstantRule again.
+ r.connectContext.getSessionVariable().setDisableNereidsRules(
+ "REWRITE_SIMPLE_AGG_TO_CONSTANT");
+ StmtExecutor stmtExecutor = new StmtExecutor(r.connectContext,
sql);
Review Comment:
**Correctness confirmed:** `setDisableNereidsRules()` replaces (not appends)
the disabled rules string. This is safe here because
`StatisticsUtil.buildConnectContext(false)` creates a fresh `ConnectContext`
with an empty disabled-rules string, so replacing with
`"REWRITE_SIMPLE_AGG_TO_CONSTANT"` only disables this one rule. No other rules
are accidentally disabled.
##########
fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java:
##########
@@ -3703,6 +3703,10 @@ private List<Partition> truncateTableInternal(OlapTable
olapTable, List<Partitio
olapTable.dropPartitionForTruncate(olapTable.getDatabase().getId(),
isforceDrop, pair.getValue());
}
+ // Reset table-level visibleVersion to TABLE_INIT_VERSION so it stays
consistent
+ // with the newly created partitions (which also start at
PARTITION_INIT_VERSION).
Review Comment:
**Correctness confirmed:** This call is within `truncateTableInternal()`
which executes under the DB write lock (acquired in `truncateTable()` at line
~3650). The method is also called from `replayTruncateTable()` which re-invokes
`truncateTableInternal()`, so the reset is correctly applied on both master and
follower. The EditLog entry is written *after* this point (in `truncateTable()`
line ~3680), following the standard modify-then-log pattern.
##########
fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/RewriteSimpleAggToConstantRuleTest.java:
##########
@@ -0,0 +1,294 @@
+// Licensed to the Apache Software Foundation (ASF) under one
Review Comment:
**Suggestion:** Consider adding a negative test case for queries with a
WHERE clause, e.g. `SELECT count(*) FROM dup_tbl WHERE k1 > 10`. This is safe
by construction (the `LogicalFilter` node between `Aggregate` and `Scan`
prevents the pattern from matching), but an explicit test would document this
invariant and catch regressions if the pattern-matching logic is later modified.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]