This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new eed4ba0edd8 IGNITE-27477 Raise LWM on recovery if it lags behind 
Catalog history (#7330)
eed4ba0edd8 is described below

commit eed4ba0edd8cb2411a9646aba88b44a84a652824
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Tue Dec 30 17:59:15 2025 +0400

    IGNITE-27477 Raise LWM on recovery if it lags behind Catalog history (#7330)
---
 modules/catalog-compaction/build.gradle            |  2 +
 .../compaction/CatalogCompactionRunner.java        | 20 ++++++---
 .../CatalogCompactionRunnerSelfTest.java           |  2 +
 .../apache/ignite/internal/index/IndexManager.java |  3 +-
 .../ignite/internal/lowwatermark/LowWatermark.java | 11 +++++
 .../internal/lowwatermark/LowWatermarkImpl.java    | 18 ++++++--
 .../internal/lowwatermark/TestLowWatermark.java    |  5 +++
 .../partition/replicator/fixtures/Node.java        |  6 +--
 .../PartitionReplicaLifecycleManager.java          |  4 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |  7 +---
 .../internal/app/LowWatermarkRectifier.java}       | 49 ++++++++++++----------
 .../CatalogStorageIndexDescriptorSupplier.java     |  4 +-
 .../internal/table/distributed/TableManager.java   | 11 ++---
 .../table/distributed/index/IndexMetaStorage.java  |  3 +-
 14 files changed, 86 insertions(+), 59 deletions(-)

diff --git a/modules/catalog-compaction/build.gradle 
b/modules/catalog-compaction/build.gradle
index d7ffe10d047..5dbee301560 100644
--- a/modules/catalog-compaction/build.gradle
+++ b/modules/catalog-compaction/build.gradle
@@ -38,6 +38,7 @@ dependencies {
     implementation project(':ignite-table')
     implementation project(':ignite-schema')
     implementation project(':ignite-transactions')
+    implementation project(':ignite-low-watermark')
 
     implementation libs.jetbrains.annotations
     implementation libs.fastutil.core
@@ -47,6 +48,7 @@ dependencies {
     testImplementation testFixtures(project(':ignite-metastorage'))
     testImplementation testFixtures(project(':ignite-catalog'))
     testImplementation testFixtures(project(':ignite-failure-handler'))
+    testImplementation testFixtures(project(':ignite-low-watermark'))
     testImplementation libs.awaitility
 
     integrationTestImplementation libs.fastutil.core
diff --git 
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java
 
b/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java
index 5e981fe731e..c4521564770 100644
--- 
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java
+++ 
b/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java
@@ -66,6 +66,9 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.lowwatermark.LowWatermark;
+import 
org.apache.ignite.internal.lowwatermark.event.ChangeLowWatermarkEventParameters;
+import org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.network.InternalClusterNode;
@@ -150,6 +153,8 @@ public class CatalogCompactionRunner implements 
IgniteComponent {
 
     private final TopologyService topologyService;
 
+    private final LowWatermark lowWatermark;
+
     private final RebalanceMinimumRequiredTimeProvider 
rebalanceMinimumRequiredTimeProvider;
 
     private CompletableFuture<Void> lastRunFuture = 
CompletableFutures.nullCompletedFuture();
@@ -161,7 +166,7 @@ public class CatalogCompactionRunner implements 
IgniteComponent {
      */
     private volatile @Nullable String compactionCoordinatorNodeName;
 
-    private volatile HybridTimestamp lowWatermark;
+    private volatile HybridTimestamp lowWatermarkValue;
 
     private volatile UUID localNodeId;
 
@@ -178,6 +183,7 @@ public class CatalogCompactionRunner implements 
IgniteComponent {
             ClockService clockService,
             SchemaSyncService schemaSyncService,
             TopologyService topologyService,
+            LowWatermark lowWatermark,
             ActiveLocalTxMinimumRequiredTimeProvider 
activeLocalTxMinimumRequiredTimeProvider,
             MinimumRequiredTimeCollectorService 
minimumRequiredTimeCollectorService,
             RebalanceMinimumRequiredTimeProvider 
rebalanceMinimumRequiredTimeProvider
@@ -189,6 +195,7 @@ public class CatalogCompactionRunner implements 
IgniteComponent {
         this.clockService = clockService;
         this.schemaSyncService = schemaSyncService;
         this.topologyService = topologyService;
+        this.lowWatermark = lowWatermark;
         this.placementDriver = placementDriver;
         this.replicaService = replicaService;
         this.activeLocalTxMinimumRequiredTimeProvider = 
activeLocalTxMinimumRequiredTimeProvider;
@@ -199,6 +206,9 @@ public class CatalogCompactionRunner implements 
IgniteComponent {
 
     @Override
     public CompletableFuture<Void> startAsync(ComponentContext 
componentContext) {
+        lowWatermark.listen(LowWatermarkEvent.LOW_WATERMARK_CHANGED,
+                params -> 
onLowWatermarkChanged(((ChangeLowWatermarkEventParameters) 
params).newLowWatermark()));
+
         
messagingService.addMessageHandler(CatalogCompactionMessageGroup.class, new 
CatalogCompactionMessageHandler());
 
         localNodeId = topologyService.localMember().id();
@@ -223,7 +233,7 @@ public class CatalogCompactionRunner implements 
IgniteComponent {
     public void updateCoordinator(InternalClusterNode newCoordinator) {
         compactionCoordinatorNodeName = newCoordinator.name();
 
-        triggerCompaction(lowWatermark);
+        triggerCompaction(lowWatermarkValue);
     }
 
     /** Returns local view of the node on who is currently compaction 
coordinator. For test purposes only. */
@@ -238,8 +248,8 @@ public class CatalogCompactionRunner implements 
IgniteComponent {
     }
 
     /** Called when the low watermark has been changed. */
-    public CompletableFuture<Boolean> onLowWatermarkChanged(HybridTimestamp 
newLowWatermark) {
-        lowWatermark = newLowWatermark;
+    CompletableFuture<Boolean> onLowWatermarkChanged(HybridTimestamp 
newLowWatermark) {
+        lowWatermarkValue = newLowWatermark;
 
         triggerCompaction(newLowWatermark);
 
@@ -709,7 +719,7 @@ public class CatalogCompactionRunner implements 
IgniteComponent {
         }
 
         private void handleMinimumTimesRequest(InternalClusterNode sender, 
Long correlationId) {
-            HybridTimestamp lwm = lowWatermark;
+            HybridTimestamp lwm = lowWatermarkValue;
             LocalMinTime minLocalTime;
 
             if (lwm != null) {
diff --git 
a/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunnerSelfTest.java
 
b/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunnerSelfTest.java
index 19e360e8ec9..24afdffde7e 100644
--- 
a/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunnerSelfTest.java
+++ 
b/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunnerSelfTest.java
@@ -99,6 +99,7 @@ import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.index.IndexNodeFinishedRwTransactionsChecker;
 import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.lowwatermark.TestLowWatermark;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.network.ClusterNodeImpl;
 import org.apache.ignite.internal.network.InternalClusterNode;
@@ -1248,6 +1249,7 @@ public class CatalogCompactionRunnerSelfTest extends 
AbstractCatalogCompactionTe
                 clockService,
                 schemaSyncService,
                 topologyService,
+                new TestLowWatermark(),
                 clockService::nowLong,
                 minTimeCollector,
                 rebalanceMinimumRequiredTimeProvider
diff --git 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index cef5ae464b6..c784f5ef218 100644
--- 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++ 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -23,7 +23,6 @@ import static 
org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_CREAT
 import static 
org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_REMOVED;
 import static org.apache.ignite.internal.event.EventListener.fromConsumer;
 import static 
org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent.LOW_WATERMARK_CHANGED;
-import static 
org.apache.ignite.internal.partition.replicator.SafeLowWatermarkUtils.catalogSafeLowWatermark;
 import static 
org.apache.ignite.internal.table.distributed.index.IndexUtils.registerIndexToTable;
 import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -330,7 +329,7 @@ public class IndexManager implements IgniteComponent {
     /** Recover deferred destroy events. */
     private void recoverDestructionQueue() {
         // LWM starts updating only after the node is restored.
-        HybridTimestamp lwm = catalogSafeLowWatermark(lowWatermark, 
catalogService);
+        HybridTimestamp lwm = lowWatermark.getLowWatermark();
 
         int earliestCatalogVersion = lwm == null
                 ? catalogService.earliestCatalogVersion()
diff --git 
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermark.java
 
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermark.java
index 3164619628b..3184458ab53 100644
--- 
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermark.java
+++ 
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermark.java
@@ -51,6 +51,17 @@ public interface LowWatermark extends 
EventProducer<LowWatermarkEvent, LowWaterm
      */
     void updateLowWatermark(HybridTimestamp newLowWatermark);
 
+    /**
+     * Sets the low watermark during node recovery.
+     *
+     * <p>This method sets the low watermark immediately and unconditionally, 
without checking if the new value is higher than
+     * the current one. Listeners are not notified about the change; even 
more, they must not be registered at this point.
+     * It is intended to be used only during node recovery.
+     *
+     * @param newLowWatermark New low watermark.
+     */
+    void setLowWatermarkOnRecovery(HybridTimestamp newLowWatermark);
+
     /**
      * Locks the low watermark at the provided timestamp (prevents it from 
being updated to a value higher than the provided one).
      *
diff --git 
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java
 
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java
index b9ea21be866..a82849b7d88 100644
--- 
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java
+++ 
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java
@@ -164,7 +164,10 @@ public class LowWatermarkImpl extends 
AbstractEventProducer<LowWatermarkEvent, L
     @Override
     public CompletableFuture<Void> startAsync(ComponentContext 
componentContext) {
         return inBusyLockAsync(busyLock, () -> {
-            setLowWatermarkOnRecovery(readLowWatermarkFromVault());
+            HybridTimestamp lwmFromVault = readLowWatermarkFromVault();
+            if (lwmFromVault != null) {
+                setLowWatermarkOnRecovery(lwmFromVault);
+            }
 
             messagingService.addMessageHandler(LowWatermarkMessageGroup.class, 
this::onReceiveNetworkMessage);
 
@@ -291,11 +294,14 @@ public class LowWatermarkImpl extends 
AbstractEventProducer<LowWatermarkEvent, L
         lowWatermark = newLowWatermark;
     }
 
-    private void setLowWatermarkOnRecovery(@Nullable HybridTimestamp 
newLowWatermark) {
+    @Override
+    public void setLowWatermarkOnRecovery(HybridTimestamp newLowWatermark) {
         updateLowWatermarkLock.writeLock().lock();
 
         try {
             lowWatermark = newLowWatermark;
+
+            saveWatermarkToVault(newLowWatermark);
         } finally {
             updateLowWatermarkLock.writeLock().unlock();
         }
@@ -383,9 +389,9 @@ public class LowWatermarkImpl extends 
AbstractEventProducer<LowWatermarkEvent, L
 
     CompletableFuture<Void> updateAndNotify(HybridTimestamp newLowWatermark) {
         return inBusyLockAsync(busyLock, () -> {
-                    vaultManager.put(LOW_WATERMARK_VAULT_KEY, 
newLowWatermark.toBytes());
+            saveWatermarkToVault(newLowWatermark);
 
-                    return waitForLocksAndSetLowWatermark(newLowWatermark)
+            return waitForLocksAndSetLowWatermark(newLowWatermark)
                             .thenComposeAsync(unused2 -> fireEvent(
                                     LOW_WATERMARK_CHANGED,
                                     new 
ChangeLowWatermarkEventParameters(newLowWatermark)), scheduledThreadPool)
@@ -404,6 +410,10 @@ public class LowWatermarkImpl extends 
AbstractEventProducer<LowWatermarkEvent, L
         );
     }
 
+    private void saveWatermarkToVault(HybridTimestamp newLowWatermark) {
+        vaultManager.put(LOW_WATERMARK_VAULT_KEY, newLowWatermark.toBytes());
+    }
+
     private CompletableFuture<Void> 
waitForLocksAndSetLowWatermark(HybridTimestamp newLowWatermark) {
         return inBusyLockAsync(busyLock, () -> {
             // Write lock so no new LWM locks can be added.
diff --git 
a/modules/low-watermark/src/testFixtures/java/org/apache/ignite/internal/lowwatermark/TestLowWatermark.java
 
b/modules/low-watermark/src/testFixtures/java/org/apache/ignite/internal/lowwatermark/TestLowWatermark.java
index 66c6798fbcc..8af39abc56a 100644
--- 
a/modules/low-watermark/src/testFixtures/java/org/apache/ignite/internal/lowwatermark/TestLowWatermark.java
+++ 
b/modules/low-watermark/src/testFixtures/java/org/apache/ignite/internal/lowwatermark/TestLowWatermark.java
@@ -85,6 +85,11 @@ public class TestLowWatermark extends 
AbstractEventProducer<LowWatermarkEvent, L
         }
     }
 
+    @Override
+    public void setLowWatermarkOnRecovery(HybridTimestamp newLowWatermark) {
+        setLowWatermark(newLowWatermark);
+    }
+
     @Override
     public boolean tryLock(UUID lockId, HybridTimestamp lockTs) {
         updateLowWatermarkLock.readLock().lock();
diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
index b4e6cb0506c..00099da8df1 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
@@ -100,8 +100,6 @@ import 
org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.lowwatermark.LowWatermarkImpl;
-import 
org.apache.ignite.internal.lowwatermark.event.ChangeLowWatermarkEventParameters;
-import org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
@@ -688,15 +686,13 @@ public class Node {
                 clockService,
                 schemaSyncService,
                 clusterService.topologyService(),
+                lowWatermark,
                 clockService::nowLong,
                 minTimeCollectorService,
                 new 
RebalanceMinimumRequiredTimeProviderImpl(metaStorageManager, catalogManager));
 
         
metaStorageManager.addElectionListener(catalogCompactionRunner::updateCoordinator);
 
-        lowWatermark.listen(LowWatermarkEvent.LOW_WATERMARK_CHANGED,
-                params -> 
catalogCompactionRunner.onLowWatermarkChanged(((ChangeLowWatermarkEventParameters)
 params).newLowWatermark()));
-
         SystemDistributedConfiguration systemDistributedConfiguration =
                 
clusterConfigRegistry.getConfiguration(SystemDistributedExtensionConfiguration.KEY).system();
 
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index c4b66184794..742e7886382 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -59,7 +59,6 @@ import static 
org.apache.ignite.internal.partition.replicator.LocalPartitionRepl
 import static 
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEvent.AFTER_REPLICA_STOPPED;
 import static 
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEvent.BEFORE_REPLICA_DESTROYED;
 import static 
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEvent.BEFORE_REPLICA_STOPPED;
-import static 
org.apache.ignite.internal.partition.replicator.SafeLowWatermarkUtils.catalogSafeLowWatermark;
 import static 
org.apache.ignite.internal.partitiondistribution.Assignments.assignmentListToString;
 import static 
org.apache.ignite.internal.partitiondistribution.PartitionDistributionUtils.calculateAssignmentForPartition;
 import static 
org.apache.ignite.internal.partitiondistribution.PartitionDistributionUtils.calculateAssignments;
@@ -477,8 +476,7 @@ public class PartitionReplicaLifecycleManager extends
 
         handleResourcesForDroppedZonesOnRecovery();
 
-        HybridTimestamp safeLwm = catalogSafeLowWatermark(lowWatermark, 
catalogService);
-        CompletableFuture<Void> processZonesAndAssignmentsOnStart = 
processZonesOnStart(recoveryRevision, safeLwm)
+        CompletableFuture<Void> processZonesAndAssignmentsOnStart = 
processZonesOnStart(recoveryRevision, lowWatermark.getLowWatermark())
                 .thenCompose(ignored -> 
processAssignmentsOnRecovery(recoveryRevision));
 
         metaStorageMgr.registerPrefixWatch(new 
ByteArray(PENDING_ASSIGNMENTS_QUEUE_PREFIX_BYTES), 
pendingAssignmentsRebalanceListener);
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 91460355457..a4c140fbabe 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -157,8 +157,6 @@ import 
org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.lowwatermark.LowWatermarkImpl;
-import 
org.apache.ignite.internal.lowwatermark.event.ChangeLowWatermarkEventParameters;
-import org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.cache.IdempotentCacheVacuumizer;
@@ -1032,6 +1030,7 @@ public class IgniteImpl implements Ignite {
                 clockService,
                 schemaSyncService,
                 clusterSvc.topologyService(),
+                lowWatermark,
                 indexNodeFinishedRwTransactionsChecker,
                 minTimeCollectorService,
                 new RebalanceMinimumRequiredTimeProviderImpl(metaStorageMgr, 
catalogManager)
@@ -1042,9 +1041,6 @@ public class IgniteImpl implements Ignite {
 
         killCommandHandler = new KillCommandHandler(name, 
logicalTopologyService, clusterSvc.messagingService());
 
-        lowWatermark.listen(LowWatermarkEvent.LOW_WATERMARK_CHANGED,
-                params -> 
catalogCompactionRunner.onLowWatermarkChanged(((ChangeLowWatermarkEventParameters)
 params).newLowWatermark()));
-
         resourcesRegistry = new RemotelyTriggeredResourceRegistry();
 
         var transactionInflights = new 
TransactionInflights(placementDriverMgr.placementDriver(), clockService);
@@ -1604,6 +1600,7 @@ public class IgniteImpl implements Ignite {
                         lifecycleManager.startComponentsAsync(
                                 componentContext,
                                 catalogManager,
+                                new LowWatermarkRectifier(lowWatermark, 
catalogManager),
                                 catalogCompactionRunner,
                                 indexMetaStorage,
                                 clusterCfgMgr,
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/SafeLowWatermarkUtils.java
 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/LowWatermarkRectifier.java
similarity index 50%
rename from 
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/SafeLowWatermarkUtils.java
rename to 
modules/runner/src/main/java/org/apache/ignite/internal/app/LowWatermarkRectifier.java
index a7de6435fbb..205580ee98e 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/SafeLowWatermarkUtils.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/LowWatermarkRectifier.java
@@ -15,42 +15,45 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.partition.replicator;
+package org.apache.ignite.internal.app;
 
 import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
+import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lowwatermark.LowWatermark;
+import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.manager.IgniteComponent;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Utility class for safe low watermark operations.
+ * Component that rectifies the low watermark on node startup if it is lower 
than the earliest catalog timestamp.
  */
-public class SafeLowWatermarkUtils {
-    /**
-     * Returns a catalog-safe low watermark (that is, a timestamp that is not 
lower than current LWM, but can be safely used
-     * to access the catalog).
-     * If the low watermark is {@code null}, {@code null} is returned. If the 
low watermark is less than the earliest catalog timestamp,
-     * the earliest catalog timestamp is returned. Otherwise, the original low 
watermark is returned.
-     *
-     * @param watermark The low watermark.
-     * @param catalogService The catalog service.
-     * @return A catalog-safe low watermark or {@code null}.
-     */
-    public static @Nullable HybridTimestamp 
catalogSafeLowWatermark(LowWatermark watermark, CatalogService catalogService) {
-        HybridTimestamp lwmTimestamp = watermark.getLowWatermark();
-
-        if (lwmTimestamp == null) {
-            return null;
-        }
+class LowWatermarkRectifier implements IgniteComponent {
+    private final LowWatermark lowWatermark;
+    private final CatalogService catalogService;
+
+    LowWatermarkRectifier(LowWatermark lowWatermark, CatalogService 
catalogService) {
+        this.lowWatermark = lowWatermark;
+        this.catalogService = catalogService;
+    }
 
+    @Override
+    public CompletableFuture<Void> startAsync(ComponentContext 
componentContext) {
         HybridTimestamp earliestCatalogTimestamp = 
hybridTimestamp(catalogService.earliestCatalog().time());
+        @Nullable HybridTimestamp lwm = lowWatermark.getLowWatermark();
 
-        if (lwmTimestamp.compareTo(earliestCatalogTimestamp) < 0) {
-            return earliestCatalogTimestamp;
-        } else {
-            return lwmTimestamp;
+        if (lwm != null && lwm.compareTo(earliestCatalogTimestamp) < 0) {
+            lowWatermark.setLowWatermarkOnRecovery(earliestCatalogTimestamp);
         }
+
+        return nullCompletedFuture();
+    }
+
+    @Override
+    public CompletableFuture<Void> stopAsync(ComponentContext 
componentContext) {
+        return nullCompletedFuture();
     }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/CatalogStorageIndexDescriptorSupplier.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/CatalogStorageIndexDescriptorSupplier.java
index 600c23ebd91..c7d97a8d647 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/CatalogStorageIndexDescriptorSupplier.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/CatalogStorageIndexDescriptorSupplier.java
@@ -17,8 +17,6 @@
 
 package org.apache.ignite.internal.table.distributed;
 
-import static 
org.apache.ignite.internal.partition.replicator.SafeLowWatermarkUtils.catalogSafeLowWatermark;
-
 import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
@@ -49,7 +47,7 @@ class CatalogStorageIndexDescriptorSupplier implements 
StorageIndexDescriptorSup
 
         // Get the current Low Watermark value. Since this class is used only 
on recovery, we expect that this value will not change
         // concurrently.
-        HybridTimestamp lowWatermarkTimestamp = 
catalogSafeLowWatermark(lowWatermark, catalogService);
+        HybridTimestamp lowWatermarkTimestamp = lowWatermark.getLowWatermark();
 
         int earliestCatalogVersion = lowWatermarkTimestamp == null
                 ? catalogService.earliestCatalogVersion()
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 2743476c67e..bb649037bfe 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -33,7 +33,6 @@ import static 
org.apache.ignite.internal.event.EventListener.fromConsumer;
 import static 
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEvent.AFTER_REPLICA_DESTROYED;
 import static 
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEvent.AFTER_REPLICA_STOPPED;
 import static 
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEvent.BEFORE_REPLICA_STARTED;
-import static 
org.apache.ignite.internal.partition.replicator.SafeLowWatermarkUtils.catalogSafeLowWatermark;
 import static 
org.apache.ignite.internal.table.distributed.TableUtils.aliveTables;
 import static 
org.apache.ignite.internal.table.distributed.index.IndexUtils.registerIndexesToTable;
 import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
@@ -532,9 +531,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
 
             rebalanceRetryDelayConfiguration.init();
 
-            @Nullable HybridTimestamp lwm = 
catalogSafeLowWatermark(lowWatermark, catalogService);
-
-            cleanUpResourcesForDroppedTablesOnRecoveryBusy(lwm);
+            cleanUpResourcesForDroppedTablesOnRecoveryBusy();
 
             catalogService.listen(CatalogEvent.TABLE_CREATE, 
onTableCreateListener);
             catalogService.listen(CatalogEvent.TABLE_DROP, 
onTableDropListener);
@@ -550,7 +547,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
 
             long recoveryRevision = recoveryFinishFuture.join().revision();
 
-            return recoverTables(recoveryRevision, lwm);
+            return recoverTables(recoveryRevision, 
lowWatermark.getLowWatermark());
         });
     }
 
@@ -1860,10 +1857,10 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         }
     }
 
-    private void cleanUpResourcesForDroppedTablesOnRecoveryBusy(@Nullable 
HybridTimestamp lwm) {
+    private void cleanUpResourcesForDroppedTablesOnRecoveryBusy() {
         // TODO: IGNITE-20384 Clean up abandoned resources for dropped tables 
from vault and metastore
 
-        Set<Integer> aliveTableIds = aliveTables(catalogService, lwm);
+        Set<Integer> aliveTableIds = aliveTables(catalogService, 
lowWatermark.getLowWatermark());
 
         destroyMvStoragesForTablesNotIn(aliveTableIds);
     }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorage.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorage.java
index 11d049d9e73..9a6afb2df4c 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorage.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorage.java
@@ -37,7 +37,6 @@ import static 
org.apache.ignite.internal.metastorage.dsl.Conditions.value;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.remove;
-import static 
org.apache.ignite.internal.partition.replicator.SafeLowWatermarkUtils.catalogSafeLowWatermark;
 import static 
org.apache.ignite.internal.table.distributed.index.MetaIndexStatus.READ_ONLY;
 import static 
org.apache.ignite.internal.table.distributed.index.MetaIndexStatus.REMOVED;
 import static 
org.apache.ignite.internal.table.distributed.index.MetaIndexStatus.statusOnRemoveIndex;
@@ -323,7 +322,7 @@ public class IndexMetaStorage implements IgniteComponent {
     }
 
     private CompletableFuture<Void> recoverIndexMetas() {
-        int lwmCatalogVersion = 
lwmCatalogVersion(catalogSafeLowWatermark(lowWatermark, catalogService));
+        int lwmCatalogVersion = 
lwmCatalogVersion(lowWatermark.getLowWatermark());
         int latestCatalogVersion = catalogService.latestCatalogVersion();
         int startCatalogVersion = Math.max(lwmCatalogVersion, 
catalogService.earliestCatalogVersion());
 

Reply via email to