korlov42 commented on code in PR #6593:
URL: https://github.com/apache/ignite-3/pull/6593#discussion_r2371880509
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManager.java:
##########
@@ -28,6 +29,9 @@ public interface SqlStatisticManager extends LifecycleAware {
*/
long tableSize(int tableId);
+ /** Plan updater callback. */
+ void setListener(IntConsumer updater);
Review Comment:
let's introduce distinct interface for listener with meaningful name
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/cache/Cache.java:
##########
@@ -73,6 +75,22 @@ public interface Cache<K, V> {
*/
V compute(K key, BiFunction<? super K, ? super V, ? extends V>
remappingFunction);
+ /**
+ * If the value for the specified key is present and non-null, attempts to
compute a new mapping
+ * given the key and its current mapped value.
+ * If the remapping function returns null, the mapping is removed.
+ * If the remapping function itself throws an (unchecked) exception, the
exception is rethrown,
+ * and the current mapping is left unchanged.
+ *
+ * @param key Key with which the specified value is to be associated.
+ * @param remappingFunction The remapping function to compute a value.
+ * @return The new value associated with the specified key, or null if
none.
+ */
+ @Nullable V computeIfPresent(K key, BiFunction<? super K, ? super V, ?
extends V> remappingFunction);
Review Comment:
why `compute` method is not enough?
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java:
##########
@@ -130,6 +137,12 @@ private void updateTableSizeStatistics(int tableId,
boolean force) {
}).exceptionally(e -> {
LOG.info("Can't calculate size for table [id={}].", e,
tableId);
return null;
+ }).whenComplete((ignored, ex) -> {
+ if (ex == null) {
Review Comment:
this `if` statement will never evaluate to `false`. That is, listener is
always notified, even in case of faulty iteration. Please add tests on
successful and unsuccessful update of the table size.
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/cache/Cache.java:
##########
@@ -73,6 +75,22 @@ public interface Cache<K, V> {
*/
V compute(K key, BiFunction<? super K, ? super V, ? extends V>
remappingFunction);
+ /**
+ * If the value for the specified key is present and non-null, attempts to
compute a new mapping
+ * given the key and its current mapped value.
+ * If the remapping function returns null, the mapping is removed.
+ * If the remapping function itself throws an (unchecked) exception, the
exception is rethrown,
+ * and the current mapping is left unchanged.
+ *
+ * @param key Key with which the specified value is to be associated.
+ * @param remappingFunction The remapping function to compute a value.
+ * @return The new value associated with the specified key, or null if
none.
+ */
+ @Nullable V computeIfPresent(K key, BiFunction<? super K, ? super V, ?
extends V> remappingFunction);
+
+ /** Returns a {@link Set} view of the mappings contained in this map. */
Review Comment:
```suggestion
/** Returns a {@link Set} view of the mappings contained in this cache.
*/
```
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java:
##########
@@ -130,6 +137,12 @@ private void updateTableSizeStatistics(int tableId,
boolean force) {
}).exceptionally(e -> {
LOG.info("Can't calculate size for table [id={}].", e,
tableId);
return null;
+ }).whenComplete((ignored, ex) -> {
+ if (ex == null) {
+ if (planUpdater != null) {
+ planUpdater.accept(tableId);
Review Comment:
double-reading of volatile variable is sigh of a code smell.
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java:
##########
@@ -459,63 +528,95 @@ private CompletableFuture<QueryPlan> prepareQuery(
// Try to produce a fast plan, if successful, then return that
plan w/o caching it.
QueryPlan fastPlan = tryOptimizeFast(stmt, ctx);
if (fastPlan != null) {
- return CompletableFuture.completedFuture(fastPlan);
+ return CompletableFuture.completedFuture(new
PlanInfo(fastPlan));
}
}
- // Use parameter metadata to compute a cache key.
- CacheKey key =
createCacheKeyFromParameterMetadata(stmt.parsedResult, ctx,
stmt.parameterMetadata);
+ if (prepareWithCache) {
+ // Use parameter metadata to compute a cache key.
+ CacheKey cacheKey =
createCacheKeyFromParameterMetadata(stmt.parsedResult, ctx,
stmt.parameterMetadata);
- CompletableFuture<QueryPlan> planFut = cache.get(key, k ->
CompletableFuture.supplyAsync(() -> {
- IgnitePlanner planner = ctx.planner();
+ return cache.get(cacheKey, k ->
CompletableFuture.supplyAsync(() -> buildQueryPlan(stmt, ctx,
+ () -> cache.invalidate(cacheKey)), planningPool));
+ } else {
+ return CompletableFuture.supplyAsync(() ->
buildQueryPlan(stmt, ctx, () -> {}), planningPool);
+ }
+ });
+ }
- ValidationResult validated = stmt.value;
- ParameterMetadata parameterMetadata = stmt.parameterMetadata;
+ private PlanInfo buildQueryPlan(ValidStatement<ValidationResult> stmt,
PlanningContext ctx, Runnable onTimeoutAction) {
+ IgnitePlanner planner = ctx.planner();
- SqlNode validatedNode = validated.sqlNode();
+ ValidationResult validated = stmt.value;
+ ParameterMetadata parameterMetadata = stmt.parameterMetadata;
- RelWithMetadata relWithMetadata = doOptimize(ctx,
validatedNode, planner, () -> cache.invalidate(key));
- IgniteRel optimizedRel = relWithMetadata.rel;
- QueryPlan fastPlan = tryOptimizeFast(stmt, ctx);
+ SqlNode validatedNode = validated.sqlNode();
- ResultSetMetadata resultSetMetadata =
resultSetMetadata(validated.dataType(), validated.origins(),
validated.aliases());
+ RelWithMetadata relWithMetadata = doOptimize(ctx, validatedNode,
planner, onTimeoutAction);
+ IgniteRel optimizedRel = relWithMetadata.rel;
+ QueryPlan fastPlan = tryOptimizeFast(stmt, ctx);
- int catalogVersion = ctx.catalogVersion();
+ ResultSetMetadata resultSetMetadata =
resultSetMetadata(validated.dataType(), validated.origins(),
validated.aliases());
- if (optimizedRel instanceof IgniteKeyValueGet) {
- IgniteKeyValueGet kvGet = (IgniteKeyValueGet) optimizedRel;
+ int catalogVersion = ctx.catalogVersion();
- return new KeyValueGetPlan(
- nextPlanId(),
- catalogVersion,
- kvGet,
- resultSetMetadata,
- parameterMetadata,
- relWithMetadata.paMetadata,
- relWithMetadata.ppMetadata
- );
- }
+ if (optimizedRel instanceof IgniteKeyValueGet) {
+ IgniteKeyValueGet kvGet = (IgniteKeyValueGet) optimizedRel;
- var plan = new MultiStepPlan(
- nextPlanId(),
- SqlQueryType.QUERY,
- optimizedRel,
- resultSetMetadata,
- parameterMetadata,
- catalogVersion,
- relWithMetadata.numSources,
- fastPlan,
- relWithMetadata.paMetadata,
- relWithMetadata.ppMetadata
- );
-
- logPlan(parsedResult.originalQuery(), plan);
-
- return plan;
- }, planningPool));
-
- return planFut;
- });
+ var plan = new KeyValueGetPlan(
+ nextPlanId(),
+ catalogVersion,
+ kvGet,
+ resultSetMetadata,
+ parameterMetadata,
+ relWithMetadata.paMetadata,
+ relWithMetadata.ppMetadata
+ );
+
+ return new PlanInfo(plan);
+ }
+
+ var plan = new MultiStepPlan(
+ nextPlanId(),
+ SqlQueryType.QUERY,
+ optimizedRel,
+ resultSetMetadata,
+ parameterMetadata,
+ catalogVersion,
+ relWithMetadata.numSources,
+ fastPlan,
+ relWithMetadata.paMetadata,
+ relWithMetadata.ppMetadata
+ );
+
+ logPlan(stmt.parsedResult().originalQuery(), plan);
+
+ int currentCatalogVersion =
schemaManager.catalogVersion(clockService.now().longValue());
+
+ if (currentCatalogVersion == catalogVersion) {
+ Set<Integer> sources = resolveSources(plan.getRel());
+
+ return new PlanInfo(plan, stmt, ctx, sources);
+ }
+
+ return new PlanInfo(plan);
+ }
+
+ private CompletableFuture<PlanInfo> preparePlan(SqlQueryType queryType,
ParsedResult parsedRes, PlanningContext ctx, CacheKey key) {
+ int currentCatalogVersion =
schemaManager.catalogVersion(clockService.now().longValue());
Review Comment:
same here, you must explicitly make sure that all metadata have arrived
##########
modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/EmptyCacheFactory.java:
##########
@@ -83,6 +85,16 @@ public V compute(K key, BiFunction<? super K, ? super V, ?
extends V> remappingF
return remappingFunction.apply(key, null);
}
+ @Override
+ public @Nullable V computeIfPresent(K key, BiFunction<? super K, ?
super V, ? extends V> remappingFunction) {
+ return remappingFunction.apply(key, null);
Review Comment:
this implementation is not correct. `computeIfPresent` should guarantee that
old value is never `null`, therefore `remappingFunction ` might not expect
`null`
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java:
##########
@@ -459,63 +528,95 @@ private CompletableFuture<QueryPlan> prepareQuery(
// Try to produce a fast plan, if successful, then return that
plan w/o caching it.
QueryPlan fastPlan = tryOptimizeFast(stmt, ctx);
if (fastPlan != null) {
- return CompletableFuture.completedFuture(fastPlan);
+ return CompletableFuture.completedFuture(new
PlanInfo(fastPlan));
}
}
- // Use parameter metadata to compute a cache key.
- CacheKey key =
createCacheKeyFromParameterMetadata(stmt.parsedResult, ctx,
stmt.parameterMetadata);
+ if (prepareWithCache) {
+ // Use parameter metadata to compute a cache key.
+ CacheKey cacheKey =
createCacheKeyFromParameterMetadata(stmt.parsedResult, ctx,
stmt.parameterMetadata);
- CompletableFuture<QueryPlan> planFut = cache.get(key, k ->
CompletableFuture.supplyAsync(() -> {
- IgnitePlanner planner = ctx.planner();
+ return cache.get(cacheKey, k ->
CompletableFuture.supplyAsync(() -> buildQueryPlan(stmt, ctx,
+ () -> cache.invalidate(cacheKey)), planningPool));
+ } else {
+ return CompletableFuture.supplyAsync(() ->
buildQueryPlan(stmt, ctx, () -> {}), planningPool);
+ }
+ });
+ }
- ValidationResult validated = stmt.value;
- ParameterMetadata parameterMetadata = stmt.parameterMetadata;
+ private PlanInfo buildQueryPlan(ValidStatement<ValidationResult> stmt,
PlanningContext ctx, Runnable onTimeoutAction) {
+ IgnitePlanner planner = ctx.planner();
- SqlNode validatedNode = validated.sqlNode();
+ ValidationResult validated = stmt.value;
+ ParameterMetadata parameterMetadata = stmt.parameterMetadata;
- RelWithMetadata relWithMetadata = doOptimize(ctx,
validatedNode, planner, () -> cache.invalidate(key));
- IgniteRel optimizedRel = relWithMetadata.rel;
- QueryPlan fastPlan = tryOptimizeFast(stmt, ctx);
+ SqlNode validatedNode = validated.sqlNode();
- ResultSetMetadata resultSetMetadata =
resultSetMetadata(validated.dataType(), validated.origins(),
validated.aliases());
+ RelWithMetadata relWithMetadata = doOptimize(ctx, validatedNode,
planner, onTimeoutAction);
+ IgniteRel optimizedRel = relWithMetadata.rel;
+ QueryPlan fastPlan = tryOptimizeFast(stmt, ctx);
- int catalogVersion = ctx.catalogVersion();
+ ResultSetMetadata resultSetMetadata =
resultSetMetadata(validated.dataType(), validated.origins(),
validated.aliases());
- if (optimizedRel instanceof IgniteKeyValueGet) {
- IgniteKeyValueGet kvGet = (IgniteKeyValueGet) optimizedRel;
+ int catalogVersion = ctx.catalogVersion();
- return new KeyValueGetPlan(
- nextPlanId(),
- catalogVersion,
- kvGet,
- resultSetMetadata,
- parameterMetadata,
- relWithMetadata.paMetadata,
- relWithMetadata.ppMetadata
- );
- }
+ if (optimizedRel instanceof IgniteKeyValueGet) {
+ IgniteKeyValueGet kvGet = (IgniteKeyValueGet) optimizedRel;
- var plan = new MultiStepPlan(
- nextPlanId(),
- SqlQueryType.QUERY,
- optimizedRel,
- resultSetMetadata,
- parameterMetadata,
- catalogVersion,
- relWithMetadata.numSources,
- fastPlan,
- relWithMetadata.paMetadata,
- relWithMetadata.ppMetadata
- );
-
- logPlan(parsedResult.originalQuery(), plan);
-
- return plan;
- }, planningPool));
-
- return planFut;
- });
+ var plan = new KeyValueGetPlan(
+ nextPlanId(),
+ catalogVersion,
+ kvGet,
+ resultSetMetadata,
+ parameterMetadata,
+ relWithMetadata.paMetadata,
+ relWithMetadata.ppMetadata
+ );
+
+ return new PlanInfo(plan);
+ }
+
+ var plan = new MultiStepPlan(
+ nextPlanId(),
+ SqlQueryType.QUERY,
+ optimizedRel,
+ resultSetMetadata,
+ parameterMetadata,
+ catalogVersion,
+ relWithMetadata.numSources,
+ fastPlan,
+ relWithMetadata.paMetadata,
+ relWithMetadata.ppMetadata
+ );
+
+ logPlan(stmt.parsedResult().originalQuery(), plan);
+
+ int currentCatalogVersion =
schemaManager.catalogVersion(clockService.now().longValue());
+
+ if (currentCatalogVersion == catalogVersion) {
+ Set<Integer> sources = resolveSources(plan.getRel());
+
+ return new PlanInfo(plan, stmt, ctx, sources);
+ }
Review Comment:
this check is not correct in general. You must wait until it's safe to get
version like this. See `SchemaSyncService`
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java:
##########
@@ -142,6 +157,10 @@ public class PrepareServiceImpl implements PrepareService {
private volatile ThreadPoolExecutor planningPool;
+ private final PlanUpdater planUpdater;
+
+ private final ClockService clockService;
Review Comment:
why do we need an entire service? Wouldn't `HybridClock` been enough?
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java:
##########
@@ -858,6 +1046,149 @@ private static ParameterMetadata
createParameterMetadata(RelDataType parameterRo
return new ParameterMetadata(parameterTypes);
}
+ public void statisticsChanged(int tableId) {
+ planUpdater.statisticsChanged(tableId);
+ }
+
+ private static class PlanUpdater {
+ private final ClockService clockService;
+
+ private final ScheduledExecutorService planUpdater;
+
+ private final AtomicReference<CompletableFuture<PlanInfo>>
rePlanningFut = new AtomicReference<>(nullCompletedFuture());
+
+ private volatile boolean recalculatePlans;
+
+ private final Cache<CacheKey, CompletableFuture<PlanInfo>> cache;
+
+ private final SqlSchemaManager schemaManager;
+
+ private final long plannerTimeout;
+
+ private final PlanPrepare prepare;
+
+ PlanUpdater(
+ ClockService clockService,
+ ScheduledExecutorService planUpdater,
+ Cache<CacheKey, CompletableFuture<PlanInfo>> cache,
+ SqlSchemaManager schemaManager,
+ long plannerTimeout,
+ PlanPrepare prepare
+ ) {
+ this.clockService = clockService;
+ this.planUpdater = planUpdater;
+ this.cache = cache;
+ this.schemaManager = schemaManager;
+ this.plannerTimeout = plannerTimeout;
+ this.prepare = prepare;
+ }
+
+ /**
+ * Reacts to the changed statistic.
+ *
+ * @param tableId Table Id statistic changed for.
+ */
+ void statisticsChanged(int tableId) {
+ Set<Entry<CacheKey, CompletableFuture<PlanInfo>>> cachedEntries =
cache.entrySet();
+
+ int currentCatalogVersion =
schemaManager.catalogVersion(clockService.now().longValue());
+
+ boolean statChanged = false;
+
+ for (Map.Entry<CacheKey, CompletableFuture<PlanInfo>> ent :
cachedEntries) {
Review Comment:
I'm wondering if this affects somehow expiration
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]