This is an automated email from the ASF dual-hosted git repository.

zstan pushed a commit to branch ignite-26068
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 049d5c2146692344c74824dc5b2067d16af60e80
Author: zstan <[email protected]>
AuthorDate: Thu Sep 11 09:47:54 2025 +0300

    IGNITE-26068 Sql. Update cached query plans when related statistics changed
---
 .../sql/engine/statistic/ItStatisticTest.java      |   9 +-
 .../internal/sql/engine/SqlQueryProcessor.java     |   5 +-
 .../sql/engine/exec/fsm/ParsingPhaseHandler.java   |   4 -
 .../internal/sql/engine/prepare/CacheKey.java      |  19 +-
 .../sql/engine/prepare/PrepareService.java         |   2 +
 .../sql/engine/prepare/PrepareServiceImpl.java     | 395 +++++++++++++++------
 .../sql/engine/prepare/ValidationResult.java       |  32 +-
 .../sql/engine/statistic/SqlStatisticManager.java  |   4 +
 .../engine/statistic/SqlStatisticManagerImpl.java  |  19 +-
 .../internal/sql/engine/util/cache/Cache.java      |  10 +
 .../sql/engine/util/cache/CacheFactory.java        |   9 +
 .../engine/util/cache/CaffeineCacheFactory.java    |  18 +
 .../sql/engine/exec/ExecutionServiceImplTest.java  |  13 +-
 .../engine/framework/PredefinedSchemaManager.java  |   7 +
 .../sql/engine/framework/TestBuilders.java         |  31 +-
 .../engine/framework/VersionedSchemaManager.java   |  50 +++
 .../sql/engine/planner/PlannerTimeoutTest.java     |   5 +-
 .../sql/engine/prepare/PrepareServiceImplTest.java | 189 +++++++++-
 .../engine/schema/SqlSchemaManagerImplTest.java    |  13 +-
 .../sql/engine/util/EmptyCacheFactory.java         |  11 +
 .../sql/metrics/PlanningCacheMetricsTest.java      |  11 +-
 21 files changed, 715 insertions(+), 141 deletions(-)

diff --git 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/statistic/ItStatisticTest.java
 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/statistic/ItStatisticTest.java
