zstan commented on code in PR #6593:
URL: https://github.com/apache/ignite-3/pull/6593#discussion_r2381120467
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java:
##########
@@ -858,6 +1046,150 @@ private static ParameterMetadata
createParameterMetadata(RelDataType parameterRo
return new ParameterMetadata(parameterTypes);
}
+ // Although catalog version can evaluate during execution, it`s ok to take
actual version for this moment.
+ private int directCatalogVersion() {
+ return schemaManager.catalogVersion(clockService.now().longValue());
+ }
+
+ public void statisticsChanged(int tableId) {
+ planUpdater.statisticsChanged(tableId);
+ }
+
+ private static class PlanUpdater {
+ private final ScheduledExecutorService planUpdater;
+
+ private final AtomicReference<CompletableFuture<PlanInfo>>
rePlanningFut = new AtomicReference<>(nullCompletedFuture());
+
+ private volatile boolean recalculatePlans;
+
+ private final Cache<CacheKey, CompletableFuture<PlanInfo>> cache;
+
+ private final long plannerTimeout;
+
+ private final PlanPrepare prepare;
+
+ private final IntSupplier catalogVersionSupplier;
+
+ PlanUpdater(
+ ScheduledExecutorService planUpdater,
+ Cache<CacheKey, CompletableFuture<PlanInfo>> cache,
+ long plannerTimeout,
+ PlanPrepare prepare,
+ IntSupplier catalogVersionSupplier
+ ) {
+ this.planUpdater = planUpdater;
+ this.cache = cache;
+ this.plannerTimeout = plannerTimeout;
+ this.prepare = prepare;
+ this.catalogVersionSupplier = catalogVersionSupplier;
+ }
+
+ /**
+ * Reacts to the changed statistic.
+ *
+ * @param tableId Table Id statistic changed for.
+ */
+ void statisticsChanged(int tableId) {
+ Set<Entry<CacheKey, CompletableFuture<PlanInfo>>> cachedEntries =
cache.entrySet();
+
+ int currentCatalogVersion = catalogVersionSupplier.getAsInt();
+
+ boolean statChanged = false;
+
+ for (Map.Entry<CacheKey, CompletableFuture<PlanInfo>> ent :
cachedEntries) {
+ CacheKey key = ent.getKey();
+ CompletableFuture<PlanInfo> fut = ent.getValue();
+
+ if (currentCatalogVersion == key.catalogVersion() &&
isCompletedSuccessfully(fut)) {
+ // no wait, already completed
+ PlanInfo info = fut.join();
+
+ if (info.sources.contains(tableId)) {
+ ent.getKey().invalidate();
+ statChanged = true;
+ }
+ }
+ }
+
+ if (statChanged) {
+ recalculatePlans = true;
+ }
+ }
+
+ void start() {
+ planUpdater.scheduleAtFixedRate(() -> {
+ if (!recalculatePlans) {
+ return;
+ }
+
+ CompletableFuture<PlanInfo> planFut = rePlanningFut.get();
+ if (planFut != null && !planFut.isDone()) {
+ // some work still in progress
+ return;
+ } else {
+ rePlanningFut.set(null);
+ }
+
+ while (recalculatePlans) {
+ recalculatePlans = false;
+
+ for (Entry<CacheKey, CompletableFuture<PlanInfo>> ent :
cache.entrySet()) {
+ if (!ent.getKey().needInvalidate()) {
+ continue;
+ }
+
+ CacheKey key = ent.getKey();
+ CompletableFuture<PlanInfo> fut = cache.get(key);
+
+ // can be evicted
+ if (fut != null) {
+ assert isCompletedSuccessfully(fut);
+
+ PlanInfo info = fut.join();
+
+ assert info.context != null && info.statement !=
null;
+
+ int currentCatalogVersion =
catalogVersionSupplier.getAsInt();
+
+ if (currentCatalogVersion == key.catalogVersion())
{
+ SqlQueryType queryType =
info.statement.parsedResult().queryType();
+
+ if (queryType != SqlQueryType.QUERY &&
queryType != SqlQueryType.DML) {
+ assert false : "Unexpected type: " +
queryType;
+ continue;
+ }
Review Comment:
if we store only assert the behavior will continue to be different, if we
store only continue - never catch invariant problems, thus i prefer to store it
as is, wdyt ?
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java:
##########
@@ -459,63 +532,95 @@ private CompletableFuture<QueryPlan> prepareQuery(
// Try to produce a fast plan, if successful, then return that
plan w/o caching it.
QueryPlan fastPlan = tryOptimizeFast(stmt, ctx);
if (fastPlan != null) {
- return CompletableFuture.completedFuture(fastPlan);
+ return CompletableFuture.completedFuture(new
PlanInfo(fastPlan));
}
}
- // Use parameter metadata to compute a cache key.
- CacheKey key =
createCacheKeyFromParameterMetadata(stmt.parsedResult, ctx,
stmt.parameterMetadata);
+ if (prepareWithCache) {
+ // Use parameter metadata to compute a cache key.
+ CacheKey cacheKey =
createCacheKeyFromParameterMetadata(stmt.parsedResult, ctx,
stmt.parameterMetadata);
- CompletableFuture<QueryPlan> planFut = cache.get(key, k ->
CompletableFuture.supplyAsync(() -> {
- IgnitePlanner planner = ctx.planner();
+ return cache.get(cacheKey, k ->
CompletableFuture.supplyAsync(() -> buildQueryPlan(stmt, ctx,
+ () -> cache.invalidate(cacheKey)), planningPool));
+ } else {
+ return CompletableFuture.supplyAsync(() ->
buildQueryPlan(stmt, ctx, () -> {}), planningPool);
+ }
+ });
+ }
- ValidationResult validated = stmt.value;
- ParameterMetadata parameterMetadata = stmt.parameterMetadata;
+ private PlanInfo buildQueryPlan(ValidStatement<ValidationResult> stmt,
PlanningContext ctx, Runnable onTimeoutAction) {
+ IgnitePlanner planner = ctx.planner();
- SqlNode validatedNode = validated.sqlNode();
+ ValidationResult validated = stmt.value;
+ ParameterMetadata parameterMetadata = stmt.parameterMetadata;
- RelWithMetadata relWithMetadata = doOptimize(ctx,
validatedNode, planner, () -> cache.invalidate(key));
- IgniteRel optimizedRel = relWithMetadata.rel;
- QueryPlan fastPlan = tryOptimizeFast(stmt, ctx);
+ SqlNode validatedNode = validated.sqlNode();
- ResultSetMetadata resultSetMetadata =
resultSetMetadata(validated.dataType(), validated.origins(),
validated.aliases());
+ RelWithMetadata relWithMetadata = doOptimize(ctx, validatedNode,
planner, onTimeoutAction);
+ IgniteRel optimizedRel = relWithMetadata.rel;
+ QueryPlan fastPlan = tryOptimizeFast(stmt, ctx);
- int catalogVersion = ctx.catalogVersion();
+ ResultSetMetadata resultSetMetadata =
resultSetMetadata(validated.dataType(), validated.origins(),
validated.aliases());
- if (optimizedRel instanceof IgniteKeyValueGet) {
- IgniteKeyValueGet kvGet = (IgniteKeyValueGet) optimizedRel;
+ int catalogVersion = ctx.catalogVersion();
- return new KeyValueGetPlan(
- nextPlanId(),
- catalogVersion,
- kvGet,
- resultSetMetadata,
- parameterMetadata,
- relWithMetadata.paMetadata,
- relWithMetadata.ppMetadata
- );
- }
+ if (optimizedRel instanceof IgniteKeyValueGet) {
+ IgniteKeyValueGet kvGet = (IgniteKeyValueGet) optimizedRel;
- var plan = new MultiStepPlan(
- nextPlanId(),
- SqlQueryType.QUERY,
- optimizedRel,
- resultSetMetadata,
- parameterMetadata,
- catalogVersion,
- relWithMetadata.numSources,
- fastPlan,
- relWithMetadata.paMetadata,
- relWithMetadata.ppMetadata
- );
-
- logPlan(parsedResult.originalQuery(), plan);
-
- return plan;
- }, planningPool));
-
- return planFut;
- });
+ var plan = new KeyValueGetPlan(
+ nextPlanId(),
+ catalogVersion,
+ kvGet,
+ resultSetMetadata,
+ parameterMetadata,
+ relWithMetadata.paMetadata,
+ relWithMetadata.ppMetadata
+ );
+
+ return new PlanInfo(plan);
+ }
+
+ var plan = new MultiStepPlan(
+ nextPlanId(),
+ SqlQueryType.QUERY,
+ optimizedRel,
+ resultSetMetadata,
+ parameterMetadata,
+ catalogVersion,
+ relWithMetadata.numSources,
+ fastPlan,
+ relWithMetadata.paMetadata,
+ relWithMetadata.ppMetadata
+ );
+
+ logPlan(stmt.parsedResult().originalQuery(), plan);
+
+ int currentCatalogVersion = directCatalogVersion();
+
+ if (currentCatalogVersion == catalogVersion) {
+ Set<Integer> sources = resolveSources(plan.getRel());
+
+ return new PlanInfo(plan, stmt, ctx, sources);
+ }
+
+ return new PlanInfo(plan);
Review Comment:
hided
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]