This is an automated email from the ASF dual-hosted git repository. zstan pushed a commit to branch ignite-26068 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 049d5c2146692344c74824dc5b2067d16af60e80 Author: zstan <[email protected]> AuthorDate: Thu Sep 11 09:47:54 2025 +0300 IGNITE-26068 Sql. Update cached query plans when related statistics changed --- .../sql/engine/statistic/ItStatisticTest.java | 9 +- .../internal/sql/engine/SqlQueryProcessor.java | 5 +- .../sql/engine/exec/fsm/ParsingPhaseHandler.java | 4 - .../internal/sql/engine/prepare/CacheKey.java | 19 +- .../sql/engine/prepare/PrepareService.java | 2 + .../sql/engine/prepare/PrepareServiceImpl.java | 395 +++++++++++++++------ .../sql/engine/prepare/ValidationResult.java | 32 +- .../sql/engine/statistic/SqlStatisticManager.java | 4 + .../engine/statistic/SqlStatisticManagerImpl.java | 19 +- .../internal/sql/engine/util/cache/Cache.java | 10 + .../sql/engine/util/cache/CacheFactory.java | 9 + .../engine/util/cache/CaffeineCacheFactory.java | 18 + .../sql/engine/exec/ExecutionServiceImplTest.java | 13 +- .../engine/framework/PredefinedSchemaManager.java | 7 + .../sql/engine/framework/TestBuilders.java | 31 +- .../engine/framework/VersionedSchemaManager.java | 50 +++ .../sql/engine/planner/PlannerTimeoutTest.java | 5 +- .../sql/engine/prepare/PrepareServiceImplTest.java | 189 +++++++++- .../engine/schema/SqlSchemaManagerImplTest.java | 13 +- .../sql/engine/util/EmptyCacheFactory.java | 11 + .../sql/metrics/PlanningCacheMetricsTest.java | 11 +- 21 files changed, 715 insertions(+), 141 deletions(-) diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/statistic/ItStatisticTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/statistic/ItStatisticTest.java index d45feee244d..0162942fda2 100644 --- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/statistic/ItStatisticTest.java +++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/statistic/ItStatisticTest.java @@ -33,6 +33,8 @@ import org.junit.jupiter.api.Test; public class ItStatisticTest extends BaseSqlIntegrationTest { private SqlStatisticManagerImpl sqlStatisticManager; + private static final AtomicInteger counter = new AtomicInteger(0); + @BeforeAll void beforeAll() { sqlStatisticManager = (SqlStatisticManagerImpl) queryProcessor().sqlStatisticManager(); @@ -44,6 +46,11 @@ public class ItStatisticTest extends BaseSqlIntegrationTest { sql("DROP TABLE IF EXISTS t"); } + @Override + protected int initialNodes() { + return 1; + } + @Test public void testStatisticsRowCount() throws Exception { // For test we should always update statistics. @@ -71,8 +78,6 @@ public class ItStatisticTest extends BaseSqlIntegrationTest { } } - private static AtomicInteger counter = new AtomicInteger(0); - private void insertAndUpdateRunQuery(int numberOfRecords) throws ExecutionException, TimeoutException, InterruptedException { int start = counter.get(); int end = counter.addAndGet(numberOfRecords) - 1; diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java index 0bdcb44fad5..bfeac05c268 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java @@ -296,9 +296,12 @@ public class SqlQueryProcessor implements QueryProcessor, SystemViewProvider { clusterCfg, nodeCfg, sqlSchemaManager, - ddlSqlToCommandConverter + ddlSqlToCommandConverter, + clockService )); + sqlStatisticManager.planUpdater(prepareSvc::updatePlans); + var msgSrvc = registerService(new MessageServiceImpl( localNode, clusterSrvc.messagingService(), diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ParsingPhaseHandler.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ParsingPhaseHandler.java index 2192a8cd85c..a9cfc334f45 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ParsingPhaseHandler.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ParsingPhaseHandler.java @@ -75,8 +75,4 @@ class ParsingPhaseHandler implements ExecutionPhaseHandler { return Result.proceedAfter(awaitFuture); } - - private static boolean shouldBeCached(SqlQueryType queryType) { - return queryType == SqlQueryType.QUERY || queryType == SqlQueryType.DML; - } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/CacheKey.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/CacheKey.java index 1b7f97c6a4b..528c7bfc912 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/CacheKey.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/CacheKey.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.sql.engine.prepare; import java.util.Arrays; import java.util.Objects; +import org.apache.ignite.internal.sql.engine.sql.ParsedResult; import org.apache.ignite.sql.ColumnType; /** @@ -41,22 +42,34 @@ public class CacheKey { private int hashCode = 0; + private final ParsedResult parsedResult; + /** * Constructor. * * @param catalogVersion Catalog version. * @param schemaName Schema name. - * @param query Query string. + * @param parsedResult AST with additional info. * @param contextKey Optional context key to differ queries with and without/different flags, having an impact on result plan (like * LOCAL flag) * @param paramTypes Types of all dynamic parameters, no any type can be {@code null}. */ - public CacheKey(int catalogVersion, String schemaName, String query, Object contextKey, ColumnType[] paramTypes) { + CacheKey(int catalogVersion, String schemaName, ParsedResult parsedResult, Object contextKey, ColumnType[] paramTypes) { this.catalogVersion = catalogVersion; this.schemaName = schemaName; - this.query = query; + this.query = parsedResult.normalizedQuery(); this.contextKey = contextKey; this.paramTypes = paramTypes; + + this.parsedResult = parsedResult; + } + + ParsedResult parsedResult() { + return parsedResult; + } + + int catalogVersion() { + return catalogVersion; } /** {@inheritDoc} */ diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareService.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareService.java index b4dbab49fcf..83c3b8ffd28 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareService.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareService.java @@ -48,4 +48,6 @@ public interface PrepareService extends LifecycleAware { default CompletableFuture<Void> invalidateCache(Set<String> tableNames) { return CompletableFutures.nullCompletedFuture(); } + + void updatePlans(int tableId); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java index b7ddf7608a0..6f48e5c6c46 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java @@ -24,17 +24,24 @@ import static org.apache.ignite.internal.sql.engine.trait.TraitUtils.distributio import static org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG; import static org.apache.ignite.internal.sql.engine.util.Commons.fastQueryOptimizationEnabled; import static org.apache.ignite.internal.thread.ThreadOperation.NOTHING_ALLOWED; +import static org.apache.ignite.internal.util.CompletableFutures.isCompletedSuccessfully; import static org.apache.ignite.lang.ErrorGroups.Sql.EXECUTION_CANCELLED_ERR; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -52,6 +59,7 @@ import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.tools.Frameworks; import org.apache.calcite.util.Pair; +import org.apache.ignite.internal.hlc.ClockService; import org.apache.ignite.internal.lang.SqlExceptionMapperUtil; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; @@ -74,7 +82,10 @@ import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueGet; import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueModify; import org.apache.ignite.internal.sql.engine.rel.IgniteRel; import org.apache.ignite.internal.sql.engine.rel.IgniteSelectCount; +import org.apache.ignite.internal.sql.engine.rel.IgniteTableModify; +import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan; import org.apache.ignite.internal.sql.engine.schema.IgniteSchemas; +import org.apache.ignite.internal.sql.engine.schema.IgniteTable; import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager; import org.apache.ignite.internal.sql.engine.sql.IgniteSqlExplain; import org.apache.ignite.internal.sql.engine.sql.IgniteSqlExplainMode; @@ -121,12 +132,14 @@ public class PrepareServiceImpl implements PrepareService { private static final String PLANNING_EXECUTOR_SOURCE_NAME = THREAD_POOLS_METRICS_SOURCE_NAME + "sql-planning-executor"; + public static final int PLAN_UPDATER_INITIAL_DELAY = 2_000; + private final UUID prepareServiceId = UUID.randomUUID(); private final AtomicLong planIdGen = new AtomicLong(); private final DdlSqlToCommandConverter ddlConverter; - private final Cache<CacheKey, CompletableFuture<QueryPlan>> cache; + final Cache<CacheKey, CompletableFuture<PlanInfo>> cache; private final String nodeName; @@ -142,6 +155,15 @@ public class PrepareServiceImpl implements PrepareService { private volatile ThreadPoolExecutor planningPool; + /** Clock. */ + private final ClockService clockService; + + private final Cache<CacheKey, Object> weakCache; + + private static final Object IGNORED_VAL = new Object(); + + private ExecutorService planUpdater; + /** * Factory method. * @@ -153,6 +175,8 @@ public class PrepareServiceImpl implements PrepareService { * @param nodeCfg Node SQL configuration. * @param schemaManager Schema manager to use on validation phase to bind identifiers in AST with particular schema objects. * @param ddlSqlToCommandConverter Converter from SQL DDL operators to catalog commands. + * @param clockService Clock service. + * */ public static PrepareServiceImpl create( String nodeName, @@ -162,7 +186,8 @@ public class PrepareServiceImpl implements PrepareService { SqlDistributedConfiguration clusterCfg, SqlLocalConfiguration nodeCfg, SqlSchemaManager schemaManager, - DdlSqlToCommandConverter ddlSqlToCommandConverter + DdlSqlToCommandConverter ddlSqlToCommandConverter, + ClockService clockService ) { return new PrepareServiceImpl( nodeName, @@ -173,7 +198,8 @@ public class PrepareServiceImpl implements PrepareService { clusterCfg.planner().planCacheExpiresAfterSeconds().value(), nodeCfg.planner().threadCount().value(), metricManager, - schemaManager + schemaManager, + clockService ); } @@ -187,6 +213,7 @@ public class PrepareServiceImpl implements PrepareService { * @param plannerTimeout Timeout in milliseconds to planning. * @param metricManager Metric manager. * @param schemaManager Schema manager to use on validation phase to bind identifiers in AST with particular schema objects. + * @param clockService Clock service. */ public PrepareServiceImpl( String nodeName, @@ -197,7 +224,8 @@ public class PrepareServiceImpl implements PrepareService { int plannerThreadCount, int planExpirySeconds, MetricManager metricManager, - SqlSchemaManager schemaManager + SqlSchemaManager schemaManager, + ClockService clockService ) { this.nodeName = nodeName; this.ddlConverter = ddlConverter; @@ -205,9 +233,12 @@ public class PrepareServiceImpl implements PrepareService { this.metricManager = metricManager; this.plannerThreadCount = plannerThreadCount; this.schemaManager = schemaManager; + this.clockService = clockService; sqlPlanCacheMetricSource = new SqlPlanCacheMetricSource(); cache = cacheFactory.create(cacheSize, sqlPlanCacheMetricSource, Duration.ofSeconds(planExpirySeconds)); + + weakCache = cacheFactory.createWithWeakKeys(); } /** {@inheritDoc} */ @@ -231,12 +262,67 @@ public class PrepareServiceImpl implements PrepareService { metricManager.enable(PLANNING_EXECUTOR_SOURCE_NAME); IgnitePlanner.warmup(); + + ScheduledExecutorService planUpdater = Executors.newSingleThreadScheduledExecutor( + IgniteThreadFactory.create(nodeName, "sql-query-plan-refresh", true, LOG) + ); + + planUpdater.scheduleAtFixedRate(() -> { + for (CacheKey key : weakCache.asMap().keySet()) { + CompletableFuture<PlanInfo> fut = cache.get(key); + + // can be evicted + if (fut != null) { + PlanInfo info = fut.join(); + assert info.context != null && info.statement != null; + + assert isCompletedSuccessfully(fut); + + int currentCatalogVersion = schemaManager.catalogVersion(clockService.now().longValue()); + + if (currentCatalogVersion == key.catalogVersion()) { + SqlQueryType queryType = key.parsedResult().queryType(); + + if (queryType == SqlQueryType.QUERY || queryType == SqlQueryType.DML) { + CompletableFuture<PlanInfo> newPlanFut = CompletableFuture.supplyAsync( + () -> buildPlan(queryType, key, info.statement, info.context), planningPool) + .exceptionally(e -> { + LOG.warn("Failed to re-planning query: " + key.parsedResult().originalQuery(), e); + + return null; + }); + + try { + PlanInfo newPlan = newPlanFut.get(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Plan: " + info.queryPlan + "\n\nre-planned into: " + newPlan.queryPlan); + } + + if (newPlan != null) { + cache.asMap().computeIfPresent(key, (k, v) -> CompletableFuture.completedFuture(newPlan)); + } + } catch (Exception ex) { + LOG.warn("Failed to re-planning query: " + key.parsedResult().originalQuery(), ex); + } + + weakCache.invalidate(key); + } else { + throw new AssertionError("Unexpected queryType=" + queryType); + } + } + } + } + }, PLAN_UPDATER_INITIAL_DELAY, 1_000, TimeUnit.MILLISECONDS); + + this.planUpdater = planUpdater; } /** {@inheritDoc} */ @Override public void stop() throws Exception { planningPool.shutdownNow(); + planUpdater.shutdownNow(); metricManager.unregisterSource(sqlPlanCacheMetricSource); metricManager.unregisterSource(PLANNING_EXECUTOR_SOURCE_NAME); } @@ -257,16 +343,16 @@ public class PrepareServiceImpl implements PrepareService { CacheKey key = createCacheKey(parsedResult, catalogVersion, schemaName, operationContext.parameters()); - CompletableFuture<QueryPlan> planFuture = cache.get(key); + CompletableFuture<PlanInfo> planFuture = cache.get(key); if (planFuture != null) { return planFuture.thenApply((plan) -> { // We assume that non-multi-step plans is always better then a multi-step plan. // or fast query optimization is disabled return a regular plan. - if (!(plan instanceof MultiStepPlan)) { - return plan; + if (!(plan.queryPlan instanceof MultiStepPlan)) { + return plan.queryPlan; } else { - MultiStepPlan regularPlan = (MultiStepPlan) plan; + MultiStepPlan regularPlan = (MultiStepPlan) plan.queryPlan; QueryPlan fastPlan = regularPlan.fastPlan(); if (fastPlan != null && !explicitTx) { @@ -318,7 +404,7 @@ public class PrepareServiceImpl implements PrepareService { cache.clear(); } else { Set<QualifiedName> qualifiedNames = tableNames.stream().map(QualifiedName::parse).collect(Collectors.toSet()); - cache.removeIfValue(p -> p.isDone() && planMatches(p.join(), qualifiedNames::contains)); + cache.removeIfValue(p -> p.isDone() && planMatches(p.join().queryPlan, qualifiedNames::contains)); } return null; @@ -342,13 +428,13 @@ public class PrepareServiceImpl implements PrepareService { ) { switch (parsedResult.queryType()) { case QUERY: - return prepareQuery(parsedResult, planningContext); + return prepareQuery(parsedResult, planningContext).thenApply(f -> f.queryPlan); case DDL: return prepareDdl(parsedResult, planningContext); case KILL: return prepareKill(parsedResult); case DML: - return prepareDml(parsedResult, planningContext); + return prepareDml(parsedResult, planningContext).thenApply(f -> f.queryPlan); case EXPLAIN: return prepareExplain(parsedResult, planningContext); default: @@ -402,7 +488,7 @@ public class PrepareServiceImpl implements PrepareService { explicandum ); - CompletableFuture<QueryPlan> result; + CompletableFuture<PlanInfo> result; switch (queryType) { case QUERY: result = prepareQuery(newParsedResult, ctx); @@ -415,7 +501,8 @@ public class PrepareServiceImpl implements PrepareService { } return result.thenApply(plan -> { - assert plan instanceof ExplainablePlan : plan == null ? "<null>" : plan.getClass().getCanonicalName(); + QueryPlan plan0 = plan.queryPlan; + assert plan0 instanceof ExplainablePlan : plan0 == null ? "<null>" : plan0.getClass().getCanonicalName(); SqlLiteral literal = (SqlLiteral) explainMode; @@ -423,7 +510,7 @@ public class PrepareServiceImpl implements PrepareService { assert mode != null; - return new ExplainPlan(nextPlanId(), (ExplainablePlan) plan, mode); + return new ExplainPlan(nextPlanId(), (ExplainablePlan) plan0, mode); }); } @@ -431,7 +518,7 @@ public class PrepareServiceImpl implements PrepareService { return !(sqlNode instanceof SqlNodeList); } - private CompletableFuture<QueryPlan> prepareQuery( + private CompletableFuture<PlanInfo> prepareQuery( ParsedResult parsedResult, PlanningContext ctx ) { @@ -459,63 +546,101 @@ public class PrepareServiceImpl implements PrepareService { // Try to produce a fast plan, if successful, then return that plan w/o caching it. QueryPlan fastPlan = tryOptimizeFast(stmt, ctx); if (fastPlan != null) { - return CompletableFuture.completedFuture(fastPlan); + return CompletableFuture.completedFuture(new PlanInfo(fastPlan)); } } // Use parameter metadata to compute a cache key. CacheKey key = createCacheKeyFromParameterMetadata(stmt.parsedResult, ctx, stmt.parameterMetadata); - CompletableFuture<QueryPlan> planFut = cache.get(key, k -> CompletableFuture.supplyAsync(() -> { - IgnitePlanner planner = ctx.planner(); + return cache.get(key, k -> CompletableFuture.supplyAsync(() -> buildQueryPlan(key, stmt, ctx), planningPool)); + }); + } - ValidationResult validated = stmt.value; - ParameterMetadata parameterMetadata = stmt.parameterMetadata; + private PlanInfo buildQueryPlan(CacheKey key, ValidStatement<ValidationResult> stmt, PlanningContext ctx) { + IgnitePlanner planner = ctx.planner(); - SqlNode validatedNode = validated.sqlNode(); + ValidationResult validated = stmt.value; + ParameterMetadata parameterMetadata = stmt.parameterMetadata; - RelWithMetadata relWithMetadata = doOptimize(ctx, validatedNode, planner, () -> cache.invalidate(key)); - IgniteRel optimizedRel = relWithMetadata.rel; - QueryPlan fastPlan = tryOptimizeFast(stmt, ctx); + SqlNode validatedNode = validated.sqlNode(); - ResultSetMetadata resultSetMetadata = resultSetMetadata(validated.dataType(), validated.origins(), validated.aliases()); + RelWithMetadata relWithMetadata = doOptimize(ctx, validatedNode, planner, () -> cache.invalidate(key)); + IgniteRel optimizedRel = relWithMetadata.rel; + QueryPlan fastPlan = tryOptimizeFast(stmt, ctx); + + ResultSetMetadata resultSetMetadata = resultSetMetadata(validated.dataType(), validated.origins(), validated.aliases()); + + int catalogVersion = ctx.catalogVersion(); - int catalogVersion = ctx.catalogVersion(); + if (optimizedRel instanceof IgniteKeyValueGet) { + IgniteKeyValueGet kvGet = (IgniteKeyValueGet) optimizedRel; - if (optimizedRel instanceof IgniteKeyValueGet) { - IgniteKeyValueGet kvGet = (IgniteKeyValueGet) optimizedRel; + var plan = new KeyValueGetPlan( + nextPlanId(), + catalogVersion, + kvGet, + resultSetMetadata, + parameterMetadata, + relWithMetadata.paMetadata, + relWithMetadata.ppMetadata + ); + + return new PlanInfo(plan); + } + + var plan = new MultiStepPlan( + nextPlanId(), + SqlQueryType.QUERY, + optimizedRel, + resultSetMetadata, + parameterMetadata, + catalogVersion, + relWithMetadata.numSources, + fastPlan, + relWithMetadata.paMetadata, + relWithMetadata.ppMetadata + ); - return new KeyValueGetPlan( - nextPlanId(), - catalogVersion, - kvGet, - resultSetMetadata, - parameterMetadata, - relWithMetadata.paMetadata, - relWithMetadata.ppMetadata - ); + logPlan(key.parsedResult().originalQuery(), plan); + + int currentCatalogVersion = schemaManager.catalogVersion(clockService.now().longValue()); + + if (currentCatalogVersion == catalogVersion) { + Set<Integer> sources = resolveSources(plan.getRel()); + + return new PlanInfo(plan, stmt, ctx, sources); + } + + return new PlanInfo(plan); + } + + @Override + public void updatePlans(int tableId) { + ConcurrentMap<CacheKey, CompletableFuture<PlanInfo>> map = cache.asMap(); + + for (Map.Entry<CacheKey, CompletableFuture<PlanInfo>> ent : map.entrySet()) { + CompletableFuture<PlanInfo> fut = ent.getValue(); + + if (isCompletedSuccessfully(fut)) { + // no wait, already completed + PlanInfo info = fut.join(); + + if (info.sources.contains(tableId)) { + weakCache.put(ent.getKey(), IGNORED_VAL); } + } + } + } - var plan = new MultiStepPlan( - nextPlanId(), - SqlQueryType.QUERY, - optimizedRel, - resultSetMetadata, - parameterMetadata, - catalogVersion, - relWithMetadata.numSources, - fastPlan, - relWithMetadata.paMetadata, - relWithMetadata.ppMetadata - ); - - logPlan(parsedResult.originalQuery(), plan); - - return plan; - }, planningPool)); - - return planFut; - }); + private PlanInfo buildPlan(SqlQueryType queryType, CacheKey key, ValidStatement<ValidationResult> stmt, PlanningContext ctx) { + if (queryType == SqlQueryType.QUERY) { + return buildQueryPlan(key, stmt, ctx); + } else if (queryType == SqlQueryType.DML) { + return buildDmlPlan(key, stmt, ctx); + } else { + throw new AssertionError("should not get here"); + } } private PlanId nextPlanId() { @@ -552,7 +677,7 @@ public class PrepareServiceImpl implements PrepareService { } /** Prepare plan in current thread, applicable for simple insert queries, cache plan not involved. */ - CompletableFuture<QueryPlan> prepareDmlOpt(SqlNode sqlNode, PlanningContext ctx, String originalQuery) { + CompletableFuture<PlanInfo> prepareDmlOpt(SqlNode sqlNode, PlanningContext ctx, String originalQuery) { assert single(sqlNode); // Validate @@ -593,10 +718,10 @@ public class PrepareServiceImpl implements PrepareService { logPlan(originalQuery, plan); - return CompletableFuture.completedFuture(plan); + return CompletableFuture.completedFuture(new PlanInfo(plan)); } - private CompletableFuture<QueryPlan> prepareDml(ParsedResult parsedResult, PlanningContext ctx) { + private CompletableFuture<PlanInfo> prepareDml(ParsedResult parsedResult, PlanningContext ctx) { SqlNode sqlNode = parsedResult.parsedTree(); assert single(sqlNode); @@ -607,71 +732,79 @@ public class PrepareServiceImpl implements PrepareService { return prepareDmlOpt(sqlNode, ctx, parsedResult.originalQuery()); } - CompletableFuture<ValidStatement<SqlNode>> validFut = CompletableFuture.supplyAsync(() -> { + CompletableFuture<ValidStatement<ValidationResult>> validFut = CompletableFuture.supplyAsync(() -> { IgnitePlanner planner = ctx.planner(); // Validate SqlNode validatedNode = planner.validate(sqlNode); + ValidationResult validatedResult = new ValidationResult(validatedNode); // Get parameter metadata. RelDataType parameterRowType = planner.getParameterRowType(); ParameterMetadata parameterMetadata = createParameterMetadata(parameterRowType); - return new ValidStatement<>(parsedResult, validatedNode, parameterMetadata); + return new ValidStatement<>(parsedResult, validatedResult, parameterMetadata); }, planningPool); // Optimize - return validFut.thenCompose(stmt -> { // Use parameter metadata to compute a cache key. CacheKey key = createCacheKeyFromParameterMetadata(stmt.parsedResult, ctx, stmt.parameterMetadata); - CompletableFuture<QueryPlan> planFut = cache.get(key, k -> CompletableFuture.supplyAsync(() -> { - IgnitePlanner planner = ctx.planner(); + return cache.get(key, k -> CompletableFuture.supplyAsync(() -> buildDmlPlan(key, stmt, ctx), planningPool)); + }); + } - SqlNode validatedNode = stmt.value; - ParameterMetadata parameterMetadata = stmt.parameterMetadata; + private PlanInfo buildDmlPlan(CacheKey key, ValidStatement<ValidationResult> stmt, PlanningContext ctx) { + IgnitePlanner planner = ctx.planner(); - RelWithMetadata relWithMetadata = doOptimize(ctx, validatedNode, planner, () -> cache.invalidate(key)); - IgniteRel optimizedRel = relWithMetadata.rel; + SqlNode validatedNode = stmt.value.sqlNode(); + ParameterMetadata parameterMetadata = stmt.parameterMetadata; - int catalogVersion = ctx.catalogVersion(); + RelWithMetadata relWithMetadata = doOptimize(ctx, validatedNode, planner, () -> cache.invalidate(key)); + IgniteRel optimizedRel = relWithMetadata.rel; - ExplainablePlan plan; - if (optimizedRel instanceof IgniteKeyValueModify) { - IgniteKeyValueModify kvModify = (IgniteKeyValueModify) optimizedRel; + int catalogVersion = ctx.catalogVersion(); - plan = new KeyValueModifyPlan( - nextPlanId(), - catalogVersion, - kvModify, - DML_METADATA, - parameterMetadata, - relWithMetadata.paMetadata, - relWithMetadata.ppMetadata - ); - } else { - plan = new MultiStepPlan( - nextPlanId(), - SqlQueryType.DML, - optimizedRel, - DML_METADATA, - parameterMetadata, - catalogVersion, - relWithMetadata.numSources, - null, - relWithMetadata.paMetadata, - relWithMetadata.ppMetadata - ); - } + ExplainablePlan plan; + if (optimizedRel instanceof IgniteKeyValueModify) { + IgniteKeyValueModify kvModify = (IgniteKeyValueModify) optimizedRel; - logPlan(parsedResult.originalQuery(), plan); + plan = new KeyValueModifyPlan( + nextPlanId(), + catalogVersion, + kvModify, + DML_METADATA, + parameterMetadata, + relWithMetadata.paMetadata, + relWithMetadata.ppMetadata + ); + } else { + plan = new MultiStepPlan( + nextPlanId(), + SqlQueryType.DML, + optimizedRel, + DML_METADATA, + parameterMetadata, + catalogVersion, + relWithMetadata.numSources, + null, + relWithMetadata.paMetadata, + relWithMetadata.ppMetadata + ); + } - return plan; - }, planningPool)); + logPlan(key.parsedResult().originalQuery(), plan); - return planFut; - }); + int currentCatalogVersion = schemaManager.catalogVersion(clockService.now().longValue()); + + if (currentCatalogVersion == catalogVersion) { + Set<Integer> sources = resolveSources(plan.getRel()); + + return new PlanInfo(plan, stmt, ctx, sources); + } + + return new PlanInfo(plan); } private @Nullable QueryPlan tryOptimizeFast( @@ -741,7 +874,7 @@ public class PrepareServiceImpl implements PrepareService { paramTypes[idx++] = columnType; } - return new CacheKey(catalogVersion, schemaName, parsedResult.normalizedQuery(), true /* distributed */, paramTypes); + return new CacheKey(catalogVersion, schemaName, parsedResult, true /* distributed */, paramTypes); } private static CacheKey createCacheKeyFromParameterMetadata(ParsedResult parsedResult, PlanningContext ctx, @@ -764,7 +897,35 @@ public class PrepareServiceImpl implements PrepareService { paramTypes = result; } - return new CacheKey(catalogVersion, ctx.schemaName(), parsedResult.normalizedQuery(), distributed, paramTypes); + return new CacheKey(catalogVersion, ctx.schemaName(), parsedResult, distributed, paramTypes); + } + + private static Set<Integer> resolveSources(IgniteRel rel) { + Set<Integer> tables = new HashSet<>(); + + IgniteRelShuttle shuttle = new IgniteRelShuttle() { + @Override + public IgniteRel visit(IgniteTableModify rel) { + IgniteTable igniteTable = rel.getTable().unwrapOrThrow(IgniteTable.class); + + tables.add(igniteTable.id()); + + return super.visit(rel); + } + + @Override + public IgniteRel visit(IgniteTableScan rel) { + IgniteTable igniteTable = rel.getTable().unwrapOrThrow(IgniteTable.class); + + tables.add(igniteTable.id()); + + return rel; + } + }; + + shuttle.visit(rel); + + return tables; } private static ResultSetMetadata resultSetMetadata( @@ -984,4 +1145,32 @@ public class PrepareServiceImpl implements PrepareService { return matches; } } + + private static class PlanInfo { + private final QueryPlan queryPlan; + @Nullable + private final ValidStatement<ValidationResult> statement; + @Nullable + private final PlanningContext context; + private final Set<Integer> sources; + + PlanInfo( + QueryPlan plan, + @Nullable ValidStatement<ValidationResult> statement, + @Nullable PlanningContext context, + Set<Integer> sources + ) { + this.queryPlan = plan; + this.statement = statement; + this.context = context; + this.sources = sources; + } + + PlanInfo(QueryPlan plan) { + this.queryPlan = plan; + this.statement = null; + this.context = null; + this.sources = Collections.emptySet(); + } + } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/ValidationResult.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/ValidationResult.java index 5dff334ffa2..c9dca39f32a 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/ValidationResult.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/ValidationResult.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.sql.engine.prepare; import java.util.List; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.sql.SqlNode; +import org.jetbrains.annotations.Nullable; /** * ValidationResult holder. @@ -28,11 +29,11 @@ import org.apache.calcite.sql.SqlNode; public class ValidationResult { private final SqlNode sqlNode; - private final RelDataType dataType; + @Nullable private final RelDataType dataType; - private final List<List<String>> origins; + @Nullable private final List<List<String>> origins; - private final List<String> aliases; + @Nullable private final List<String> aliases; /** * Constructor. @@ -42,13 +43,30 @@ public class ValidationResult { * @param origins Type fields provenance. * @param aliases Derived column names. */ - ValidationResult(SqlNode sqlNode, RelDataType dataType, List<List<String>> origins, List<String> aliases) { + ValidationResult( + SqlNode sqlNode, + @Nullable RelDataType dataType, + @Nullable List<List<String>> origins, + @Nullable List<String> aliases + ) { this.sqlNode = sqlNode; this.dataType = dataType; this.origins = origins; this.aliases = aliases; } + /** + * Constructor. + * + * @param sqlNode Validated SQL node. + */ + ValidationResult(SqlNode sqlNode) { + this.sqlNode = sqlNode; + this.dataType = null; + this.origins = null; + this.aliases = null; + } + /** * Get validated SQL node. */ @@ -59,19 +77,19 @@ public class ValidationResult { /** * Get validated type. */ - public RelDataType dataType() { + @Nullable public RelDataType dataType() { return dataType; } /** * Get type fields provenance. */ - public List<List<String>> origins() { + @Nullable public List<List<String>> origins() { return origins; } /** Return alternatively derived column names. */ - public List<String> aliases() { + @Nullable public List<String> aliases() { return aliases; } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManager.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManager.java index 6b98b1a6f02..b0fb66f801e 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManager.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManager.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.sql.engine.statistic; +import java.util.function.IntConsumer; import org.apache.ignite.internal.sql.engine.exec.LifecycleAware; /** @@ -28,6 +29,9 @@ public interface SqlStatisticManager extends LifecycleAware { */ long tableSize(int tableId); + /** Plan updater callback. */ + void planUpdater(IntConsumer updater); + @Override default void start(){} diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java index aa1f51e5361..977d84e463f 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java @@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.IntConsumer; import org.apache.ignite.internal.catalog.CatalogService; import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; import org.apache.ignite.internal.catalog.events.CatalogEvent; @@ -65,6 +66,8 @@ public class SqlStatisticManagerImpl implements SqlStatisticManager { private final CatalogService catalogService; private final LowWatermark lowWatermark; + private volatile IntConsumer planUpdater; + /* Contains all known table id's with statistics. */ private final ConcurrentMap<Integer, ActualSize> tableSizeMap = new ConcurrentHashMap<>(); @@ -77,6 +80,10 @@ public class SqlStatisticManagerImpl implements SqlStatisticManager { this.lowWatermark = lowWatermark; } + @Override + public void planUpdater(IntConsumer updater) { + this.planUpdater = updater; + } /** * Returns approximate number of rows in table by their id. @@ -130,6 +137,12 @@ public class SqlStatisticManagerImpl implements SqlStatisticManager { }).exceptionally(e -> { LOG.info("Can't calculate size for table [id={}].", e, tableId); return null; + }).whenComplete((ignored, ex) -> { + if (ex == null) { + if (planUpdater != null) { + planUpdater.accept(tableId); + } + } }); latestUpdateFut.updateAndGet(prev -> prev == null ? updateResult : prev.thenCompose(none -> updateResult)); @@ -183,16 +196,16 @@ public class SqlStatisticManagerImpl implements SqlStatisticManager { long timestamp; long size; - public ActualSize(long size, long timestamp) { + ActualSize(long size, long timestamp) { this.timestamp = timestamp; this.size = size; } - public long getTimestamp() { + long getTimestamp() { return timestamp; } - public long getSize() { + long getSize() { return size; } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/cache/Cache.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/cache/Cache.java index c56ac52a10e..c14f45c948d 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/cache/Cache.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/cache/Cache.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.sql.engine.util.cache; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Predicate; @@ -93,4 +95,12 @@ public interface Cache<K, V> { * @return The number of entries in the cache. */ int size(); + + /** + * Returns a view of the entries stored in this cache as a thread-safe map. Modifications made to + * the map directly affect the cache. + * + * @return A thread-safe view of this cache supporting all of the optional {@link Map} operations. + */ + ConcurrentMap<K, V> asMap(); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/cache/CacheFactory.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/cache/CacheFactory.java index d1e28046b0a..a8d3d0f9505 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/cache/CacheFactory.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/cache/CacheFactory.java @@ -55,4 +55,13 @@ public interface CacheFactory { * @param <V> Type of the value object. */ <K, V> Cache<K, V> create(int size, StatsCounter statCounter, Duration expireAfterAccess); + + /** + * Creates a cache where each key (not value) stored in the cache should be wrapped in a + * {@link java.lang.ref.WeakReference}. + * + * @param <K> Type of the key object. + * @param <V> Type of the value object. + */ + <K, V> Cache<K, V> createWithWeakKeys(); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/cache/CaffeineCacheFactory.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/cache/CaffeineCacheFactory.java index 55e9e8b0767..ccdd5c160fe 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/cache/CaffeineCacheFactory.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/cache/CaffeineCacheFactory.java @@ -21,6 +21,7 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.RemovalCause; import com.github.benmanes.caffeine.cache.stats.CacheStats; import java.time.Duration; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; import java.util.function.BiFunction; import java.util.function.Function; @@ -84,6 +85,18 @@ public class CaffeineCacheFactory implements CacheFactory { return create(size, null); } + /** {@inheritDoc} */ + @Override + public <K, V> Cache<K, V> createWithWeakKeys() { + Caffeine<Object, Object> builder = Caffeine.newBuilder(); + + if (executor != null) { + builder.executor(executor); + } + + return new CaffeineCacheToCacheAdapter<>(builder.weakKeys().build()); + } + private static class CaffeineStatsCounterAdapter implements com.github.benmanes.caffeine.cache.stats.StatsCounter { private final StatsCounter statsCounter; @@ -139,6 +152,11 @@ public class CaffeineCacheFactory implements CacheFactory { return cache.get(key, mappingFunction); } + @Override + public ConcurrentMap<K, V> asMap() { + return cache.asMap(); + } + @Override public void put(K key, V value) { cache.put(key, value); diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java index ce1731eb294..88340fbe56c 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java @@ -81,6 +81,7 @@ import org.apache.ignite.internal.components.SystemPropertiesNodeProperties; import org.apache.ignite.internal.failure.FailureManager; import org.apache.ignite.internal.failure.handlers.NoOpFailureHandler; import org.apache.ignite.internal.hlc.ClockService; +import org.apache.ignite.internal.hlc.ClockServiceImpl; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.hlc.HybridTimestamp; @@ -252,6 +253,10 @@ public class ExecutionServiceImplTest extends BaseIgniteAbstractTest { .map(node -> create(node, mappingCacheFactory, executorsFactory.apply(node))) .collect(Collectors.toList()); + ClockServiceImpl clockService = mock(ClockServiceImpl.class); + + when(clockService.now()).thenReturn(new HybridTimestamp(1_000, 500)); + prepareService = new PrepareServiceImpl( "test", 0, @@ -261,7 +266,8 @@ public class ExecutionServiceImplTest extends BaseIgniteAbstractTest { PLANNING_THREAD_COUNT, PLAN_EXPIRATION_SECONDS, metricManager, - new PredefinedSchemaManager(schema) + new PredefinedSchemaManager(schema), + clockService ); parserService = new ParserServiceImpl(); @@ -1474,6 +1480,11 @@ public class ExecutionServiceImplTest extends BaseIgniteAbstractTest { throw new UnsupportedOperationException(); } + @Override + public <K, V> Cache<K, V> createWithWeakKeys() { + throw new UnsupportedOperationException(); + } + private static class BlockOnComputeCache<K, V> extends EmptyCacheFactory.EmptyCache<K, V> { private final CountDownLatch waitLatch; diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java index 81aa4c30edb..3d4b18406cb 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java @@ -45,6 +45,7 @@ import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager; public class PredefinedSchemaManager implements SqlSchemaManager { private final IgniteSchemas root; private final Int2ObjectMap<IgniteTable> tableById; + private final SchemaPlus schemaPlus; /** Constructs schema manager from a single schema. */ public PredefinedSchemaManager(IgniteSchema schema) { @@ -67,9 +68,15 @@ public class PredefinedSchemaManager implements SqlSchemaManager { ); } + this.schemaPlus = schemaPlus; + root = new IgniteSchemas(schemaPlus, 0); } + SchemaPlus rootSchema() { + return schemaPlus; + } + /** {@inheritDoc} */ @Override public IgniteSchemas schemas(int catalogVersion) { diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java index ab6b18ae8ff..528c0839ea5 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java @@ -28,6 +28,7 @@ import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.lang.reflect.Proxy; import java.time.Clock; @@ -54,6 +55,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.IntConsumer; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -83,6 +85,7 @@ import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopolog import org.apache.ignite.internal.components.SystemPropertiesNodeProperties; import org.apache.ignite.internal.event.EventListener; import org.apache.ignite.internal.failure.FailureProcessor; +import org.apache.ignite.internal.hlc.ClockServiceImpl; import org.apache.ignite.internal.hlc.ClockWaiter; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; @@ -745,6 +748,11 @@ public class TestBuilders { ConcurrentMap<String, Long> tablesSize = new ConcurrentHashMap<>(); var schemaManager = createSqlSchemaManager(catalogManager, tablesSize); + + ClockServiceImpl clockService = mock(ClockServiceImpl.class); + + when(clockService.now()).thenReturn(new HybridTimestamp(1_000, 500)); + var prepareService = new PrepareServiceImpl( clusterName, 0, @@ -754,7 +762,8 @@ public class TestBuilders { PLANNING_THREAD_COUNT, PLAN_EXPIRATION_SECONDS, new NoOpMetricManager(), - schemaManager + schemaManager, + clockService ); Map<String, List<String>> systemViewsByNode = new HashMap<>(); @@ -931,15 +940,23 @@ public class TestBuilders { } private static SqlSchemaManagerImpl createSqlSchemaManager(CatalogManager catalogManager, ConcurrentMap<String, Long> tablesSize) { - SqlStatisticManager sqlStatisticManager = tableId -> { - CatalogTableDescriptor descriptor = catalogManager.activeCatalog(Long.MAX_VALUE).table(tableId); - long fallbackSize = 10_000; + SqlStatisticManager sqlStatisticManager = new SqlStatisticManager() { + @Override + public long tableSize(int tableId) { + CatalogTableDescriptor descriptor = catalogManager.activeCatalog(Long.MAX_VALUE).table(tableId); + long fallbackSize = 10_000; - if (descriptor == null) { - return fallbackSize; + if (descriptor == null) { + return fallbackSize; + } + + return tablesSize.getOrDefault(descriptor.name(), 10_000L); } - return tablesSize.getOrDefault(descriptor.name(), 10_000L); + @Override + public void planUpdater(IntConsumer updater) { + throw new UnsupportedOperationException(); + } }; return new SqlSchemaManagerImpl( diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/VersionedSchemaManager.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/VersionedSchemaManager.java new file mode 100644 index 00000000000..a7c4ad2104d --- /dev/null +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/VersionedSchemaManager.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.framework; + +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.internal.sql.engine.schema.IgniteSchema; +import org.apache.ignite.internal.sql.engine.schema.IgniteSchemas; + +/** + * Dummy wrapper for predefined collection of schemas with possibility to change catalog version. + * + * @see PredefinedSchemaManager + */ +public class VersionedSchemaManager extends PredefinedSchemaManager { + private final AtomicInteger ver; + + /** Constructor. */ + public VersionedSchemaManager(IgniteSchema schema, AtomicInteger ver) { + super(schema); + + this.ver = ver; + } + + /** {@inheritDoc} */ + @Override + public int catalogVersion(long timestamp) { + return ver == null ? super.catalogVersion(timestamp) : ver.get(); + } + + /** {@inheritDoc} */ + @Override + public IgniteSchemas schemas(long timestamp) { + return new IgniteSchemas(rootSchema(), ver.get()); + } +} diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTimeoutTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTimeoutTest.java index 94f20b76755..52d538c1b44 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTimeoutTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTimeoutTest.java @@ -26,6 +26,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; import java.time.Duration; import java.util.List; @@ -35,6 +36,7 @@ import org.apache.calcite.plan.volcano.VolcanoPlanner; import org.apache.calcite.plan.volcano.VolcanoTimeoutException; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.RelVisitor; +import org.apache.ignite.internal.hlc.ClockServiceImpl; import org.apache.ignite.internal.metrics.MetricManagerImpl; import org.apache.ignite.internal.sql.engine.SqlOperationContext; import org.apache.ignite.internal.sql.engine.framework.PredefinedSchemaManager; @@ -76,7 +78,8 @@ public class PlannerTimeoutTest extends AbstractPlannerTest { 1, Integer.MAX_VALUE, new MetricManagerImpl(), - new PredefinedSchemaManager(schema) + new PredefinedSchemaManager(schema), + mock(ClockServiceImpl.class) ); prepareService.start(); try { diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImplTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImplTest.java index e21d918b9d6..19b8bfcbea3 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImplTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImplTest.java @@ -17,15 +17,17 @@ package org.apache.ignite.internal.sql.engine.prepare; +import static org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl.PLAN_UPDATER_INITIAL_DELAY; import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException; import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause; import static org.apache.ignite.internal.testframework.IgniteTestUtils.await; +import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.sameInstance; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotSame; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -34,6 +36,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.time.Duration; import java.time.ZoneId; @@ -44,15 +47,19 @@ import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.ignite.internal.hlc.ClockServiceImpl; import org.apache.ignite.internal.hlc.HybridClockImpl; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.metrics.MetricManagerImpl; import org.apache.ignite.internal.sql.SqlCommon; import org.apache.ignite.internal.sql.engine.QueryCancel; import org.apache.ignite.internal.sql.engine.SqlOperationContext; import org.apache.ignite.internal.sql.engine.framework.PredefinedSchemaManager; import org.apache.ignite.internal.sql.engine.framework.TestBuilders; +import org.apache.ignite.internal.sql.engine.framework.VersionedSchemaManager; import org.apache.ignite.internal.sql.engine.prepare.ddl.DdlSqlToCommandConverter; import org.apache.ignite.internal.sql.engine.schema.IgniteSchema; import org.apache.ignite.internal.sql.engine.schema.IgniteTable; @@ -362,6 +369,137 @@ public class PrepareServiceImplTest extends BaseIgniteAbstractTest { )); } + /** Validates that plan for appropriate tableId will be changed by request. */ + @Test + public void statisticUpdatesChangePlans() { + IgniteTable table = TestBuilders.table() + .name("T") + .addColumn("C", NativeTypes.INT32) + .distribution(IgniteDistributions.single()) + .build(); + + IgniteSchema schema = new IgniteSchema("TEST", 0, List.of(table)); + + PrepareServiceImpl service = createPlannerService(schema, CaffeineCacheFactory.INSTANCE, Integer.MAX_VALUE); + + assertThat(service.cache.size(), is(0)); + + String selectQuery = "SELECT * FROM test.t WHERE c = 1"; + QueryPlan selectPlan = await(service.prepareAsync(parse(selectQuery), operationContext().build())); + + assertThat(service.cache.size(), is(1)); + + String insertQuery = "INSERT INTO test.t VALUES(OCTET_LENGTH('TEST')), (2)"; + QueryPlan insertPlan = await(service.prepareAsync(parse(insertQuery), operationContext().build())); + + assertThat(service.cache.size(), is(2)); + + service.updatePlans(table.id()); + + Awaitility.await() + .atMost(Duration.ofMillis(2 * PLAN_UPDATER_INITIAL_DELAY)) + .until( + () -> !selectPlan.equals(await(service.prepareAsync(parse(selectQuery), operationContext().build()))) + ); + + Awaitility.await() + .atMost(Duration.ofMillis(2 * PLAN_UPDATER_INITIAL_DELAY)) + .until( + () -> !insertPlan.equals(await(service.prepareAsync(parse(insertQuery), operationContext().build()))) + ); + + assertThat(service.cache.size(), is(2)); + } + + @Test + public void planUpdatesForNonCachedTable() { + IgniteTable table1 = TestBuilders.table() + .name("T1") + .addColumn("C", NativeTypes.INT32) + .distribution(IgniteDistributions.single()) + .build(); + + IgniteTable table2 = TestBuilders.table() + .name("T2") + .addColumn("C", NativeTypes.INT32) + .distribution(IgniteDistributions.single()) + .build(); + + IgniteSchema schema = new IgniteSchema("TEST", 0, List.of(table1, table2)); + + // 1 item cache plan size + PrepareServiceImpl service = (PrepareServiceImpl) createPlannerService(schema, 1); + + String selectQuery = "SELECT * FROM test.t1 WHERE c = 1"; + await(service.prepareAsync(parse(selectQuery), operationContext().build())); + + assertThat(service.cache.size(), is(1)); + CacheKey key1 = service.cache.asMap().keySet().iterator().next(); + + // different table + String insertQuery = "SELECT * FROM test.t2 WHERE c = 1"; + QueryPlan plan2 = await(service.prepareAsync(parse(insertQuery), operationContext().build())); + assertThat(service.cache.size(), is(1)); + CacheKey key2 = service.cache.asMap().keySet().iterator().next(); + + assertNotEquals(key1, key2); + + // not cached table + service.updatePlans(table1.id()); + + // cached table + service.updatePlans(table2.id()); + + Awaitility.await() + .atMost(Duration.ofMillis(2 * PLAN_UPDATER_INITIAL_DELAY)) + .until( + () -> !plan2.equals(await(service.prepareAsync(parse(insertQuery), operationContext().build()))) + ); + } + + /** Validate that plan updates only for current catalog version. */ + @Test + public void planUpdatesForCurrentCatalogVersion() { + IgniteTable table1 = TestBuilders.table() + .name("T1") + .addColumn("C", NativeTypes.INT32) + .distribution(IgniteDistributions.single()) + .build(); + + IgniteSchema schema = new IgniteSchema("TEST", 0, List.of(table1)); + + AtomicInteger ver = new AtomicInteger(); + PrepareServiceImpl service = createPlannerService(schema, CaffeineCacheFactory.INSTANCE, 10000, + Integer.MAX_VALUE, 1000, ver); + + String selectQuery = "SELECT * FROM test.t1 WHERE c = 1"; + QueryPlan plan1 = await(service.prepareAsync(parse(selectQuery), operationContext().build())); + + // catalog version 1 + ver.incrementAndGet(); + + QueryPlan plan2 = await(service.prepareAsync(parse(selectQuery), operationContext().build())); + + Awaitility.await() + .atMost(Duration.ofMillis(10000)) + .until( + () -> service.cache.size() == 2 + ); + + assertThat(service.cache.size(), is(2)); + service.updatePlans(table1.id()); + + Awaitility.await() + .atMost(Duration.ofMillis(2 * PLAN_UPDATER_INITIAL_DELAY)) + .until( + () -> !plan2.equals(await(service.prepareAsync(parse(selectQuery), operationContext().build()))) + ); + + // previous catalog, get cached plan + ver.set(0); + assertEquals(plan1, await(service.prepareAsync(parse(selectQuery), operationContext().build()))); + } + @Test public void planCacheExpiry() { IgniteTable table = TestBuilders.table() @@ -374,7 +512,7 @@ public class PrepareServiceImplTest extends BaseIgniteAbstractTest { Awaitility.await().timeout(30, TimeUnit.SECONDS).untilAsserted(() -> { int expireSeconds = 2; - PrepareService service = createPlannerService(schema, CaffeineCacheFactory.INSTANCE, Integer.MAX_VALUE, 2); + PrepareService service = createPlannerService(schema, CaffeineCacheFactory.INSTANCE, Integer.MAX_VALUE, expireSeconds, 1000); String query = "SELECT * FROM test.t WHERE c = 1"; QueryPlan p0 = await(service.prepareAsync(parse(query), operationContext().build())); @@ -549,23 +687,55 @@ public class PrepareServiceImplTest extends BaseIgniteAbstractTest { return createPlannerService(createSchema()); } + private static PrepareService createPlannerService(IgniteSchema schema, int cacheSize) { + return createPlannerService(schema, CaffeineCacheFactory.INSTANCE, 10000, Integer.MAX_VALUE, cacheSize); + } + private static PrepareService createPlannerService(IgniteSchema schema) { - return createPlannerService(schema, CaffeineCacheFactory.INSTANCE, 1000); + return createPlannerService(schema, CaffeineCacheFactory.INSTANCE, 10000); } private static PrepareServiceImpl createPlannerService(IgniteSchema schemas, CacheFactory cacheFactory, int timeoutMillis) { - return createPlannerService(schemas, cacheFactory, timeoutMillis, Integer.MAX_VALUE); + return createPlannerService(schemas, cacheFactory, timeoutMillis, Integer.MAX_VALUE, 1000); + } + + private static PrepareServiceImpl createPlannerService( + IgniteSchema schemas, + CacheFactory cacheFactory, + int timeoutMillis, + int planExpireSeconds, + int cacheSize + ) { + ClockServiceImpl clockService = mock(ClockServiceImpl.class); + + when(clockService.now()).thenReturn(new HybridTimestamp(1_000, 500)); + + PrepareServiceImpl service = new PrepareServiceImpl("test", cacheSize, cacheFactory, + mock(DdlSqlToCommandConverter.class), timeoutMillis, 2, planExpireSeconds, mock(MetricManagerImpl.class), + new PredefinedSchemaManager(schemas), clockService); + + createdServices.add(service); + + service.start(); + + return service; } private static PrepareServiceImpl createPlannerService( IgniteSchema schemas, CacheFactory cacheFactory, int timeoutMillis, - int planExpireSeconds + int planExpireSeconds, + int cacheSize, + AtomicInteger ver ) { - PrepareServiceImpl service = new PrepareServiceImpl("test", 1000, cacheFactory, + ClockServiceImpl clockService = mock(ClockServiceImpl.class); + + when(clockService.now()).thenReturn(new HybridTimestamp(1_000, 500)); + + PrepareServiceImpl service = new PrepareServiceImpl("test", cacheSize, cacheFactory, mock(DdlSqlToCommandConverter.class), timeoutMillis, 2, planExpireSeconds, mock(MetricManagerImpl.class), - new PredefinedSchemaManager(schemas)); + new VersionedSchemaManager(schemas, ver), clockService); createdServices.add(service); @@ -595,5 +765,10 @@ public class PrepareServiceImplTest extends BaseIgniteAbstractTest { public <K, V> Cache<K, V> create(int size, StatsCounter statCounter, Duration expireAfterAccess) { return (Cache<K, V>) cache; } + + @Override + public <K, V> Cache<K, V> createWithWeakKeys() { + return CaffeineCacheFactory.INSTANCE.createWithWeakKeys(); + } } } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImplTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImplTest.java index 1546ff69268..10719fc4a8b 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImplTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImplTest.java @@ -42,6 +42,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Function; +import java.util.function.IntConsumer; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.calcite.rel.RelCollations; @@ -114,7 +115,17 @@ public class SqlSchemaManagerImplTest extends BaseIgniteAbstractTest { @BeforeEach void init() { - sqlStatisticManager = tableId -> 10_000L; + sqlStatisticManager = new SqlStatisticManager() { + @Override + public long tableSize(int tableId) { + return 10_000; + } + + @Override + public void planUpdater(IntConsumer updater) { + throw new UnsupportedOperationException(); + } + }; catalogManager = CatalogTestUtils.createCatalogManagerWithTestUpdateLog("test", new HybridClockImpl()); sqlSchemaManager = new SqlSchemaManagerImpl( diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/EmptyCacheFactory.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/EmptyCacheFactory.java index b2e00a9f74f..5b02bfe9701 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/EmptyCacheFactory.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/EmptyCacheFactory.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.sql.engine.util; import java.time.Duration; +import java.util.concurrent.ConcurrentMap; import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Predicate; @@ -56,6 +57,11 @@ public class EmptyCacheFactory implements CacheFactory { return create(size); } + @Override + public <K, V> Cache<K, V> createWithWeakKeys() { + return create(0); + } + /** A cache that keeps no object. */ public static class EmptyCache<K, V> implements Cache<K, V> { @Override @@ -97,5 +103,10 @@ public class EmptyCacheFactory implements CacheFactory { public int size() { return 0; } + + @Override + public ConcurrentMap<K, V> asMap() { + throw new UnsupportedOperationException(); + } } } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/metrics/PlanningCacheMetricsTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/metrics/PlanningCacheMetricsTest.java index 5cb1bd0d4b2..95d2f9a14e5 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/metrics/PlanningCacheMetricsTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/metrics/PlanningCacheMetricsTest.java @@ -19,7 +19,11 @@ package org.apache.ignite.internal.sql.metrics; import static org.apache.ignite.internal.testframework.IgniteTestUtils.await; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import org.apache.ignite.internal.hlc.ClockServiceImpl; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.metrics.MetricManager; import org.apache.ignite.internal.metrics.MetricManagerImpl; import org.apache.ignite.internal.metrics.MetricSet; @@ -61,8 +65,13 @@ public class PlanningCacheMetricsTest extends AbstractPlannerTest { IgniteSchema schema = createSchema(table); + ClockServiceImpl clockService = mock(ClockServiceImpl.class); + + when(clockService.now()).thenReturn(new HybridTimestamp(1_000, 500)); + PrepareService prepareService = new PrepareServiceImpl( - "test", 2, cacheFactory, null, 15_000L, 2, Integer.MAX_VALUE, metricManager, new PredefinedSchemaManager(schema) + "test", 2, cacheFactory, null, 15_000L, 2, Integer.MAX_VALUE, metricManager, new PredefinedSchemaManager(schema), + clockService ); prepareService.start();
