This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-21585
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 17ff500007496d0c47e7b545c1e8483f19c9224e
Author: amashenkov <[email protected]>
AuthorDate: Tue Mar 5 19:41:44 2024 +0300

    Switch TableManager and IndexManager to LWM events for table/index 
destruction purposes.
---
 .../internal/catalog/CatalogManagerImpl.java       |  52 +----
 .../ignite/internal/catalog/CatalogService.java    |   1 +
 .../internal/catalog/events/CatalogEvent.java      |   1 +
 .../internal/catalog/CatalogManagerSelfTest.java   |  32 +--
 .../ignite/internal/catalog/CatalogTestUtils.java  |  27 ---
 .../ignite/client/handler/FakeCatalogService.java  |   4 +-
 .../ignite/internal/index/ItIndexManagerTest.java  |   1 +
 .../apache/ignite/internal/index/IndexManager.java | 133 +++++++++---
 .../ignite/internal/index/IndexManagerTest.java    |  16 +-
 .../runner/app/ItIgniteNodeRestartTest.java        |   5 +-
 .../runner/app/PlatformTestNodeRunner.java         |   3 -
 .../org/apache/ignite/internal/app/IgniteImpl.java |   5 +-
 .../ignite/internal/IgniteIntegrationTest.java     |  12 --
 .../ignite/internal/schema/SchemaManager.java      |  38 +---
 .../ignite/internal/schema/SchemaManagerTest.java  |   3 +
 .../internal/sql/sqllogic/ItSqlLogicTest.java      |   2 -
 .../rebalance/ItRebalanceDistributedTest.java      |   3 +-
 .../internal/table/distributed/TableManager.java   | 226 +++++++++++----------
 .../table/distributed/index/IndexUtils.java        |  27 ++-
 .../table/distributed/TableManagerTest.java        |   6 +-
 20 files changed, 297 insertions(+), 300 deletions(-)

diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
index 00d99f8670..a790d9b3e5 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
@@ -25,8 +25,6 @@ import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.fromParam
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
-import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
-import it.unimi.dsi.fastutil.ints.IntSet;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -40,7 +38,6 @@ import java.util.concurrent.Flow.Publisher;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.LongSupplier;
-import java.util.function.Predicate;
 import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
 import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
@@ -51,8 +48,6 @@ import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
 import org.apache.ignite.internal.catalog.events.CatalogEvent;
 import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
-import org.apache.ignite.internal.catalog.events.DestroyIndexEvent;
-import org.apache.ignite.internal.catalog.events.DestroyTableEvent;
 import org.apache.ignite.internal.catalog.storage.Fireable;
 import org.apache.ignite.internal.catalog.storage.SnapshotEntry;
 import org.apache.ignite.internal.catalog.storage.UpdateEntry;
@@ -370,6 +365,8 @@ public class CatalogManagerImpl extends 
AbstractEventProducer<CatalogEvent, Cata
     private void truncateUpTo(Catalog catalog) {
         catalogByVer.headMap(catalog.version(), false).clear();
         catalogByTs.headMap(catalog.time(), false).clear();
+
+        LOG.info("Catalog history was truncated up to version=" + 
catalog.version());
     }
 
     private CompletableFuture<Void> 