index d45feee244d..0162942fda2 100644
--- 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/statistic/ItStatisticTest.java
+++ 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/statistic/ItStatisticTest.java
@@ -33,6 +33,8 @@ import org.junit.jupiter.api.Test;
 public class ItStatisticTest extends BaseSqlIntegrationTest {
     private SqlStatisticManagerImpl sqlStatisticManager;
 
+    private static final AtomicInteger counter = new AtomicInteger(0);
+
     @BeforeAll
     void beforeAll() {
         sqlStatisticManager = (SqlStatisticManagerImpl) 
queryProcessor().sqlStatisticManager();
@@ -44,6 +46,11 @@ public class ItStatisticTest extends BaseSqlIntegrationTest {
         sql("DROP TABLE IF EXISTS t");
     }
 
+    @Override
+    protected int initialNodes() {
+        return 1;
+    }
+
     @Test
     public void testStatisticsRowCount() throws Exception {
         // For test we should always update statistics.
@@ -71,8 +78,6 @@ public class ItStatisticTest extends BaseSqlIntegrationTest {
         }
     }
 
-    private static AtomicInteger counter = new AtomicInteger(0);
-
     private void insertAndUpdateRunQuery(int numberOfRecords) throws 
ExecutionException, TimeoutException, InterruptedException {
         int start = counter.get();
         int end = counter.addAndGet(numberOfRecords) - 1;
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 0bdcb44fad5..bfeac05c268 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -296,9 +296,12 @@ public class SqlQueryProcessor implements QueryProcessor, 
SystemViewProvider {
                 clusterCfg,
                 nodeCfg,
                 sqlSchemaManager,
-                ddlSqlToCommandConverter
+                ddlSqlToCommandConverter,
+                clockService
         ));
 
+        sqlStatisticManager.planUpdater(prepareSvc::updatePlans);
+
         var msgSrvc = registerService(new MessageServiceImpl(
                 localNode,
                 clusterSrvc.messagingService(),
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ParsingPhaseHandler.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ParsingPhaseHandler.java
index 2192a8cd85c..a9cfc334f45 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ParsingPhaseHandler.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ParsingPhaseHandler.java
@@ -75,8 +75,4 @@ class ParsingPhaseHandler implements ExecutionPhaseHandler {
 
         return Result.proceedAfter(awaitFuture);
     }
-
-    private static boolean shouldBeCached(SqlQueryType queryType) {
-        return queryType == SqlQueryType.QUERY || queryType == 
SqlQueryType.DML;
-    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/CacheKey.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/CacheKey.java
index 1b7f97c6a4b..528c7bfc912 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/CacheKey.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/CacheKey.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.sql.engine.prepare;
 
 import java.util.Arrays;
 import java.util.Objects;
+import org.apache.ignite.internal.sql.engine.sql.ParsedResult;
 import org.apache.ignite.sql.ColumnType;
 
 /**
@@ -41,22 +42,34 @@ public class CacheKey {
 
     private int hashCode = 0;
 
+    private final ParsedResult parsedResult;
+
     /**
      * Constructor.
      *
      * @param catalogVersion Catalog version.
      * @param schemaName Schema name.
-     * @param query      Query string.
+     * @param parsedResult AST with additional info.
      * @param contextKey Optional context key to differ queries with and 
without/different flags, having an impact on result plan (like
      *                   LOCAL flag)
      * @param paramTypes Types of all dynamic parameters, no any type can be 
{@code null}.
      */
-    public CacheKey(int catalogVersion, String schemaName, String query, 
Object contextKey, ColumnType[] paramTypes) {
+    CacheKey(int catalogVersion, String schemaName, ParsedResult parsedResult, 
Object contextKey, ColumnType[] paramTypes) {
         this.catalogVersion = catalogVersion;
         this.schemaName = schemaName;
-        this.query = query;
+        this.query = parsedResult.normalizedQuery();
         this.contextKey = contextKey;
         this.paramTypes = paramTypes;
+
+        this.parsedResult = parsedResult;
+    }
+
+    ParsedResult parsedResult() {
+        return parsedResult;
+    }
+
+    int catalogVersion() {
+        return catalogVersion;
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareService.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareService.java
index b4dbab49fcf..83c3b8ffd28 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareService.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareService.java
@@ -48,4 +48,6 @@ public interface PrepareService extends LifecycleAware {
     default CompletableFuture<Void> invalidateCache(Set<String> tableNames) {
         return CompletableFutures.nullCompletedFuture();
     }
+
+    void updatePlans(int tableId);
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
index b7ddf7608a0..6f48e5c6c46 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
@@ -24,17 +24,24 @@ import static 
org.apache.ignite.internal.sql.engine.trait.TraitUtils.distributio
 import static 
org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG;
 import static 
org.apache.ignite.internal.sql.engine.util.Commons.fastQueryOptimizationEnabled;
 import static 
org.apache.ignite.internal.thread.ThreadOperation.NOTHING_ALLOWED;
+import static 
org.apache.ignite.internal.util.CompletableFutures.isCompletedSuccessfully;
 import static org.apache.ignite.lang.ErrorGroups.Sql.EXECUTION_CANCELLED_ERR;
 
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -52,6 +59,7 @@ import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.tools.Frameworks;
 import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.lang.SqlExceptionMapperUtil;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
@@ -74,7 +82,10 @@ import 
org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueGet;
 import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueModify;
 import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
 import org.apache.ignite.internal.sql.engine.rel.IgniteSelectCount;
+import org.apache.ignite.internal.sql.engine.rel.IgniteTableModify;
+import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
 import org.apache.ignite.internal.sql.engine.schema.IgniteSchemas;
+import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
 import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
 import org.apache.ignite.internal.sql.engine.sql.IgniteSqlExplain;
 import org.apache.ignite.internal.sql.engine.sql.IgniteSqlExplainMode;
@@ -121,12 +132,14 @@ public class PrepareServiceImpl implements PrepareService 
{
 
     private static final String PLANNING_EXECUTOR_SOURCE_NAME = 
THREAD_POOLS_METRICS_SOURCE_NAME + "sql-planning-executor";
 
+    public static final int PLAN_UPDATER_INITIAL_DELAY = 2_000;
+
     private final UUID prepareServiceId = UUID.randomUUID();
     private final AtomicLong planIdGen = new AtomicLong();
 
     private final DdlSqlToCommandConverter ddlConverter;
 
-    private final Cache<CacheKey, CompletableFuture<QueryPlan>> cache;
+    final Cache<CacheKey, CompletableFuture<PlanInfo>> cache;
 
     private final String nodeName;
 
@@ -142,6 +155,15 @@ public class PrepareServiceImpl implements PrepareService {
 
     private volatile ThreadPoolExecutor planningPool;
 
+    /** Clock. */
+    private final ClockService clockService;
+
+    private final Cache<CacheKey, Object> weakCache;
+
+    private static final Object IGNORED_VAL = new Object();
+
+    private ExecutorService planUpdater;
+
     /**
      * Factory method.
      *
@@ -153,6 +175,8 @@ public class PrepareServiceImpl implements PrepareService {
      * @param nodeCfg Node SQL configuration.
      * @param schemaManager Schema manager to use on validation phase to bind 
identifiers in AST with particular schema objects.
      * @param ddlSqlToCommandConverter Converter from SQL DDL operators to 
catalog commands.
+     * @param clockService Clock service.
+     *
      */
     public static PrepareServiceImpl create(
             String nodeName,
@@ -162,7 +186,8 @@ public class PrepareServiceImpl implements PrepareService {
             SqlDistributedConfiguration clusterCfg,
             SqlLocalConfiguration nodeCfg,
             SqlSchemaManager schemaManager,
-            DdlSqlToCommandConverter ddlSqlToCommandConverter
+            DdlSqlToCommandConverter ddlSqlToCommandConverter,
+            ClockService clockService
     ) {
         return new PrepareServiceImpl(
                 nodeName,
@@ -173,7 +198,8 @@ public class PrepareServiceImpl implements PrepareService {
                 clusterCfg.planner().planCacheExpiresAfterSeconds().value(),
                 nodeCfg.planner().threadCount().value(),
                 metricManager,
-                schemaManager
+                schemaManager,
+                clockService
         );
     }
 
@@ -187,6 +213,7 @@ public class PrepareServiceImpl implements PrepareService {
      * @param plannerTimeout Timeout in milliseconds to planning.
      * @param metricManager Metric manager.
      * @param schemaManager Schema manager to use on validation phase to bind 
identifiers in AST with particular schema objects.
+     * @param clockService Clock service.
      */
     public PrepareServiceImpl(
             String nodeName,
@@ -197,7 +224,8 @@ public class PrepareServiceImpl implements PrepareService {
             int plannerThreadCount,
             int planExpirySeconds,
             MetricManager metricManager,
-            SqlSchemaManager schemaManager
+            SqlSchemaManager schemaManager,
+            ClockService clockService
     ) {
         this.nodeName = nodeName;
         this.ddlConverter = ddlConverter;
@@ -205,9 +233,12 @@ public class PrepareServiceImpl implements PrepareService {
         this.metricManager = metricManager;
         this.plannerThreadCount = plannerThreadCount;
         this.schemaManager = schemaManager;
+        this.clockService = clockService;
 
         sqlPlanCacheMetricSource = new SqlPlanCacheMetricSource();
         cache = cacheFactory.create(cacheSize, sqlPlanCacheMetricSource, 
Duration.ofSeconds(planExpirySeconds));
+
+        weakCache = cacheFactory.createWithWeakKeys();
     }
 
     /** {@inheritDoc} */
@@ -231,12 +262,67 @@ public class PrepareServiceImpl implements PrepareService 
{
         metricManager.enable(PLANNING_EXECUTOR_SOURCE_NAME);
 
         IgnitePlanner.warmup();
+
+        ScheduledExecutorService planUpdater = 
Executors.newSingleThreadScheduledExecutor(
+                IgniteThreadFactory.create(nodeName, "sql-query-plan-refresh", 
true, LOG)
+        );
+
+        planUpdater.scheduleAtFixedRate(() -> {
+            for (CacheKey key : weakCache.asMap().keySet()) {
+                CompletableFuture<PlanInfo> fut = cache.get(key);
+
+                // can be evicted
+                if (fut != null) {
+                    PlanInfo info = fut.join();
+                    assert info.context != null && info.statement != null;
+
+                    assert isCompletedSuccessfully(fut);
+
+                    int currentCatalogVersion = 
schemaManager.catalogVersion(clockService.now().longValue());
+
+                    if (currentCatalogVersion == key.catalogVersion()) {
+                        SqlQueryType queryType = 
key.parsedResult().queryType();
+
+                        if (queryType == SqlQueryType.QUERY || queryType == 
SqlQueryType.DML) {
+                            CompletableFuture<PlanInfo> newPlanFut = 
CompletableFuture.supplyAsync(
+                                    () -> buildPlan(queryType, key, 
info.statement, info.context), planningPool)
+                                    .exceptionally(e -> {
+                                        LOG.warn("Failed to re-planning query: 
" + key.parsedResult().originalQuery(), e);
+
+                                        return null;
+                                    });
+
+                            try {
+                                PlanInfo newPlan = newPlanFut.get();
+
+                                if (LOG.isDebugEnabled()) {
+                                    LOG.debug("Plan: " + info.queryPlan + 
"\n\nre-planned into: " + newPlan.queryPlan);
+                                }
+
+                                if (newPlan != null) {
+                                    cache.asMap().computeIfPresent(key, (k, v) 
-> CompletableFuture.completedFuture(newPlan));
+                                }
+                            } catch (Exception ex) {
+                                LOG.warn("Failed to re-planning query: " + 
key.parsedResult().originalQuery(), ex);
+                            }
+
+                            weakCache.invalidate(key);
+                        } else {
+                            throw new AssertionError("Unexpected queryType=" + 
queryType);
+                        }
+                    }
+                }
+            }
+        }, PLAN_UPDATER_INITIAL_DELAY, 1_000, TimeUnit.MILLISECONDS);
+
+        this.planUpdater = planUpdater;
     }
 
     /** {@inheritDoc} */
     @Override
     public void stop() throws Exception {
         planningPool.shutdownNow();
+        planUpdater.shutdownNow();
         metricManager.unregisterSource(sqlPlanCacheMetricSource);
         metricManager.unregisterSource(PLANNING_EXECUTOR_SOURCE_NAME);
     }
@@ -257,16 +343,16 @@ public class PrepareServiceImpl implements PrepareService 
{
 
         CacheKey key = createCacheKey(parsedResult, catalogVersion, 
schemaName, operationContext.parameters());
 
-        CompletableFuture<QueryPlan> planFuture = cache.get(key);
+        CompletableFuture<PlanInfo> planFuture = cache.get(key);
 
         if (planFuture != null) {
             return planFuture.thenApply((plan) -> {
                 // We assume that non-multi-step plans is always better then a 
multi-step plan.
                 // or fast query optimization is disabled return a regular 
plan.
-                if (!(plan instanceof MultiStepPlan)) {
-                    return plan;
+                if (!(plan.queryPlan instanceof MultiStepPlan)) {
+                    return plan.queryPlan;
                 } else {
-                    MultiStepPlan regularPlan = (MultiStepPlan) plan;
+                    MultiStepPlan regularPlan = (MultiStepPlan) plan.queryPlan;
                     QueryPlan fastPlan = regularPlan.fastPlan();
 
                     if (fastPlan != null && !explicitTx) {
@@ -318,7 +404,7 @@ public class PrepareServiceImpl implements PrepareService {
                 cache.clear();
             } else {
                 Set<QualifiedName> qualifiedNames = 
tableNames.stream().map(QualifiedName::parse).collect(Collectors.toSet());
-                cache.removeIfValue(p -> p.isDone() && planMatches(p.join(), 
qualifiedNames::contains));
+                cache.removeIfValue(p -> p.isDone() && 
planMatches(p.join().queryPlan, qualifiedNames::contains));
             }
 
             return null;
@@ -342,13 +428,13 @@ public class PrepareServiceImpl implements PrepareService 
{
     ) {
         switch (parsedResult.queryType()) {
             case QUERY:
-                return prepareQuery(parsedResult, planningContext);
+                return prepareQuery(parsedResult, planningContext).thenApply(f 
-> f.queryPlan);
             case DDL:
                 return prepareDdl(parsedResult, planningContext);
             case KILL:
                 return prepareKill(parsedResult);
             case DML:
-                return prepareDml(parsedResult, planningContext);
+                return prepareDml(parsedResult, planningContext).thenApply(f 
-> f.queryPlan);
             case EXPLAIN:
                 return prepareExplain(parsedResult, planningContext);
             default:
@@ -402,7 +488,7 @@ public class PrepareServiceImpl implements PrepareService {
                 explicandum
         );
 
-        CompletableFuture<QueryPlan> result;
+        CompletableFuture<PlanInfo> result;
         switch (queryType) {
             case QUERY:
                 result = prepareQuery(newParsedResult, ctx);
@@ -415,7 +501,8 @@ public class PrepareServiceImpl implements PrepareService {
         }
 
         return result.thenApply(plan -> {
-            assert plan instanceof ExplainablePlan : plan == null ? "<null>" : 
plan.getClass().getCanonicalName();
+            QueryPlan plan0 = plan.queryPlan;
+            assert plan0 instanceof ExplainablePlan : plan0 == null ? "<null>" 
: plan0.getClass().getCanonicalName();
 
             SqlLiteral literal = (SqlLiteral) explainMode;
 
@@ -423,7 +510,7 @@ public class PrepareServiceImpl implements PrepareService {
 
             assert mode != null;
 
-            return new ExplainPlan(nextPlanId(), (ExplainablePlan) plan, mode);
+            return new ExplainPlan(nextPlanId(), (ExplainablePlan) plan0, 
mode);
         });
     }
 
@@ -431,7 +518,7 @@ public class PrepareServiceImpl implements PrepareService {
         return !(sqlNode instanceof SqlNodeList);
     }
 
-    private CompletableFuture<QueryPlan> prepareQuery(
+    private CompletableFuture<PlanInfo> prepareQuery(
             ParsedResult parsedResult,
             PlanningContext ctx
     ) {
@@ -459,63 +546,101 @@ public class PrepareServiceImpl implements 
PrepareService {
                 // Try to produce a fast plan, if successful, then return that 
plan w/o caching it.
                 QueryPlan fastPlan = tryOptimizeFast(stmt, ctx);
                 if (fastPlan != null) {
-                    return CompletableFuture.completedFuture(fastPlan);
+                    return CompletableFuture.completedFuture(new 
PlanInfo(fastPlan));
                 }
             }
 
             // Use parameter metadata to compute a cache key.
             CacheKey key = 
createCacheKeyFromParameterMetadata(stmt.parsedResult, ctx, 
stmt.parameterMetadata);
 
-            CompletableFuture<QueryPlan> planFut = cache.get(key, k -> 
CompletableFuture.supplyAsync(() -> {
-                IgnitePlanner planner = ctx.planner();
+            return cache.get(key, k -> CompletableFuture.supplyAsync(() -> 
buildQueryPlan(key, stmt, ctx), planningPool));
+        });
+    }
 
-                ValidationResult validated = stmt.value;
-                ParameterMetadata parameterMetadata = stmt.parameterMetadata;
+    private PlanInfo buildQueryPlan(CacheKey key, 
ValidStatement<ValidationResult> stmt, PlanningContext ctx) {
+        IgnitePlanner planner = ctx.planner();
 
-                SqlNode validatedNode = validated.sqlNode();
+        ValidationResult validated = stmt.value;
+        ParameterMetadata parameterMetadata = stmt.parameterMetadata;
 
-                RelWithMetadata relWithMetadata = doOptimize(ctx, 
validatedNode, planner, () -> cache.invalidate(key));
-                IgniteRel optimizedRel = relWithMetadata.rel;
-                QueryPlan fastPlan = tryOptimizeFast(stmt, ctx);
+        SqlNode validatedNode = validated.sqlNode();
 
-                ResultSetMetadata resultSetMetadata = 
resultSetMetadata(validated.dataType(), validated.origins(), 
validated.aliases());
+        RelWithMetadata relWithMetadata = doOptimize(ctx, validatedNode, 
planner, () -> cache.invalidate(key));
+        IgniteRel optimizedRel = relWithMetadata.rel;
+        QueryPlan fastPlan = tryOptimizeFast(stmt, ctx);
+
+        ResultSetMetadata resultSetMetadata = 
resultSetMetadata(validated.dataType(), validated.origins(), 
validated.aliases());
+
+        int catalogVersion = ctx.catalogVersion();
 
-                int catalogVersion = ctx.catalogVersion();
+        if (optimizedRel instanceof IgniteKeyValueGet) {
+            IgniteKeyValueGet kvGet = (IgniteKeyValueGet) optimizedRel;
 
-                if (optimizedRel instanceof IgniteKeyValueGet) {
-                    IgniteKeyValueGet kvGet = (IgniteKeyValueGet) optimizedRel;
+            var plan = new KeyValueGetPlan(
+                    nextPlanId(),
+                    catalogVersion,
+                    kvGet,
+                    resultSetMetadata,
+                    parameterMetadata,
+                    relWithMetadata.paMetadata,
+                    relWithMetadata.ppMetadata
+            );
+
+            return new PlanInfo(plan);
+        }
+
+        var plan = new MultiStepPlan(
+                nextPlanId(),
+                SqlQueryType.QUERY,
+                optimizedRel,
+                resultSetMetadata,
+                parameterMetadata,
+                catalogVersion,
+                relWithMetadata.numSources,
+                fastPlan,
+                relWithMetadata.paMetadata,
+                relWithMetadata.ppMetadata
+        );
 
-                    return new KeyValueGetPlan(
-                            nextPlanId(),
-                            catalogVersion,
-                            kvGet,
-                            resultSetMetadata,
-                            parameterMetadata,
-                            relWithMetadata.paMetadata,
-                            relWithMetadata.ppMetadata
-                    );
+        logPlan(key.parsedResult().originalQuery(), plan);
+
+        int currentCatalogVersion = 
schemaManager.catalogVersion(clockService.now().longValue());
+
+        if (currentCatalogVersion == catalogVersion) {
+            Set<Integer> sources = resolveSources(plan.getRel());
+
+            return new PlanInfo(plan, stmt, ctx, sources);
+        }
+
+        return new PlanInfo(plan);
+    }
+
+    @Override
+    public void updatePlans(int tableId) {
+        ConcurrentMap<CacheKey, CompletableFuture<PlanInfo>> map = 
cache.asMap();
+
+        for (Map.Entry<CacheKey, CompletableFuture<PlanInfo>> ent : 
map.entrySet()) {
+            CompletableFuture<PlanInfo> fut = ent.getValue();
+
+            if (isCompletedSuccessfully(fut)) {
+                // no wait, already completed
+                PlanInfo info = fut.join();
+
+                if (info.sources.contains(tableId)) {
+                    weakCache.put(ent.getKey(), IGNORED_VAL);
                 }
+            }
+        }
+    }
 
-                var plan = new MultiStepPlan(
-                        nextPlanId(),
-                        SqlQueryType.QUERY,
-                        optimizedRel,
-                        resultSetMetadata,
-                        parameterMetadata,
-                        catalogVersion,
-                        relWithMetadata.numSources,
-                        fastPlan,
-                        relWithMetadata.paMetadata,
-                        relWithMetadata.ppMetadata
-                );
-
-                logPlan(parsedResult.originalQuery(), plan);
-
-                return plan;
-            }, planningPool));
-
-            return planFut;
-        });
+    private PlanInfo buildPlan(SqlQueryType queryType, CacheKey key, 
ValidStatement<ValidationResult> stmt, PlanningContext ctx) {
+        if (queryType == SqlQueryType.QUERY) {
+            return buildQueryPlan(key, stmt, ctx);
+        } else if (queryType == SqlQueryType.DML) {
+            return buildDmlPlan(key, stmt, ctx);
+        } else {
+            throw new AssertionError("should not get here");
+        }
     }
 
     private PlanId nextPlanId() {
@@ -552,7 +677,7 @@ public class PrepareServiceImpl implements PrepareService {
     }
 
     /** Prepare plan in current thread, applicable for simple insert queries, 
cache plan not involved. */
-    CompletableFuture<QueryPlan> prepareDmlOpt(SqlNode sqlNode, 
PlanningContext ctx, String originalQuery) {
+    CompletableFuture<PlanInfo> prepareDmlOpt(SqlNode sqlNode, PlanningContext 
ctx, String originalQuery) {
         assert single(sqlNode);
 
         // Validate
@@ -593,10 +718,10 @@ public class PrepareServiceImpl implements PrepareService 
{
 
         logPlan(originalQuery, plan);
 
-        return CompletableFuture.completedFuture(plan);
+        return CompletableFuture.completedFuture(new PlanInfo(plan));
     }
 
-    private CompletableFuture<QueryPlan> prepareDml(ParsedResult parsedResult, 
PlanningContext ctx) {
+    private CompletableFuture<PlanInfo> prepareDml(ParsedResult parsedResult, 
PlanningContext ctx) {
         SqlNode sqlNode = parsedResult.parsedTree();
 
         assert single(sqlNode);
@@ -607,71 +732,79 @@ public class PrepareServiceImpl implements PrepareService 
{
             return prepareDmlOpt(sqlNode, ctx, parsedResult.originalQuery());
         }
 
-        CompletableFuture<ValidStatement<SqlNode>> validFut = 
CompletableFuture.supplyAsync(() -> {
+        CompletableFuture<ValidStatement<ValidationResult>> validFut = 
CompletableFuture.supplyAsync(() -> {
             IgnitePlanner planner = ctx.planner();
 
             // Validate
             SqlNode validatedNode = planner.validate(sqlNode);
+            ValidationResult validatedResult = new 
ValidationResult(validatedNode);
 
             // Get parameter metadata.
             RelDataType parameterRowType = planner.getParameterRowType();
             ParameterMetadata parameterMetadata = 
createParameterMetadata(parameterRowType);
 
-            return new ValidStatement<>(parsedResult, validatedNode, 
parameterMetadata);
+            return new ValidStatement<>(parsedResult, validatedResult, 
parameterMetadata);
         }, planningPool);
 
         // Optimize
-
         return validFut.thenCompose(stmt -> {
             // Use parameter metadata to compute a cache key.
             CacheKey key = 
createCacheKeyFromParameterMetadata(stmt.parsedResult, ctx, 
stmt.parameterMetadata);
 
-            CompletableFuture<QueryPlan> planFut = cache.get(key, k -> 
CompletableFuture.supplyAsync(() -> {
-                IgnitePlanner planner = ctx.planner();
+            return cache.get(key, k -> CompletableFuture.supplyAsync(() -> 
buildDmlPlan(key, stmt, ctx), planningPool));
+        });
+    }
 
-                SqlNode validatedNode = stmt.value;
-                ParameterMetadata parameterMetadata = stmt.parameterMetadata;
+    private PlanInfo buildDmlPlan(CacheKey key, 
ValidStatement<ValidationResult> stmt, PlanningContext ctx) {
+        IgnitePlanner planner = ctx.planner();
 
-                RelWithMetadata relWithMetadata = doOptimize(ctx, 
validatedNode, planner, () -> cache.invalidate(key));
-                IgniteRel optimizedRel = relWithMetadata.rel;
+        SqlNode validatedNode = stmt.value.sqlNode();
+        ParameterMetadata parameterMetadata = stmt.parameterMetadata;
 
-                int catalogVersion = ctx.catalogVersion();
+        RelWithMetadata relWithMetadata = doOptimize(ctx, validatedNode, 
planner, () -> cache.invalidate(key));
+        IgniteRel optimizedRel = relWithMetadata.rel;
 
-                ExplainablePlan plan;
-                if (optimizedRel instanceof IgniteKeyValueModify) {
-                    IgniteKeyValueModify kvModify = (IgniteKeyValueModify) 
optimizedRel;
+        int catalogVersion = ctx.catalogVersion();
 
-                    plan = new KeyValueModifyPlan(
-                            nextPlanId(),
-                            catalogVersion,
-                            kvModify,
-                            DML_METADATA,
-                            parameterMetadata,
-                            relWithMetadata.paMetadata,
-                            relWithMetadata.ppMetadata
-                    );
-                } else {
-                    plan = new MultiStepPlan(
-                            nextPlanId(),
-                            SqlQueryType.DML,
-                            optimizedRel,
-                            DML_METADATA,
-                            parameterMetadata,
-                            catalogVersion,
-                            relWithMetadata.numSources,
-                            null,
-                            relWithMetadata.paMetadata,
-                            relWithMetadata.ppMetadata
-                    );
-                }
+        ExplainablePlan plan;
+        if (optimizedRel instanceof IgniteKeyValueModify) {
+            IgniteKeyValueModify kvModify = (IgniteKeyValueModify) 
optimizedRel;
 
-                logPlan(parsedResult.originalQuery(), plan);
+            plan = new KeyValueModifyPlan(
+                    nextPlanId(),
+                    catalogVersion,
+                    kvModify,
+                    DML_METADATA,
+                    parameterMetadata,
+                    relWithMetadata.paMetadata,
+                    relWithMetadata.ppMetadata
+            );
+        } else {
+            plan = new MultiStepPlan(
+                    nextPlanId(),
+                    SqlQueryType.DML,
+                    optimizedRel,
+                    DML_METADATA,
+                    parameterMetadata,
+                    catalogVersion,
+                    relWithMetadata.numSources,
+                    null,
+                    relWithMetadata.paMetadata,
+                    relWithMetadata.ppMetadata
+            );
+        }
 
-                return plan;
-            }, planningPool));
+        logPlan(key.parsedResult().originalQuery(), plan);
 
-            return planFut;
-        });
+        int currentCatalogVersion = 
schemaManager.catalogVersion(clockService.now().longValue());
+
+        if (currentCatalogVersion == catalogVersion) {
+            Set<Integer> sources = resolveSources(plan.getRel());
+
+            return new PlanInfo(plan, stmt, ctx, sources);
+        }
+
+        return new PlanInfo(plan);
     }
 
     private @Nullable QueryPlan tryOptimizeFast(
@@ -741,7 +874,7 @@ public class PrepareServiceImpl implements PrepareService {
             paramTypes[idx++] = columnType;
         }
 
-        return new CacheKey(catalogVersion, schemaName, 
parsedResult.normalizedQuery(), true /* distributed */, paramTypes);
+        return new CacheKey(catalogVersion, schemaName, parsedResult, true /* 
distributed */, paramTypes);
     }
 
     private static CacheKey createCacheKeyFromParameterMetadata(ParsedResult 
parsedResult, PlanningContext ctx,
@@ -764,7 +897,35 @@ public class PrepareServiceImpl implements PrepareService {
             paramTypes = result;
         }
 
-        return new CacheKey(catalogVersion, ctx.schemaName(), 
parsedResult.normalizedQuery(), distributed, paramTypes);
+        return new CacheKey(catalogVersion, ctx.schemaName(), parsedResult, 
distributed, paramTypes);
+    }
+
+    private static Set<Integer> resolveSources(IgniteRel rel) {
+        Set<Integer> tables = new HashSet<>();
+
+        IgniteRelShuttle shuttle = new IgniteRelShuttle() {
+            @Override
+            public IgniteRel visit(IgniteTableModify rel) {
+                IgniteTable igniteTable = 
rel.getTable().unwrapOrThrow(IgniteTable.class);
+
+                tables.add(igniteTable.id());
+
+                return super.visit(rel);
+            }
+
+            @Override
+            public IgniteRel visit(IgniteTableScan rel) {
+                IgniteTable igniteTable = 
rel.getTable().unwrapOrThrow(IgniteTable.class);
+
+                tables.add(igniteTable.id());
+
+                return rel;
+            }
+        };
+
+        shuttle.visit(rel);
+
+        return tables;
     }
 
     private static ResultSetMetadata resultSetMetadata(
@@ -984,4 +1145,32 @@ public class PrepareServiceImpl implements PrepareService 
{
             return matches;
         }
     }
+
+    private static class PlanInfo {
+        private final QueryPlan queryPlan;
+        @Nullable
+        private final ValidStatement<ValidationResult> statement;
+        @Nullable
+        private final PlanningContext context;
+        private final Set<Integer> sources;
+
+        PlanInfo(
+                QueryPlan plan,
+                @Nullable ValidStatement<ValidationResult> statement,
+                @Nullable PlanningContext context,
+                Set<Integer> sources
+        ) {
+            this.queryPlan = plan;
+            this.statement = statement;
+            this.context = context;
+            this.sources = sources;
+        }
+
+        PlanInfo(QueryPlan plan) {
+            this.queryPlan = plan;
+            this.statement = null;
+            this.context = null;
+            this.sources = Collections.emptySet();
+        }
+    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/ValidationResult.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/ValidationResult.java
index 5dff334ffa2..c9dca39f32a 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/ValidationResult.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/ValidationResult.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.sql.engine.prepare;
 import java.util.List;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.sql.SqlNode;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * ValidationResult holder.
@@ -28,11 +29,11 @@ import org.apache.calcite.sql.SqlNode;
 public class ValidationResult {
     private final SqlNode sqlNode;
 
-    private final RelDataType dataType;
+    @Nullable private final RelDataType dataType;
 
-    private final List<List<String>> origins;
+    @Nullable private final List<List<String>> origins;
 
-    private final List<String> aliases;
+    @Nullable private final List<String> aliases;
 
     /**
      * Constructor.
@@ -42,13 +43,30 @@ public class ValidationResult {
      * @param origins  Type fields provenance.
      * @param aliases  Derived column names.
      */
-    ValidationResult(SqlNode sqlNode, RelDataType dataType, List<List<String>> 
origins, List<String> aliases) {
+    ValidationResult(
+            SqlNode sqlNode,
+            @Nullable RelDataType dataType,
+            @Nullable List<List<String>> origins,
+            @Nullable List<String> aliases
+    ) {
         this.sqlNode = sqlNode;
         this.dataType = dataType;
         this.origins = origins;
         this.aliases = aliases;
     }
 
+    /**
+     * Constructor.
+     *
+     * @param sqlNode  Validated SQL node.
+     */
+    ValidationResult(SqlNode sqlNode) {
+        this.sqlNode = sqlNode;
+        this.dataType = null;
+        this.origins = null;
+        this.aliases = null;
+    }
+
     /**
      * Get validated SQL node.
      */
@@ -59,19 +77,19 @@ public class ValidationResult {
     /**
      * Get validated type.
      */
-    public RelDataType dataType() {
+    @Nullable public RelDataType dataType() {
         return dataType;
     }
 
     /**
      * Get type fields provenance.
      */
-    public List<List<String>> origins() {
+    @Nullable public List<List<String>> origins() {
         return origins;
     }
 
     /** Return alternatively derived column names. */
-    public List<String> aliases() {
+    @Nullable public List<String> aliases() {
         return aliases;
     }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManager.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManager.java
index 6b98b1a6f02..b0fb66f801e 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManager.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.sql.engine.statistic;
 
+import java.util.function.IntConsumer;
 import org.apache.ignite.internal.sql.engine.exec.LifecycleAware;
 
 /**
@@ -28,6 +29,9 @@ public interface SqlStatisticManager extends LifecycleAware {
      */
     long tableSize(int tableId);
 
+    /** Plan updater callback. */
+    void planUpdater(IntConsumer updater);
+
     @Override
     default void start(){}
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java
index aa1f51e5361..977d84e463f 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.IntConsumer;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.catalog.events.CatalogEvent;
@@ -65,6 +66,8 @@ public class SqlStatisticManagerImpl implements 
SqlStatisticManager {
     private final CatalogService catalogService;
     private final LowWatermark lowWatermark;
 
+    private volatile IntConsumer planUpdater;
+
     /* Contains all known table id's with statistics. */
     private final ConcurrentMap<Integer, ActualSize> tableSizeMap = new 
ConcurrentHashMap<>();
 
@@ -77,6 +80,10 @@ public class SqlStatisticManagerImpl implements 
SqlStatisticManager {
         this.lowWatermark = lowWatermark;
     }
 
+    @Override
+    public void planUpdater(IntConsumer updater) {
+        this.planUpdater = updater;
+    }
 
     /**
      * Returns approximate number of rows in table by their id.
@@ -130,6 +137,12 @@ public class SqlStatisticManagerImpl implements 
SqlStatisticManager {
                     }).exceptionally(e -> {
                         LOG.info("Can't calculate size for table [id={}].", e, 
tableId);
                         return null;
+                    }).whenComplete((ignored, ex) -> {
+                        if (ex == null) {
+                            if (planUpdater != null) {
+                                planUpdater.accept(tableId);
+                            }
+                        }
                     });
 
             latestUpdateFut.updateAndGet(prev -> prev == null ? updateResult : 
prev.thenCompose(none -> updateResult));
@@ -183,16 +196,16 @@ public class SqlStatisticManagerImpl implements 
SqlStatisticManager {
         long timestamp;
         long size;
 
-        public ActualSize(long size, long timestamp) {
+        ActualSize(long size, long timestamp) {
             this.timestamp = timestamp;
             this.size = size;
         }
 
-        public long getTimestamp() {
+        long getTimestamp() {
             return timestamp;
         }
 
-        public long getSize() {
+        long getSize() {
             return size;
         }
     }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/cache/Cache.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/cache/Cache.java
index c56ac52a10e..c14f45c948d 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/cache/Cache.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/cache/Cache.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.sql.engine.util.cache;
 
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.Predicate;
@@ -93,4 +95,12 @@ public interface Cache<K, V> {
      * @return The number of entries in the cache.
      */
     int size();
+
+    /**
+     * Returns a view of the entries stored in this cache as a thread-safe 
map. Modifications made to
+     * the map directly affect the cache.
+     *
+     * @return A thread-safe view of this cache supporting all of the optional 
{@link Map} operations.
+     */
+    ConcurrentMap<K, V> asMap();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/cache/CacheFactory.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/cache/CacheFactory.java
index d1e28046b0a..a8d3d0f9505 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/cache/CacheFactory.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/cache/CacheFactory.java
@@ -55,4 +55,13 @@ public interface CacheFactory {
      * @param <V> Type of the value object.
      */
     <K, V> Cache<K, V> create(int size, StatsCounter statCounter, Duration 
expireAfterAccess);
+
+    /**
+     * Creates a cache where each key (not value) stored in the cache should 
be wrapped in a
+     * {@link java.lang.ref.WeakReference}.
+     *
+     * @param <K> Type of the key object.
+     * @param <V> Type of the value object.
+     */
+    <K, V> Cache<K, V> createWithWeakKeys();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/cache/CaffeineCacheFactory.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/cache/CaffeineCacheFactory.java
index 55e9e8b0767..ccdd5c160fe 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/cache/CaffeineCacheFactory.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/cache/CaffeineCacheFactory.java
@@ -21,6 +21,7 @@ import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.RemovalCause;
 import com.github.benmanes.caffeine.cache.stats.CacheStats;
 import java.time.Duration;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 import java.util.function.BiFunction;
 import java.util.function.Function;
@@ -84,6 +85,18 @@ public class CaffeineCacheFactory implements CacheFactory {
         return create(size, null);
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public <K, V> Cache<K, V> createWithWeakKeys() {
+        Caffeine<Object, Object> builder = Caffeine.newBuilder();
+
+        if (executor != null) {
+            builder.executor(executor);
+        }
+
+        return new CaffeineCacheToCacheAdapter<>(builder.weakKeys().build());
+    }
+
     private static class CaffeineStatsCounterAdapter implements 
com.github.benmanes.caffeine.cache.stats.StatsCounter {
         private final StatsCounter statsCounter;
 
@@ -139,6 +152,11 @@ public class CaffeineCacheFactory implements CacheFactory {
             return cache.get(key, mappingFunction);
         }
 
+        @Override
+        public ConcurrentMap<K, V> asMap() {
+            return cache.asMap();
+        }
+
         @Override
         public void put(K key, V value) {
             cache.put(key, value);
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index ce1731eb294..88340fbe56c 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -81,6 +81,7 @@ import 
org.apache.ignite.internal.components.SystemPropertiesNodeProperties;
 import org.apache.ignite.internal.failure.FailureManager;
 import org.apache.ignite.internal.failure.handlers.NoOpFailureHandler;
 import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.ClockServiceImpl;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -252,6 +253,10 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
                 .map(node -> create(node, mappingCacheFactory, 
executorsFactory.apply(node)))
                 .collect(Collectors.toList());
 
+        ClockServiceImpl clockService = mock(ClockServiceImpl.class);
+
+        when(clockService.now()).thenReturn(new HybridTimestamp(1_000, 500));
+
         prepareService = new PrepareServiceImpl(
                 "test",
                 0,
@@ -261,7 +266,8 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
                 PLANNING_THREAD_COUNT,
                 PLAN_EXPIRATION_SECONDS,
                 metricManager,
-                new PredefinedSchemaManager(schema)
+                new PredefinedSchemaManager(schema),
+                clockService
         );
         parserService = new ParserServiceImpl();
 
@@ -1474,6 +1480,11 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
             throw new UnsupportedOperationException();
         }
 
+        @Override
+        public <K, V> Cache<K, V> createWithWeakKeys() {
+            throw new UnsupportedOperationException();
+        }
+
         private static class BlockOnComputeCache<K, V> extends 
EmptyCacheFactory.EmptyCache<K, V> {
             private final CountDownLatch waitLatch;
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java
index 81aa4c30edb..3d4b18406cb 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java
@@ -45,6 +45,7 @@ import 
org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
 public class PredefinedSchemaManager implements SqlSchemaManager {
     private final IgniteSchemas root;
     private final Int2ObjectMap<IgniteTable> tableById;
+    private final SchemaPlus schemaPlus;
 
     /** Constructs schema manager from a single schema. */
     public PredefinedSchemaManager(IgniteSchema schema) {
@@ -67,9 +68,15 @@ public class PredefinedSchemaManager implements 
SqlSchemaManager {
             );
         }
 
+        this.schemaPlus = schemaPlus;
+
         root = new IgniteSchemas(schemaPlus, 0);
     }
 
+    SchemaPlus rootSchema() {
+        return schemaPlus;
+    }
+
     /** {@inheritDoc} */
     @Override
     public IgniteSchemas schemas(int catalogVersion) {
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index ab6b18ae8ff..528c0839ea5 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -28,6 +28,7 @@ import static 
org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.lang.reflect.Proxy;
 import java.time.Clock;
@@ -54,6 +55,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.function.IntConsumer;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -83,6 +85,7 @@ import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopolog
 import org.apache.ignite.internal.components.SystemPropertiesNodeProperties;
 import org.apache.ignite.internal.event.EventListener;
 import org.apache.ignite.internal.failure.FailureProcessor;
+import org.apache.ignite.internal.hlc.ClockServiceImpl;
 import org.apache.ignite.internal.hlc.ClockWaiter;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
@@ -745,6 +748,11 @@ public class TestBuilders {
 
             ConcurrentMap<String, Long> tablesSize = new ConcurrentHashMap<>();
             var schemaManager = createSqlSchemaManager(catalogManager, 
tablesSize);
+
+            ClockServiceImpl clockService = mock(ClockServiceImpl.class);
+
+            when(clockService.now()).thenReturn(new HybridTimestamp(1_000, 
500));
+
             var prepareService = new PrepareServiceImpl(
                     clusterName,
                     0,
@@ -754,7 +762,8 @@ public class TestBuilders {
                     PLANNING_THREAD_COUNT,
                     PLAN_EXPIRATION_SECONDS,
                     new NoOpMetricManager(),
-                    schemaManager
+                    schemaManager,
+                    clockService
             );
 
             Map<String, List<String>> systemViewsByNode = new HashMap<>();
@@ -931,15 +940,23 @@ public class TestBuilders {
     }
 
     private static SqlSchemaManagerImpl createSqlSchemaManager(CatalogManager 
catalogManager, ConcurrentMap<String, Long> tablesSize) {
-        SqlStatisticManager sqlStatisticManager = tableId -> {
-            CatalogTableDescriptor descriptor = 
catalogManager.activeCatalog(Long.MAX_VALUE).table(tableId);
-            long fallbackSize = 10_000;
+        SqlStatisticManager sqlStatisticManager = new SqlStatisticManager() {
+            @Override
+            public long tableSize(int tableId) {
+                CatalogTableDescriptor descriptor = 
catalogManager.activeCatalog(Long.MAX_VALUE).table(tableId);
+                long fallbackSize = 10_000;
 
-            if (descriptor == null) {
-                return fallbackSize;
+                if (descriptor == null) {
+                    return fallbackSize;
+                }
+
+                return tablesSize.getOrDefault(descriptor.name(), 10_000L);
             }
 
-            return tablesSize.getOrDefault(descriptor.name(), 10_000L);
+            @Override
+            public void planUpdater(IntConsumer updater) {
+                throw new UnsupportedOperationException();
+            }
         };
 
         return new SqlSchemaManagerImpl(
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/VersionedSchemaManager.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/VersionedSchemaManager.java
new file mode 100644
index 00000000000..a7c4ad2104d
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/VersionedSchemaManager.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.framework;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
+import org.apache.ignite.internal.sql.engine.schema.IgniteSchemas;
+
+/**
+ * Dummy wrapper for predefined collection of schemas with possibility to 
change catalog version.
+ *
+ * @see PredefinedSchemaManager
+ */
+public class VersionedSchemaManager extends PredefinedSchemaManager {
+    private final AtomicInteger ver;
+
+    /** Constructor. */
+    public VersionedSchemaManager(IgniteSchema schema, AtomicInteger ver) {
+        super(schema);
+
+        this.ver = ver;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int catalogVersion(long timestamp) {
+        return ver == null ? super.catalogVersion(timestamp) : ver.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public IgniteSchemas schemas(long timestamp) {
+        return new IgniteSchemas(rootSchema(), ver.get());
+    }
+}
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTimeoutTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTimeoutTest.java
index 94f20b76755..52d538c1b44 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTimeoutTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTimeoutTest.java
@@ -26,6 +26,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
 
 import java.time.Duration;
 import java.util.List;
@@ -35,6 +36,7 @@ import org.apache.calcite.plan.volcano.VolcanoPlanner;
 import org.apache.calcite.plan.volcano.VolcanoTimeoutException;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelVisitor;
+import org.apache.ignite.internal.hlc.ClockServiceImpl;
 import org.apache.ignite.internal.metrics.MetricManagerImpl;
 import org.apache.ignite.internal.sql.engine.SqlOperationContext;
 import org.apache.ignite.internal.sql.engine.framework.PredefinedSchemaManager;
@@ -76,7 +78,8 @@ public class PlannerTimeoutTest extends AbstractPlannerTest {
                 1,
                 Integer.MAX_VALUE,
                 new MetricManagerImpl(),
-                new PredefinedSchemaManager(schema)
+                new PredefinedSchemaManager(schema),
+                mock(ClockServiceImpl.class)
         );
         prepareService.start();
         try {
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImplTest.java
index e21d918b9d6..19b8bfcbea3 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImplTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImplTest.java
@@ -17,15 +17,17 @@
 
 package org.apache.ignite.internal.sql.engine.prepare;
 
+import static 
org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl.PLAN_UPDATER_INITIAL_DELAY;
 import static 
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.sameInstance;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotSame;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -34,6 +36,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import java.time.Duration;
 import java.time.ZoneId;
@@ -44,15 +47,19 @@ import java.util.UUID;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import org.apache.ignite.internal.hlc.ClockServiceImpl;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.metrics.MetricManagerImpl;
 import org.apache.ignite.internal.sql.SqlCommon;
 import org.apache.ignite.internal.sql.engine.QueryCancel;
 import org.apache.ignite.internal.sql.engine.SqlOperationContext;
 import org.apache.ignite.internal.sql.engine.framework.PredefinedSchemaManager;
 import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
+import org.apache.ignite.internal.sql.engine.framework.VersionedSchemaManager;
 import 
org.apache.ignite.internal.sql.engine.prepare.ddl.DdlSqlToCommandConverter;
 import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
 import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
@@ -362,6 +369,137 @@ public class PrepareServiceImplTest extends 
BaseIgniteAbstractTest {
         ));
     }
 
+    /** Validates that plan for appropriate tableId will be changed by 
request. */
+    @Test
+    public void statisticUpdatesChangePlans() {
+        IgniteTable table = TestBuilders.table()
+                .name("T")
+                .addColumn("C", NativeTypes.INT32)
+                .distribution(IgniteDistributions.single())
+                .build();
+
+        IgniteSchema schema = new IgniteSchema("TEST", 0, List.of(table));
+
+        PrepareServiceImpl service = createPlannerService(schema, 
CaffeineCacheFactory.INSTANCE, Integer.MAX_VALUE);
+
+        assertThat(service.cache.size(), is(0));
+
+        String selectQuery = "SELECT * FROM test.t WHERE c = 1";
+        QueryPlan selectPlan = await(service.prepareAsync(parse(selectQuery), 
operationContext().build()));
+
+        assertThat(service.cache.size(), is(1));
+
+        String insertQuery = "INSERT INTO test.t VALUES(OCTET_LENGTH('TEST')), 
(2)";
+        QueryPlan insertPlan = await(service.prepareAsync(parse(insertQuery), 
operationContext().build()));
+
+        assertThat(service.cache.size(), is(2));
+
+        service.updatePlans(table.id());
+
+        Awaitility.await()
+                .atMost(Duration.ofMillis(2 * PLAN_UPDATER_INITIAL_DELAY))
+                .until(
+                        () -> 
!selectPlan.equals(await(service.prepareAsync(parse(selectQuery), 
operationContext().build())))
+                );
+
+        Awaitility.await()
+                .atMost(Duration.ofMillis(2 * PLAN_UPDATER_INITIAL_DELAY))
+                .until(
+                         () -> 
!insertPlan.equals(await(service.prepareAsync(parse(insertQuery), 
operationContext().build())))
+                );
+
+        assertThat(service.cache.size(), is(2));
+    }
+
+    @Test
+    public void planUpdatesForNonCachedTable() {
+        IgniteTable table1 = TestBuilders.table()
+                .name("T1")
+                .addColumn("C", NativeTypes.INT32)
+                .distribution(IgniteDistributions.single())
+                .build();
+
+        IgniteTable table2 = TestBuilders.table()
+                .name("T2")
+                .addColumn("C", NativeTypes.INT32)
+                .distribution(IgniteDistributions.single())
+                .build();
+
+        IgniteSchema schema = new IgniteSchema("TEST", 0, List.of(table1, 
table2));
+
+        // 1 item cache plan size
+        PrepareServiceImpl service = (PrepareServiceImpl) 
createPlannerService(schema,  1);
+
+        String selectQuery = "SELECT * FROM test.t1 WHERE c = 1";
+        await(service.prepareAsync(parse(selectQuery), 
operationContext().build()));
+
+        assertThat(service.cache.size(), is(1));
+        CacheKey key1 = service.cache.asMap().keySet().iterator().next();
+
+        // different table
+        String insertQuery = "SELECT * FROM test.t2 WHERE c = 1";
+        QueryPlan plan2 = await(service.prepareAsync(parse(insertQuery), 
operationContext().build()));
+        assertThat(service.cache.size(), is(1));
+        CacheKey key2 = service.cache.asMap().keySet().iterator().next();
+
+        assertNotEquals(key1, key2);
+
+        // not cached table
+        service.updatePlans(table1.id());
+
+        // cached table
+        service.updatePlans(table2.id());
+
+        Awaitility.await()
+                .atMost(Duration.ofMillis(2 * PLAN_UPDATER_INITIAL_DELAY))
+                .until(
+                        () -> 
!plan2.equals(await(service.prepareAsync(parse(insertQuery), 
operationContext().build())))
+                );
+    }
+
+    /** Validate that plan updates only for current catalog version. */
+    @Test
+    public void planUpdatesForCurrentCatalogVersion() {
+        IgniteTable table1 = TestBuilders.table()
+                .name("T1")
+                .addColumn("C", NativeTypes.INT32)
+                .distribution(IgniteDistributions.single())
+                .build();
+
+        IgniteSchema schema = new IgniteSchema("TEST", 0, List.of(table1));
+
+        AtomicInteger ver = new AtomicInteger();
+        PrepareServiceImpl service = createPlannerService(schema, 
CaffeineCacheFactory.INSTANCE, 10000,
+                Integer.MAX_VALUE, 1000, ver);
+
+        String selectQuery = "SELECT * FROM test.t1 WHERE c = 1";
+        QueryPlan plan1 = await(service.prepareAsync(parse(selectQuery), 
operationContext().build()));
+
+        // catalog version 1
+        ver.incrementAndGet();
+
+        QueryPlan plan2 = await(service.prepareAsync(parse(selectQuery), 
operationContext().build()));
+
+        Awaitility.await()
+                .atMost(Duration.ofMillis(10000))
+                .until(
+                        () -> service.cache.size() == 2
+                );
+
+        assertThat(service.cache.size(), is(2));
+        service.updatePlans(table1.id());
+
+        Awaitility.await()
+                .atMost(Duration.ofMillis(2 * PLAN_UPDATER_INITIAL_DELAY))
+                .until(
+                        () -> 
!plan2.equals(await(service.prepareAsync(parse(selectQuery), 
operationContext().build())))
+                );
+
+        // previous catalog, get cached plan
+        ver.set(0);
+        assertEquals(plan1, await(service.prepareAsync(parse(selectQuery), 
operationContext().build())));
+    }
+
     @Test
     public void planCacheExpiry() {
         IgniteTable table = TestBuilders.table()
@@ -374,7 +512,7 @@ public class PrepareServiceImplTest extends 
BaseIgniteAbstractTest {
 
         Awaitility.await().timeout(30, TimeUnit.SECONDS).untilAsserted(() -> {
             int expireSeconds = 2;
-            PrepareService service = createPlannerService(schema, 
CaffeineCacheFactory.INSTANCE, Integer.MAX_VALUE, 2);
+            PrepareService service = createPlannerService(schema, 
CaffeineCacheFactory.INSTANCE, Integer.MAX_VALUE, expireSeconds, 1000);
 
             String query = "SELECT * FROM test.t WHERE c = 1";
             QueryPlan p0 = await(service.prepareAsync(parse(query), 
operationContext().build()));
@@ -549,23 +687,55 @@ public class PrepareServiceImplTest extends 
BaseIgniteAbstractTest {
         return createPlannerService(createSchema());
     }
 
+    private static PrepareService createPlannerService(IgniteSchema schema, 
int cacheSize) {
+        return createPlannerService(schema, CaffeineCacheFactory.INSTANCE, 
10000, Integer.MAX_VALUE, cacheSize);
+    }
+
     private static PrepareService createPlannerService(IgniteSchema schema) {
-        return createPlannerService(schema, CaffeineCacheFactory.INSTANCE, 
1000);
+        return createPlannerService(schema, CaffeineCacheFactory.INSTANCE, 
10000);
     }
 
     private static PrepareServiceImpl createPlannerService(IgniteSchema 
schemas, CacheFactory cacheFactory, int timeoutMillis) {
-        return createPlannerService(schemas, cacheFactory, timeoutMillis, 
Integer.MAX_VALUE);
+        return createPlannerService(schemas, cacheFactory, timeoutMillis, 
Integer.MAX_VALUE, 1000);
+    }
+
+    private static PrepareServiceImpl createPlannerService(
+            IgniteSchema schemas,
+            CacheFactory cacheFactory,
+            int timeoutMillis,
+            int planExpireSeconds,
+            int cacheSize
+    ) {
+        ClockServiceImpl clockService = mock(ClockServiceImpl.class);
+
+        when(clockService.now()).thenReturn(new HybridTimestamp(1_000, 500));
+
+        PrepareServiceImpl service = new PrepareServiceImpl("test", cacheSize, 
cacheFactory,
+                mock(DdlSqlToCommandConverter.class), timeoutMillis, 2, 
planExpireSeconds, mock(MetricManagerImpl.class),
+                new PredefinedSchemaManager(schemas), clockService);
+
+        createdServices.add(service);
+
+        service.start();
+
+        return service;
     }
 
     private static PrepareServiceImpl createPlannerService(
             IgniteSchema schemas,
             CacheFactory cacheFactory,
             int timeoutMillis,
-            int planExpireSeconds
+            int planExpireSeconds,
+            int cacheSize,
+            AtomicInteger ver
     ) {
-        PrepareServiceImpl service = new PrepareServiceImpl("test", 1000, 
cacheFactory,
+        ClockServiceImpl clockService = mock(ClockServiceImpl.class);
+
+        when(clockService.now()).thenReturn(new HybridTimestamp(1_000, 500));
+
+        PrepareServiceImpl service = new PrepareServiceImpl("test", cacheSize, 
cacheFactory,
                 mock(DdlSqlToCommandConverter.class), timeoutMillis, 2, 
planExpireSeconds, mock(MetricManagerImpl.class),
-                new PredefinedSchemaManager(schemas));
+                new VersionedSchemaManager(schemas, ver), clockService);
 
         createdServices.add(service);
 
@@ -595,5 +765,10 @@ public class PrepareServiceImplTest extends 
BaseIgniteAbstractTest {
         public <K, V> Cache<K, V> create(int size, StatsCounter statCounter, 
Duration expireAfterAccess) {
             return (Cache<K, V>) cache;
         }
+
+        @Override
+        public <K, V> Cache<K, V> createWithWeakKeys() {
+            return CaffeineCacheFactory.INSTANCE.createWithWeakKeys();
+        }
     }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImplTest.java
index 1546ff69268..10719fc4a8b 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImplTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImplTest.java
@@ -42,6 +42,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
+import java.util.function.IntConsumer;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.calcite.rel.RelCollations;
@@ -114,7 +115,17 @@ public class SqlSchemaManagerImplTest extends 
BaseIgniteAbstractTest {
 
     @BeforeEach
     void init() {
-        sqlStatisticManager = tableId -> 10_000L;
+        sqlStatisticManager = new SqlStatisticManager() {
+            @Override
+            public long tableSize(int tableId) {
+                return 10_000;
+            }
+
+            @Override
+            public void planUpdater(IntConsumer updater) {
+                throw new UnsupportedOperationException();
+            }
+        };
 
         catalogManager = 
CatalogTestUtils.createCatalogManagerWithTestUpdateLog("test", new 
HybridClockImpl());
         sqlSchemaManager = new SqlSchemaManagerImpl(
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/EmptyCacheFactory.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/EmptyCacheFactory.java
index b2e00a9f74f..5b02bfe9701 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/EmptyCacheFactory.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/EmptyCacheFactory.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.sql.engine.util;
 
 import java.time.Duration;
+import java.util.concurrent.ConcurrentMap;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.Predicate;
@@ -56,6 +57,11 @@ public class EmptyCacheFactory implements CacheFactory {
         return create(size);
     }
 
+    @Override
+    public <K, V> Cache<K, V> createWithWeakKeys() {
+        return create(0);
+    }
+
     /** A cache that keeps no object. */
     public static class EmptyCache<K, V> implements Cache<K, V> {
         @Override
@@ -97,5 +103,10 @@ public class EmptyCacheFactory implements CacheFactory {
         public int size() {
             return 0;
         }
+
+        @Override
+        public ConcurrentMap<K, V> asMap() {
+            throw new UnsupportedOperationException();
+        }
     }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/metrics/PlanningCacheMetricsTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/metrics/PlanningCacheMetricsTest.java
index 5cb1bd0d4b2..95d2f9a14e5 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/metrics/PlanningCacheMetricsTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/metrics/PlanningCacheMetricsTest.java
@@ -19,7 +19,11 @@ package org.apache.ignite.internal.sql.metrics;
 
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
+import org.apache.ignite.internal.hlc.ClockServiceImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.metrics.MetricManager;
 import org.apache.ignite.internal.metrics.MetricManagerImpl;
 import org.apache.ignite.internal.metrics.MetricSet;
@@ -61,8 +65,13 @@ public class PlanningCacheMetricsTest extends 
AbstractPlannerTest {
 
         IgniteSchema schema = createSchema(table);
 
+        ClockServiceImpl clockService = mock(ClockServiceImpl.class);
+
+        when(clockService.now()).thenReturn(new HybridTimestamp(1_000, 500));
+
         PrepareService prepareService = new PrepareServiceImpl(
-                "test", 2, cacheFactory, null, 15_000L, 2, Integer.MAX_VALUE, 
metricManager, new PredefinedSchemaManager(schema)
+                "test", 2, cacheFactory, null, 15_000L, 2, Integer.MAX_VALUE, 
metricManager, new PredefinedSchemaManager(schema),
+                clockService
         );
 
         prepareService.start();

Reply via email to