AMashenkov commented on code in PR #6649:
URL: https://github.com/apache/ignite-3/pull/6649#discussion_r2480648075


##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java:
##########
@@ -138,7 +138,8 @@ public class PrepareServiceImpl implements PrepareService {
 
     private static final String PLANNING_EXECUTOR_SOURCE_NAME = 
THREAD_POOLS_METRICS_SOURCE_NAME + "sql-planning-executor";
 
-    public static final int PLAN_UPDATER_INITIAL_DELAY = 2_000;
+    public static final int PLAN_UPDATER_INITIAL_DELAY = 5_000;

Review Comment:
   Create a ticket to move these properties to configuration.



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java:
##########
@@ -171,6 +136,65 @@ public void start() {
                 tableSizeMap.putIfAbsent(table.id(), DEFAULT_VALUE);
             }
         }
+
+        scheduler.scheduleAtFixedRate(this::update, INITIAL_DELAY, 
REFRESH_PERIOD, TimeUnit.MILLISECONDS);
+    }
+
+    private void update() {
+        if (!latestUpdateFut.get().isDone()) {
+            return;
+        }
+
+        Collection<InternalTable> tables = new 
ArrayList<>(tableSizeMap.size());
+
+        for (Map.Entry<Integer, ActualSize> ent : tableSizeMap.entrySet()) {
+            Integer tableId = ent.getKey();
+
+            if (droppedTables.contains(tableId)) {
+                continue;
+            }
+
+            TableViewInternal tableView = tableManager.cachedTable(tableId);
+
+            if (tableView == null) {
+                LOG.debug("No table found to update statistics [id={}].", 
ent.getKey());
+            } else {
+                tables.add(tableView.internalTable());
+            }
+        }
+
+        CompletableFuture<Void> updateResult = 
statSupplier.estimatedSizeWithLastUpdate(tables)
+                .handle((infos, err) -> {
+                    for (Map.Entry<Integer, PartitionModificationInfo> ent : 
infos.entrySet()) {

Review Comment:
   Is it possible to return Int2ObjectMap here to avoid boxing?



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java:
##########
@@ -196,25 +221,46 @@ private void 
onLwmChanged(ChangeLowWatermarkEventParameters parameters) {
         List<DestroyTableEvent> events = 
destructionEventsQueue.drainUpTo(earliestVersion);
 
         events.forEach(event -> tableSizeMap.remove(event.tableId()));
+        events.forEach(event -> droppedTables.remove(event.tableId()));
     }
 
     /** Timestamped size. */
-    private static class ActualSize {
-        long timestamp;
+    static class ActualSize {
+        long modificationCounter;

Review Comment:
   It is versy similar to PartitionModificationInfo.
   Do we really need both?



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java:
##########
@@ -196,25 +221,46 @@ private void 
onLwmChanged(ChangeLowWatermarkEventParameters parameters) {
         List<DestroyTableEvent> events = 
destructionEventsQueue.drainUpTo(earliestVersion);
 
         events.forEach(event -> tableSizeMap.remove(event.tableId()));
+        events.forEach(event -> droppedTables.remove(event.tableId()));
     }
 
     /** Timestamped size. */
-    private static class ActualSize {
-        long timestamp;
+    static class ActualSize {
+        long modificationCounter;

Review Comment:
   >  Timestamped size.
   Javadoc is incorrect.
   Maybe rename to  VersionSize and modificationCounter is a version.



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java:
##########
@@ -69,15 +75,30 @@ public class SqlStatisticManagerImpl implements 
SqlStatisticUpdateManager {
     private final AtomicReference<StatisticUpdatesSupplier> changesSupplier = 
new AtomicReference<>();
 
     /* Contains all known table id's with statistics. */
-    private final ConcurrentMap<Integer, ActualSize> tableSizeMap = new 
ConcurrentHashMap<>();
+    final ConcurrentMap<Integer, ActualSize> tableSizeMap = new 
ConcurrentHashMap<>();
 
-    private volatile long thresholdTimeToPostponeUpdateMs = 
TimeUnit.MINUTES.toMillis(1);
+    /* Contain dropped tables, can`t update statistic for such case. */
+    Set<Integer> droppedTables = Collections.newSetFromMap(new 
ConcurrentHashMap<>());
+
+    private final ScheduledExecutorService scheduler;
+    private final StatisticAggregator<Collection<InternalTable>, 
CompletableFuture<Map<Integer, PartitionModificationInfo>>> statSupplier;
+
+    static final long INITIAL_DELAY = 15_000;
+    static final long REFRESH_PERIOD = 15_000;

Review Comment:
   Create a ticket to move these properties to configuration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to