lowka commented on code in PR #4256: URL: https://github.com/apache/ignite-3/pull/4256#discussion_r1732746216
########## modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java: ########## @@ -247,30 +238,62 @@ void triggerCompaction(@Nullable HybridTimestamp lwm) { return; } - lastRunFuture = startCompaction(logicalTopologyService.localLogicalTopology()); + lastRunFuture = startCompaction(lwm, logicalTopologyService.localLogicalTopology()); } }); } - private CompletableFuture<Void> startCompaction(LogicalTopologySnapshot topologySnapshot) { - long localMinimum = localMinTimeProvider.time(); + private @Nullable Long getMinLocalTime(HybridTimestamp lwm) { + Map<TablePartitionId, @Nullable Long> partitionStates = localMinTimeProvider.minTimePerPartition(); - if (catalogManagerFacade.catalogByTsNullable(localMinimum) == null) { - LOG.info("Catalog compaction skipped, nothing to compact [timestamp={}].", localMinimum); + // Find the minimum time among all partitions. + long partitionMinTime = Long.MAX_VALUE; - return CompletableFutures.nullCompletedFuture(); + for (Map.Entry<TablePartitionId, Long> e : partitionStates.entrySet()) { + Long state = e.getValue(); + + if (state == null) { + LOG.debug("Partition state is missing [partition={}, all={}]", e.getKey(), partitionStates); + return null; + } + + partitionMinTime = Math.min(partitionMinTime, state); } - return determineGlobalMinimumRequiredTime(topologySnapshot.nodes(), localMinimum) + // Choose the minimum time between the low watermark and the minimum time among all partitions. + long chosenMinTime = Math.min(lwm.longValue(), partitionMinTime); + + LOG.debug("Local minimum required time: [partitionMinTime={}, lowWatermark={}, chosen={}]", + partitionMinTime, + lwm, + chosenMinTime + ); + + return chosenMinTime; + } + + private CompletableFuture<Void> startCompaction(HybridTimestamp lwm, LogicalTopologySnapshot topologySnapshot) { + LOG.info("Catalog compaction started at [lowWaterMark={}]", lwm); + + Long localMinRequiredTime = getMinLocalTime(lwm); + + if (localMinRequiredTime == null) { + // If do not have local time yet, Use a placeholder value that is going to be overwritten. + localMinRequiredTime = Long.MAX_VALUE; + } + + return determineGlobalMinimumRequiredTime(topologySnapshot.nodes(), localMinRequiredTime) .thenComposeAsync(timeHolder -> { long minRequiredTime = timeHolder.minRequiredTime; long minActiveTxBeginTime = timeHolder.minActiveTxBeginTime; + Catalog catalog = catalogManagerFacade.catalogByTsNullable(minRequiredTime); CompletableFuture<Boolean> catalogCompactionFut; if (catalog == null) { - LOG.info("Catalog compaction skipped, nothing to compact [timestamp={}].", minRequiredTime); + LOG.info("Catalog compaction skipped, nothing to compact [timestamp={}]. No catalog at minRequiredTime", Review Comment: Fixed. ########## modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java: ########## @@ -247,30 +238,62 @@ void triggerCompaction(@Nullable HybridTimestamp lwm) { return; } - lastRunFuture = startCompaction(logicalTopologyService.localLogicalTopology()); + lastRunFuture = startCompaction(lwm, logicalTopologyService.localLogicalTopology()); } }); } - private CompletableFuture<Void> startCompaction(LogicalTopologySnapshot topologySnapshot) { - long localMinimum = localMinTimeProvider.time(); + private @Nullable Long getMinLocalTime(HybridTimestamp lwm) { + Map<TablePartitionId, @Nullable Long> partitionStates = localMinTimeProvider.minTimePerPartition(); - if (catalogManagerFacade.catalogByTsNullable(localMinimum) == null) { - LOG.info("Catalog compaction skipped, nothing to compact [timestamp={}].", localMinimum); + // Find the minimum time among all partitions. + long partitionMinTime = Long.MAX_VALUE; - return CompletableFutures.nullCompletedFuture(); + for (Map.Entry<TablePartitionId, Long> e : partitionStates.entrySet()) { + Long state = e.getValue(); + + if (state == null) { + LOG.debug("Partition state is missing [partition={}, all={}]", e.getKey(), partitionStates); Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org