This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-21585
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 043661bf69ac13c2f214e1cab0a227f25aeca0ea
Author: amashenkov <andrey.mashen...@gmail.com>
AuthorDate: Tue Mar 5 13:42:32 2024 +0300

    LowWatermark refactored, interface extracted, added dummy implementation.
    Subscribe IndexManager and TableManager on LWM updates.
---
 .../internal/catalog/CatalogManagerImpl.java       |   7 +
 .../ignite/internal/catalog/CatalogService.java    |   3 +
 .../ignite/internal/catalog/CatalogTestUtils.java  |  27 ---
 .../ignite/client/handler/FakeCatalogService.java  |   5 +
 .../ignite/internal/index/ItIndexManagerTest.java  |   5 +-
 .../apache/ignite/internal/index/IndexManager.java | 111 +++++++--
 .../ignite/internal/index/IndexManagerTest.java    |  28 ++-
 .../runner/app/ItIgniteNodeRestartTest.java        |   7 +-
 .../runner/app/PlatformTestNodeRunner.java         |   3 -
 .../org/apache/ignite/internal/app/IgniteImpl.java |  15 +-
 .../ignite/internal/IgniteIntegrationTest.java     |  12 -
 .../ignite/internal/schema/SchemaManager.java      |  18 +-
 .../internal/sql/sqllogic/ItSqlLogicTest.java      |   2 -
 .../rebalance/ItRebalanceDistributedTest.java      |   9 +-
 .../internal/table/distributed/LowWatermark.java   | 261 +--------------------
 .../distributed/LowWatermarkChangedListener.java   |   2 +-
 .../{LowWatermark.java => LowWatermarkImpl.java}   |  12 +-
 .../internal/table/distributed/TableManager.java   | 146 +++++++-----
 .../table/distributed/LowWatermarkTest.java        |   8 +-
 .../table/distributed/TableManagerTest.java        |  15 +-
 .../ignite/internal/table/TestLowWatermark.java    |  69 ++++++
 21 files changed, 334 insertions(+), 431 deletions(-)

diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
index 293b1376c6..00d99f8670 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
@@ -298,6 +298,13 @@ public class CatalogManagerImpl extends 
AbstractEventProducer<CatalogEvent, Cata
         return catalogAt(timestamp).version();
     }
 
