AMashenkov commented on code in PR #6593:
URL: https://github.com/apache/ignite-3/pull/6593#discussion_r2379164952


##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java:
##########
@@ -858,6 +1046,150 @@ private static ParameterMetadata 
createParameterMetadata(RelDataType parameterRo
         return new ParameterMetadata(parameterTypes);
     }
 
+    // Although catalog version can evaluate during execution, it`s ok to take 
actual version for this moment.
+    private int directCatalogVersion() {
+        return schemaManager.catalogVersion(clockService.now().longValue());
+    }
+
+    public void statisticsChanged(int tableId) {
+        planUpdater.statisticsChanged(tableId);
+    }
+
+    private static class PlanUpdater {
+        private final ScheduledExecutorService planUpdater;
+
+        private final AtomicReference<CompletableFuture<PlanInfo>> 
rePlanningFut = new AtomicReference<>(nullCompletedFuture());
+
+        private volatile boolean recalculatePlans;
+
+        private final Cache<CacheKey, CompletableFuture<PlanInfo>> cache;
+
+        private final long plannerTimeout;
+
+        private final PlanPrepare prepare;
+
+        private final IntSupplier catalogVersionSupplier;
+
+        PlanUpdater(
+                ScheduledExecutorService planUpdater,
+                Cache<CacheKey, CompletableFuture<PlanInfo>> cache,
+                long plannerTimeout,
+                PlanPrepare prepare,
+                IntSupplier catalogVersionSupplier
+        ) {
+            this.planUpdater = planUpdater;
+            this.cache = cache;
+            this.plannerTimeout = plannerTimeout;
+            this.prepare = prepare;
+            this.catalogVersionSupplier = catalogVersionSupplier;
+        }
+
+        /**
+         * Reacts to the changed statistic.
+         *
+         * @param tableId Table Id statistic changed for.
+         */
+        void statisticsChanged(int tableId) {
+            Set<Entry<CacheKey, CompletableFuture<PlanInfo>>> cachedEntries = 
cache.entrySet();
+
+            int currentCatalogVersion = catalogVersionSupplier.getAsInt();
+
+            boolean statChanged = false;
+
+            for (Map.Entry<CacheKey, CompletableFuture<PlanInfo>> ent : 
cachedEntries) {
+                CacheKey key = ent.getKey();
+                CompletableFuture<PlanInfo> fut = ent.getValue();
+
+                if (currentCatalogVersion == key.catalogVersion() && 
isCompletedSuccessfully(fut)) {
+                    // no wait, already completed
+                    PlanInfo info = fut.join();
+
+                    if (info.sources.contains(tableId)) {
+                        ent.getKey().invalidate();
+                        statChanged = true;
+                    }
+                }
+            }
+
+            if (statChanged) {
+                recalculatePlans = true;
+            }
+        }
+
+        void start() {
+            planUpdater.scheduleAtFixedRate(() -> {
+                if (!recalculatePlans) {
+                    return;
+                }
+
+                CompletableFuture<PlanInfo> planFut = rePlanningFut.get();
+                if (planFut != null && !planFut.isDone()) {
+                    // some work still in progress
+                    return;
+                } else {
+                    rePlanningFut.set(null);
+                }
+
+                while (recalculatePlans) {
+                    recalculatePlans = false;
+
+                    for (Entry<CacheKey, CompletableFuture<PlanInfo>> ent : 
cache.entrySet()) {
+                        if (!ent.getKey().needInvalidate()) {
+                            continue;
+                        }
+
+                        CacheKey key = ent.getKey();
+                        CompletableFuture<PlanInfo> fut = cache.get(key);
+
+                        // can be evicted
+                        if (fut != null) {
+                            assert isCompletedSuccessfully(fut);
+
+                            PlanInfo info = fut.join();
+
+                            assert info.context != null && info.statement != 
null;
+
+                            int currentCatalogVersion = 
catalogVersionSupplier.getAsInt();
+
+                            if (currentCatalogVersion == key.catalogVersion()) 
{
+                                SqlQueryType queryType = 
info.statement.parsedResult().queryType();
+
+                                if (queryType != SqlQueryType.QUERY && 
queryType != SqlQueryType.DML) {
+                                    assert false : "Unexpected type: " + 
queryType;
+                                    continue;
+                                }
+
+                                PlanningContext planningContext = 
PlanningContext.builder()
+                                        .frameworkConfig(info.context.config())
+                                        
.query(info.statement.parsedResult().originalQuery())
+                                        .plannerTimeout(plannerTimeout)
+                                        
.catalogVersion(info.context.catalogVersion())
+                                        
.defaultSchemaName(info.context.schemaName())
+                                        .parameters(info.context.parameters())
+                                        .explicitTx(info.context.explicitTx())
+                                        .build();
+
+                                CompletableFuture<PlanInfo> newPlanFut =
+                                        prepare.preparePlan(queryType, 
info.statement.parsedResult, planningContext, key);
+
+                                rePlanningFut.updateAndGet(prev -> prev == 
null ? newPlanFut : prev.thenCompose(none -> newPlanFut));
+
+                            } else {
+                                key.invalidated();
+                            }
+                        }
+                    }
+                }
+
+            }, PLAN_UPDATER_INITIAL_DELAY, 1_000, TimeUnit.MILLISECONDS);
+        }
+    }

Review Comment:
   Assume, there is a cache with expiry policy. And two object representing the 
same key: k1 and k2.
   1. I do `cache.put(k1, )`;
   2. Then change state `k1.invalidate()`
   3. Then k1 is expired, but eviction is lazy... so, entry doesn't actually 
removed from cache.
   4. Then I do `cache.put(k2, )`;
   Is there any guarantee that cache entry has k2 or k1?
   
   I bet it depends on implementation. 
   The cache may put a new entry with k2, or can just replace entry value 
keeping k1 in-touched,
   just because k1 and k2 are `equal`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to