saveUpdateAndWaitForActivation(UpdateProducer updateProducer) {
@@ -460,53 +457,12 @@ public class CatalogManagerImpl extends 
AbstractEventProducer<CatalogEvent, Cata
 
         private CompletableFuture<Void> handle(SnapshotEntry event, long 
causalityToken) {
             Catalog catalog = event.snapshot();
-
-            // Use reverse order to find latest descriptors.
-            Collection<Catalog> droppedCatalogVersions = 
catalogByVer.headMap(catalog.version(), false).descendingMap().values();
-
-            List<Fireable> events = new ArrayList<>();
-            IntSet objectToSkip = new IntOpenHashSet();
-            Predicate<CatalogObjectDescriptor> filter = obj -> 
objectToSkip.add(obj.id());
-
-            // At first, add alive indexes to filter.
-            applyToAliveIndexesFrom(catalog.version(), filter::test);
-
-            // Create destroy events for dropped indexes.
-            droppedCatalogVersions.forEach(oldCatalog -> 
oldCatalog.indexes().stream()
-                    .filter(filter)
-                    .forEach(idx -> events.add(
-                            new DestroyIndexEvent(idx.id(), idx.tableId(), 
tableZoneDescriptor(oldCatalog, idx.tableId()).partitions()))
-                    ));
-
-            objectToSkip.clear();
-            // At last, create destroy events for dropped tables.
-            droppedCatalogVersions.forEach(oldCatalog -> 
oldCatalog.tables().stream()
-                    .filter(tbl -> catalog.table(tbl.id()) == null)
-                    .filter(filter)
-                    .forEach(tbl -> events.add(new DestroyTableEvent(tbl.id(), 
tableZoneDescriptor(oldCatalog, tbl.id()).partitions()))));
-
             // On recovery phase, we must register catalog from the snapshot.
             // In other cases, it is ok to rewrite an existed version, because 
it's exactly the same.
             registerCatalog(catalog);
+            truncateUpTo(catalog);
 
-            List<CompletableFuture<?>> eventFutures = new 
ArrayList<>(events.size());
-
-            for (Fireable fireEvent : events) {
-                eventFutures.add(fireEvent(
-                        fireEvent.eventType(),
-                        fireEvent.createEventParameters(causalityToken, 
catalog.version())
-                ));
-            }
-
-            return allOf(eventFutures.toArray(CompletableFuture[]::new))
-                    .whenComplete((ignore, err) -> {
-                        if (err != null) {
-                            LOG.warn("Failed to compact catalog.", err);
-                            // TODO: IGNITE-14611 Pass exception to an error 
handler?
-                        } else {
-                            truncateUpTo(catalog);
-                        }
-                    });
+            return nullCompletedFuture();
         }
 
         private CompletableFuture<Void> handle(VersionedUpdate update, 
HybridTimestamp metaStorageUpdateTimestamp, long causalityToken) {
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
index 1477ee7a3a..a570e01cfc 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
@@ -101,6 +101,7 @@ public interface CatalogService extends 
EventProducer<CatalogEvent, CatalogEvent
     int earliestCatalogVersion();
 
     /** Returns the earliest registered version of the catalog, which is 
observable since given timestamp. */
+    // TODO IGNITE-21608 Use method without timestamp instead?
     int earliestCatalogVersion(long timestamp);
 
     /** Returns the latest registered version of the catalog. */
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEvent.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEvent.java
index ecc5e60c0a..e736f1c560 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEvent.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEvent.java
@@ -59,6 +59,7 @@ public enum CatalogEvent implements Event {
     INDEX_REMOVED,
 
     /** This event is fired when an index has been dropped from all catalog 
versions and can be destroyed. */
+    @Deprecated
     INDEX_DESTROY,
 
     /** This event is fired, when a distribution zone was created in Catalog. 
*/
diff --git 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
index 5bc20d6216..4511ae4100 100644
--- 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
+++ 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
@@ -47,6 +47,7 @@ import static 
org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.
 import static 
org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.STOPPING;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
@@ -128,8 +129,6 @@ import 
org.apache.ignite.internal.catalog.events.CatalogEventParameters;
 import org.apache.ignite.internal.catalog.events.CreateIndexEventParameters;
 import org.apache.ignite.internal.catalog.events.CreateTableEventParameters;
 import org.apache.ignite.internal.catalog.events.CreateZoneEventParameters;
-import org.apache.ignite.internal.catalog.events.DestroyIndexEventParameters;
-import org.apache.ignite.internal.catalog.events.DestroyTableEventParameters;
 import org.apache.ignite.internal.catalog.events.DropColumnEventParameters;
 import org.apache.ignite.internal.catalog.events.DropTableEventParameters;
 import org.apache.ignite.internal.catalog.events.DropZoneEventParameters;
@@ -1182,17 +1181,6 @@ public class CatalogManagerSelfTest extends 
BaseCatalogManagerTest {
 
         verifyNoMoreInteractions(eventListener);
         clearInvocations(eventListener);
-
-        // Got 'destroy' event only after Catalog compaction.
-        assertThat(CatalogTestUtils.waitCatalogCompaction(manager, 
clock.nowLong()), equalTo(true));
-        verify(eventListener, 
times(2)).notify(any(DestroyTableEventParameters.class));
-
-        verifyNoMoreInteractions(eventListener);
-        clearInvocations();
-
-        // Expect no events if Catalog wasn't compacted.
-        assertThat(CatalogTestUtils.waitCatalogCompaction(manager, 
clock.nowLong()), equalTo(false));
-        verifyNoMoreInteractions(eventListener);
     }
 
     @Test
@@ -1209,7 +1197,6 @@ public class CatalogManagerSelfTest extends 
BaseCatalogManagerTest {
         manager.listen(CatalogEvent.INDEX_AVAILABLE, eventListener);
         manager.listen(CatalogEvent.INDEX_STOPPING, eventListener);
         manager.listen(CatalogEvent.INDEX_REMOVED, eventListener);
-        manager.listen(CatalogEvent.INDEX_DESTROY, eventListener);
 
         // Try to create index without table.
         assertThat(manager.execute(createIndexCmd), 
willThrow(TableNotFoundValidationException.class));
@@ -1258,16 +1245,6 @@ public class CatalogManagerSelfTest extends 
BaseCatalogManagerTest {
         verify(eventListener).notify(any(RemoveIndexEventParameters.class));
         verifyNoMoreInteractions(eventListener);
         clearInvocations(eventListener);
-
-        // Got 'destroy' event only after Catalog compaction.
-        assertThat(CatalogTestUtils.waitCatalogCompaction(manager, 
clock.nowLong()), equalTo(true));
-        verify(eventListener, times(2 /* PK + secondary index 
*/)).notify(any(DestroyIndexEventParameters.class));
-
-        clearInvocations();
-
-        // Expect no events if Catalog wasn't compacted.
-        assertThat(CatalogTestUtils.waitCatalogCompaction(manager, 
clock.nowLong()), equalTo(false));
-        verifyNoMoreInteractions(eventListener);
     }
 
     @Test
@@ -2482,7 +2459,7 @@ public class CatalogManagerSelfTest extends 
BaseCatalogManagerTest {
     }
 
     @Test
-    public void testCatalogCompaction() {
+    public void testCatalogCompaction() throws InterruptedException {
         assertThat(manager.execute(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
         assertThat(manager.execute(simpleTable(TABLE_NAME_2)), 
willBe(nullValue()));
 
@@ -2493,9 +2470,8 @@ public class CatalogManagerSelfTest extends 
BaseCatalogManagerTest {
         assertThat(manager.execute(simpleIndex(TABLE_NAME, INDEX_NAME)), 
willBe(nullValue()));
         assertThat(manager.execute(simpleIndex(TABLE_NAME, INDEX_NAME_2)), 
willBe(nullValue()));
 
-        assertThat(CatalogTestUtils.waitCatalogCompaction(manager, timestamp), 
equalTo(true));
-
-        assertEquals(catalog.version(), manager.earliestCatalogVersion());
+        assertThat(manager.compactCatalog(timestamp), willBe(Boolean.TRUE));
+        assertTrue(waitForCondition(() -> catalog.version() == 
manager.earliestCatalogVersion(), 3_000));
 
         assertNull(manager.catalog(0));
         assertNull(manager.catalog(catalog.version() - 1));
diff --git 
a/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java
 
b/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java
index 10e6dd46eb..3f0c3596c8 100644
--- 
a/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java
+++ 
b/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java
@@ -19,12 +19,10 @@ package org.apache.ignite.internal.catalog;
 
 import static java.util.concurrent.CompletableFuture.allOf;
 import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
-import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.fail;
 
 import java.util.List;
 import java.util.Set;
@@ -343,31 +341,6 @@ public class CatalogTestUtils {
         return AlterZoneCommand.builder().zoneName(zoneName);
     }
 
-    /**
-     * Starts catalog compaction and waits it finished locally.
-     *
-     * @param catalogManager Catalog manager.
-     * @param timestamp Timestamp catalog should be compacted up to.
-     * @return {@code True} if a new snapshot has been successfully written, 
{@code false} otherwise.
-     */
-    public static boolean waitCatalogCompaction(CatalogManager catalogManager, 
long timestamp) {
-        int version = catalogManager.activeCatalogVersion(timestamp);
-
-        CompletableFuture<Boolean> operationFuture = ((CatalogManagerImpl) 
catalogManager).compactCatalog(timestamp);
-
-        try {
-            boolean result = operationFuture.get();
-
-            if (result) {
-                waitForCondition(() -> catalogManager.earliestCatalogVersion() 
== version, 3_000);
-            }
-        } catch (Exception e) {
-            fail(e);
-        }
-
-        return operationFuture.join();
-    }
-
     private static class TestUpdateLog implements UpdateLog {
         private final HybridClock clock;
 
diff --git 
a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakeCatalogService.java
 
b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakeCatalogService.java
index bd22a7e675..8ef91d7e8e 100644
--- 
a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakeCatalogService.java
+++ 
b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakeCatalogService.java
@@ -150,12 +150,12 @@ public class FakeCatalogService implements CatalogService 
{
     }
 
     @Override
-    public int latestCatalogVersion() {
+    public int earliestCatalogVersion(long timestamp) {
         return 0;
     }
 
     @Override
-    public int earliestCatalogVersion(long timestamp) {
+    public int latestCatalogVersion() {
         return 0;
     }
 
diff --git 
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java
 
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java
index ca176498f9..fcb3b54e29 100644
--- 
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java
+++ 
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java
@@ -113,6 +113,7 @@ public class ItIndexManagerTest extends 
ClusterPerClassIntegrationTest {
         return future.join();
     }
 
+    // TODO: validate this.
     private static List<Integer> 
collectIndexIdsFromCatalogForRecovery(IgniteImpl ignite, TableImpl table) {
         CatalogManager catalogManager = ignite.catalogManager();
 
diff --git 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index 3bdbe766d5..537f8a3c71 100644
--- 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++ 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -17,14 +17,18 @@
 
 package org.apache.ignite.internal.index;
 
+import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.CompletableFuture.runAsync;
 import static 
org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_CREATE;
-import static 
org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_DESTROY;
+import static 
org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_REMOVED;
 import static 
org.apache.ignite.internal.table.distributed.index.IndexUtils.registerIndexToTable;
+import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
 
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -32,13 +36,15 @@ import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.LongFunction;
+import java.util.stream.Collectors;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.catalog.events.CatalogEvent;
 import org.apache.ignite.internal.catalog.events.CreateIndexEventParameters;
-import org.apache.ignite.internal.catalog.events.DestroyIndexEventParameters;
+import org.apache.ignite.internal.catalog.events.RemoveIndexEventParameters;
 import org.apache.ignite.internal.causality.IncrementalVersionedValue;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.IgniteComponent;
@@ -47,7 +53,9 @@ import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.table.DeferredEventsQueue;
 import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.internal.table.distributed.LowWatermark;
 import org.apache.ignite.internal.table.distributed.PartitionSet;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
@@ -84,6 +92,12 @@ public class IndexManager implements IgniteComponent {
     /** Versioned value to prevent races when registering/unregistering 
indexes when processing metastore or catalog events. */
     private final IncrementalVersionedValue<Void> handleMetastoreEventVv;
 
+    /** Low watermark. */
+    private final LowWatermark lowWatermark;
+
+    /** Deferred destruction queue. */
+    private final DeferredEventsQueue<DestroyIndexEvent> deferredQueue = new 
DeferredEventsQueue<>(DestroyIndexEvent::catalogVersion);
+
     /**
      * Constructor.
      *
@@ -97,12 +111,14 @@ public class IndexManager implements IgniteComponent {
             TableManager tableManager,
             CatalogService catalogService,
             ExecutorService ioExecutor,
-            Consumer<LongFunction<CompletableFuture<?>>> registry
+            Consumer<LongFunction<CompletableFuture<?>>> registry,
+            LowWatermark lowWatermark
     ) {
         this.schemaManager = schemaManager;
         this.tableManager = tableManager;
         this.catalogService = catalogService;
         this.ioExecutor = ioExecutor;
+        this.lowWatermark = lowWatermark;
 
         handleMetastoreEventVv = new IncrementalVersionedValue<>(registry);
     }
@@ -111,8 +127,11 @@ public class IndexManager implements IgniteComponent {
     public CompletableFuture<Void> start() {
         LOG.debug("Index manager is about to start");
 
+        recoverDeferredQueue();
+
         catalogService.listen(INDEX_CREATE, (CreateIndexEventParameters 
parameters) -> onIndexCreate(parameters));
-        catalogService.listen(INDEX_DESTROY, (DestroyIndexEventParameters 
parameters) -> onIndexDestroy(parameters));
+        catalogService.listen(INDEX_REMOVED, (RemoveIndexEventParameters 
parameters) -> onIndexRemoved(parameters));
+        lowWatermark.addUpdateListener(this::onLwmChanged);
 
         LOG.info("Index manager started");
 
@@ -152,27 +171,6 @@ public class IndexManager implements IgniteComponent {
         return tableManager.tableAsync(causalityToken, 
tableId).thenApply(table -> table.internalTable().storage());
     }
 
-    private CompletableFuture<Boolean> 
onIndexDestroy(DestroyIndexEventParameters parameters) {
-        int indexId = parameters.indexId();
-        int tableId = parameters.tableId();
-
-        long causalityToken = parameters.causalityToken();
-
-        CompletableFuture<TableViewInternal> tableFuture = 
tableManager.tableAsync(causalityToken, tableId);
-
-        return inBusyLockAsync(busyLock, () -> handleMetastoreEventVv.update(
-                causalityToken,
-                updater(unused -> tableFuture.thenApply(table -> 
inBusyLock(busyLock, () -> {
-                    if (table != null) {
-                        // In case of DROP TABLE the table will be removed 
first.
-                        table.unregisterIndex(indexId);
-                    }
-
-                    return null;
-                })))
-        )).thenApply(unused -> false);
-    }
-
     private CompletableFuture<Boolean> 
onIndexCreate(CreateIndexEventParameters parameters) {
         return inBusyLockAsync(busyLock, () -> {
             CatalogIndexDescriptor index = parameters.indexDescriptor();
@@ -198,6 +196,51 @@ public class IndexManager implements IgniteComponent {
         });
     }
 
+    private CompletableFuture<Boolean> 
onIndexRemoved(RemoveIndexEventParameters parameters) {
+        return inBusyLockAsync(busyLock, () -> {
+            int indexId = parameters.indexId();
+            int version = parameters.catalogVersion();
+            int prevVersion = version - 1;
+
+            // Retrieve descriptor during synchronous call, before the 
previous catalog version could be concurrently compacted.
+            CatalogIndexDescriptor indexDescriptor = 
catalogService.index(indexId, prevVersion);
+            assert indexDescriptor != null : "index";
+
+            deferredQueue.enqueue(new DestroyIndexEvent(version, indexId, 
indexDescriptor.tableId()));
+
+            return falseCompletedFuture();
+        });
+    }
+
+    private CompletableFuture<Void> onLwmChanged(HybridTimestamp ts) {
+        return inBusyLockAsync(busyLock, () -> {
+            int earliestVersion = 
catalogService.earliestCatalogVersion(HybridTimestamp.hybridTimestampToLong(ts));
+
+            List<DestroyIndexEvent> events = 
deferredQueue.drainUpTo(earliestVersion);
+
+            if (events.isEmpty()) {
+                return nullCompletedFuture();
+            }
+
+            List<CompletableFuture<Void>> futures = 
deferredQueue.drainUpTo(earliestVersion).stream()
+                    .map(event -> destroyIndexAsync(event.indexId(), 
event.tableId()))
+                    .collect(Collectors.toList());
+
+            return allOf(futures.toArray(CompletableFuture[]::new));
+        });
+    }
+
+    private CompletableFuture<Void> destroyIndexAsync(int indexId, int 
tableId) {
+        return runAsync(() -> inBusyLock(busyLock, () -> {
+            TableViewInternal table = tableManager.cachedTable(tableId);
+
+            if (table != null) {
+                // In case of DROP TABLE the table will be removed with all 
it's indexes.
+                table.unregisterIndex(indexId);
+            }
+        }), ioExecutor);
+    }
+
     private CompletableFuture<?> startIndexAsync(
             CatalogTableDescriptor table,
             CatalogIndexDescriptor index,
@@ -249,4 +292,44 @@ public class IndexManager implements IgniteComponent {
             return updateFunction.apply(t);
         };
     }
+
+    /** Recover deferred destroy events. */
+    private void recoverDeferredQueue() {
+        int earliestVersion = 
catalogService.earliestCatalogVersion(HybridTimestamp.hybridTimestampToLong(lowWatermark.getLowWatermark()));
+        int latestVersion = catalogService.latestCatalogVersion();
+
+        synchronized ((deferredQueue)) {
+            for (int version = latestVersion - 1; version >= earliestVersion; 
version--) {
+                int nextVersion = version + 1;
+                catalogService.indexes(version).stream()
+                        .filter(idx -> catalogService.index(idx.id(), 
nextVersion) == null)
+                        .forEach(idx -> deferredQueue.enqueue(new 
DestroyIndexEvent(nextVersion, idx.id(), idx.tableId())));
+            }
+        }
+    }
+
+    /** Internal event. */
+    private static class DestroyIndexEvent {
+        final int catalogVersion;
+        final int indexId;
+        final int tableId;
+
+        DestroyIndexEvent(int catalogVersion, int indexId, int tableId) {
+            this.catalogVersion = catalogVersion;
+            this.indexId = indexId;
+            this.tableId = tableId;
+        }
+
+        public int catalogVersion() {
+            return catalogVersion;
+        }
+
+        public int indexId() {
+            return indexId;
+        }
+
+        public int tableId() {
+            return tableId;
+        }
+    }
 }
diff --git 
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
 
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
index 72e809b7d0..1c7d7f9032 100644
--- 
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
+++ 
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
@@ -22,7 +22,6 @@ import static 
java.util.concurrent.CompletableFuture.completedFuture;
 import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
 import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_ZONE_NAME;
 import static 
org.apache.ignite.internal.catalog.CatalogTestUtils.createTestCatalogManager;
-import static 
org.apache.ignite.internal.catalog.CatalogTestUtils.waitCatalogCompaction;
 import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_DATA_REGION;
 import static 
org.apache.ignite.internal.index.TestIndexManagementUtils.COLUMN_NAME;
 import static 
org.apache.ignite.internal.index.TestIndexManagementUtils.INDEX_NAME;
@@ -34,7 +33,6 @@ import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutur
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.mock;
@@ -67,6 +65,7 @@ import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.table.TableTestUtils;
 import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.internal.table.TestLowWatermark;
 import org.apache.ignite.internal.table.distributed.PartitionSet;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import 
org.apache.ignite.internal.table.distributed.schema.ConstantSchemaVersions;
@@ -102,6 +101,8 @@ public class IndexManagerTest extends 
BaseIgniteAbstractTest {
 
     private final Map<Integer, TableViewInternal> tableViewInternalByTableId = 
new ConcurrentHashMap<>();
 
+    private TestLowWatermark lowWatermark = new TestLowWatermark();
+
     @BeforeEach
     public void setUp() {
         mockTableManager = mock(TableManager.class);
@@ -146,7 +147,7 @@ public class IndexManagerTest extends 
BaseIgniteAbstractTest {
         int tableId = indexDescriptor.tableId();
 
         dropIndex(INDEX_NAME);
-        assertTrue(waitCatalogCompaction(catalogManager, Long.MAX_VALUE));
+        assertThat(fireDestroyEvent(), willCompleteSuccessfully());
 
         long causalityToken = 0L; // Use last token.
         MvTableStorage mvTableStorage = 
indexManager.getMvTableStorage(causalityToken, tableId).get();
@@ -164,7 +165,7 @@ public class IndexManagerTest extends 
BaseIgniteAbstractTest {
         int tableId = indexDescriptor.tableId();
 
         dropTable(TABLE_NAME);
-        assertTrue(waitCatalogCompaction(catalogManager, Long.MAX_VALUE));
+        assertThat(fireDestroyEvent(), willCompleteSuccessfully());
 
         long causalityToken = 0L; // Use last token.
         MvTableStorage mvTableStorage = 
indexManager.getMvTableStorage(causalityToken, tableId).get();
@@ -220,7 +221,8 @@ public class IndexManagerTest extends 
BaseIgniteAbstractTest {
                 mockTableManager,
                 catalogManager,
                 ForkJoinPool.commonPool(),
-                (LongFunction<CompletableFuture<?>> function) -> 
metaStorageManager.registerRevisionUpdateListener(function::apply)
+                (LongFunction<CompletableFuture<?>> function) -> 
metaStorageManager.registerRevisionUpdateListener(function::apply),
+                lowWatermark
         );
 
         assertThat(allOf(metaStorageManager.start(), catalogManager.start(), 
indexManager.start()), willCompleteSuccessfully());
@@ -245,4 +247,8 @@ public class IndexManagerTest extends 
BaseIgniteAbstractTest {
     private void dropIndex(String indexName) {
         TableTestUtils.dropIndex(catalogManager, DEFAULT_SCHEMA_NAME, 
indexName);
     }
+
+    private CompletableFuture<Void> fireDestroyEvent() {
+        return lowWatermark.updateAndNotify(clock.now());
+    }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index c22f720f00..ce4aee0028 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -525,7 +525,7 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
 
         var sqlRef = new AtomicReference<IgniteSqlImpl>();
 
-        LowWatermarkImpl lowWatermark = new LowWatermarkImpl(name, 
gcConfig.lowWatermark(), hybridClock, txManager, vault, failureProcessor);
+        var lowWatermark = new LowWatermarkImpl(name, gcConfig.lowWatermark(), 
hybridClock, txManager, vault, failureProcessor);
 
         TableManager tableManager = new TableManager(
                 name,
@@ -566,7 +566,8 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 tableManager,
                 catalogManager,
                 threadPoolsManager.tableIoExecutor(),
-                registry
+                registry,
+                lowWatermark
         );
 
         var metricManager = new MetricManager();
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
index 1580bde38b..14627286b1 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
@@ -61,7 +61,6 @@ import org.apache.ignite.IgnitionManager;
 import org.apache.ignite.InitParameters;
 import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.JobExecutionContext;
-import org.apache.ignite.internal.IgniteIntegrationTest;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
 import org.apache.ignite.internal.catalog.commands.ColumnParams;
@@ -568,8 +567,6 @@ public class PlatformTestNodeRunner {
                 session.execute(null, "DROP TABLE " + tableName + "");
             }
 
-            
IgniteIntegrationTest.forceCleanupAbandonedResources(context.ignite());
-
             return tableName;
         }
     }
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 210c08fcdd..faa1e19995 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -727,7 +727,8 @@ public class IgniteImpl implements Ignite {
                 distributedTblMgr,
                 catalogManager,
                 threadPoolsManager.tableIoExecutor(),
-                registry
+                registry,
+                lowWatermark
         );
 
         indexBuildingManager = new IndexBuildingManager(
@@ -1450,7 +1451,7 @@ public class IgniteImpl implements Ignite {
         return resourcesRegistry;
     }
 
-    /** Returns low watermark */
+    /** Returns low watermark. */
     @TestOnly
     public LowWatermarkImpl lowWatermark() {
         return lowWatermark;
diff --git 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/IgniteIntegrationTest.java
 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/IgniteIntegrationTest.java
index 0ac6d4ce3e..9fa5710043 100644
--- 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/IgniteIntegrationTest.java
+++ 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/IgniteIntegrationTest.java
@@ -17,9 +17,6 @@
 
 package org.apache.ignite.internal;
 
-import org.apache.ignite.Ignite;
-import org.apache.ignite.internal.app.IgniteImpl;
-import org.apache.ignite.internal.catalog.CatalogManagerImpl;
 import org.apache.ignite.internal.junit.StopAllIgnitesAfterTests;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
@@ -31,13 +28,4 @@ import org.junit.jupiter.api.extension.ExtendWith;
 // The order is important here.
 @ExtendWith({WorkDirectoryExtension.class, StopAllIgnitesAfterTests.class})
 public abstract class IgniteIntegrationTest extends BaseIgniteAbstractTest {
-    /**
-     * Forcibly destroys partitions for dropped tables and indexes via 
triggering catalog compaction to the latest catalog version.
-     */
-    public static void forceCleanupAbandonedResources(Ignite node) {
-        IgniteImpl node0 = (IgniteImpl) node;
-        CatalogManagerImpl catalogManager = (CatalogManagerImpl) 
node0.catalogManager();
-
-        catalogManager.compactCatalog(Long.MAX_VALUE);
-    }
 }
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
index 3e9a6f7251..ccfb5a03e3 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
@@ -22,6 +22,7 @@ import static 
org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
 import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
 
 import java.util.Collection;
@@ -38,7 +39,6 @@ import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableSchemaVersions
 import org.apache.ignite.internal.catalog.events.CatalogEvent;
 import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
 import org.apache.ignite.internal.catalog.events.CreateTableEventParameters;
-import org.apache.ignite.internal.catalog.events.DestroyTableEventParameters;
 import org.apache.ignite.internal.catalog.events.TableEventParameters;
 import org.apache.ignite.internal.causality.IncrementalVersionedValue;
 import org.apache.ignite.internal.lang.IgniteInternalException;
@@ -79,7 +79,8 @@ public class SchemaManager implements IgniteComponent {
     public CompletableFuture<Void> start() {
         catalogService.listen(CatalogEvent.TABLE_CREATE, this::onTableCreated);
         catalogService.listen(CatalogEvent.TABLE_ALTER, this::onTableAltered);
-        catalogService.listen(CatalogEvent.TABLE_DESTROY, 
this::onTableDestroyed);
+
+        // TODO IGNITE-21585 subscribe to LWM updates.
 
         registerExistingTables();
 
@@ -173,12 +174,6 @@ public class SchemaManager implements IgniteComponent {
         }
     }
 
-    private CompletableFuture<Boolean> onTableDestroyed(CatalogEventParameters 
event) {
-        DestroyTableEventParameters creationEvent = 
(DestroyTableEventParameters) event;
-
-        return dropRegistry(creationEvent.causalityToken(), 
creationEvent.tableId()).thenApply(ignored -> false);
-    }
-
     private void setColumnMapping(SchemaDescriptor schema, int tableId) throws 
ExecutionException, InterruptedException {
         if (schema.version() == CatalogTableDescriptor.INITIAL_TABLE_VERSION) {
             return;
@@ -309,29 +304,16 @@ public class SchemaManager implements IgniteComponent {
     /**
      * Drops schema registry for the given table id (along with the 
corresponding schemas).
      *
-     * @param causalityToken Causality token.
      * @param tableId Table id.
      */
-    private CompletableFuture<?> dropRegistry(long causalityToken, int 
tableId) {
-        if (!busyLock.enterBusy()) {
-            throw new IgniteException(NODE_STOPPING_ERR, new 
NodeStoppingException());
-        }
+    // TODO IGNITE-21585: subscribe to LWM updates.
+    public CompletableFuture<?> dropRegistryAsync(int tableId) {
+        return inBusyLockAsync(busyLock, () -> {
+            SchemaRegistryImpl removedRegistry = 
registriesById.remove(tableId);
+            removedRegistry.close();
 
-        try {
-            return registriesVv.update(causalityToken, (registries, e) -> 
inBusyLock(busyLock, () -> {
-                if (e != null) {
-                    return failedFuture(new IgniteInternalException(
-                            format("Cannot remove a schema registry for the 
table [tblId={}]", tableId), e));
-                }
-
-                SchemaRegistryImpl removedRegistry = 
registriesById.remove(tableId);
-                removedRegistry.close();
-
-                return nullCompletedFuture();
-            }));
-        } finally {
-            busyLock.leaveBusy();
-        }
+            return falseCompletedFuture();
+        });
     }
 
     @Override
diff --git 
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaManagerTest.java
 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaManagerTest.java
index 16a19ec9e9..2d772b693a 100644
--- 
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaManagerTest.java
+++ 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaManagerTest.java
@@ -55,6 +55,7 @@ import 
org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.sql.ColumnType;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.ArgumentCaptor;
@@ -251,6 +252,8 @@ class SchemaManagerTest extends BaseIgniteAbstractTest {
         completeCausalityToken(CAUSALITY_TOKEN_2);
     }
 
+    // TODO IGNITE-21585: subscribe to LWM
+    @Disabled("IGNITE-21585")
     @Test
     void destroyTableMakesRegistryUnavailable() {
         createSomeTable();
diff --git 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/sqllogic/ItSqlLogicTest.java
 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/sqllogic/ItSqlLogicTest.java
index ad0123193d..581eba0f22 100644
--- 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/sqllogic/ItSqlLogicTest.java
+++ 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/sqllogic/ItSqlLogicTest.java
@@ -291,8 +291,6 @@ public class ItSqlLogicTest extends IgniteIntegrationTest {
                     }
                 }
             }
-
-            forceCleanupAbandonedResources(CLUSTER_NODES.get(0));
         }
     }
 
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index aed0d5ce4c..ea44af1c06 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -1247,7 +1247,8 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
                     tableManager,
                     catalogManager,
                     threadPoolsManager.tableIoExecutor(),
-                    registry
+                    registry,
+                    lowWatermark
             );
         }
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 561ff10ddf..f1067d41d4 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -105,7 +105,7 @@ import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
 import org.apache.ignite.internal.catalog.events.CatalogEvent;
 import org.apache.ignite.internal.catalog.events.CreateTableEventParameters;
-import org.apache.ignite.internal.catalog.events.DestroyTableEventParameters;
+import org.apache.ignite.internal.catalog.events.DropTableEventParameters;
 import org.apache.ignite.internal.catalog.events.RenameTableEventParameters;
 import org.apache.ignite.internal.causality.CompletionListener;
 import org.apache.ignite.internal.causality.IncrementalVersionedValue;
@@ -162,6 +162,7 @@ import 
org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.storage.engine.StorageEngine;
 import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
 import org.apache.ignite.internal.storage.index.StorageIndexDescriptorSupplier;
+import org.apache.ignite.internal.table.DeferredEventsQueue;
 import org.apache.ignite.internal.table.IgniteTablesInternal;
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.TableImpl;
@@ -292,6 +293,9 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
     /** Started tables. */
     private final Map<Integer, TableImpl> startedTables = new 
ConcurrentHashMap<>();
 
+    /** Deferred destruction queue. */
+    private final DeferredEventsQueue<DestroyTableEvent> deferredQueue = new 
DeferredEventsQueue<>(DestroyTableEvent::catalogVersion);
+
     /** Local partitions. */
     private final Map<Integer, PartitionSet> localPartsByTableId = new 
ConcurrentHashMap<>();
 
@@ -561,7 +565,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
 
             long recoveryRevision = recoveryFinishFuture.join();
 
-            startTables(recoveryRevision);
+            startTables(recoveryRevision, lowWatermark.getLowWatermark());
 
             processAssignmentsOnRecovery(recoveryRevision);
 
@@ -569,14 +573,8 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
             
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX),
 stableAssignmentsRebalanceListener);
             
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(ASSIGNMENTS_SWITCH_REDUCE_PREFIX),
 assignmentsSwitchRebalanceListener);
 
-            catalogService.listen(CatalogEvent.TABLE_CREATE, parameters -> {
-                return onTableCreate((CreateTableEventParameters) 
parameters).thenApply(unused -> false);
-            });
-
-            catalogService.listen(CatalogEvent.TABLE_DESTROY, parameters -> {
-                return onTableDestroy(((DestroyTableEventParameters) 
parameters)).thenApply(unused -> false);
-            });
-
+            catalogService.listen(CatalogEvent.TABLE_CREATE, parameters -> 
onTableCreate((CreateTableEventParameters) parameters));
+            catalogService.listen(CatalogEvent.TABLE_DROP, parameters -> 
onTableDrop(((DropTableEventParameters) parameters)));
             catalogService.listen(CatalogEvent.TABLE_ALTER, parameters -> {
                 if (parameters instanceof RenameTableEventParameters) {
                     return onTableRename((RenameTableEventParameters) 
parameters).thenApply(unused -> false);
@@ -585,6 +583,8 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
                 }
             });
 
+            lowWatermark.addUpdateListener(this::onLwmChanged);
+
             partitionReplicatorNodeRecovery.start();
 
             return nullCompletedFuture();
@@ -644,8 +644,10 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
         }
     }
 
-    private CompletableFuture<?> onTableCreate(CreateTableEventParameters 
parameters) {
-        return createTableLocally(parameters.causalityToken(), 
parameters.catalogVersion(), parameters.tableDescriptor(), false);
+    private CompletableFuture<Boolean> 
onTableCreate(CreateTableEventParameters parameters) {
+        return inBusyLockAsync(busyLock, () ->
+                createTableLocally(parameters.causalityToken(), 
parameters.catalogVersion(), parameters.tableDescriptor(), false))
+                .thenApply(unused -> false);
     }
 
     /**
@@ -719,11 +721,29 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         });
     }
 
-    private CompletableFuture<Void> onTableDestroy(DestroyTableEventParameters 
parameters) {
+    private CompletableFuture<Boolean> onTableDrop(DropTableEventParameters 
parameters) {
         return inBusyLockAsync(busyLock, () -> {
-            dropTableLocally(parameters.causalityToken(), parameters);
+            deferredQueue.enqueue(new 
DestroyTableEvent(parameters.catalogVersion(), parameters.tableId()));
 
-            return nullCompletedFuture();
+            return falseCompletedFuture();
+        });
+    }
+
+    private CompletableFuture<Void> onLwmChanged(HybridTimestamp ts) {
+        return inBusyLockAsync(busyLock, () -> {
+            int earliestVersion = 
catalogService.earliestCatalogVersion(HybridTimestamp.hybridTimestampToLong(ts));
+
+            List<DestroyTableEvent> events = 
deferredQueue.drainUpTo(earliestVersion);
+
+            if (events.isEmpty()) {
+                return nullCompletedFuture();
+            }
+
+            List<CompletableFuture<Void>> futures = events.stream()
+                    .map(event -> destroyTableLocally(event.tableId()))
+                    .collect(Collectors.toList());
+
+            return allOf(futures.toArray(CompletableFuture[]::new));
         });
     }
 
@@ -1177,45 +1197,44 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
             CatalogTableDescriptor tableDescriptor,
             boolean onNodeRecovery
     ) {
-        return inBusyLockAsync(busyLock, () -> {
-            int tableId = tableDescriptor.id();
-            int zoneId = tableDescriptor.zoneId();
+        int tableId = tableDescriptor.id();
+        int zoneId = tableDescriptor.zoneId();
 
-            CatalogZoneDescriptor zoneDescriptor = 
getZoneDescriptor(tableDescriptor, catalogVersion);
+        // Retrieve descriptor during synchronous call, before the previous 
catalog version could be concurrently compacted.
+        CatalogZoneDescriptor zoneDescriptor = 
getZoneDescriptor(tableDescriptor, catalogVersion);
 
-            CompletableFuture<List<Assignments>> assignmentsFuture;
+        CompletableFuture<List<Assignments>> assignmentsFuture;
 
-            // Check if the table already has assignments in the meta storage 
locally.
-            // So, it means, that it is a recovery process and we should use 
the meta storage local assignments instead of calculation
-            // of the new ones.
-            if (partitionAssignmentsGetLocally(metaStorageMgr, tableId, 0, 
causalityToken) != null) {
-                assignmentsFuture = completedFuture(
-                        tableAssignmentsGetLocally(metaStorageMgr, tableId, 
zoneDescriptor.partitions(), causalityToken));
-            } else {
-                assignmentsFuture = 
distributionZoneManager.dataNodes(causalityToken, catalogVersion, zoneId)
-                        .thenApply(dataNodes -> 
AffinityUtils.calculateAssignments(
-                                dataNodes,
-                                zoneDescriptor.partitions(),
-                                zoneDescriptor.replicas()
-                        
).stream().map(Assignments::of).collect(Collectors.toList()));
-
-                assignmentsFuture.thenAccept(assignmentsList -> {
-                    LOG.info(IgniteStringFormatter.format("Assignments 
calculated from data nodes [table={}, tableId={}, assignments={}, "
-                            + "revision={}]", tableDescriptor.name(), tableId, 
assignmentListToString(assignmentsList), causalityToken));
-                });
-            }
+        // Check if the table already has assignments in the meta storage 
locally.
+        // So, it means, that it is a recovery process and we should use the 
meta storage local assignments instead of calculation
+        // of the new ones.
+        if (partitionAssignmentsGetLocally(metaStorageMgr, tableId, 0, 
causalityToken) != null) {
+            assignmentsFuture = completedFuture(
+                    tableAssignmentsGetLocally(metaStorageMgr, tableId, 
zoneDescriptor.partitions(), causalityToken));
+        } else {
+            assignmentsFuture = 
distributionZoneManager.dataNodes(causalityToken, catalogVersion, zoneId)
+                    .thenApply(dataNodes -> AffinityUtils.calculateAssignments(
+                            dataNodes,
+                            zoneDescriptor.partitions(),
+                            zoneDescriptor.replicas()
+                    
).stream().map(Assignments::of).collect(Collectors.toList()));
+
+            assignmentsFuture.thenAccept(assignmentsList -> {
+                LOG.info(IgniteStringFormatter.format("Assignments calculated 
from data nodes [table={}, tableId={}, assignments={}, "
+                        + "revision={}]", tableDescriptor.name(), tableId, 
assignmentListToString(assignmentsList), causalityToken));
+            });
+        }
 
-            CompletableFuture<List<Assignments>> assignmentsFutureAfterInvoke =
-                    writeTableAssignmentsToMetastore(tableId, 
assignmentsFuture);
+        CompletableFuture<List<Assignments>> assignmentsFutureAfterInvoke =
+                writeTableAssignmentsToMetastore(tableId, assignmentsFuture);
 
-            return createTableLocally(
-                    causalityToken,
-                    tableDescriptor,
-                    zoneDescriptor,
-                    assignmentsFutureAfterInvoke,
-                    onNodeRecovery
-            );
-        });
+        return createTableLocally(
+                causalityToken,
+                tableDescriptor,
+                zoneDescriptor,
+                assignmentsFutureAfterInvoke,
+                onNodeRecovery
+        );
     }
 
     /**
@@ -1300,15 +1319,17 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                 return failedFuture(e);
             }
 
-            startedTables.put(tableId, table);
-
             return allOf(localPartsUpdateFuture, 
tablesByIdFuture).thenComposeAsync(ignore -> inBusyLock(busyLock, () -> {
                         if (onNodeRecovery) {
                             SchemaRegistry schemaRegistry = table.schemaView();
                             PartitionSet partitionSet = 
localPartsByTableId.get(tableId);
+                            HybridTimestamp lwm = 
lowWatermark.getLowWatermark();
 
-                            registerIndexesToTableOnNodeRecovery(table, 
catalogService, partitionSet, schemaRegistry);
+                            registerIndexesToTableOnNodeRecovery(table, 
catalogService, partitionSet, schemaRegistry, lwm);
                         }
+
+                        startedTables.put(tableId, table);
+
                         return 
startLocalPartitionsAndClients(assignmentsFuture, table, zoneDescriptor.id());
                     }
             ), ioExecutor);
@@ -1377,63 +1398,43 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
     /**
      * Drops local structures for a table.
      *
-     * @param causalityToken Causality token.
-     * @param parameters Destroy table event parameters.
+     * @param tableId Table id to destroy.
      */
-    private void dropTableLocally(long causalityToken, 
DestroyTableEventParameters parameters) {
-        int tableId = parameters.tableId();
-        // TODO Drop partitions from parameters and use from storage.
-        int partitions = parameters.partitions();
-
-        localPartitionsVv.update(causalityToken, (previousVal, e) -> 
inBusyLock(busyLock, () -> {
-            if (e != null) {
-                return failedFuture(e);
-            }
-
-            localPartsByTableId.remove(tableId);
-
-            return nullCompletedFuture();
-        }));
-
-        tablesVv.update(causalityToken, (ignore, e) -> inBusyLock(busyLock, () 
-> {
-            if (e != null) {
-                return failedFuture(e);
-            }
-
-            TableImpl table = tables.get(tableId);
-
-            assert table != null : tableId;
-
-            InternalTable internalTable = table.internalTable();
-
-            CompletableFuture<?>[] stopReplicaFutures = new 
CompletableFuture<?>[partitions];
-
-            // TODO https://issues.apache.org/jira/browse/IGNITE-19170 
Partitions should be stopped on the assignments change
-            //  event triggered by zone drop or alter. Stop replica 
asynchronously, out of metastorage event pipeline.
-            for (int partitionId = 0; partitionId < partitions; partitionId++) 
{
-                var replicationGroupId = new TablePartitionId(tableId, 
partitionId);
-
-                stopReplicaFutures[partitionId] = 
stopPartition(replicationGroupId, table);
-            }
+    private CompletableFuture<Void> destroyTableLocally(int tableId) {
+        TableImpl table = startedTables.remove(tableId);
+        localPartsByTableId.remove(tableId);
 
-            // TODO: IGNITE-18703 Destroy raft log and meta
-            return allOf(stopReplicaFutures)
-                    .thenComposeAsync(
-                            unused -> allOf(
-                                    internalTable.storage().destroy(),
-                                    runAsync(() -> 
internalTable.txStateStorage().destroy(), ioExecutor)
-                            ),
-                            ioExecutor
-                    ).thenAccept(ignore0 -> tables.remove(tableId));
-        }));
+        assert table != null;
 
-        startedTables.remove(tableId);
+        InternalTable internalTable = table.internalTable();
+        int partitions = internalTable.partitions();
 
+        // TODO https://issues.apache.org/jira/browse/IGNITE-18991 Move 
assigment manipulations to Distribution zones.
         Set<ByteArray> assignmentKeys = IntStream.range(0, partitions)
                 .mapToObj(p -> stablePartAssignmentsKey(new 
TablePartitionId(tableId, p)))
                 .collect(toSet());
-
         metaStorageMgr.removeAll(assignmentKeys);
+
+        CompletableFuture<?>[] stopReplicaFutures = new 
CompletableFuture<?>[partitions];
+
+        // TODO https://issues.apache.org/jira/browse/IGNITE-19170 Partitions 
should be stopped on the assignments change
+        //  event triggered by zone drop or alter. Stop replica 
asynchronously, out of metastorage event pipeline.
+        for (int partitionId = 0; partitionId < partitions; partitionId++) {
+            var replicationGroupId = new TablePartitionId(tableId, 
partitionId);
+
+            stopReplicaFutures[partitionId] = 
stopPartition(replicationGroupId, table);
+        }
+
+        // TODO: IGNITE-18703 Destroy raft log and meta
+        return allOf(stopReplicaFutures)
+                .thenComposeAsync(
+                        unused -> inBusyLockAsync(busyLock, () -> allOf(
+                                internalTable.storage().destroy(),
+                                runAsync(() -> inBusyLock(busyLock, () -> 
internalTable.txStateStorage().destroy()), ioExecutor)
+                        )),
+                        ioExecutor)
+                .thenAccept(ignore0 -> tables.remove(tableId))
+                .thenAcceptAsync(ignore0 -> 
schemaManager.dropRegistryAsync(tableId), ioExecutor);
     }
 
     @Override
@@ -2398,10 +2399,10 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         return tables.stream().filter(table -> 
table.name().equals(name)).findAny().orElse(null);
     }
 
-    private void startTables(long recoveryRevision) {
+    private void startTables(long recoveryRevision, @Nullable HybridTimestamp 
lwm) {
         sharedTxStateStorage.start();
 
-        int earliestCatalogVersion = catalogService.earliestCatalogVersion();
+        int earliestCatalogVersion = 
catalogService.activeCatalogVersion(HybridTimestamp.hybridTimestampToLong(lwm));
         int latestCatalogVersion = catalogService.latestCatalogVersion();
 
         var startedTables = new IntOpenHashSet();
@@ -2440,4 +2441,23 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
 
         return anyOf(future, stopManagerFuture).thenApply(o -> (T) o);
     }
+
+    /** Internal event. */
+    private static class DestroyTableEvent {
+        final int catalogVersion;
+        final int tableId;
+
+        DestroyTableEvent(int catalogVersion, int tableId) {
+            this.catalogVersion = catalogVersion;
+            this.tableId = tableId;
+        }
+
+        public int catalogVersion() {
+            return catalogVersion;
+        }
+
+        public int tableId() {
+            return tableId;
+        }
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexUtils.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexUtils.java
index f9793904d8..eac28e464b 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexUtils.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexUtils.java
@@ -17,10 +17,12 @@
 
 package org.apache.ignite.internal.table.distributed.index;
 
-import java.util.HashSet;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor;
 import org.apache.ignite.internal.storage.index.StorageIndexDescriptor;
@@ -28,6 +30,7 @@ import 
org.apache.ignite.internal.storage.index.StorageIndexDescriptor.StorageCo
 import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor;
 import org.apache.ignite.internal.table.TableViewInternal;
 import org.apache.ignite.internal.table.distributed.PartitionSet;
+import org.jetbrains.annotations.Nullable;
 
 /** Auxiliary class for working with indexes that can contain useful methods 
and constants. */
 public class IndexUtils {
@@ -71,38 +74,40 @@ public class IndexUtils {
     }
 
     /**
-     * Registers indexes to a table on node recovery..
+     * Registers indexes to a table on node recovery.
      *
      * @param table Table into which the index will be registered.
      * @param catalogService Catalog service.
      * @param partitionSet Partitions for which index storages will need to be 
created if they are missing.
      * @param schemaRegistry Table schema register.
+     * @param lwm Low watermark.
      */
     public static void registerIndexesToTableOnNodeRecovery(
             TableViewInternal table,
             CatalogService catalogService,
             PartitionSet partitionSet,
-            SchemaRegistry schemaRegistry
+            SchemaRegistry schemaRegistry,
+            @Nullable HybridTimestamp lwm
     ) {
-        int earliestCatalogVersion = catalogService.earliestCatalogVersion();
+        int earliestCatalogVersion = 
catalogService.activeCatalogVersion(HybridTimestamp.hybridTimestampToLong(lwm));
         int latestCatalogVersion = catalogService.latestCatalogVersion();
 
         int tableId = table.tableId();
 
-        var indexIds = new HashSet<Integer>();
+        var indexIds = new IntOpenHashSet();
 
-        for (int catalogVersion = earliestCatalogVersion; catalogVersion <= 
latestCatalogVersion; catalogVersion++) {
+        for (int catalogVersion = latestCatalogVersion; catalogVersion >= 
earliestCatalogVersion; catalogVersion--) {
             CatalogTableDescriptor tableDescriptor = 
catalogService.table(tableId, catalogVersion);
 
             if (tableDescriptor == null) {
                 continue;
             }
 
-            for (CatalogIndexDescriptor indexDescriptor : 
catalogService.indexes(catalogVersion, tableId)) {
-                if (indexIds.add(indexDescriptor.id())) {
-                    registerIndexToTable(table, tableDescriptor, 
indexDescriptor, partitionSet, schemaRegistry);
-                }
-            }
+            int ver0 = catalogVersion;
+            catalogService.indexes(catalogVersion, tableId).stream()
+                    .filter(idx -> ver0 == latestCatalogVersion || 
idx.status() == CatalogIndexStatus.AVAILABLE) // Alive index
+                    .filter(idx -> indexIds.add(idx.id())) // Filter duplicates
+                    .forEach(idx -> registerIndexToTable(table, 
tableDescriptor, idx, partitionSet, schemaRegistry));
         }
     }
 }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 90eb159423..3f277f204d 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -359,7 +359,7 @@ public class TableManagerTest extends IgniteAbstractTest {
         verify(txStateTableStorage, atMost(0)).destroy();
         verify(replicaMgr, atMost(0)).stopReplica(any());
 
-        assertTrue(CatalogTestUtils.waitCatalogCompaction(catalogManager, 
Long.MAX_VALUE));
+        assertThat(fireDestroyEvent(), willCompleteSuccessfully());
 
         verify(mvTableStorage, 
timeout(TimeUnit.SECONDS.toMillis(10))).destroy();
         verify(txStateTableStorage, 
timeout(TimeUnit.SECONDS.toMillis(10))).destroy();
@@ -867,4 +867,8 @@ public class TableManagerTest extends IgniteAbstractTest {
     private Collection<CatalogTableDescriptor> allTableDescriptors() {
         return catalogManager.tables(catalogManager.latestCatalogVersion());
     }
+
+    private CompletableFuture<Void> fireDestroyEvent() {
+        return lowWatermark.updateAndNotify(clock.now());
+    }
 }

Reply via email to