+    @Override
+    public int earliestCatalogVersion(long timestamp) {
+        Entry<Long, Catalog> earliestEntry = catalogByTs.floorEntry(timestamp);
+
+        return (earliestEntry == null) ? earliestCatalogVersion() : 
earliestEntry.getValue().version();
+    }
+
     @Override
     public int earliestCatalogVersion() {
         return catalogByVer.firstEntry().getKey();
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
index d2c167c863..1477ee7a3a 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
@@ -100,6 +100,9 @@ public interface CatalogService extends 
EventProducer<CatalogEvent, CatalogEvent
     /** Returns the earliest registered version of the catalog. */
     int earliestCatalogVersion();
 
+    /** Returns the earliest registered version of the catalog, which is 
observable since given timestamp. */
+    int earliestCatalogVersion(long timestamp);
+
     /** Returns the latest registered version of the catalog. */
     int latestCatalogVersion();
 
diff --git 
a/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java
 
b/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java
index 10e6dd46eb..3f0c3596c8 100644
--- 
a/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java
+++ 
b/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java
@@ -19,12 +19,10 @@ package org.apache.ignite.internal.catalog;
 
 import static java.util.concurrent.CompletableFuture.allOf;
 import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
-import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.fail;
 
 import java.util.List;
 import java.util.Set;
@@ -343,31 +341,6 @@ public class CatalogTestUtils {
         return AlterZoneCommand.builder().zoneName(zoneName);
     }
 
-    /**
-     * Starts catalog compaction and waits it finished locally.
-     *
-     * @param catalogManager Catalog manager.
-     * @param timestamp Timestamp catalog should be compacted up to.
-     * @return {@code True} if a new snapshot has been successfully written, 
{@code false} otherwise.
-     */
-    public static boolean waitCatalogCompaction(CatalogManager catalogManager, 
long timestamp) {
-        int version = catalogManager.activeCatalogVersion(timestamp);
-
-        CompletableFuture<Boolean> operationFuture = ((CatalogManagerImpl) 
catalogManager).compactCatalog(timestamp);
-
-        try {
-            boolean result = operationFuture.get();
-
-            if (result) {
-                waitForCondition(() -> catalogManager.earliestCatalogVersion() 
== version, 3_000);
-            }
-        } catch (Exception e) {
-            fail(e);
-        }
-
-        return operationFuture.join();
-    }
-
     private static class TestUpdateLog implements UpdateLog {
         private final HybridClock clock;
 
diff --git 
a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakeCatalogService.java
 
b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakeCatalogService.java
index 2e74e02311..bd22a7e675 100644
--- 
a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakeCatalogService.java
+++ 
b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakeCatalogService.java
@@ -154,6 +154,11 @@ public class FakeCatalogService implements CatalogService {
         return 0;
     }
 
+    @Override
+    public int earliestCatalogVersion(long timestamp) {
+        return 0;
+    }
+
     @Override
     public CompletableFuture<Void> catalogReadyFuture(int version) {
         return null;
diff --git 
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java
 
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java
index 208cd96d5e..be7ec779ba 100644
--- 
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java
+++ 
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.stream.IntStream;
 import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
 import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.table.Table;
 import org.junit.jupiter.api.AfterEach;
@@ -118,7 +119,9 @@ public class ItIndexManagerTest extends 
ClusterPerClassIntegrationTest {
         IntList list = new IntArrayList();
         int tableId = table.tableId();
 
-        acceptAliveIndexes(ignite.catalogManager(), (tbl, idx) -> {
+        HybridTimestamp lowWatermark = ignite.lowWatermark().getLowWatermark();
+
+        acceptAliveIndexes(ignite.catalogManager(), lowWatermark, (tbl, idx) 
-> {
             if (tableId == idx.tableId()) {
                 list.add(idx.id());
             }
diff --git 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index 069161857f..70c315830a 100644
--- 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++ 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -17,10 +17,14 @@
 
 package org.apache.ignite.internal.index;
 
+import static java.util.Comparator.comparingInt;
 import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.CompletableFuture.runAsync;
 import static 
org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_CREATE;
 import static 
org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_DESTROY;
+import static 
org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_REMOVED;
+import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
@@ -30,6 +34,8 @@ import it.unimi.dsi.fastutil.ints.IntSet;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
@@ -43,10 +49,14 @@ import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
 import org.apache.ignite.internal.catalog.events.CatalogEvent;
 import org.apache.ignite.internal.catalog.events.CreateIndexEventParameters;
 import org.apache.ignite.internal.catalog.events.DestroyIndexEventParameters;
+import org.apache.ignite.internal.catalog.events.IndexEventParameters;
+import org.apache.ignite.internal.catalog.events.RemoveIndexEventParameters;
 import org.apache.ignite.internal.causality.IncrementalVersionedValue;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.IgniteComponent;
@@ -67,9 +77,11 @@ import 
org.apache.ignite.internal.storage.index.StorageIndexDescriptor;
 import 
org.apache.ignite.internal.storage.index.StorageIndexDescriptor.StorageColumnDescriptor;
 import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor;
 import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.internal.table.distributed.LowWatermark;
 import org.apache.ignite.internal.table.distributed.PartitionSet;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * An Ignite component that is responsible for handling index-related commands 
like CREATE or DROP
@@ -111,6 +123,13 @@ public class IndexManager implements IgniteComponent {
     /** Table storages by ID for which indexes were created. */
     private final Map<Integer, MvTableStorage> tableStoragesById = new 
ConcurrentHashMap<>();
 
+    /** Low watermark. */
+    private final LowWatermark lowWatermark;
+
+    /** Deferred destruction queue. */
+    private final Queue<DestroyIndexEventParameters> deferredQueue =
+            new 
PriorityQueue<>(comparingInt(IndexEventParameters::catalogVersion));
+
     /**
      * Constructor.
      *
@@ -125,13 +144,15 @@ public class IndexManager implements IgniteComponent {
             CatalogService catalogService,
             MetaStorageManager metaStorageManager,
             ExecutorService ioExecutor,
-            Consumer<LongFunction<CompletableFuture<?>>> registry
+            Consumer<LongFunction<CompletableFuture<?>>> registry,
+            LowWatermark lowWatermark
     ) {
         this.schemaManager = schemaManager;
         this.tableManager = tableManager;
         this.catalogService = catalogService;
         this.metaStorageManager = metaStorageManager;
         this.ioExecutor = ioExecutor;
+        this.lowWatermark = lowWatermark;
 
         startVv = new IncrementalVersionedValue<>(registry);
         tableStoragesVv = new IncrementalVersionedValue<>(registry);
@@ -141,10 +162,12 @@ public class IndexManager implements IgniteComponent {
     public CompletableFuture<Void> start() {
         LOG.debug("Index manager is about to start");
 
-        startIndexes();
+        startIndexes(lowWatermark.getLowWatermark());
 
         catalogService.listen(INDEX_CREATE, (CreateIndexEventParameters 
parameters) -> onIndexCreate(parameters));
+        catalogService.listen(INDEX_REMOVED, (RemoveIndexEventParameters 
parameters) -> onIndexRemoved(parameters));
         catalogService.listen(INDEX_DESTROY, (DestroyIndexEventParameters 
parameters) -> onIndexDestroy(parameters));
+        lowWatermark.addUpdateListener(this::onLwmChanged);
 
         LOG.info("Index manager started");
 
@@ -184,25 +207,63 @@ public class IndexManager implements IgniteComponent {
         return tableStoragesVv.get(causalityToken).thenApply(ignore -> 
tableStoragesById.get(tableId));
     }
 
+    private CompletableFuture<Boolean> 
onIndexRemoved(RemoveIndexEventParameters parameters) {
+        return inBusyLock(busyLock, () -> {
+            int indexId = parameters.indexId();
+            int version = parameters.catalogVersion() - 1;
+
+            CatalogIndexDescriptor indexDescriptor = 
catalogService.index(indexId, version);
+            assert indexDescriptor != null : "index";
+
+            CatalogTableDescriptor tableDescriptor = 
catalogService.table(indexDescriptor.tableId(), version);
+            assert tableDescriptor != null : "table";
+
+
+            CatalogZoneDescriptor zoneDescriptor = 
catalogService.zone(tableDescriptor.zoneId(), version);
+            assert zoneDescriptor != null : "zone";
+
+            int tableId = tableDescriptor.id();
+
+            synchronized (deferredQueue) {
+                deferredQueue.offer(new DestroyIndexEventParameters(-1L, -1, 
indexId, tableId, zoneDescriptor.partitions()));
+            }
+
+            return falseCompletedFuture();
+        });
+    }
+
+    private CompletableFuture<Void> onLwmChanged(HybridTimestamp ts) {
+        return runAsync(() -> {
+            int earliestVersion = 
catalogService.activeCatalogVersion(ts.longValue());
+
+            // TODO: any hint if there is smth to clean ???
+
+            synchronized ((deferredQueue)) {
+                DestroyIndexEventParameters next;
+
+                while ((next = deferredQueue.peek()) != null && 
next.catalogVersion() < earliestVersion) {
+                    next = deferredQueue.poll();
+
+                    onIndexDestroy(next);
+                }
+            }
+        }, ioExecutor);
+    }
+
     private CompletableFuture<Boolean> 
onIndexDestroy(DestroyIndexEventParameters parameters) {
         int indexId = parameters.indexId();
         int tableId = parameters.tableId();
 
-        long causalityToken = parameters.causalityToken();
-
-        CompletableFuture<TableViewInternal> tableFuture = 
tableManager.tableAsync(causalityToken, tableId);
+        TableViewInternal table = tableManager.cachedTable(tableId);
 
-        return inBusyLockAsync(busyLock, () -> tableStoragesVv.update(
-                causalityToken,
-                updater(ignore -> tableFuture.thenAccept(table -> 
inBusyLock(busyLock, () -> {
-                    if (table != null) {
-                        // In case of DROP TABLE the table will be removed 
first.
-                        table.unregisterIndex(indexId);
-                    } else {
-                        tableStoragesById.remove(tableId);
-                    }
-                })))
-        )).thenApply(unused -> false);
+        return runAsync(() -> inBusyLock(busyLock, () -> {
+            if (table != null) {
+                // In case of DROP TABLE the table will be removed first.
+                table.unregisterIndex(indexId);
+            } else {
+                tableStoragesById.remove(tableId);
+            }
+        }), ioExecutor).thenApply(unused -> false);
     }
 
     private CompletableFuture<Boolean> 
onIndexCreate(CreateIndexEventParameters parameters) {
@@ -315,7 +376,7 @@ public class IndexManager implements IgniteComponent {
         }
     }
 
-    private void startIndexes() {
+    private void startIndexes(@Nullable HybridTimestamp lwm) {
         CompletableFuture<Long> recoveryFinishedFuture = 
metaStorageManager.recoveryFinishedFuture();
 
         assert recoveryFinishedFuture.isDone();
@@ -324,7 +385,7 @@ public class IndexManager implements IgniteComponent {
 
         List<CompletableFuture<?>> startIndexFutures = new ArrayList<>();
 
-        acceptAliveIndexes(catalogService, (table, index) -> 
startIndexFutures.add(startIndexAsync(table, index, causalityToken)));
+        acceptAliveIndexes(catalogService, lwm, (table, index) -> 
startIndexFutures.add(startIndexAsync(table, index, causalityToken)));
 
         // Forces to wait until recovery is complete before the metastore 
watches are deployed to avoid races with other components.
         startVv.update(causalityToken, (unused, throwable) -> 
allOf(startIndexFutures.toArray(CompletableFuture[]::new)))
@@ -416,14 +477,20 @@ public class IndexManager implements IgniteComponent {
     }
 
     /**
-     * Collects indexes (including deleted ones) for tables (tables from the 
latest version of the catalog) from the earliest to the latest
-     * version of the catalog that need to be started on node recovery.
+     * Collects indexes for tables from the earliest to the latest observable 
catalog version, which need to be started on node recovery.
+     * If low watermark is not set, then earliest catalog version will be used 
instead.
      *
      * @param catalogService Catalog service.
+     * @param lowWatermark Low watermark or {@code null} for calculating 
earliest available catalog version.
+     * @param consumer A consumer that accepts alive index' descriptor.
      */
-    static void acceptAliveIndexes(CatalogService catalogService, 
BiConsumer<CatalogTableDescriptor, CatalogIndexDescriptor> consumer) {
-        int earliestCatalogVersion = catalogService.earliestCatalogVersion();
+    static void acceptAliveIndexes(
+            CatalogService catalogService,
+            @Nullable HybridTimestamp lowWatermark,
+            BiConsumer<CatalogTableDescriptor, CatalogIndexDescriptor> consumer
+    ) {
         int latestCatalogVersion = catalogService.latestCatalogVersion();
+        int earliestCatalogVersion = 
catalogService.earliestCatalogVersion(HybridTimestamp.hybridTimestampToLong(lowWatermark));
 
         IntSet processedObjects = new IntOpenHashSet();
         catalogService.indexes(latestCatalogVersion).stream()
diff --git 
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
 
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
index 202f4a8267..bfcbcc8d30 100644
--- 
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
+++ 
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
@@ -23,7 +23,6 @@ import static java.util.stream.Collectors.toList;
 import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
 import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_ZONE_NAME;
 import static 
org.apache.ignite.internal.catalog.CatalogTestUtils.createTestCatalogManager;
-import static 
org.apache.ignite.internal.catalog.CatalogTestUtils.waitCatalogCompaction;
 import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_DATA_REGION;
 import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.pkIndexName;
 import static 
org.apache.ignite.internal.index.TestIndexManagementUtils.COLUMN_NAME;
@@ -47,7 +46,6 @@ import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
@@ -79,6 +77,7 @@ import 
org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
 import org.apache.ignite.internal.catalog.events.CatalogEvent;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.marshaller.ReflectionMarshallersProvider;
 import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
 import org.apache.ignite.internal.metastorage.impl.MetaStorageService;
@@ -92,6 +91,7 @@ import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.table.TableTestUtils;
 import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.internal.table.TestLowWatermark;
 import org.apache.ignite.internal.table.distributed.PartitionSet;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import 
org.apache.ignite.internal.table.distributed.schema.ConstantSchemaVersions;
@@ -133,6 +133,8 @@ public class IndexManagerTest extends 
BaseIgniteAbstractTest {
 
     private final Map<Integer, TableViewInternal> tableViewInternalByTableId = 
new ConcurrentHashMap<>();
 
+    private TestLowWatermark lowWatermark = new TestLowWatermark();
+
     @BeforeEach
     public void setUp() {
         mockTableManager = mock(TableManager.class);
@@ -230,7 +232,7 @@ public class IndexManagerTest extends 
BaseIgniteAbstractTest {
         int tableId = indexDescriptor.tableId();
 
         dropIndex(INDEX_NAME);
-        assertTrue(waitCatalogCompaction(catalogManager, Long.MAX_VALUE));
+        assertThat(fireDestroyEvent(), willCompleteSuccessfully());
 
         long causalityToken = 0L; // Use last token.
         MvTableStorage mvTableStorage = 
indexManager.getMvTableStorage(causalityToken, tableId).get();
@@ -248,7 +250,7 @@ public class IndexManagerTest extends 
BaseIgniteAbstractTest {
         int tableId = indexDescriptor.tableId();
 
         dropTable(TABLE_NAME);
-        assertTrue(waitCatalogCompaction(catalogManager, Long.MAX_VALUE));
+        assertThat(fireDestroyEvent(), willCompleteSuccessfully());
 
         long causalityToken = 0L; // Use last token.
         MvTableStorage mvTableStorage = 
indexManager.getMvTableStorage(causalityToken, tableId).get();
@@ -256,7 +258,6 @@ public class IndexManagerTest extends 
BaseIgniteAbstractTest {
         verify(mvTableStorage).destroyIndex(indexId);
     }
 
-
     @Test
     void testCollectIndexesForRecoveryForCreatedTables() {
         createTable(OTHER_TABLE_NAME);
@@ -294,7 +295,7 @@ public class IndexManagerTest extends 
BaseIgniteAbstractTest {
         createTable(OTHER_TABLE_NAME);
         dropTable(OTHER_TABLE_NAME);
 
-        assertTrue(waitCatalogCompaction(catalogManager, Long.MAX_VALUE));
+        assertThat(fireDestroyEvent(), willCompleteSuccessfully());
 
         Map<CatalogTableDescriptor, Collection<CatalogIndexDescriptor>> 
collectedIndexes = collectIndexesForRecovery();
 
@@ -370,12 +371,12 @@ public class IndexManagerTest extends 
BaseIgniteAbstractTest {
         dropIndexes(indexName3, indexName4, indexName5);
         removeIndex(catalogManager, removedIndexId);
 
-        assertTrue(waitCatalogCompaction(catalogManager, Long.MAX_VALUE));
+        IgniteUtils.stopAll(indexManager, catalogManager, metaStorageManager);
 
         TableViewInternal tableViewInternal = 
tableViewInternalByTableId.get(tableId());
         clearInvocations(tableViewInternal);
+        lowWatermark.update(clock.now());
 
-        IgniteUtils.stopAll(indexManager, catalogManager, metaStorageManager);
         createAndStartComponents();
 
         ArgumentCaptor<StorageHashIndexDescriptor> captor = 
ArgumentCaptor.forClass(StorageHashIndexDescriptor.class);
@@ -461,7 +462,8 @@ public class IndexManagerTest extends 
BaseIgniteAbstractTest {
                 catalogManager,
                 metaStorageManager,
                 ForkJoinPool.commonPool(),
-                (LongFunction<CompletableFuture<?>> function) -> 
metaStorageManager.registerRevisionUpdateListener(function::apply)
+                (LongFunction<CompletableFuture<?>> function) -> 
metaStorageManager.registerRevisionUpdateListener(function::apply),
+                lowWatermark
         );
 
         assertThat(allOf(metaStorageManager.start(), catalogManager.start(), 
indexManager.start()), willCompleteSuccessfully());
@@ -549,8 +551,14 @@ public class IndexManagerTest extends 
BaseIgniteAbstractTest {
     private Map<CatalogTableDescriptor, Collection<CatalogIndexDescriptor>> 
collectIndexesForRecovery() {
         Map<CatalogTableDescriptor, Collection<CatalogIndexDescriptor>> res = 
new HashMap<>();
 
-        IndexManager.acceptAliveIndexes(catalogManager, (k, v) -> 
res.computeIfAbsent(k, ignore -> new ArrayList<>()).add(v));
+        HybridTimestamp lwm = lowWatermark.getLowWatermark();
+
+        IndexManager.acceptAliveIndexes(catalogManager, lwm, (k, v) -> 
res.computeIfAbsent(k, ignore -> new ArrayList<>()).add(v));
 
         return res;
     }
+
+    private CompletableFuture<Void> fireDestroyEvent() {
+        return lowWatermark.updateAndNotify(clock.now());
+    }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index f083095297..10204f5337 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -161,7 +161,7 @@ import 
org.apache.ignite.internal.storage.DataStorageModules;
 import org.apache.ignite.internal.systemview.SystemViewManagerImpl;
 import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.table.TableViewInternal;
-import org.apache.ignite.internal.table.distributed.LowWatermark;
+import org.apache.ignite.internal.table.distributed.LowWatermarkImpl;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.table.distributed.TableMessageGroup;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
@@ -525,7 +525,7 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
 
         var sqlRef = new AtomicReference<IgniteSqlImpl>();
 
-        LowWatermark lowWatermark = new LowWatermark(name, 
gcConfig.lowWatermark(), hybridClock, txManager, vault, failureProcessor);
+        LowWatermarkImpl lowWatermark = new LowWatermarkImpl(name, 
gcConfig.lowWatermark(), hybridClock, txManager, vault, failureProcessor);
 
         TableManager tableManager = new TableManager(
                 name,
@@ -567,7 +567,8 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 catalogManager,
                 metaStorageMgr,
                 threadPoolsManager.tableIoExecutor(),
-                registry
+                registry,
+                lowWatermark
         );
 
         var metricManager = new MetricManager();
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
index 1580bde38b..14627286b1 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
@@ -61,7 +61,6 @@ import org.apache.ignite.IgnitionManager;
 import org.apache.ignite.InitParameters;
 import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.JobExecutionContext;
-import org.apache.ignite.internal.IgniteIntegrationTest;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
 import org.apache.ignite.internal.catalog.commands.ColumnParams;
@@ -568,8 +567,6 @@ public class PlatformTestNodeRunner {
                 session.execute(null, "DROP TABLE " + tableName + "");
             }
 
-            
IgniteIntegrationTest.forceCleanupAbandonedResources(context.ignite());
-
             return tableName;
         }
     }
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 5dc2b7ea5f..b4dc4bccb5 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -177,7 +177,7 @@ import 
org.apache.ignite.internal.storage.engine.StorageEngine;
 import org.apache.ignite.internal.storage.engine.ThreadAssertingStorageEngine;
 import org.apache.ignite.internal.systemview.SystemViewManagerImpl;
 import org.apache.ignite.internal.systemview.api.SystemViewManager;
-import org.apache.ignite.internal.table.distributed.LowWatermark;
+import org.apache.ignite.internal.table.distributed.LowWatermarkImpl;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.table.distributed.TableMessageGroup;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
@@ -345,7 +345,7 @@ public class IgniteImpl implements Ignite {
 
     private final ClockWaiter clockWaiter;
 
-    private final LowWatermark lowWatermark;
+    private final LowWatermarkImpl lowWatermark;
 
     private final OutgoingSnapshotsManager outgoingSnapshotsManager;
 
@@ -686,7 +686,7 @@ public class IgniteImpl implements Ignite {
 
         StorageUpdateConfiguration storageUpdateConfiguration = 
clusterConfigRegistry.getConfiguration(StorageUpdateConfiguration.KEY);
 
-        lowWatermark = new LowWatermark(name, gcConfig.lowWatermark(), clock, 
txManager, vaultMgr, failureProcessor);
+        lowWatermark = new LowWatermarkImpl(name, gcConfig.lowWatermark(), 
clock, txManager, vaultMgr, failureProcessor);
 
         distributedTblMgr = new TableManager(
                 name,
@@ -728,7 +728,8 @@ public class IgniteImpl implements Ignite {
                 catalogManager,
                 metaStorageMgr,
                 threadPoolsManager.tableIoExecutor(),
-                registry
+                registry,
+                lowWatermark
         );
 
         indexBuildingManager = new IndexBuildingManager(
@@ -1450,4 +1451,10 @@ public class IgniteImpl implements Ignite {
     public RemotelyTriggeredResourceRegistry resourcesRegistry() {
         return resourcesRegistry;
     }
+
+    /** Returns low watermark */
+    @TestOnly
+    public LowWatermarkImpl lowWatermark() {
+        return lowWatermark;
+    }
 }
diff --git 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/IgniteIntegrationTest.java
 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/IgniteIntegrationTest.java
index 0ac6d4ce3e..9fa5710043 100644
--- 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/IgniteIntegrationTest.java
+++ 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/IgniteIntegrationTest.java
@@ -17,9 +17,6 @@
 
 package org.apache.ignite.internal;
 
-import org.apache.ignite.Ignite;
-import org.apache.ignite.internal.app.IgniteImpl;
-import org.apache.ignite.internal.catalog.CatalogManagerImpl;
 import org.apache.ignite.internal.junit.StopAllIgnitesAfterTests;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
@@ -31,13 +28,4 @@ import org.junit.jupiter.api.extension.ExtendWith;
 // The order is important here.
 @ExtendWith({WorkDirectoryExtension.class, StopAllIgnitesAfterTests.class})
 public abstract class IgniteIntegrationTest extends BaseIgniteAbstractTest {
-    /**
-     * Forcibly destroys partitions for dropped tables and indexes via 
triggering catalog compaction to the latest catalog version.
-     */
-    public static void forceCleanupAbandonedResources(Ignite node) {
-        IgniteImpl node0 = (IgniteImpl) node;
-        CatalogManagerImpl catalogManager = (CatalogManagerImpl) 
node0.catalogManager();
-
-        catalogManager.compactCatalog(Long.MAX_VALUE);
-    }
 }
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
index 3e9a6f7251..a824585df2 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
@@ -176,7 +176,7 @@ public class SchemaManager implements IgniteComponent {
     private CompletableFuture<Boolean> onTableDestroyed(CatalogEventParameters 
event) {
         DestroyTableEventParameters creationEvent = 
(DestroyTableEventParameters) event;
 
-        return dropRegistry(creationEvent.causalityToken(), 
creationEvent.tableId()).thenApply(ignored -> false);
+        return dropRegistry(creationEvent.tableId()).thenApply(ignored -> 
false);
     }
 
     private void setColumnMapping(SchemaDescriptor schema, int tableId) throws 
ExecutionException, InterruptedException {
@@ -309,26 +309,18 @@ public class SchemaManager implements IgniteComponent {
     /**
      * Drops schema registry for the given table id (along with the 
corresponding schemas).
      *
-     * @param causalityToken Causality token.
      * @param tableId Table id.
      */
-    private CompletableFuture<?> dropRegistry(long causalityToken, int 
tableId) {
+    private CompletableFuture<?> dropRegistry(int tableId) {
         if (!busyLock.enterBusy()) {
             throw new IgniteException(NODE_STOPPING_ERR, new 
NodeStoppingException());
         }
 
         try {
-            return registriesVv.update(causalityToken, (registries, e) -> 
inBusyLock(busyLock, () -> {
-                if (e != null) {
-                    return failedFuture(new IgniteInternalException(
-                            format("Cannot remove a schema registry for the 
table [tblId={}]", tableId), e));
-                }
-
-                SchemaRegistryImpl removedRegistry = 
registriesById.remove(tableId);
-                removedRegistry.close();
+            SchemaRegistryImpl removedRegistry = 
registriesById.remove(tableId);
+            removedRegistry.close();
 
-                return nullCompletedFuture();
-            }));
+            return nullCompletedFuture();
         } finally {
             busyLock.leaveBusy();
         }
diff --git 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/sqllogic/ItSqlLogicTest.java
 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/sqllogic/ItSqlLogicTest.java
index ad0123193d..581eba0f22 100644
--- 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/sqllogic/ItSqlLogicTest.java
+++ 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/sqllogic/ItSqlLogicTest.java
@@ -291,8 +291,6 @@ public class ItSqlLogicTest extends IgniteIntegrationTest {
                     }
                 }
             }
-
-            forceCleanupAbandonedResources(CLUSTER_NODES.get(0));
         }
     }
 
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 6c25e6a32f..31325fe6d8 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -168,7 +168,7 @@ import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.TableRaftService;
 import org.apache.ignite.internal.table.TableTestUtils;
 import org.apache.ignite.internal.table.TableViewInternal;
-import org.apache.ignite.internal.table.distributed.LowWatermark;
+import org.apache.ignite.internal.table.distributed.LowWatermarkImpl;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.table.distributed.TableMessageGroup;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
@@ -955,7 +955,7 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
 
         private final NetworkAddress networkAddress;
 
-        private final LowWatermark lowWatermark;
+        private final LowWatermarkImpl lowWatermark;
 
         /** The future have to be complete after the node start and all Meta 
storage watches are deployd. */
         private CompletableFuture<Void> deployWatchesFut;
@@ -1172,7 +1172,7 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
             StorageUpdateConfiguration storageUpdateConfiguration = 
clusterConfigRegistry.getConfiguration(StorageUpdateConfiguration.KEY);
 
             HybridClockImpl clock = new HybridClockImpl();
-            lowWatermark = new LowWatermark(name, gcConfig.lowWatermark(), 
clock, txManager, vaultManager, failureProcessor);
+            lowWatermark = new LowWatermarkImpl(name, gcConfig.lowWatermark(), 
clock, txManager, vaultManager, failureProcessor);
 
             tableManager = new TableManager(
                     name,
@@ -1248,7 +1248,8 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
                     catalogManager,
                     metaStorageManager,
                     threadPoolsManager.tableIoExecutor(),
-                    registry
+                    registry,
+                    lowWatermark
             );
         }
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java
index 8e5cc3cf20..6b77076d8f 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java
@@ -17,264 +17,19 @@
 
 package org.apache.ignite.internal.table.distributed;
 
-import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
-import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
-import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.ignite.internal.failure.FailureContext;
-import org.apache.ignite.internal.failure.FailureProcessor;
-import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.lang.ByteArray;
-import org.apache.ignite.internal.lang.NodeStoppingException;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.manager.IgniteComponent;
-import 
org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
-import org.apache.ignite.internal.thread.NamedThreadFactory;
-import org.apache.ignite.internal.tx.TxManager;
-import org.apache.ignite.internal.util.ByteUtils;
-import org.apache.ignite.internal.util.IgniteSpinBusyLock;
-import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.internal.vault.VaultEntry;
-import org.apache.ignite.internal.vault.VaultManager;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Class to manage the low watermark.
- *
- * <p>Low watermark is the node's local time, which ensures that read-only 
transactions have completed by this time, and new read-only
- * transactions will only be created after this time, and we can safely delete 
obsolete/garbage data such as: obsolete versions of table
- * rows, remote indexes, remote tables, etc.
- *
- * @see <a 
href="https://cwiki.apache.org/confluence/display/IGNITE/IEP-91%3A+Transaction+protocol";>IEP-91</a>
+ * An interface for tracking Low watermark.
  */
-public class LowWatermark implements IgniteComponent {
-    private static final IgniteLogger LOG = 
Loggers.forClass(LowWatermark.class);
-
-    static final ByteArray LOW_WATERMARK_VAULT_KEY = new 
ByteArray("low-watermark");
-
-    private final LowWatermarkConfiguration lowWatermarkConfig;
-
-    private final HybridClock clock;
-
-    private final TxManager txManager;
-
-    private final VaultManager vaultManager;
-
-    private final List<LowWatermarkChangedListener> updateListeners = new 
CopyOnWriteArrayList<>();
-
-    private final ScheduledExecutorService scheduledThreadPool;
-
-    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
-
-    private final AtomicBoolean closeGuard = new AtomicBoolean();
-
-    private volatile @Nullable HybridTimestamp lowWatermark;
-
-    private final AtomicReference<ScheduledFuture<?>> lastScheduledTaskFuture 
= new AtomicReference<>();
-
-    private final FailureProcessor failureProcessor;
-
-    /**
-     * Constructor.
-     *
-     * @param nodeName Node name.
-     * @param lowWatermarkConfig Low watermark configuration.
-     * @param clock A hybrid logical clock.
-     * @param txManager Transaction manager.
-     * @param vaultManager Vault manager.
-     * @param failureProcessor Failure processor tha is used to handle 
critical errors.
-     */
-    public LowWatermark(
-            String nodeName,
-            LowWatermarkConfiguration lowWatermarkConfig,
-            HybridClock clock,
-            TxManager txManager,
-            VaultManager vaultManager,
-            FailureProcessor failureProcessor
-    ) {
-        this.lowWatermarkConfig = lowWatermarkConfig;
-        this.clock = clock;
-        this.txManager = txManager;
-        this.vaultManager = vaultManager;
-        this.failureProcessor = failureProcessor;
-
-        scheduledThreadPool = Executors.newSingleThreadScheduledExecutor(
-                NamedThreadFactory.create(nodeName, "low-watermark-updater", 
LOG)
-        );
-    }
-
-    /**
-     * Starts the watermark manager.
-     */
-    @Override
-    public CompletableFuture<Void> start() {
-        inBusyLock(busyLock, () -> {
-            lowWatermark = readLowWatermarkFromVault();
-        });
-
-        return nullCompletedFuture();
-    }
-
-    /**
-     * Schedule watermark updates.
-     */
-    public void scheduleUpdates() {
-        inBusyLock(busyLock, () -> {
-            HybridTimestamp lowWatermarkCandidate = lowWatermark;
-
-            if (lowWatermarkCandidate == null) {
-                LOG.info("Previous value of the low watermark was not found, 
will schedule to update it");
-
-                scheduleUpdateLowWatermarkBusy();
-
-                return;
-            }
-
-            LOG.info("Low watermark has been scheduled to be updated: {}", 
lowWatermarkCandidate);
-
-            txManager.updateLowWatermark(lowWatermarkCandidate)
-                    .thenComposeAsync(unused -> inBusyLock(busyLock, () -> 
notifyListeners(lowWatermarkCandidate)), scheduledThreadPool)
-                    .whenComplete((unused, throwable) -> {
-                        if (throwable == null) {
-                            inBusyLock(busyLock, 
this::scheduleUpdateLowWatermarkBusy);
-                        } else if (!(throwable instanceof 
NodeStoppingException)) {
-                            LOG.error("Error during the Watermark manager 
start", throwable);
-
-                            failureProcessor.process(new 
FailureContext(CRITICAL_ERROR, throwable));
-
-                            inBusyLock(busyLock, 
this::scheduleUpdateLowWatermarkBusy);
-                        }
-                    });
-        });
-    }
-
-    private @Nullable HybridTimestamp readLowWatermarkFromVault() {
-        VaultEntry vaultEntry = vaultManager.get(LOW_WATERMARK_VAULT_KEY);
-
-        return vaultEntry == null ? null : 
ByteUtils.fromBytes(vaultEntry.value());
-    }
-
-    @Override
-    public void stop() {
-        if (!closeGuard.compareAndSet(false, true)) {
-            return;
-        }
-
-        busyLock.block();
-
-        ScheduledFuture<?> lastScheduledTaskFuture = 
this.lastScheduledTaskFuture.get();
-
-        if (lastScheduledTaskFuture != null) {
-            lastScheduledTaskFuture.cancel(true);
-        }
-
-        IgniteUtils.shutdownAndAwaitTermination(scheduledThreadPool, 10, 
TimeUnit.SECONDS);
-    }
-
-    /**
-     * Returns the current low watermark, {@code null} means no low watermark 
has been assigned yet.
-     */
-    public @Nullable HybridTimestamp getLowWatermark() {
-        return lowWatermark;
-    }
-
-    void updateLowWatermark() {
-        inBusyLock(busyLock, () -> {
-            HybridTimestamp lowWatermarkCandidate = 
createNewLowWatermarkCandidate();
-
-            // Wait until all the read-only transactions are finished before 
the new candidate, since no new RO transactions could be
-            // created, then we can safely promote the candidate as a new low 
watermark, store it in vault, and we can safely start cleaning
-            // up the stale/junk data in the tables.
-            txManager.updateLowWatermark(lowWatermarkCandidate)
-                    .thenComposeAsync(unused -> inBusyLock(busyLock, () -> {
-                        vaultManager.put(LOW_WATERMARK_VAULT_KEY, 
ByteUtils.toBytes(lowWatermarkCandidate));
-
-                        lowWatermark = lowWatermarkCandidate;
-
-                        return notifyListeners(lowWatermarkCandidate);
-                    }), scheduledThreadPool)
-                    .whenComplete((unused, throwable) -> {
-                        if (throwable != null) {
-                            if (!(throwable instanceof NodeStoppingException)) 
{
-                                LOG.error("Failed to update low watermark, 
will schedule again: {}", throwable, lowWatermarkCandidate);
-
-                                inBusyLock(busyLock, 
this::scheduleUpdateLowWatermarkBusy);
-                            }
-                        } else {
-                            LOG.info("Successful low watermark update: {}", 
lowWatermarkCandidate);
-
-                            scheduleUpdateLowWatermarkBusy();
-                        }
-                    });
-        });
-    }
-
-    public void addUpdateListener(LowWatermarkChangedListener listener) {
-        updateListeners.add(listener);
-    }
-
-    public void removeUpdateListener(LowWatermarkChangedListener listener) {
-        updateListeners.remove(listener);
-    }
-
-    private CompletableFuture<Void> notifyListeners(HybridTimestamp 
lowWatermark) {
-        if (updateListeners.isEmpty()) {
-            return nullCompletedFuture();
-        }
-
-        ArrayList<CompletableFuture<?>> res = new ArrayList<>();
-        for (LowWatermarkChangedListener updateListener : updateListeners) {
-            res.add(updateListener.onLwmChanged(lowWatermark));
-        }
-
-        return CompletableFuture.allOf(res.toArray(CompletableFuture[]::new));
-    }
-
-    private void scheduleUpdateLowWatermarkBusy() {
-        ScheduledFuture<?> previousScheduledFuture = 
this.lastScheduledTaskFuture.get();
-
-        assert previousScheduledFuture == null || 
previousScheduledFuture.isDone() : "previous scheduled task has not finished";
-
-        ScheduledFuture<?> newScheduledFuture = scheduledThreadPool.schedule(
-                this::updateLowWatermark,
-                lowWatermarkConfig.updateFrequency().value(),
-                TimeUnit.MILLISECONDS
-        );
-
-        boolean casResult = 
lastScheduledTaskFuture.compareAndSet(previousScheduledFuture, 
newScheduledFuture);
-
-        assert casResult : "only one scheduled task is expected";
-    }
-
-    HybridTimestamp createNewLowWatermarkCandidate() {
-        HybridTimestamp now = clock.now();
-
-        HybridTimestamp lowWatermarkCandidate = now.addPhysicalTime(
-                -lowWatermarkConfig.dataAvailabilityTime().value() - 
getMaxClockSkew()
-        );
-
-        HybridTimestamp lowWatermark = this.lowWatermark;
-
-        assert lowWatermark == null || 
lowWatermarkCandidate.compareTo(lowWatermark) > 0 :
-                "lowWatermark=" + lowWatermark + ", lowWatermarkCandidate=" + 
lowWatermarkCandidate;
+public interface LowWatermark {
+    /** Returns the current low watermark, {@code null} means no low watermark 
has been assigned yet. */
+    @Nullable HybridTimestamp getLowWatermark();
 
-        return lowWatermarkCandidate;
-    }
+    /** Subscribes on watermark changes. */
+    void addUpdateListener(LowWatermarkChangedListener listener);
 
-    private long getMaxClockSkew() {
-        // TODO: IGNITE-19287 Add Implementation
-        return HybridTimestamp.CLOCK_SKEW;
-    }
+    /** Unsubscribes on watermark changes. */
+    void removeUpdateListener(LowWatermarkChangedListener listener);
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkChangedListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkChangedListener.java
index 2a738a7c46..72e395bbaf 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkChangedListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkChangedListener.java
@@ -21,7 +21,7 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 
 /**
- * LWM event listener interface.
+ * Low watermark event listener interface.
  *
  * @see LowWatermark
  */
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkImpl.java
similarity index 98%
copy from 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java
copy to 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkImpl.java
index 8e5cc3cf20..177ee9d652 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkImpl.java
@@ -59,8 +59,8 @@ import org.jetbrains.annotations.Nullable;
  *
  * @see <a 
href="https://cwiki.apache.org/confluence/display/IGNITE/IEP-91%3A+Transaction+protocol";>IEP-91</a>
  */
-public class LowWatermark implements IgniteComponent {
-    private static final IgniteLogger LOG = 
Loggers.forClass(LowWatermark.class);
+public class LowWatermarkImpl implements IgniteComponent, LowWatermark {
+    private static final IgniteLogger LOG = 
Loggers.forClass(LowWatermarkImpl.class);
 
     static final ByteArray LOW_WATERMARK_VAULT_KEY = new 
ByteArray("low-watermark");
 
@@ -96,7 +96,7 @@ public class LowWatermark implements IgniteComponent {
      * @param vaultManager Vault manager.
      * @param failureProcessor Failure processor tha is used to handle 
critical errors.
      */
-    public LowWatermark(
+    public LowWatermarkImpl(
             String nodeName,
             LowWatermarkConfiguration lowWatermarkConfig,
             HybridClock clock,
@@ -183,9 +183,7 @@ public class LowWatermark implements IgniteComponent {
         IgniteUtils.shutdownAndAwaitTermination(scheduledThreadPool, 10, 
TimeUnit.SECONDS);
     }
 
-    /**
-     * Returns the current low watermark, {@code null} means no low watermark 
has been assigned yet.
-     */
+    @Override
     public @Nullable HybridTimestamp getLowWatermark() {
         return lowWatermark;
     }
@@ -221,10 +219,12 @@ public class LowWatermark implements IgniteComponent {
         });
     }
 
+    @Override
     public void addUpdateListener(LowWatermarkChangedListener listener) {
         updateListeners.add(listener);
     }
 
+    @Override
     public void removeUpdateListener(LowWatermarkChangedListener listener) {
         updateListeners.remove(listener);
     }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 5b1820fc91..78e753ea5b 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.table.distributed;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.Collections.emptySet;
 import static java.util.Collections.unmodifiableMap;
+import static java.util.Comparator.comparingInt;
 import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.anyOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
@@ -71,6 +72,8 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -104,6 +107,7 @@ import 
org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
 import org.apache.ignite.internal.catalog.events.CatalogEvent;
 import org.apache.ignite.internal.catalog.events.CreateTableEventParameters;
 import org.apache.ignite.internal.catalog.events.DestroyTableEventParameters;
+import org.apache.ignite.internal.catalog.events.DropTableEventParameters;
 import org.apache.ignite.internal.catalog.events.RenameTableEventParameters;
 import org.apache.ignite.internal.causality.CompletionListener;
 import org.apache.ignite.internal.causality.IncrementalVersionedValue;
@@ -288,6 +292,10 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
     /** Started tables. */
     private final Map<Integer, TableImpl> startedTables = new 
ConcurrentHashMap<>();
 
+    /** Deferred destruction queue. */
+    private final Queue<DestroyTableEventParameters> deferredQueue =
+            new 
PriorityQueue<>(comparingInt(DestroyTableEventParameters::catalogVersion));
+
     /** Local partitions. */
     private final Map<Integer, PartitionSet> localPartsByTableId = new 
ConcurrentHashMap<>();
 
@@ -557,7 +565,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
 
             long recoveryRevision = recoveryFinishFuture.join();
 
-            startTables(recoveryRevision);
+            startTables(recoveryRevision, lowWatermark.getLowWatermark());
 
             processAssignmentsOnRecovery(recoveryRevision);
 
@@ -565,13 +573,10 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
             
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX),
 stableAssignmentsRebalanceListener);
             
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(ASSIGNMENTS_SWITCH_REDUCE_PREFIX),
 assignmentsSwitchRebalanceListener);
 
-            catalogService.listen(CatalogEvent.TABLE_CREATE, parameters -> {
-                return onTableCreate((CreateTableEventParameters) 
parameters).thenApply(unused -> false);
-            });
-
-            catalogService.listen(CatalogEvent.TABLE_DESTROY, parameters -> {
-                return onTableDestroy(((DestroyTableEventParameters) 
parameters)).thenApply(unused -> false);
-            });
+            catalogService.listen(CatalogEvent.TABLE_CREATE, parameters -> 
onTableCreate((CreateTableEventParameters) parameters));
+            catalogService.listen(CatalogEvent.TABLE_DROP, parameters -> 
onTableDrop(((DropTableEventParameters) parameters)));
+            catalogService.listen(CatalogEvent.TABLE_DESTROY, parameters -> 
onTableDestroy(((DestroyTableEventParameters) parameters)));
+            lowWatermark.addUpdateListener(this::onLwmChanged);
 
             catalogService.listen(CatalogEvent.TABLE_ALTER, parameters -> {
                 if (parameters instanceof RenameTableEventParameters) {
@@ -640,8 +645,9 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
         }
     }
 
-    private CompletableFuture<?> onTableCreate(CreateTableEventParameters 
parameters) {
-        return createTableLocally(parameters.causalityToken(), 
parameters.catalogVersion(), parameters.tableDescriptor());
+    private CompletableFuture<Boolean> 
onTableCreate(CreateTableEventParameters parameters) {
+        return createTableLocally(parameters.causalityToken(), 
parameters.catalogVersion(), parameters.tableDescriptor())
+                .thenApply(unused -> false);
     }
 
     /**
@@ -715,14 +721,49 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         });
     }
 
-    private CompletableFuture<Void> onTableDestroy(DestroyTableEventParameters 
parameters) {
+    private CompletableFuture<Boolean> onTableDrop(DropTableEventParameters 
parameters) {
         return inBusyLockAsync(busyLock, () -> {
-            dropTableLocally(parameters.causalityToken(), parameters);
+            int tableId = parameters.tableId();
+            int version = parameters.catalogVersion() - 1;
 
-            return nullCompletedFuture();
+            CatalogTableDescriptor tableDescriptor = 
catalogService.table(tableId, version);
+            assert tableDescriptor != null;
+
+            CatalogZoneDescriptor zoneDescriptor = 
catalogService.zone(tableDescriptor.zoneId(), version);
+            assert zoneDescriptor != null;
+
+            int partitions = zoneDescriptor.partitions();
+
+            synchronized (deferredQueue) {
+                deferredQueue.offer(new DestroyTableEventParameters(-1L, 
version, tableId, partitions));
+            }
+
+            return falseCompletedFuture();
         });
     }
 
+    private CompletableFuture<Void> onLwmChanged(HybridTimestamp ts) {
+        return runAsync(() -> {
+            int earliestVersion = 
catalogService.activeCatalogVersion(ts.longValue());
+
+            // TODO: any hint if there is smth to clean ???
+
+            synchronized ((deferredQueue)) {
+                DestroyTableEventParameters next;
+
+                while ((next = deferredQueue.peek()) != null && 
next.catalogVersion() < earliestVersion) {
+                    next = deferredQueue.poll();
+
+                    onTableDestroy(next);
+                }
+            }
+        }, ioExecutor);
+    }
+
+    private CompletableFuture<Boolean> 
onTableDestroy(DestroyTableEventParameters parameters) {
+        return inBusyLockAsync(busyLock, () -> 
destroyTableLocally(parameters).thenApply(unused -> false));
+    }
+
     private CompletableFuture<?> onTableRename(RenameTableEventParameters 
parameters) {
         return inBusyLockAsync(busyLock, () -> tablesVv.update(
                 parameters.causalityToken(),
@@ -1349,63 +1390,44 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
     /**
      * Drops local structures for a table.
      *
-     * @param causalityToken Causality token.
      * @param parameters Destroy table event parameters.
      */
-    private void dropTableLocally(long causalityToken, 
DestroyTableEventParameters parameters) {
+    private CompletableFuture<Void> 
destroyTableLocally(DestroyTableEventParameters parameters) {
         int tableId = parameters.tableId();
-        // TODO Drop partitions from parameters and use from storage.
-        int partitions = parameters.partitions();
 
-        localPartitionsVv.update(causalityToken, (previousVal, e) -> 
inBusyLock(busyLock, () -> {
-            if (e != null) {
-                return failedFuture(e);
-            }
+        TableImpl table = startedTables.remove(tableId);
+        localPartsByTableId.remove(tableId);
 
-            localPartsByTableId.remove(tableId);
+        assert table != null;
 
-            return nullCompletedFuture();
-        }));
-
-        tablesVv.update(causalityToken, (ignore, e) -> inBusyLock(busyLock, () 
-> {
-            if (e != null) {
-                return failedFuture(e);
-            }
-
-            TableImpl table = tables.get(tableId);
-
-            assert table != null : tableId;
-
-            InternalTable internalTable = table.internalTable();
-
-            CompletableFuture<?>[] stopReplicaFutures = new 
CompletableFuture<?>[partitions];
-
-            // TODO https://issues.apache.org/jira/browse/IGNITE-19170 
Partitions should be stopped on the assignments change
-            //  event triggered by zone drop or alter. Stop replica 
asynchronously, out of metastorage event pipeline.
-            for (int partitionId = 0; partitionId < partitions; partitionId++) 
{
-                var replicationGroupId = new TablePartitionId(tableId, 
partitionId);
-
-                stopReplicaFutures[partitionId] = 
stopPartition(replicationGroupId, table);
-            }
-
-            // TODO: IGNITE-18703 Destroy raft log and meta
-            return allOf(stopReplicaFutures)
-                    .thenComposeAsync(
-                            unused -> allOf(
-                                    internalTable.storage().destroy(),
-                                    runAsync(() -> 
internalTable.txStateStorage().destroy(), ioExecutor)
-                            ),
-                            ioExecutor
-                    ).thenAccept(ignore0 -> tables.remove(tableId));
-        }));
-
-        startedTables.remove(tableId);
+        InternalTable internalTable = table.internalTable();
+        int partitions = internalTable.partitions();
 
+        // TODO https://issues.apache.org/jira/browse/IGNITE-18991 Move 
assigment manipulations to Distribution zones.
         Set<ByteArray> assignmentKeys = IntStream.range(0, partitions)
                 .mapToObj(p -> stablePartAssignmentsKey(new 
TablePartitionId(tableId, p)))
                 .collect(toSet());
-
         metaStorageMgr.removeAll(assignmentKeys);
+
+        CompletableFuture<?>[] stopReplicaFutures = new 
CompletableFuture<?>[partitions];
+
+        // TODO https://issues.apache.org/jira/browse/IGNITE-19170 Partitions 
should be stopped on the assignments change
+        //  event triggered by zone drop or alter. Stop replica 
asynchronously, out of metastorage event pipeline.
+        for (int partitionId = 0; partitionId < partitions; partitionId++) {
+            var replicationGroupId = new TablePartitionId(tableId, 
partitionId);
+
+            stopReplicaFutures[partitionId] = 
stopPartition(replicationGroupId, table);
+        }
+
+        // TODO: IGNITE-18703 Destroy raft log and meta
+        return allOf(stopReplicaFutures)
+                .thenComposeAsync(
+                        unused -> allOf(
+                                internalTable.storage().destroy(),
+                                runAsync(() -> 
internalTable.txStateStorage().destroy(), ioExecutor)
+                        ),
+                        ioExecutor
+                ).thenAccept(ignore0 -> tables.remove(tableId));
     }
 
     @Override
@@ -2336,11 +2358,13 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         return tables.stream().filter(table -> 
table.name().equals(name)).findAny().orElse(null);
     }
 
-    private void startTables(long recoveryRevision) {
+    private void startTables(long recoveryRevision, @Nullable HybridTimestamp 
lwm) {
         sharedTxStateStorage.start();
 
-        int earliestCatalogVersion = catalogService.earliestCatalogVersion();
         int latestCatalogVersion = catalogService.latestCatalogVersion();
+        int earliestCatalogVersion = (lwm == null)
+                ? catalogService.earliestCatalogVersion()
+                : catalogService.activeCatalogVersion(lwm.longValue());
 
         var startedTables = new IntOpenHashSet();
         List<CompletableFuture<?>> startTableFutures = new ArrayList<>();
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkTest.java
index c35e203fdb..a21ab858b7 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.ignite.internal.table.distributed;
 
-import static 
org.apache.ignite.internal.table.distributed.LowWatermark.LOW_WATERMARK_VAULT_KEY;
+import static 
org.apache.ignite.internal.table.distributed.LowWatermarkImpl.LOW_WATERMARK_VAULT_KEY;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -58,7 +58,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.InOrder;
 
 /**
- * For {@link LowWatermark} testing.
+ * For {@link LowWatermarkImpl} testing.
  */
 @ExtendWith(ConfigurationExtension.class)
 public class LowWatermarkTest extends BaseIgniteAbstractTest {
@@ -73,14 +73,14 @@ public class LowWatermarkTest extends 
BaseIgniteAbstractTest {
 
     private LowWatermarkChangedListener listener;
 
-    private LowWatermark lowWatermark;
+    private LowWatermarkImpl lowWatermark;
 
     @BeforeEach
     void setUp() {
         listener = mock(LowWatermarkChangedListener.class);
         
when(listener.onLwmChanged(any(HybridTimestamp.class))).thenReturn(nullCompletedFuture());
 
-        lowWatermark = new LowWatermark("test", lowWatermarkConfig, clock, 
txManager, vaultManager, mock(FailureProcessor.class));
+        lowWatermark = new LowWatermarkImpl("test", lowWatermarkConfig, clock, 
txManager, vaultManager, mock(FailureProcessor.class));
         lowWatermark.addUpdateListener(listener);
     }
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 7b041ea1a6..3f277f204d 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -118,6 +118,7 @@ import 
org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorage
 import 
org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryStorageEngineConfiguration;
 import org.apache.ignite.internal.table.TableTestUtils;
 import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.internal.table.TestLowWatermark;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
 import 
org.apache.ignite.internal.table.distributed.schema.AlwaysSyncedSchemaSyncService;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
@@ -129,7 +130,6 @@ import 
org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
 import org.apache.ignite.internal.util.CursorUtils;
 import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.TopologyService;
@@ -243,8 +243,11 @@ public class TableManagerTest extends IgniteAbstractTest {
 
     private ExecutorService partitionOperationsExecutor;
 
+    private TestLowWatermark lowWatermark;
+
     @BeforeEach
     void before() throws NodeStoppingException {
+        lowWatermark = new TestLowWatermark();
         catalogMetastore = StandaloneMetaStorageManager.create(new 
SimpleInMemoryKeyValueStorage(NODE_NAME));
         catalogManager = CatalogTestUtils.createTestCatalogManager(NODE_NAME, 
clock, catalogMetastore);
 
@@ -356,7 +359,7 @@ public class TableManagerTest extends IgniteAbstractTest {
         verify(txStateTableStorage, atMost(0)).destroy();
         verify(replicaMgr, atMost(0)).stopReplica(any());
 
-        assertTrue(CatalogTestUtils.waitCatalogCompaction(catalogManager, 
Long.MAX_VALUE));
+        assertThat(fireDestroyEvent(), willCompleteSuccessfully());
 
         verify(mvTableStorage, 
timeout(TimeUnit.SECONDS.toMillis(10))).destroy();
         verify(txStateTableStorage, 
timeout(TimeUnit.SECONDS.toMillis(10))).destroy();
@@ -744,8 +747,6 @@ public class TableManagerTest extends IgniteAbstractTest {
      */
     private TableManager createTableManager(CompletableFuture<TableManager> 
tblManagerFut, Consumer<MvTableStorage> tableStorageDecorator,
             Consumer<TxStateTableStorage> txStateTableStorageDecorator) {
-        VaultManager vaultManager = mock(VaultManager.class);
-
         TableManager tableManager = new TableManager(
                 NODE_NAME,
                 revisionUpdater,
@@ -777,7 +778,7 @@ public class TableManagerTest extends IgniteAbstractTest {
                 () -> mock(IgniteSql.class),
                 new RemotelyTriggeredResourceRegistry(),
                 mock(ScheduledExecutorService.class),
-                new LowWatermark(NODE_NAME, gcConfig.lowWatermark(), clock, 
tm, vaultManager, mock(FailureProcessor.class))
+                lowWatermark
         ) {
 
             @Override
@@ -866,4 +867,8 @@ public class TableManagerTest extends IgniteAbstractTest {
     private Collection<CatalogTableDescriptor> allTableDescriptors() {
         return catalogManager.tables(catalogManager.latestCatalogVersion());
     }
+
+    private CompletableFuture<Void> fireDestroyEvent() {
+        return lowWatermark.updateAndNotify(clock.now());
+    }
 }
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TestLowWatermark.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TestLowWatermark.java
new file mode 100644
index 0000000000..320c776ffe
--- /dev/null
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TestLowWatermark.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.table.distributed.LowWatermark;
+import 
org.apache.ignite.internal.table.distributed.LowWatermarkChangedListener;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Low watermark dummy implementation, which requires explicit {@link 
#updateAndNotify(HybridTimestamp)} method call to notify listeners.
+ * This implementation has no persistent state and notifies listeners 
instantly in same thread.
+ */
+public class TestLowWatermark implements LowWatermark {
+    private final List<LowWatermarkChangedListener> listeners = new 
CopyOnWriteArrayList<>();
+    private volatile HybridTimestamp ts;
+
+    @Override
+    public @Nullable HybridTimestamp getLowWatermark() {
+        return ts;
+    }
+
+    @Override
+    public void addUpdateListener(LowWatermarkChangedListener listener) {
+        this.listeners.add(listener);
+    }
+
+    @Override
+    public void removeUpdateListener(LowWatermarkChangedListener listener) {
+        this.listeners.remove(listener);
+    }
+
+    /**
+     * Update low watermark and notify listeners.
+     *
+     * @param newTs New timestamp.
+     * @return Listener notification future.
+     */
+    public CompletableFuture<Void> updateAndNotify(HybridTimestamp newTs) {
+        assert ts == null || ts.longValue() < newTs.longValue();
+
+        this.ts = newTs;
+
+        return CompletableFuture.allOf(listeners.stream().map(l -> 
l.onLwmChanged(newTs)).toArray(CompletableFuture[]::new));
+    }
+
+    /** Set low watermark without listeners notification. */
+    public void update(HybridTimestamp newTs) {
+        this.ts = newTs;
+    }
+}

Reply via email to