zstan commented on code in PR #6649:
URL: https://github.com/apache/ignite-3/pull/6649#discussion_r2465641101
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java:
##########
@@ -97,63 +117,7 @@ public void changesNotifier(StatisticUpdatesSupplier
updater) {
*/
@Override
public long tableSize(int tableId) {
- updateTableSizeStatistics(tableId, false);
-
- return tableSizeMap.getOrDefault(tableId, DEFAULT_VALUE).getSize();
- }
-
- /** Update table size statistic in the background if it required. */
- private void updateTableSizeStatistics(int tableId, boolean force) {
- TableViewInternal tableView = tableManager.cachedTable(tableId);
- if (tableView == null) {
- LOG.debug("There is no table to update statistics [id={}].",
tableId);
- return;
- }
-
- ActualSize tableSize = tableSizeMap.get(tableId);
- if (tableSize == null) {
- // has been concurrently cleaned up, no need more update statistic
for the table.
- return;
- }
-
- long currTimestamp = FastTimestamps.coarseCurrentTimeMillis();
- long lastUpdateTime = tableSize.getTimestamp();
-
- if (force || lastUpdateTime <= currTimestamp -
thresholdTimeToPostponeUpdateMs) {
- // Prevent to run update for the same table twice concurrently.
- if (!force && !tableSizeMap.replace(tableId, tableSize, new
ActualSize(tableSize.getSize(), currTimestamp - 1))) {
- return;
- }
-
- // just request new table size in background.
- CompletableFuture<Void> updateResult =
tableView.internalTable().estimatedSize()
- .thenAccept(size -> {
- // the table can be concurrently dropped and we
shouldn't put new value in this case.
- tableSizeMap.computeIfPresent(tableId, (k, v) -> {
- // Discard current computation if value in cache
is newer than current one.
- if (v.timestamp >= currTimestamp) {
- return v;
- }
-
- return new ActualSize(Math.max(size, 1),
currTimestamp);
- });
- }).handle((res, err) -> {
- if (err != null) {
- LOG.warn(format("Can't calculate size for table
[id={}].", tableId), err);
-
- return null;
- } else {
- StatisticUpdatesSupplier supplier =
changesSupplier.get();
- if (supplier != null) {
- supplier.accept(tableId);
- }
-
- return res;
- }
- });
-
- latestUpdateFut.updateAndGet(prev -> prev == null ? updateResult :
prev.thenCompose(none -> updateResult));
- }
+ return tableSizeMap.computeIfAbsent(tableId, k ->
DEFAULT_VALUE).getSize();
Review Comment:
done
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java:
##########
@@ -69,15 +74,30 @@ public class SqlStatisticManagerImpl implements
SqlStatisticUpdateManager {
private final AtomicReference<StatisticUpdatesSupplier> changesSupplier =
new AtomicReference<>();
/* Contains all known table id's with statistics. */
- private final ConcurrentMap<Integer, ActualSize> tableSizeMap = new
ConcurrentHashMap<>();
+ final ConcurrentMap<Integer, ActualSize> tableSizeMap = new
ConcurrentHashMap<>();
- private volatile long thresholdTimeToPostponeUpdateMs =
TimeUnit.MINUTES.toMillis(1);
+ /* Contain dropped tables, can`t update statistic for such case. */
+ Set<Integer> droppedTables = Collections.newSetFromMap(new
ConcurrentHashMap<>());
+
+ private final ScheduledExecutorService scheduler;
+ private final StatisticAggregator<Collection<InternalTable>,
CompletableFuture<Map<Integer, PartitionModificationInfo>>> statSupplier;
+
+ static final long INITIAL_DELAY = 5_000;
+ static final long REFRESH_PERIOD = 2_000; // !!!!
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]