AMashenkov commented on code in PR #6593:
URL: https://github.com/apache/ignite-3/pull/6593#discussion_r2387740915


##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java:
##########
@@ -858,6 +1046,150 @@ private static ParameterMetadata 
createParameterMetadata(RelDataType parameterRo
         return new ParameterMetadata(parameterTypes);
     }
 
+    // Although catalog version can evaluate during execution, it`s ok to take 
actual version for this moment.
+    private int directCatalogVersion() {
+        return schemaManager.catalogVersion(clockService.now().longValue());
+    }
+
+    public void statisticsChanged(int tableId) {
+        planUpdater.statisticsChanged(tableId);
+    }
+
+    private static class PlanUpdater {
+        private final ScheduledExecutorService planUpdater;
+
+        private final AtomicReference<CompletableFuture<PlanInfo>> 
rePlanningFut = new AtomicReference<>(nullCompletedFuture());
+
+        private volatile boolean recalculatePlans;
+
+        private final Cache<CacheKey, CompletableFuture<PlanInfo>> cache;
+
+        private final long plannerTimeout;
+
+        private final PlanPrepare prepare;
+
+        private final IntSupplier catalogVersionSupplier;
+
+        PlanUpdater(
+                ScheduledExecutorService planUpdater,
+                Cache<CacheKey, CompletableFuture<PlanInfo>> cache,
+                long plannerTimeout,
+                PlanPrepare prepare,
+                IntSupplier catalogVersionSupplier
+        ) {
+            this.planUpdater = planUpdater;
+            this.cache = cache;
+            this.plannerTimeout = plannerTimeout;
+            this.prepare = prepare;
+            this.catalogVersionSupplier = catalogVersionSupplier;
+        }
+
+        /**
+         * Reacts to the changed statistic.
+         *
+         * @param tableId Table Id statistic changed for.
+         */
+        void statisticsChanged(int tableId) {
+            Set<Entry<CacheKey, CompletableFuture<PlanInfo>>> cachedEntries = 
cache.entrySet();
+
+            int currentCatalogVersion = catalogVersionSupplier.getAsInt();
+
+            boolean statChanged = false;
+
+            for (Map.Entry<CacheKey, CompletableFuture<PlanInfo>> ent : 
cachedEntries) {
+                CacheKey key = ent.getKey();
+                CompletableFuture<PlanInfo> fut = ent.getValue();
+
+                if (currentCatalogVersion == key.catalogVersion() && 
isCompletedSuccessfully(fut)) {
+                    // no wait, already completed
+                    PlanInfo info = fut.join();
+
+                    if (info.sources.contains(tableId)) {
+                        ent.getKey().invalidate();
+                        statChanged = true;
+                    }
+                }
+            }
+
+            if (statChanged) {
+                recalculatePlans = true;
+            }
+        }
+
+        void start() {
+            planUpdater.scheduleAtFixedRate(() -> {
+                if (!recalculatePlans) {
+                    return;
+                }
+
+                CompletableFuture<PlanInfo> planFut = rePlanningFut.get();
+                if (planFut != null && !planFut.isDone()) {
+                    // some work still in progress
+                    return;
+                } else {
+                    rePlanningFut.set(null);
+                }
+
+                while (recalculatePlans) {
+                    recalculatePlans = false;
+
+                    for (Entry<CacheKey, CompletableFuture<PlanInfo>> ent : 
cache.entrySet()) {
+                        if (!ent.getKey().needInvalidate()) {
+                            continue;
+                        }
+
+                        CacheKey key = ent.getKey();
+                        CompletableFuture<PlanInfo> fut = cache.get(key);
+
+                        // can be evicted
+                        if (fut != null) {
+                            assert isCompletedSuccessfully(fut);
+
+                            PlanInfo info = fut.join();
+
+                            assert info.context != null && info.statement != 
null;
+
+                            int currentCatalogVersion = 
catalogVersionSupplier.getAsInt();
+
+                            if (currentCatalogVersion == key.catalogVersion()) 
{
+                                SqlQueryType queryType = 
info.statement.parsedResult().queryType();
+
+                                if (queryType != SqlQueryType.QUERY && 
queryType != SqlQueryType.DML) {
+                                    assert false : "Unexpected type: " + 
queryType;
+                                    continue;
+                                }

Review Comment:
   When you write just an `assert` - it means you do not expect any unwanted 
values.
   When you write `if-continue` - it means you expect unwanted values, but 
skips them.
   Having both `if` and `assert` looks weird.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to