tkalkirill commented on code in PR #3358:
URL: https://github.com/apache/ignite-3/pull/3358#discussion_r1521189187


##########
modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java:
##########
@@ -2488,7 +2464,7 @@ private static Stream<Arguments> 
argumentsForCheckIndexCreationCatalogVersion()
     }
 
     @Test
-    public void testCatalogCompaction() {
+    public void testCatalogCompaction() throws InterruptedException {

Review Comment:
   ```suggestion
       public void testCatalogCompaction() throws Exception {
   ```



##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java:
##########
@@ -473,53 +473,12 @@ public CompletableFuture<Void> handle(UpdateLogEvent 
event, HybridTimestamp meta
 
         private CompletableFuture<Void> handle(SnapshotEntry event, long 
causalityToken) {
             Catalog catalog = event.snapshot();
-
-            // Use reverse order to find latest descriptors.
-            Collection<Catalog> droppedCatalogVersions = 
catalogByVer.headMap(catalog.version(), false).descendingMap().values();
-
-            List<Fireable> events = new ArrayList<>();
-            IntSet objectToSkip = new IntOpenHashSet();
-            Predicate<CatalogObjectDescriptor> filter = obj -> 
objectToSkip.add(obj.id());
-
-            // At first, add alive indexes to filter.
-            applyToAliveIndexesFrom(catalog.version(), filter::test);
-
-            // Create destroy events for dropped indexes.
-            droppedCatalogVersions.forEach(oldCatalog -> 
oldCatalog.indexes().stream()
-                    .filter(filter)
-                    .forEach(idx -> events.add(
-                            new DestroyIndexEvent(idx.id(), idx.tableId(), 
tableZoneDescriptor(oldCatalog, idx.tableId()).partitions()))
-                    ));
-
-            objectToSkip.clear();
-            // At last, create destroy events for dropped tables.
-            droppedCatalogVersions.forEach(oldCatalog -> 
oldCatalog.tables().stream()
-                    .filter(tbl -> catalog.table(tbl.id()) == null)
-                    .filter(filter)
-                    .forEach(tbl -> events.add(new DestroyTableEvent(tbl.id(), 
tableZoneDescriptor(oldCatalog, tbl.id()).partitions()))));
-
             // On recovery phase, we must register catalog from the snapshot.
             // In other cases, it is ok to rewrite an existed version, because 
it's exactly the same.
             registerCatalog(catalog);
+            truncateUpTo(catalog);

Review Comment:
   
`org.apache.ignite.internal.catalog.CatalogManagerImpl.OnUpdateHandlerImpl#handle(org.apache.ignite.internal.catalog.storage.SnapshotEntry,
 long)` don't use `causalityToken`



##########
modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java:
##########
@@ -81,6 +86,10 @@ public class ClientPrimaryReplicaTracker implements 
EventListener<EventParameter
 
     private final SchemaSyncService schemaSyncService;
 
+    private final LowWatermark lowWatermark;
+
+    private final DeferredEventsQueue<DestroyTableEvent> deferredEventsQueue = 
new DeferredEventsQueue<>(DestroyTableEvent::catalogVersion);

Review Comment:
   `DestroyTableEvent::catalogVersion` not` Event timestamp extractor.`
   It confuses me a lot.
   
   Another strange name, deferred queue of what? buy milk?



##########
modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java:
##########
@@ -280,8 +294,21 @@ public CompletableFuture<Boolean> notify(EventParameters 
parameters) {
     }
 
     private CompletableFuture<Boolean> notifyInternal(EventParameters 
parameters) {
-        if (parameters instanceof DestroyTableEventParameters) {
-            removeTable((DestroyTableEventParameters) parameters);
+        if (parameters instanceof DropTableEventParameters) {
+            DropTableEventParameters event = (DropTableEventParameters) 
parameters;
+
+            int tableId = event.tableId();
+            int catalogVersion = event.catalogVersion();
+            int previousVersion = catalogVersion - 1;
+
+            // Retrieve descriptor during synchronous call, before the 
previous catalog version could be concurrently compacted.
+            CatalogTableDescriptor tableDescriptor = 
catalogService.table(tableId, previousVersion);
+            assert tableDescriptor != null : "table";
+
+            CatalogZoneDescriptor zoneDescriptor = 
catalogService.zone(tableDescriptor.zoneId(), previousVersion);
+            assert zoneDescriptor != null : "zone";

Review Comment:
   ```suggestion
               assert zoneDescriptor != null : "zoneId=" + 
tableDescriptor.zoneId() + ", catalogVersion=" + previousVersion;
   ```



##########
modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java:
##########
@@ -280,8 +294,21 @@ public CompletableFuture<Boolean> notify(EventParameters 
parameters) {
     }
 
     private CompletableFuture<Boolean> notifyInternal(EventParameters 
parameters) {
-        if (parameters instanceof DestroyTableEventParameters) {
-            removeTable((DestroyTableEventParameters) parameters);
+        if (parameters instanceof DropTableEventParameters) {
+            DropTableEventParameters event = (DropTableEventParameters) 
parameters;
+
+            int tableId = event.tableId();
+            int catalogVersion = event.catalogVersion();
+            int previousVersion = catalogVersion - 1;
+
+            // Retrieve descriptor during synchronous call, before the 
previous catalog version could be concurrently compacted.
+            CatalogTableDescriptor tableDescriptor = 
catalogService.table(tableId, previousVersion);
+            assert tableDescriptor != null : "table";

Review Comment:
   ```suggestion
               assert tableDescriptor != null : "tableId=" + tableId + ", 
catalogVersion=" + previousVersion;
   ```



##########
modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java:
##########
@@ -113,6 +113,7 @@ private static List<Integer> 
collectIndexIdsFromTable(TableImpl table, int parti
         return future.join();
     }
 
+    // TODO: validate this.

Review Comment:
   Well, well, this check is useful!
   Why is TODO here?



##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java:
##########
@@ -84,6 +92,12 @@ public class IndexManager implements IgniteComponent {
     /** Versioned value to prevent races when registering/unregistering 
indexes when processing metastore or catalog events. */
     private final IncrementalVersionedValue<Void> handleMetastoreEventVv;
 
+    /** Low watermark. */
+    private final LowWatermark lowWatermark;
+
+    /** Deferred destruction queue. */
+    private final DeferredEventsQueue<DestroyIndexEvent> deferredQueue = new 
DeferredEventsQueue<>(DestroyIndexEvent::catalogVersion);

Review Comment:
   Again `DestroyIndexEvent::catalogVersion` not timestamp sooo confused me.
   Another strange name, deferred queue of what? buy milk?



##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java:
##########
@@ -198,6 +196,58 @@ private CompletableFuture<Boolean> 
onIndexCreate(CreateIndexEventParameters para
         });
     }
 
+    private CompletableFuture<Boolean> 
onIndexRemoved(RemoveIndexEventParameters parameters) {
+        return inBusyLockAsync(busyLock, () -> {
+            int indexId = parameters.indexId();
+            int version = parameters.catalogVersion();
+            int prevVersion = version - 1;

Review Comment:
   Please rename to `previousCatalogVersion`



##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexChooser.java:
##########
@@ -1,272 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.index;
-
-import static java.util.Collections.binarySearch;
-import static java.util.Collections.unmodifiableList;
-import static java.util.Comparator.comparingInt;
-import static 
org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.AVAILABLE;
-import static 
org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.STOPPING;
-import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
-
-import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.NavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.ignite.internal.catalog.CatalogService;
-import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
-import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
-import org.apache.ignite.internal.catalog.events.CatalogEvent;
-import org.apache.ignite.internal.catalog.events.DropTableEventParameters;
-import org.apache.ignite.internal.catalog.events.RemoveIndexEventParameters;
-import org.apache.ignite.internal.close.ManuallyCloseable;
-import org.apache.ignite.internal.event.EventListener;
-import org.apache.ignite.internal.util.IgniteSpinBusyLock;
-
-/** Index chooser for various operations, for example for RW transactions. */
-class IndexChooser implements ManuallyCloseable {

Review Comment:
   Like a sickle to the balls =(



##########
modules/table/src/main/java/org/apache/ignite/internal/table/DeferredEventsQueue.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.function.ToLongFunction;
+
+/**
+ * A queue for deferred events, which provide method to drain events up to 
given watermark. An implementation is a thread-safe wrapper over
+ * {@link java.util.PriorityQueue}.
+ *
+ * @param <T> Event type.
+ */
+public class DeferredEventsQueue<T> {

Review Comment:
   It is certainly not obvious from the name that this is for a low watermark.



##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java:
##########
@@ -248,4 +298,44 @@ private static <T> BiFunction<T, Throwable, 
CompletableFuture<T>> updater(Functi
             return updateFunction.apply(t);
         };
     }
+
+    /** Recover deferred destroy events. */
+    private void recoverDeferredQueue() {
+        int earliestVersion = 
catalogService.earliestCatalogVersion(HybridTimestamp.hybridTimestampToLong(lowWatermark.getLowWatermark()));
+        int latestVersion = catalogService.latestCatalogVersion();
+
+        synchronized ((deferredQueue)) {
+            for (int version = latestVersion - 1; version >= earliestVersion; 
version--) {

Review Comment:
   Please use catalogVersion instead version for variables.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkImpl.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.failure.FailureContext;
+import org.apache.ignite.internal.failure.FailureProcessor;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import 
org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Class to manage the low watermark.
+ *
+ * <p>Low watermark is the node's local time, which ensures that read-only 
transactions have completed by this time, and new read-only
+ * transactions will only be created after this time, and we can safely delete 
obsolete/garbage data such as: obsolete versions of table
+ * rows, remote indexes, remote tables, etc.
+ *
+ * @see <a 
href="https://cwiki.apache.org/confluence/display/IGNITE/IEP-91%3A+Transaction+protocol";>IEP-91</a>
+ */
+public class LowWatermarkImpl implements IgniteComponent, LowWatermark {
+    private static final IgniteLogger LOG = 
Loggers.forClass(LowWatermarkImpl.class);
+
+    static final ByteArray LOW_WATERMARK_VAULT_KEY = new 
ByteArray("low-watermark");
+
+    private final LowWatermarkConfiguration lowWatermarkConfig;
+
+    private final HybridClock clock;
+
+    private final TxManager txManager;
+
+    private final VaultManager vaultManager;
+
+    private final List<LowWatermarkChangedListener> updateListeners = new 
CopyOnWriteArrayList<>();
+
+    private final ScheduledExecutorService scheduledThreadPool;
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+    private volatile @Nullable HybridTimestamp lowWatermark;
+
+    private final AtomicReference<ScheduledFuture<?>> lastScheduledTaskFuture 
= new AtomicReference<>();
+
+    private final FailureProcessor failureProcessor;
+
+    /**
+     * Constructor.
+     *
+     * @param nodeName Node name.
+     * @param lowWatermarkConfig Low watermark configuration.
+     * @param clock A hybrid logical clock.
+     * @param txManager Transaction manager.
+     * @param vaultManager Vault manager.
+     * @param failureProcessor Failure processor tha is used to handle 
critical errors.
+     */
+    public LowWatermarkImpl(
+            String nodeName,
+            LowWatermarkConfiguration lowWatermarkConfig,
+            HybridClock clock,
+            TxManager txManager,
+            VaultManager vaultManager,
+            FailureProcessor failureProcessor
+    ) {
+        this.lowWatermarkConfig = lowWatermarkConfig;
+        this.clock = clock;
+        this.txManager = txManager;
+        this.vaultManager = vaultManager;
+        this.failureProcessor = failureProcessor;
+
+        scheduledThreadPool = Executors.newSingleThreadScheduledExecutor(
+                NamedThreadFactory.create(nodeName, "low-watermark-updater", 
LOG)
+        );
+    }
+
+    /**
+     * Starts the watermark manager.
+     */

Review Comment:
   ```suggestion
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkImpl.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.failure.FailureContext;
+import org.apache.ignite.internal.failure.FailureProcessor;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import 
org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Class to manage the low watermark.
+ *
+ * <p>Low watermark is the node's local time, which ensures that read-only 
transactions have completed by this time, and new read-only
+ * transactions will only be created after this time, and we can safely delete 
obsolete/garbage data such as: obsolete versions of table
+ * rows, remote indexes, remote tables, etc.
+ *
+ * @see <a 
href="https://cwiki.apache.org/confluence/display/IGNITE/IEP-91%3A+Transaction+protocol";>IEP-91</a>
+ */
+public class LowWatermarkImpl implements IgniteComponent, LowWatermark {
+    private static final IgniteLogger LOG = 
Loggers.forClass(LowWatermarkImpl.class);
+
+    static final ByteArray LOW_WATERMARK_VAULT_KEY = new 
ByteArray("low-watermark");
+
+    private final LowWatermarkConfiguration lowWatermarkConfig;
+
+    private final HybridClock clock;
+
+    private final TxManager txManager;
+
+    private final VaultManager vaultManager;
+
+    private final List<LowWatermarkChangedListener> updateListeners = new 
CopyOnWriteArrayList<>();
+
+    private final ScheduledExecutorService scheduledThreadPool;
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+    private volatile @Nullable HybridTimestamp lowWatermark;
+
+    private final AtomicReference<ScheduledFuture<?>> lastScheduledTaskFuture 
= new AtomicReference<>();
+
+    private final FailureProcessor failureProcessor;
+
+    /**
+     * Constructor.
+     *
+     * @param nodeName Node name.
+     * @param lowWatermarkConfig Low watermark configuration.
+     * @param clock A hybrid logical clock.
+     * @param txManager Transaction manager.
+     * @param vaultManager Vault manager.
+     * @param failureProcessor Failure processor tha is used to handle 
critical errors.
+     */
+    public LowWatermarkImpl(
+            String nodeName,
+            LowWatermarkConfiguration lowWatermarkConfig,
+            HybridClock clock,
+            TxManager txManager,
+            VaultManager vaultManager,
+            FailureProcessor failureProcessor
+    ) {
+        this.lowWatermarkConfig = lowWatermarkConfig;
+        this.clock = clock;
+        this.txManager = txManager;
+        this.vaultManager = vaultManager;
+        this.failureProcessor = failureProcessor;
+
+        scheduledThreadPool = Executors.newSingleThreadScheduledExecutor(
+                NamedThreadFactory.create(nodeName, "low-watermark-updater", 
LOG)
+        );
+    }
+
+    /**
+     * Starts the watermark manager.
+     */
+    @Override
+    public CompletableFuture<Void> start() {
+        inBusyLock(busyLock, () -> {
+            lowWatermark = readLowWatermarkFromVault();
+        });
+
+        return nullCompletedFuture();
+    }
+
+    /**
+     * Schedule watermark updates.
+     */
+    public void scheduleUpdates() {
+        inBusyLock(busyLock, () -> {
+            HybridTimestamp lowWatermarkCandidate = lowWatermark;
+
+            if (lowWatermarkCandidate == null) {
+                LOG.info("Previous value of the low watermark was not found, 
will schedule to update it");
+
+                scheduleUpdateLowWatermarkBusy();
+
+                return;
+            }
+
+            LOG.info("Low watermark has been scheduled to be updated: {}", 
lowWatermarkCandidate);
+
+            txManager.updateLowWatermark(lowWatermarkCandidate)
+                    .thenComposeAsync(unused -> inBusyLock(busyLock, () -> 
notifyListeners(lowWatermarkCandidate)), scheduledThreadPool)
+                    .whenComplete((unused, throwable) -> {
+                        if (throwable == null) {
+                            inBusyLock(busyLock, 
this::scheduleUpdateLowWatermarkBusy);
+                        } else if (!(throwable instanceof 
NodeStoppingException)) {
+                            LOG.error("Error during the Watermark manager 
start", throwable);
+
+                            failureProcessor.process(new 
FailureContext(CRITICAL_ERROR, throwable));
+
+                            inBusyLock(busyLock, 
this::scheduleUpdateLowWatermarkBusy);
+                        }
+                    });
+        });
+    }
+
+    private @Nullable HybridTimestamp readLowWatermarkFromVault() {
+        VaultEntry vaultEntry = vaultManager.get(LOW_WATERMARK_VAULT_KEY);
+
+        return vaultEntry == null ? null : 
ByteUtils.fromBytes(vaultEntry.value());
+    }
+
+    @Override
+    public void stop() {
+        if (!closeGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        ScheduledFuture<?> lastScheduledTaskFuture = 
this.lastScheduledTaskFuture.get();
+
+        if (lastScheduledTaskFuture != null) {
+            lastScheduledTaskFuture.cancel(true);
+        }
+
+        IgniteUtils.shutdownAndAwaitTermination(scheduledThreadPool, 10, 
TimeUnit.SECONDS);
+    }
+
+    @Override
+    public @Nullable HybridTimestamp getLowWatermark() {
+        return lowWatermark;
+    }
+
+    void updateLowWatermark() {
+        inBusyLock(busyLock, () -> {
+            HybridTimestamp lowWatermarkCandidate = 
createNewLowWatermarkCandidate();
+
+            // Wait until all the read-only transactions are finished before 
the new candidate, since no new RO transactions could be
+            // created, then we can safely promote the candidate as a new low 
watermark, store it in vault, and we can safely start cleaning
+            // up the stale/junk data in the tables.
+            txManager.updateLowWatermark(lowWatermarkCandidate)
+                    .thenComposeAsync(unused -> inBusyLock(busyLock, () -> {
+                        vaultManager.put(LOW_WATERMARK_VAULT_KEY, 
ByteUtils.toBytes(lowWatermarkCandidate));
+
+                        lowWatermark = lowWatermarkCandidate;
+
+                        return notifyListeners(lowWatermarkCandidate);
+                    }), scheduledThreadPool)
+                    .whenComplete((unused, throwable) -> {
+                        if (throwable != null) {
+                            if (!(throwable instanceof NodeStoppingException)) 
{
+                                LOG.error("Failed to update low watermark, 
will schedule again: {}", throwable, lowWatermarkCandidate);
+
+                                inBusyLock(busyLock, 
this::scheduleUpdateLowWatermarkBusy);
+                            }
+                        } else {
+                            LOG.info("Successful low watermark update: {}", 
lowWatermarkCandidate);
+
+                            scheduleUpdateLowWatermarkBusy();
+                        }
+                    });
+        });
+    }
+
+    @Override
+    public void addUpdateListener(LowWatermarkChangedListener listener) {
+        updateListeners.add(listener);
+    }
+
+    @Override
+    public void removeUpdateListener(LowWatermarkChangedListener listener) {
+        updateListeners.remove(listener);
+    }
+
+    private CompletableFuture<Void> notifyListeners(HybridTimestamp 
lowWatermark) {
+        if (updateListeners.isEmpty()) {
+            return nullCompletedFuture();
+        }
+
+        ArrayList<CompletableFuture<?>> res = new ArrayList<>();

Review Comment:
   ```suggestion
           var res = new ArrayList<CompletableFuture<?>>();
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkImpl.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.failure.FailureContext;
+import org.apache.ignite.internal.failure.FailureProcessor;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import 
org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Class to manage the low watermark.
+ *
+ * <p>Low watermark is the node's local time, which ensures that read-only 
transactions have completed by this time, and new read-only
+ * transactions will only be created after this time, and we can safely delete 
obsolete/garbage data such as: obsolete versions of table
+ * rows, remote indexes, remote tables, etc.
+ *
+ * @see <a 
href="https://cwiki.apache.org/confluence/display/IGNITE/IEP-91%3A+Transaction+protocol";>IEP-91</a>
+ */
+public class LowWatermarkImpl implements IgniteComponent, LowWatermark {
+    private static final IgniteLogger LOG = 
Loggers.forClass(LowWatermarkImpl.class);
+
+    static final ByteArray LOW_WATERMARK_VAULT_KEY = new 
ByteArray("low-watermark");
+
+    private final LowWatermarkConfiguration lowWatermarkConfig;
+
+    private final HybridClock clock;
+
+    private final TxManager txManager;
+
+    private final VaultManager vaultManager;
+
+    private final List<LowWatermarkChangedListener> updateListeners = new 
CopyOnWriteArrayList<>();
+
+    private final ScheduledExecutorService scheduledThreadPool;
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+    private volatile @Nullable HybridTimestamp lowWatermark;
+
+    private final AtomicReference<ScheduledFuture<?>> lastScheduledTaskFuture 
= new AtomicReference<>();
+
+    private final FailureProcessor failureProcessor;
+
+    /**
+     * Constructor.
+     *
+     * @param nodeName Node name.
+     * @param lowWatermarkConfig Low watermark configuration.
+     * @param clock A hybrid logical clock.
+     * @param txManager Transaction manager.
+     * @param vaultManager Vault manager.
+     * @param failureProcessor Failure processor tha is used to handle 
critical errors.
+     */
+    public LowWatermarkImpl(
+            String nodeName,
+            LowWatermarkConfiguration lowWatermarkConfig,
+            HybridClock clock,
+            TxManager txManager,
+            VaultManager vaultManager,
+            FailureProcessor failureProcessor
+    ) {
+        this.lowWatermarkConfig = lowWatermarkConfig;
+        this.clock = clock;
+        this.txManager = txManager;
+        this.vaultManager = vaultManager;
+        this.failureProcessor = failureProcessor;
+
+        scheduledThreadPool = Executors.newSingleThreadScheduledExecutor(
+                NamedThreadFactory.create(nodeName, "low-watermark-updater", 
LOG)
+        );
+    }
+
+    /**
+     * Starts the watermark manager.
+     */
+    @Override
+    public CompletableFuture<Void> start() {
+        inBusyLock(busyLock, () -> {
+            lowWatermark = readLowWatermarkFromVault();
+        });
+
+        return nullCompletedFuture();
+    }
+
+    /**
+     * Schedule watermark updates.
+     */
+    public void scheduleUpdates() {
+        inBusyLock(busyLock, () -> {
+            HybridTimestamp lowWatermarkCandidate = lowWatermark;
+
+            if (lowWatermarkCandidate == null) {
+                LOG.info("Previous value of the low watermark was not found, 
will schedule to update it");
+
+                scheduleUpdateLowWatermarkBusy();
+
+                return;
+            }
+
+            LOG.info("Low watermark has been scheduled to be updated: {}", 
lowWatermarkCandidate);
+
+            txManager.updateLowWatermark(lowWatermarkCandidate)
+                    .thenComposeAsync(unused -> inBusyLock(busyLock, () -> 
notifyListeners(lowWatermarkCandidate)), scheduledThreadPool)
+                    .whenComplete((unused, throwable) -> {
+                        if (throwable == null) {
+                            inBusyLock(busyLock, 
this::scheduleUpdateLowWatermarkBusy);

Review Comment:
   Why don't we save the new low watermark in vault?



##########
modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TestLowWatermark.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.table.distributed.LowWatermark;
+import 
org.apache.ignite.internal.table.distributed.LowWatermarkChangedListener;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Low watermark dummy implementation, which requires explicit {@link 
#updateAndNotify(HybridTimestamp)} method call to notify listeners.
+ * This implementation has no persistent state and notifies listeners 
instantly in same thread.
+ */
+public class TestLowWatermark implements LowWatermark {
+    private final List<LowWatermarkChangedListener> listeners = new 
CopyOnWriteArrayList<>();
+    private volatile HybridTimestamp ts;
+
+    @Override
+    public @Nullable HybridTimestamp getLowWatermark() {
+        return ts;
+    }
+
+    @Override
+    public void addUpdateListener(LowWatermarkChangedListener listener) {
+        this.listeners.add(listener);
+    }
+
+    @Override
+    public void removeUpdateListener(LowWatermarkChangedListener listener) {
+        this.listeners.remove(listener);
+    }
+
+    /**
+     * Update low watermark and notify listeners.
+     *
+     * @param newTs New timestamp.
+     * @return Listener notification future.
+     */
+    public CompletableFuture<Void> updateAndNotify(HybridTimestamp newTs) {
+        assert ts == null || ts.longValue() < newTs.longValue();

Review Comment:
   I am expect that newTs not null



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkImpl.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.failure.FailureContext;
+import org.apache.ignite.internal.failure.FailureProcessor;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import 
org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Class to manage the low watermark.
+ *
+ * <p>Low watermark is the node's local time, which ensures that read-only 
transactions have completed by this time, and new read-only
+ * transactions will only be created after this time, and we can safely delete 
obsolete/garbage data such as: obsolete versions of table
+ * rows, remote indexes, remote tables, etc.
+ *
+ * @see <a 
href="https://cwiki.apache.org/confluence/display/IGNITE/IEP-91%3A+Transaction+protocol";>IEP-91</a>
+ */
+public class LowWatermarkImpl implements IgniteComponent, LowWatermark {
+    private static final IgniteLogger LOG = 
Loggers.forClass(LowWatermarkImpl.class);
+
+    static final ByteArray LOW_WATERMARK_VAULT_KEY = new 
ByteArray("low-watermark");
+
+    private final LowWatermarkConfiguration lowWatermarkConfig;
+
+    private final HybridClock clock;
+
+    private final TxManager txManager;
+
+    private final VaultManager vaultManager;
+
+    private final List<LowWatermarkChangedListener> updateListeners = new 
CopyOnWriteArrayList<>();
+
+    private final ScheduledExecutorService scheduledThreadPool;
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+    private volatile @Nullable HybridTimestamp lowWatermark;
+
+    private final AtomicReference<ScheduledFuture<?>> lastScheduledTaskFuture 
= new AtomicReference<>();
+
+    private final FailureProcessor failureProcessor;
+
+    /**
+     * Constructor.
+     *
+     * @param nodeName Node name.
+     * @param lowWatermarkConfig Low watermark configuration.
+     * @param clock A hybrid logical clock.
+     * @param txManager Transaction manager.
+     * @param vaultManager Vault manager.
+     * @param failureProcessor Failure processor tha is used to handle 
critical errors.
+     */
+    public LowWatermarkImpl(
+            String nodeName,
+            LowWatermarkConfiguration lowWatermarkConfig,
+            HybridClock clock,
+            TxManager txManager,
+            VaultManager vaultManager,
+            FailureProcessor failureProcessor
+    ) {
+        this.lowWatermarkConfig = lowWatermarkConfig;
+        this.clock = clock;
+        this.txManager = txManager;
+        this.vaultManager = vaultManager;
+        this.failureProcessor = failureProcessor;
+
+        scheduledThreadPool = Executors.newSingleThreadScheduledExecutor(
+                NamedThreadFactory.create(nodeName, "low-watermark-updater", 
LOG)
+        );
+    }
+
+    /**
+     * Starts the watermark manager.
+     */
+    @Override
+    public CompletableFuture<Void> start() {
+        inBusyLock(busyLock, () -> {
+            lowWatermark = readLowWatermarkFromVault();
+        });
+
+        return nullCompletedFuture();
+    }
+
+    /**
+     * Schedule watermark updates.
+     */
+    public void scheduleUpdates() {
+        inBusyLock(busyLock, () -> {
+            HybridTimestamp lowWatermarkCandidate = lowWatermark;
+
+            if (lowWatermarkCandidate == null) {
+                LOG.info("Previous value of the low watermark was not found, 
will schedule to update it");
+
+                scheduleUpdateLowWatermarkBusy();
+
+                return;
+            }
+
+            LOG.info("Low watermark has been scheduled to be updated: {}", 
lowWatermarkCandidate);
+
+            txManager.updateLowWatermark(lowWatermarkCandidate)
+                    .thenComposeAsync(unused -> inBusyLock(busyLock, () -> 
notifyListeners(lowWatermarkCandidate)), scheduledThreadPool)
+                    .whenComplete((unused, throwable) -> {
+                        if (throwable == null) {
+                            inBusyLock(busyLock, 
this::scheduleUpdateLowWatermarkBusy);
+                        } else if (!(throwable instanceof 
NodeStoppingException)) {
+                            LOG.error("Error during the Watermark manager 
start", throwable);
+
+                            failureProcessor.process(new 
FailureContext(CRITICAL_ERROR, throwable));
+
+                            inBusyLock(busyLock, 
this::scheduleUpdateLowWatermarkBusy);
+                        }
+                    });
+        });
+    }
+
+    private @Nullable HybridTimestamp readLowWatermarkFromVault() {
+        VaultEntry vaultEntry = vaultManager.get(LOW_WATERMARK_VAULT_KEY);
+
+        return vaultEntry == null ? null : 
ByteUtils.fromBytes(vaultEntry.value());
+    }
+
+    @Override
+    public void stop() {
+        if (!closeGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        ScheduledFuture<?> lastScheduledTaskFuture = 
this.lastScheduledTaskFuture.get();
+
+        if (lastScheduledTaskFuture != null) {
+            lastScheduledTaskFuture.cancel(true);
+        }
+
+        IgniteUtils.shutdownAndAwaitTermination(scheduledThreadPool, 10, 
TimeUnit.SECONDS);
+    }
+
+    @Override
+    public @Nullable HybridTimestamp getLowWatermark() {
+        return lowWatermark;
+    }
+
+    void updateLowWatermark() {
+        inBusyLock(busyLock, () -> {
+            HybridTimestamp lowWatermarkCandidate = 
createNewLowWatermarkCandidate();
+
+            // Wait until all the read-only transactions are finished before 
the new candidate, since no new RO transactions could be
+            // created, then we can safely promote the candidate as a new low 
watermark, store it in vault, and we can safely start cleaning
+            // up the stale/junk data in the tables.
+            txManager.updateLowWatermark(lowWatermarkCandidate)
+                    .thenComposeAsync(unused -> inBusyLock(busyLock, () -> {
+                        vaultManager.put(LOW_WATERMARK_VAULT_KEY, 
ByteUtils.toBytes(lowWatermarkCandidate));

Review Comment:
   I think it's better to save the new low watermark only after listeners 
complete their futures.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -725,11 +726,29 @@ private CompletableFuture<List<Assignments>> 
writeTableAssignmentsToMetastore(
         });
     }
 
-    private CompletableFuture<Void> onTableDestroy(DestroyTableEventParameters 
parameters) {
+    private CompletableFuture<Boolean> onTableDrop(DropTableEventParameters 
parameters) {
         return inBusyLockAsync(busyLock, () -> {
-            dropTableLocally(parameters.causalityToken(), parameters);
+            deferredQueue.enqueue(new 
DestroyTableEvent(parameters.catalogVersion(), parameters.tableId()));

Review Comment:
   Again u can use `org.apache.ignite.internal.event.EventListener#fromConsumer`



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1391,63 +1410,43 @@ protected TxStateTableStorage 
createTxStateTableStorage(CatalogTableDescriptor t
     /**
      * Drops local structures for a table.
      *
-     * @param causalityToken Causality token.
-     * @param parameters Destroy table event parameters.
+     * @param tableId Table id to destroy.
      */
-    private void dropTableLocally(long causalityToken, 
DestroyTableEventParameters parameters) {
-        int tableId = parameters.tableId();
-        // TODO Drop partitions from parameters and use from storage.
-        int partitions = parameters.partitions();
-
-        localPartitionsVv.update(causalityToken, (previousVal, e) -> 
inBusyLock(busyLock, () -> {
-            if (e != null) {
-                return failedFuture(e);
-            }
+    private CompletableFuture<Void> destroyTableLocally(int tableId) {
+        TableImpl table = startedTables.remove(tableId);
+        localPartsByTableId.remove(tableId);
 
-            localPartsByTableId.remove(tableId);
+        assert table != null;

Review Comment:
   ```suggestion
           assert table != null : tableId;
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1314,14 +1334,13 @@ private CompletableFuture<Void> createTableLocally(
                 return failedFuture(e);
             }
 
-            startedTables.put(tableId, table);
-
             return allOf(localPartsUpdateFuture, 
tablesByIdFuture).thenComposeAsync(ignore -> inBusyLock(busyLock, () -> {
                         if (onNodeRecovery) {
                             SchemaRegistry schemaRegistry = table.schemaView();
                             PartitionSet partitionSet = 
localPartsByTableId.get(tableId);
+                            HybridTimestamp lwm = 
lowWatermark.getLowWatermark();

Review Comment:
   Please add a comment that the low watermark will start updating only after 
the node is restored and there will be no races.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1836,11 +1835,12 @@ private CompletableFuture<Void> 
handleChangePendingAssignmentEvent(
                     )
                     .thenComposeAsync(unused -> inBusyLock(busyLock, () -> {
                         if (!isRecovery) {
+                            HybridTimestamp lwm = 
lowWatermark.getLowWatermark();
                             // We create index storages (and also register the 
necessary structures) for the rebalancing one partition
                             // before start the raft node, so that the updates 
that come when applying the replication log can safely
                             // update the indexes. On recovery node, we do not 
need to call this code, since during restoration we start
                             // all partitions and already register indexes 
there.
-                            registerIndexesToTable(tbl, catalogService, 
singlePartitionIdSet, tbl.schemaView());
+                            registerIndexesToTable(tbl, catalogService, 
singlePartitionIdSet, tbl.schemaView(), lwm);

Review Comment:
   Here it looks like there may be a race, for example, a low watermark will be 
aggressively updated quite quickly and it may turn out that the task of 
destroying the index will end before we register the index and it will be 
created empty.



##########
modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java:
##########
@@ -304,9 +331,22 @@ private CompletableFuture<Boolean> 
notifyInternal(EventParameters parameters) {
         return falseCompletedFuture(); // false: don't remove listener.
     }
 
-    private void removeTable(DestroyTableEventParameters event) {
-        for (int partition = 0; partition < event.partitions(); partition++) {
-            TablePartitionId tablePartitionId = new 
TablePartitionId(event.tableId(), partition);
+    @Override
+    public CompletableFuture<Void> onLwmChanged(HybridTimestamp ts) {
+        return inBusyLockAsync(busyLock, () -> {
+            int earliestVersion = 
catalogService.earliestCatalogVersion(HybridTimestamp.hybridTimestampToLong(ts));
+
+            List<DestroyTableEvent> events = 
deferredEventsQueue.drainUpTo(earliestVersion);
+
+            events.forEach(event -> removeTable(event.tableId(), 
event.partitions()));
+
+            return nullCompletedFuture();
+        });
+    }
+
+    private void removeTable(int tableId, int partitions) {
+        for (int partition = 0; partition < partitions; partition++) {
+            TablePartitionId tablePartitionId = new TablePartitionId(tableId, 
partition);
             primaryReplicas.remove(tablePartitionId);

Review Comment:
   Do I understand correctly that there can be races between updating the 
watermark and updating the lease holder?



##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java:
##########
@@ -100,6 +100,10 @@ public interface CatalogService extends 
EventProducer<CatalogEvent, CatalogEvent
     /** Returns the earliest registered version of the catalog. */
     int earliestCatalogVersion();
 
+    /** Returns the earliest registered version of the catalog, which is 
observable since given timestamp. */
+    // TODO IGNITE-21608 Use method without timestamp instead?
+    int earliestCatalogVersion(long timestamp);

Review Comment:
   I think this method is not needed yet and adds additional complexity when 
using the catalog, since the catalog will not be compacted yet and we will 
figure out the details of its implementation later, we can safely use 
`CatalogService#activeCatalogVersion`.



##########
modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java:
##########
@@ -280,8 +294,21 @@ public CompletableFuture<Boolean> notify(EventParameters 
parameters) {
     }
 
     private CompletableFuture<Boolean> notifyInternal(EventParameters 
parameters) {
-        if (parameters instanceof DestroyTableEventParameters) {
-            removeTable((DestroyTableEventParameters) parameters);
+        if (parameters instanceof DropTableEventParameters) {

Review Comment:
   U can use `org.apache.ignite.internal.event.EventListener#fromConsumer`



##########
modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java:
##########
@@ -68,7 +73,7 @@
  *       load it from the placement driver. Wait for a limited amount of time 
(in getPrimaryReplica) and return what we have.
  *       Don't block the client for too long, it is better to miss the primary 
than to delay the request.
  */
-public class ClientPrimaryReplicaTracker implements 
EventListener<EventParameters> {
+public class ClientPrimaryReplicaTracker implements 
EventListener<EventParameters>, LowWatermarkChangedListener {

Review Comment:
   For me, we don’t need to implement both interfaces since in 
`ClientPrimaryReplicaTracker#start` we add ourselves and not from the outside; 
personally, this code confused me.



##########
modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java:
##########
@@ -359,4 +399,29 @@ public long timestamp() {
             return timestamp;
         }
     }
+
+    /** Internal event. */
+    private static class DestroyTableEvent {

Review Comment:
   Why not create a package private class separately?



##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java:
##########
@@ -198,6 +196,58 @@ private CompletableFuture<Boolean> 
onIndexCreate(CreateIndexEventParameters para
         });
     }
 
+    private CompletableFuture<Boolean> 
onIndexRemoved(RemoveIndexEventParameters parameters) {
+        return inBusyLockAsync(busyLock, () -> {
+            int indexId = parameters.indexId();
+            int version = parameters.catalogVersion();

Review Comment:
   Please rename to `catalogVersion`



##########
modules/table/src/main/java/org/apache/ignite/internal/table/DeferredEventsQueue.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.function.ToLongFunction;
+
+/**
+ * A queue for deferred events, which provide method to drain events up to 
given watermark. An implementation is a thread-safe wrapper over
+ * {@link java.util.PriorityQueue}.
+ *
+ * @param <T> Event type.
+ */
+public class DeferredEventsQueue<T> {
+    private final PriorityQueue<T> queue;
+    private final ToLongFunction<T> mapper;
+
+    /**
+     * Creates a queue.
+     *
+     * @param mapper Event timestamp extractor.
+     */
+    public DeferredEventsQueue(ToLongFunction<T> mapper) {

Review Comment:
   Here you don’t always get a timestamp, but also the catalog version is very 
confusing, I would suggest passing a comparator for the queue.



##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java:
##########
@@ -198,6 +196,58 @@ private CompletableFuture<Boolean> 
onIndexCreate(CreateIndexEventParameters para
         });
     }
 
+    private CompletableFuture<Boolean> 
onIndexRemoved(RemoveIndexEventParameters parameters) {
+        return inBusyLockAsync(busyLock, () -> {
+            int indexId = parameters.indexId();
+            int version = parameters.catalogVersion();
+            int prevVersion = version - 1;
+
+            // Retrieve descriptor during synchronous call, before the 
previous catalog version could be concurrently compacted.
+            CatalogIndexDescriptor indexDescriptor = 
catalogService.index(indexId, prevVersion);
+            assert indexDescriptor != null : "index";
+
+            int tableId = indexDescriptor.tableId();
+
+            if (catalogService.table(tableId, version) == null) {
+                // Nothing to do. Index will be destroyed along with the table.
+                return falseCompletedFuture();
+            }
+
+            deferredQueue.enqueue(new DestroyIndexEvent(version, indexId, 
tableId));
+
+            return falseCompletedFuture();
+        });
+    }
+
+    private CompletableFuture<Void> onLwmChanged(HybridTimestamp ts) {
+        return inBusyLockAsync(busyLock, () -> {
+            int earliestVersion = 
catalogService.earliestCatalogVersion(HybridTimestamp.hybridTimestampToLong(ts));
+
+            List<DestroyIndexEvent> events = 
deferredQueue.drainUpTo(earliestVersion);
+
+            if (events.isEmpty()) {
+                return nullCompletedFuture();
+            }
+
+            List<CompletableFuture<Void>> futures = events.stream()
+                    .map(event -> destroyIndexAsync(event.indexId(), 
event.tableId()))
+                    .collect(Collectors.toList());

Review Comment:
   Maybe use static import ?



##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java:
##########
@@ -198,6 +196,58 @@ private CompletableFuture<Boolean> 
onIndexCreate(CreateIndexEventParameters para
         });
     }
 
+    private CompletableFuture<Boolean> 
onIndexRemoved(RemoveIndexEventParameters parameters) {
+        return inBusyLockAsync(busyLock, () -> {

Review Comment:
   U can use `org.apache.ignite.internal.event.EventListener#fromConsumer`



##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java:
##########
@@ -248,4 +298,44 @@ private static <T> BiFunction<T, Throwable, 
CompletableFuture<T>> updater(Functi
             return updateFunction.apply(t);
         };
     }
+
+    /** Recover deferred destroy events. */
+    private void recoverDeferredQueue() {
+        int earliestVersion = 
catalogService.earliestCatalogVersion(HybridTimestamp.hybridTimestampToLong(lowWatermark.getLowWatermark()));
+        int latestVersion = catalogService.latestCatalogVersion();
+
+        synchronized ((deferredQueue)) {

Review Comment:
   For what `synchronized` ?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/DeferredEventsQueue.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.function.ToLongFunction;
+
+/**
+ * A queue for deferred events, which provide method to drain events up to 
given watermark. An implementation is a thread-safe wrapper over
+ * {@link java.util.PriorityQueue}.
+ *
+ * @param <T> Event type.
+ */
+public class DeferredEventsQueue<T> {
+    private final PriorityQueue<T> queue;

Review Comment:
   Add a comment that is guarded by yourself.



##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java:
##########
@@ -198,6 +196,58 @@ private CompletableFuture<Boolean> 
onIndexCreate(CreateIndexEventParameters para
         });
     }
 
+    private CompletableFuture<Boolean> 
onIndexRemoved(RemoveIndexEventParameters parameters) {
+        return inBusyLockAsync(busyLock, () -> {
+            int indexId = parameters.indexId();
+            int version = parameters.catalogVersion();
+            int prevVersion = version - 1;
+
+            // Retrieve descriptor during synchronous call, before the 
previous catalog version could be concurrently compacted.
+            CatalogIndexDescriptor indexDescriptor = 
catalogService.index(indexId, prevVersion);
+            assert indexDescriptor != null : "index";
+
+            int tableId = indexDescriptor.tableId();
+
+            if (catalogService.table(tableId, version) == null) {
+                // Nothing to do. Index will be destroyed along with the table.
+                return falseCompletedFuture();
+            }
+
+            deferredQueue.enqueue(new DestroyIndexEvent(version, indexId, 
tableId));
+
+            return falseCompletedFuture();
+        });
+    }
+
+    private CompletableFuture<Void> onLwmChanged(HybridTimestamp ts) {
+        return inBusyLockAsync(busyLock, () -> {
+            int earliestVersion = 
catalogService.earliestCatalogVersion(HybridTimestamp.hybridTimestampToLong(ts));

Review Comment:
   Please rename to `lwmActiveCatalogVersion`



##########
modules/table/src/main/java/org/apache/ignite/internal/table/DeferredEventsQueue.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.function.ToLongFunction;
+
+/**
+ * A queue for deferred events, which provide method to drain events up to 
given watermark. An implementation is a thread-safe wrapper over
+ * {@link java.util.PriorityQueue}.
+ *
+ * @param <T> Event type.
+ */
+public class DeferredEventsQueue<T> {
+    private final PriorityQueue<T> queue;
+    private final ToLongFunction<T> mapper;
+
+    /**
+     * Creates a queue.
+     *
+     * @param mapper Event timestamp extractor.
+     */
+    public DeferredEventsQueue(ToLongFunction<T> mapper) {
+        this.mapper = mapper;
+        this.queue = new 
PriorityQueue<>(Comparator.comparingLong(this.mapper));
+    }
+
+    /**
+     * Offers a new event to the queue.
+     *
+     * @param event New deferred event.
+     */
+    public boolean enqueue(T event) {
+        synchronized (queue) {
+            return queue.offer(event);
+        }
+    }
+
+    /**
+     * Drain queue up to given watermark and return dequeued events.
+     *
+     * @param watermark Timestamp to drain up to.
+     * @return Dequeued events.
+     */
+    public List<T> drainUpTo(long watermark) {
+        synchronized (queue) {
+            if (!hasExpired0(watermark)) {
+                return List.of();
+            }
+
+            List<T> events = new ArrayList<>();
+            do {
+                T event = queue.poll();
+
+                events.add(event);
+            } while (hasExpired0(watermark));
+
+            return events;
+        }
+    }
+
+    /**
+     * Returns queue size.
+     */
+    public int size() {
+        synchronized (queue) {
+            return queue.size();
+        }
+    }
+
+    /**
+     * Returns {@code true} if queue is empty, {@code false} otherwise.
+     */
+    public boolean isEmpty() {
+        synchronized (queue) {
+            return queue.isEmpty();
+        }
+    }
+
+    /**
+     * Returns {@code true} if found events below watermark, {@code false} 
otherwise.
+     */
+    public boolean hasExpiredEvents(long watermark) {

Review Comment:
   What is a low watermark? timestamp, catalog version or what?



##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java:
##########
@@ -248,4 +298,44 @@ private static <T> BiFunction<T, Throwable, 
CompletableFuture<T>> updater(Functi
             return updateFunction.apply(t);
         };
     }
+
+    /** Recover deferred destroy events. */
+    private void recoverDeferredQueue() {
+        int earliestVersion = 
catalogService.earliestCatalogVersion(HybridTimestamp.hybridTimestampToLong(lowWatermark.getLowWatermark()));
+        int latestVersion = catalogService.latestCatalogVersion();
+
+        synchronized ((deferredQueue)) {
+            for (int version = latestVersion - 1; version >= earliestVersion; 
version--) {
+                int nextVersion = version + 1;
+                catalogService.indexes(version).stream()
+                        .filter(idx -> catalogService.index(idx.id(), 
nextVersion) == null)
+                        .forEach(idx -> deferredQueue.enqueue(new 
DestroyIndexEvent(nextVersion, idx.id(), idx.tableId())));
+            }
+        }
+    }
+
+    /** Internal event. */
+    private static class DestroyIndexEvent {

Review Comment:
   Please use package private class.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkImpl.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.failure.FailureContext;
+import org.apache.ignite.internal.failure.FailureProcessor;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import 
org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Class to manage the low watermark.
+ *
+ * <p>Low watermark is the node's local time, which ensures that read-only 
transactions have completed by this time, and new read-only
+ * transactions will only be created after this time, and we can safely delete 
obsolete/garbage data such as: obsolete versions of table
+ * rows, remote indexes, remote tables, etc.
+ *
+ * @see <a 
href="https://cwiki.apache.org/confluence/display/IGNITE/IEP-91%3A+Transaction+protocol";>IEP-91</a>
+ */
+public class LowWatermarkImpl implements IgniteComponent, LowWatermark {
+    private static final IgniteLogger LOG = 
Loggers.forClass(LowWatermarkImpl.class);
+
+    static final ByteArray LOW_WATERMARK_VAULT_KEY = new 
ByteArray("low-watermark");
+
+    private final LowWatermarkConfiguration lowWatermarkConfig;
+
+    private final HybridClock clock;
+
+    private final TxManager txManager;
+
+    private final VaultManager vaultManager;
+
+    private final List<LowWatermarkChangedListener> updateListeners = new 
CopyOnWriteArrayList<>();
+
+    private final ScheduledExecutorService scheduledThreadPool;
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+    private volatile @Nullable HybridTimestamp lowWatermark;
+
+    private final AtomicReference<ScheduledFuture<?>> lastScheduledTaskFuture 
= new AtomicReference<>();
+
+    private final FailureProcessor failureProcessor;
+
+    /**
+     * Constructor.
+     *
+     * @param nodeName Node name.
+     * @param lowWatermarkConfig Low watermark configuration.
+     * @param clock A hybrid logical clock.
+     * @param txManager Transaction manager.
+     * @param vaultManager Vault manager.
+     * @param failureProcessor Failure processor tha is used to handle 
critical errors.
+     */
+    public LowWatermarkImpl(
+            String nodeName,
+            LowWatermarkConfiguration lowWatermarkConfig,
+            HybridClock clock,
+            TxManager txManager,
+            VaultManager vaultManager,
+            FailureProcessor failureProcessor
+    ) {
+        this.lowWatermarkConfig = lowWatermarkConfig;
+        this.clock = clock;
+        this.txManager = txManager;
+        this.vaultManager = vaultManager;
+        this.failureProcessor = failureProcessor;
+
+        scheduledThreadPool = Executors.newSingleThreadScheduledExecutor(
+                NamedThreadFactory.create(nodeName, "low-watermark-updater", 
LOG)
+        );
+    }
+
+    /**
+     * Starts the watermark manager.
+     */
+    @Override
+    public CompletableFuture<Void> start() {
+        inBusyLock(busyLock, () -> {
+            lowWatermark = readLowWatermarkFromVault();
+        });
+
+        return nullCompletedFuture();

Review Comment:
   ```suggestion
           return inBusyLockAsync(busyLock, () -> {
               lowWatermark = readLowWatermarkFromVault();
               
               return nullCompletedFuture();
           });
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkImpl.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.failure.FailureContext;
+import org.apache.ignite.internal.failure.FailureProcessor;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import 
org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Class to manage the low watermark.
+ *
+ * <p>Low watermark is the node's local time, which ensures that read-only 
transactions have completed by this time, and new read-only
+ * transactions will only be created after this time, and we can safely delete 
obsolete/garbage data such as: obsolete versions of table
+ * rows, remote indexes, remote tables, etc.
+ *
+ * @see <a 
href="https://cwiki.apache.org/confluence/display/IGNITE/IEP-91%3A+Transaction+protocol";>IEP-91</a>
+ */
+public class LowWatermarkImpl implements IgniteComponent, LowWatermark {
+    private static final IgniteLogger LOG = 
Loggers.forClass(LowWatermarkImpl.class);
+
+    static final ByteArray LOW_WATERMARK_VAULT_KEY = new 
ByteArray("low-watermark");
+
+    private final LowWatermarkConfiguration lowWatermarkConfig;
+
+    private final HybridClock clock;
+
+    private final TxManager txManager;
+
+    private final VaultManager vaultManager;
+
+    private final List<LowWatermarkChangedListener> updateListeners = new 
CopyOnWriteArrayList<>();
+
+    private final ScheduledExecutorService scheduledThreadPool;
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+    private volatile @Nullable HybridTimestamp lowWatermark;
+
+    private final AtomicReference<ScheduledFuture<?>> lastScheduledTaskFuture 
= new AtomicReference<>();
+
+    private final FailureProcessor failureProcessor;
+
+    /**
+     * Constructor.
+     *
+     * @param nodeName Node name.
+     * @param lowWatermarkConfig Low watermark configuration.
+     * @param clock A hybrid logical clock.
+     * @param txManager Transaction manager.
+     * @param vaultManager Vault manager.
+     * @param failureProcessor Failure processor tha is used to handle 
critical errors.
+     */
+    public LowWatermarkImpl(
+            String nodeName,
+            LowWatermarkConfiguration lowWatermarkConfig,
+            HybridClock clock,
+            TxManager txManager,
+            VaultManager vaultManager,
+            FailureProcessor failureProcessor
+    ) {
+        this.lowWatermarkConfig = lowWatermarkConfig;
+        this.clock = clock;
+        this.txManager = txManager;
+        this.vaultManager = vaultManager;
+        this.failureProcessor = failureProcessor;
+
+        scheduledThreadPool = Executors.newSingleThreadScheduledExecutor(

Review Comment:
   It would be necessary to create a task with todo to use a common pool and 
not a separate one.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkImpl.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.failure.FailureContext;
+import org.apache.ignite.internal.failure.FailureProcessor;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import 
org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Class to manage the low watermark.
+ *
+ * <p>Low watermark is the node's local time, which ensures that read-only 
transactions have completed by this time, and new read-only
+ * transactions will only be created after this time, and we can safely delete 
obsolete/garbage data such as: obsolete versions of table
+ * rows, remote indexes, remote tables, etc.
+ *
+ * @see <a 
href="https://cwiki.apache.org/confluence/display/IGNITE/IEP-91%3A+Transaction+protocol";>IEP-91</a>
+ */
+public class LowWatermarkImpl implements IgniteComponent, LowWatermark {
+    private static final IgniteLogger LOG = 
Loggers.forClass(LowWatermarkImpl.class);
+
+    static final ByteArray LOW_WATERMARK_VAULT_KEY = new 
ByteArray("low-watermark");
+
+    private final LowWatermarkConfiguration lowWatermarkConfig;
+
+    private final HybridClock clock;
+
+    private final TxManager txManager;
+
+    private final VaultManager vaultManager;
+
+    private final List<LowWatermarkChangedListener> updateListeners = new 
CopyOnWriteArrayList<>();
+
+    private final ScheduledExecutorService scheduledThreadPool;
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+    private volatile @Nullable HybridTimestamp lowWatermark;
+
+    private final AtomicReference<ScheduledFuture<?>> lastScheduledTaskFuture 
= new AtomicReference<>();
+
+    private final FailureProcessor failureProcessor;
+
+    /**
+     * Constructor.
+     *
+     * @param nodeName Node name.
+     * @param lowWatermarkConfig Low watermark configuration.
+     * @param clock A hybrid logical clock.
+     * @param txManager Transaction manager.
+     * @param vaultManager Vault manager.
+     * @param failureProcessor Failure processor tha is used to handle 
critical errors.
+     */
+    public LowWatermarkImpl(
+            String nodeName,
+            LowWatermarkConfiguration lowWatermarkConfig,
+            HybridClock clock,
+            TxManager txManager,
+            VaultManager vaultManager,
+            FailureProcessor failureProcessor
+    ) {
+        this.lowWatermarkConfig = lowWatermarkConfig;
+        this.clock = clock;
+        this.txManager = txManager;
+        this.vaultManager = vaultManager;
+        this.failureProcessor = failureProcessor;
+
+        scheduledThreadPool = Executors.newSingleThreadScheduledExecutor(
+                NamedThreadFactory.create(nodeName, "low-watermark-updater", 
LOG)
+        );
+    }
+
+    /**
+     * Starts the watermark manager.
+     */
+    @Override
+    public CompletableFuture<Void> start() {
+        inBusyLock(busyLock, () -> {
+            lowWatermark = readLowWatermarkFromVault();
+        });
+
+        return nullCompletedFuture();
+    }
+
+    /**
+     * Schedule watermark updates.
+     */
+    public void scheduleUpdates() {
+        inBusyLock(busyLock, () -> {
+            HybridTimestamp lowWatermarkCandidate = lowWatermark;
+
+            if (lowWatermarkCandidate == null) {
+                LOG.info("Previous value of the low watermark was not found, 
will schedule to update it");
+
+                scheduleUpdateLowWatermarkBusy();
+
+                return;
+            }
+
+            LOG.info("Low watermark has been scheduled to be updated: {}", 
lowWatermarkCandidate);
+
+            txManager.updateLowWatermark(lowWatermarkCandidate)

Review Comment:
   Could we avoid duplicating code?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkImpl.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.failure.FailureContext;
+import org.apache.ignite.internal.failure.FailureProcessor;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import 
org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Class to manage the low watermark.
+ *
+ * <p>Low watermark is the node's local time, which ensures that read-only 
transactions have completed by this time, and new read-only
+ * transactions will only be created after this time, and we can safely delete 
obsolete/garbage data such as: obsolete versions of table
+ * rows, remote indexes, remote tables, etc.
+ *
+ * @see <a 
href="https://cwiki.apache.org/confluence/display/IGNITE/IEP-91%3A+Transaction+protocol";>IEP-91</a>
+ */
+public class LowWatermarkImpl implements IgniteComponent, LowWatermark {
+    private static final IgniteLogger LOG = 
Loggers.forClass(LowWatermarkImpl.class);
+
+    static final ByteArray LOW_WATERMARK_VAULT_KEY = new 
ByteArray("low-watermark");
+
+    private final LowWatermarkConfiguration lowWatermarkConfig;
+
+    private final HybridClock clock;
+
+    private final TxManager txManager;
+
+    private final VaultManager vaultManager;
+
+    private final List<LowWatermarkChangedListener> updateListeners = new 
CopyOnWriteArrayList<>();
+
+    private final ScheduledExecutorService scheduledThreadPool;
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+    private volatile @Nullable HybridTimestamp lowWatermark;
+
+    private final AtomicReference<ScheduledFuture<?>> lastScheduledTaskFuture 
= new AtomicReference<>();
+
+    private final FailureProcessor failureProcessor;
+
+    /**
+     * Constructor.
+     *
+     * @param nodeName Node name.
+     * @param lowWatermarkConfig Low watermark configuration.
+     * @param clock A hybrid logical clock.
+     * @param txManager Transaction manager.
+     * @param vaultManager Vault manager.
+     * @param failureProcessor Failure processor tha is used to handle 
critical errors.
+     */
+    public LowWatermarkImpl(
+            String nodeName,
+            LowWatermarkConfiguration lowWatermarkConfig,
+            HybridClock clock,
+            TxManager txManager,
+            VaultManager vaultManager,
+            FailureProcessor failureProcessor
+    ) {
+        this.lowWatermarkConfig = lowWatermarkConfig;
+        this.clock = clock;
+        this.txManager = txManager;
+        this.vaultManager = vaultManager;
+        this.failureProcessor = failureProcessor;
+
+        scheduledThreadPool = Executors.newSingleThreadScheduledExecutor(
+                NamedThreadFactory.create(nodeName, "low-watermark-updater", 
LOG)
+        );
+    }
+
+    /**
+     * Starts the watermark manager.
+     */
+    @Override
+    public CompletableFuture<Void> start() {
+        inBusyLock(busyLock, () -> {
+            lowWatermark = readLowWatermarkFromVault();
+        });
+
+        return nullCompletedFuture();
+    }
+
+    /**
+     * Schedule watermark updates.

Review Comment:
   ```suggestion
        * Schedule low watermark updates.
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/DeferredEventsQueue.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.function.ToLongFunction;
+
+/**
+ * A queue for deferred events, which provide method to drain events up to 
given watermark. An implementation is a thread-safe wrapper over
+ * {@link java.util.PriorityQueue}.
+ *
+ * @param <T> Event type.
+ */
+public class DeferredEventsQueue<T> {
+    private final PriorityQueue<T> queue;
+    private final ToLongFunction<T> mapper;
+
+    /**
+     * Creates a queue.
+     *
+     * @param mapper Event timestamp extractor.
+     */
+    public DeferredEventsQueue(ToLongFunction<T> mapper) {
+        this.mapper = mapper;
+        this.queue = new 
PriorityQueue<>(Comparator.comparingLong(this.mapper));
+    }
+
+    /**
+     * Offers a new event to the queue.
+     *
+     * @param event New deferred event.
+     */
+    public boolean enqueue(T event) {
+        synchronized (queue) {
+            return queue.offer(event);
+        }
+    }
+
+    /**
+     * Drain queue up to given watermark and return dequeued events.
+     *
+     * @param watermark Timestamp to drain up to.
+     * @return Dequeued events.
+     */
+    public List<T> drainUpTo(long watermark) {
+        synchronized (queue) {
+            if (!hasExpired0(watermark)) {
+                return List.of();
+            }
+
+            List<T> events = new ArrayList<>();
+            do {
+                T event = queue.poll();
+
+                events.add(event);
+            } while (hasExpired0(watermark));
+
+            return events;
+        }
+    }
+
+    /**
+     * Returns queue size.
+     */
+    public int size() {
+        synchronized (queue) {
+            return queue.size();
+        }
+    }
+
+    /**
+     * Returns {@code true} if queue is empty, {@code false} otherwise.
+     */
+    public boolean isEmpty() {
+        synchronized (queue) {
+            return queue.isEmpty();
+        }
+    }
+
+    /**
+     * Returns {@code true} if found events below watermark, {@code false} 
otherwise.
+     */
+    public boolean hasExpiredEvents(long watermark) {
+        synchronized (queue) {
+            return hasExpired0(watermark);
+        }
+    }
+
+    /**
+     * Removes all events from the queue.
+     */
+    public void clear() {
+        synchronized (queue) {
+            queue.clear();
+        }
+    }
+
+    private boolean hasExpired0(long watermark) {

Review Comment:
   What is a low watermark? timestamp, catalog version or what?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -292,6 +293,9 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
     /** Started tables. */
     private final Map<Integer, TableImpl> startedTables = new 
ConcurrentHashMap<>();
 
+    /** Deferred destruction queue. */
+    private final DeferredEventsQueue<DestroyTableEvent> deferredQueue = new 
DeferredEventsQueue<>(DestroyTableEvent::catalogVersion);

Review Comment:
   Again 2 pain (in my not ass).



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java:
##########
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
+import static 
org.apache.ignite.internal.catalog.CatalogTestUtils.createTestCatalogManager;
+import static org.apache.ignite.internal.table.TableTestUtils.createHashIndex;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
+import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE;
+import static 
org.apache.ignite.internal.util.CompletableFutures.emptySetCompletedFuture;
+import static 
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
+import static org.apache.ignite.sql.ColumnType.INT64;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.LongFunction;
+import org.apache.ignite.internal.affinity.AffinityUtils;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.commands.ColumnParams;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.configuration.ConfigurationRegistry;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
+import org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil;
+import org.apache.ignite.internal.failure.FailureProcessor;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
+import 
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import 
org.apache.ignite.internal.metastorage.server.TestRocksDbKeyValueStorage;
+import org.apache.ignite.internal.network.ClusterNodeImpl;
+import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
+import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.raft.storage.impl.LocalLogStorageFactory;
+import org.apache.ignite.internal.replicator.ReplicaManager;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.schema.SchemaUtils;
+import org.apache.ignite.internal.schema.configuration.GcConfiguration;
+import 
org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration;
+import org.apache.ignite.internal.storage.DataStorageManager;
+import org.apache.ignite.internal.storage.DataStorageModules;
+import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import 
org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryDataStorageModule;
+import 
org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryStorageEngineConfiguration;
+import org.apache.ignite.internal.table.TableTestUtils;
+import org.apache.ignite.internal.table.TestLowWatermark;
+import 
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import 
org.apache.ignite.internal.table.distributed.schema.AlwaysSyncedSchemaSyncService;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.thread.IgniteThreadFactory;
+import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
+import org.apache.ignite.internal.tx.HybridTimestampTracker;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
+import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.TopologyService;
+import org.apache.ignite.sql.IgniteSql;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.MockedStatic;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+/**
+ * Table manager recovery scenarios.
+ */
+@ExtendWith({MockitoExtension.class, ConfigurationExtension.class, 
WorkDirectoryExtension.class})
+@MockitoSettings(strictness = Strictness.LENIENT)
+public class TableManagerRecoveryTest extends IgniteAbstractTest {

Review Comment:
   The test is difficult to read, I trust the author.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexUtils.java:
##########
@@ -84,32 +87,34 @@ public static void registerIndexToTable(
      * @param catalogService Catalog service.
      * @param partitionSet Partitions for which index storages will need to be 
created if they are missing.
      * @param schemaRegistry Table schema register.
+     * @param lwm Low watermark.
      */
     public static void registerIndexesToTable(
             TableViewInternal table,
             CatalogService catalogService,
             PartitionSet partitionSet,
-            SchemaRegistry schemaRegistry
+            SchemaRegistry schemaRegistry,
+            @Nullable HybridTimestamp lwm

Review Comment:
   Please describe in the javadoc why this argument is used and how it will be 
used.



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java:
##########
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */

Review Comment:
   3 licenses are power.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -2468,4 +2468,23 @@ private <T> CompletableFuture<T> 
orStopManagerFuture(CompletableFuture<T> future
 
         return anyOf(future, stopManagerFuture).thenApply(o -> (T) o);
     }
+
+    /** Internal event. */
+    private static class DestroyTableEvent {

Review Comment:
   Please use package private class.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -725,11 +726,29 @@ private CompletableFuture<List<Assignments>> 
writeTableAssignmentsToMetastore(
         });
     }
 
-    private CompletableFuture<Void> onTableDestroy(DestroyTableEventParameters 
parameters) {
+    private CompletableFuture<Boolean> onTableDrop(DropTableEventParameters 
parameters) {
         return inBusyLockAsync(busyLock, () -> {
-            dropTableLocally(parameters.causalityToken(), parameters);
+            deferredQueue.enqueue(new 
DestroyTableEvent(parameters.catalogVersion(), parameters.tableId()));
 
-            return nullCompletedFuture();
+            return falseCompletedFuture();
+        });
+    }
+
+    private CompletableFuture<Void> onLwmChanged(HybridTimestamp ts) {
+        return inBusyLockAsync(busyLock, () -> {
+            int earliestVersion = 
catalogService.earliestCatalogVersion(HybridTimestamp.hybridTimestampToLong(ts));

Review Comment:
   Please use naming `lwmCatalogVersion`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to