This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new ed64c5c3c4 IGNITE-21754 Remove destroyed tables on recovery - Page 
Memory (#3446)
ed64c5c3c4 is described below

commit ed64c5c3c4e265a04a5e63e01f951f976c1817f1
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Thu Mar 21 15:25:52 2024 +0300

    IGNITE-21754 Remove destroyed tables on recovery - Page Memory (#3446)
---
 .../distributionzones/DistributionZoneManager.java |   2 +-
 .../persistence/store/FilePageStoreManager.java    |  31 ++-
 .../store/FilePageStoreManagerTest.java            |  32 ++-
 .../internal/storage/engine/StorageEngine.java     |   8 +
 .../engine/ThreadAssertingStorageEngine.java       |   5 +
 .../storage/engine/AbstractStorageEngineTest.java  |  60 +++++-
 .../internal/storage/impl/TestStorageEngine.java   |   5 +
 .../PersistentPageMemoryStorageEngine.java         |  14 ++
 .../VolatilePageMemoryStorageEngine.java           |   5 +
 .../storage/rocksdb/RocksDbStorageEngine.java      |   6 +
 .../rocksdb/engine/RocksDbStorageEngineTest.java   |   7 +
 .../table/distributed/DroppedTableInfo.java        |  67 +++++++
 .../internal/table/distributed/TableManager.java   |  31 ++-
 .../internal/table/distributed/TableUtils.java     |  39 ++++
 .../distributed/TableManagerRecoveryTest.java      |  35 +++-
 .../internal/table/distributed/TableUtilsTest.java | 215 ++++++++++++++-------
 .../ignite/internal/table/TableTestUtils.java      |  48 +++++
 17 files changed, 523 insertions(+), 87 deletions(-)

diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index ce7eeee373..8f8326c6ff 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -1425,7 +1425,7 @@ public class DistributionZoneManager implements 
IgniteComponent {
 
         List<CompletableFuture<Void>> futures = new ArrayList<>();
 
-        // TODO: IGNITE-20287 Clean up abandoned resources for dropped zones 
from volt and metastore
+        // TODO: IGNITE-20287 Clean up abandoned resources for dropped tables 
from vault and metastore
         for (CatalogZoneDescriptor zone : 
catalogManager.zones(catalogVersion)) {
             futures.add(restoreZoneStateBusy(zone, recoveryRevision));
         }
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManager.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManager.java
index b599d338d6..318a52e887 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManager.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManager.java
@@ -20,11 +20,13 @@ package 
org.apache.ignite.internal.pagememory.persistence.store;
 import static java.nio.file.Files.createDirectories;
 import static java.nio.file.Files.createFile;
 import static java.nio.file.Files.delete;
+import static java.nio.file.Files.exists;
 import static java.util.stream.Collectors.toList;
 import static 
org.apache.ignite.internal.pagememory.PageIdAllocator.MAX_PARTITION_ID;
 import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
 import static 
org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
 import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
+import static 
org.apache.ignite.internal.util.IgniteUtils.deleteIfExistsThrowable;
 
 import java.io.File;
 import java.io.IOException;
@@ -352,7 +354,7 @@ public class FilePageStoreManager implements 
PageReadWriteManager {
     }
 
     private Path ensureGroupWorkDir(int groupId) throws 
IgniteInternalCheckedException {
-        Path groupWorkDir = dbDir.resolve(GROUP_DIR_PREFIX + groupId);
+        Path groupWorkDir = groupDir(groupId);
 
         try {
             createDirectories(groupWorkDir);
@@ -393,7 +395,7 @@ public class FilePageStoreManager implements 
PageReadWriteManager {
      * @param index Index of the delta file page store.
      */
     public Path tmpDeltaFilePageStorePath(int groupId, int partitionId, int 
index) {
-        return dbDir.resolve(GROUP_DIR_PREFIX + 
groupId).resolve(String.format(TMP_PART_DELTA_FILE_TEMPLATE, partitionId, 
index));
+        return 
groupDir(groupId).resolve(String.format(TMP_PART_DELTA_FILE_TEMPLATE, 
partitionId, index));
     }
 
     /**
@@ -404,7 +406,7 @@ public class FilePageStoreManager implements 
PageReadWriteManager {
      * @param index Index of the delta file page store.
      */
     public Path deltaFilePageStorePath(int groupId, int partitionId, int 
index) {
-        return dbDir.resolve(GROUP_DIR_PREFIX + 
groupId).resolve(String.format(PART_DELTA_FILE_TEMPLATE, partitionId, index));
+        return 
groupDir(groupId).resolve(String.format(PART_DELTA_FILE_TEMPLATE, partitionId, 
index));
     }
 
     /**
@@ -434,7 +436,7 @@ public class FilePageStoreManager implements 
PageReadWriteManager {
 
         return cleanupAsyncExecutor.async((RunnableX) () -> {
             Path partitionDeleteFilePath = createFile(
-                    dbDir.resolve(GROUP_DIR_PREFIX + 
groupPartitionId.getGroupId())
+                    groupDir(groupPartitionId.getGroupId())
                             .resolve(String.format(DEL_PART_FILE_TEMPLATE, 
groupPartitionId.getPartitionId()))
             );
 
@@ -482,4 +484,25 @@ public class FilePageStoreManager implements 
PageReadWriteManager {
             return filePageStore;
         });
     }
+
+    /**
+     * Destroys the group directory with all sub-directories and files if it 
exists.
+     *
+     * @throws IOException If there was an I/O error when deleting a directory.
+     */
+    public void destroyGroupIfExists(int groupId) throws IOException {
+        Path groupDir = groupDir(groupId);
+
+        try {
+            if (exists(groupDir)) {
+                deleteIfExistsThrowable(groupDir);
+            }
+        } catch (IOException e) {
+            throw new IOException("Failed to delete group directory: " + 
groupDir, e);
+        }
+    }
+
+    private Path groupDir(int groupId) {
+        return dbDir.resolve(GROUP_DIR_PREFIX + groupId);
+    }
 }
diff --git 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManagerTest.java
 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManagerTest.java
index d38bf4645b..6e8393898c 100644
--- 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManagerTest.java
+++ 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManagerTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.pagememory.persistence.store;
 
 import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.function.Predicate.not;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toSet;
 import static 
org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager.DEL_PART_FILE_TEMPLATE;
@@ -40,6 +41,7 @@ import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.emptyArray;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -525,13 +527,35 @@ public class FilePageStoreManagerTest extends 
BaseIgniteAbstractTest {
         );
     }
 
+    @Test
+    void testDestroyGroupIfExists() throws Exception {
+        FilePageStoreManager manager = createManager();
+
+        manager.start();
+
+        int groupId = 1;
+
+        Path dbDir = workDir.resolve("db");
+
+        // Directory does not exist.
+        assertDoesNotThrow(() -> manager.destroyGroupIfExists(groupId));
+        assertThat(collectAllSubFileAndDirs(dbDir), empty());
+
+        createAndAddFilePageStore(manager, new GroupPartitionId(groupId, 0));
+        assertThat(collectAllSubFileAndDirs(dbDir), not(empty()));
+
+        manager.destroyGroupIfExists(groupId);
+        assertThat(collectAllSubFileAndDirs(dbDir), empty());
+    }
+
     private FilePageStoreManager createManager() throws Exception {
         FilePageStoreManager manager = new FilePageStoreManager(
                 "test",
                 workDir,
                 new RandomAccessFileIoFactory(),
                 PAGE_SIZE,
-                mock(FailureProcessor.class));
+                mock(FailureProcessor.class)
+        );
 
         managers.add(manager);
 
@@ -544,6 +568,12 @@ public class FilePageStoreManagerTest extends 
BaseIgniteAbstractTest {
         }
     }
 
+    private static List<Path> collectAllSubFileAndDirs(Path start) throws 
Exception {
+        try (Stream<Path> fileStream = Files.walk(start)) {
+            return fileStream.filter(not(start::equals)).collect(toList());
+        }
+    }
+
     private static Path createGroupWorkDir(Path workDir, int tableId) throws 
Exception {
         Path groupWorkDir = workDir.resolve("db/table-" + tableId);
 
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageEngine.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageEngine.java
index 908d62a9c1..332ac08d40 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageEngine.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageEngine.java
@@ -57,4 +57,12 @@ public interface StorageEngine {
      */
     // TODO: IGNITE-19717 Get rid of indexDescriptorSupplier
     MvTableStorage createMvTable(StorageTableDescriptor tableDescriptor, 
StorageIndexDescriptorSupplier indexDescriptorSupplier);
+
+    /**
+     * Destroys the table if it exists.
+     *
+     * @param tableId Table ID.
+     * @throws StorageException If an error has occurs while dropping the 
table.
+     */
+    void dropMvTable(int tableId);
 }
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/ThreadAssertingStorageEngine.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/ThreadAssertingStorageEngine.java
index d9f02b1773..3d699b7049 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/ThreadAssertingStorageEngine.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/ThreadAssertingStorageEngine.java
@@ -59,4 +59,9 @@ public class ThreadAssertingStorageEngine implements 
StorageEngine {
         MvTableStorage tableStorage = 
storageEngine.createMvTable(tableDescriptor, indexDescriptorSupplier);
         return new ThreadAssertingMvTableStorage(tableStorage);
     }
+
+    @Override
+    public void dropMvTable(int tableId) {
+        storageEngine.dropMvTable(tableId);
+    }
 }
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/engine/AbstractStorageEngineTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/engine/AbstractStorageEngineTest.java
index 5909c6238a..fbabfc2546 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/engine/AbstractStorageEngineTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/engine/AbstractStorageEngineTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.storage.engine;
 import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_DATA_REGION;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assumptions.assumeFalse;
 import static org.mockito.Mockito.mock;
@@ -68,7 +69,41 @@ public abstract class AbstractStorageEngineTest extends 
BaseMvStoragesTest {
     void testRestartAfterFlush() throws Exception {
         assumeFalse(storageEngine.isVolatile());
 
-        StorageTableDescriptor tableDescriptor = new StorageTableDescriptor(1, 
1, DEFAULT_DATA_REGION);
+        int tableId = 1;
+        int lastAppliedIndex = 10;
+        int lastAppliedTerm = 20;
+
+        createMvTableWithPartitionAndFill(tableId, lastAppliedIndex, 
lastAppliedTerm);
+
+        // Restart.
+        stopEngineAfterTest();
+        createEngineBeforeTest();
+
+        checkMvTableStorageWithPartitionAfterRestart(tableId, 
lastAppliedIndex, lastAppliedTerm);
+    }
+
+    @Test
+    protected void testDropMvTableOnRecovery() throws Exception {
+        assumeFalse(storageEngine.isVolatile());
+
+        int tableId = 1;
+
+        // Table does not exist.
+        assertDoesNotThrow(() -> storageEngine.dropMvTable(tableId));
+
+        createMvTableWithPartitionAndFill(tableId, 10, 20);
+
+        // Restart.
+        stopEngineAfterTest();
+        createEngineBeforeTest();
+
+        assertDoesNotThrow(() -> storageEngine.dropMvTable(tableId));
+
+        checkMvTableStorageWithPartitionAfterRestart(tableId, 0, 0);
+    }
+
+    private void createMvTableWithPartitionAndFill(int tableId, int 
lastAppliedIndex, int lastAppliedTerm) throws Exception {
+        StorageTableDescriptor tableDescriptor = new 
StorageTableDescriptor(tableId, 1, DEFAULT_DATA_REGION);
         StorageIndexDescriptorSupplier indexSupplier = 
mock(StorageIndexDescriptorSupplier.class);
 
         MvTableStorage mvTableStorage = 
storageEngine.createMvTable(tableDescriptor, indexSupplier);
@@ -79,13 +114,13 @@ public abstract class AbstractStorageEngineTest extends 
BaseMvStoragesTest {
             assertThat(mvPartitionStorageFuture, willCompleteSuccessfully());
             MvPartitionStorage mvPartitionStorage = 
mvPartitionStorageFuture.join();
 
-            try (AutoCloseable ignored1 = mvTableStorage::close) {
+            try (AutoCloseable ignored1 = mvPartitionStorage::close) {
                 // Flush. Persist the table itself, not the update.
                 assertThat(mvPartitionStorage.flush(), 
willCompleteSuccessfully());
 
                 mvPartitionStorage.runConsistently(locker -> {
                     // Update of basic storage data.
-                    mvPartitionStorage.lastApplied(10, 20);
+                    mvPartitionStorage.lastApplied(lastAppliedIndex, 
lastAppliedTerm);
 
                     return null;
                 });
@@ -94,12 +129,17 @@ public abstract class AbstractStorageEngineTest extends 
BaseMvStoragesTest {
                 assertThat(mvPartitionStorage.flush(), 
willCompleteSuccessfully());
             }
         }
+    }
 
-        // Restart.
-        stopEngineAfterTest();
-        createEngineBeforeTest();
+    private void checkMvTableStorageWithPartitionAfterRestart(
+            int tableId,
+            int expLastAppliedIndex,
+            int expLastAppliedTerm
+    ) throws Exception {
+        StorageTableDescriptor tableDescriptor = new 
StorageTableDescriptor(tableId, 1, DEFAULT_DATA_REGION);
+        StorageIndexDescriptorSupplier indexSupplier = 
mock(StorageIndexDescriptorSupplier.class);
 
-        mvTableStorage = storageEngine.createMvTable(tableDescriptor, 
indexSupplier);
+        MvTableStorage mvTableStorage = 
storageEngine.createMvTable(tableDescriptor, indexSupplier);
 
         try (AutoCloseable ignored0 = mvTableStorage::close) {
             CompletableFuture<MvPartitionStorage> mvPartitionStorageFuture = 
mvTableStorage.createMvPartition(0);
@@ -107,10 +147,10 @@ public abstract class AbstractStorageEngineTest extends 
BaseMvStoragesTest {
             assertThat(mvPartitionStorageFuture, willCompleteSuccessfully());
             MvPartitionStorage mvPartitionStorage = 
mvPartitionStorageFuture.join();
 
-            try (AutoCloseable ignored1 = mvTableStorage::close) {
+            try (AutoCloseable ignored1 = mvPartitionStorage::close) {
                 // Check that data has been persisted.
-                assertEquals(10, mvPartitionStorage.lastAppliedIndex());
-                assertEquals(20, mvPartitionStorage.lastAppliedTerm());
+                assertEquals(expLastAppliedIndex, 
mvPartitionStorage.lastAppliedIndex());
+                assertEquals(expLastAppliedTerm, 
mvPartitionStorage.lastAppliedTerm());
             }
         }
     }
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestStorageEngine.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestStorageEngine.java
index c983c8bbf3..7b2443e256 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestStorageEngine.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestStorageEngine.java
@@ -59,4 +59,9 @@ public class TestStorageEngine implements StorageEngine {
     ) throws StorageException {
         return spy(new TestMvTableStorage(tableDescriptor));
     }
+
+    @Override
+    public void dropMvTable(int tableId) {
+        // No-op.
+    }
 }
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
index edfb1ab175..6a494cca73 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
@@ -22,6 +22,7 @@ import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
 import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
 import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
 
+import java.io.IOException;
 import java.nio.file.Path;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -243,6 +244,19 @@ public class PersistentPageMemoryStorageEngine implements 
StorageEngine {
         return new PersistentPageMemoryTableStorage(tableDescriptor, 
indexDescriptorSupplier, this, dataRegion, destructionExecutor);
     }
 
+    @Override
+    public void dropMvTable(int tableId) {
+        FilePageStoreManager filePageStoreManager = this.filePageStoreManager;
+
+        assert filePageStoreManager != null : "Component has not started";
+
+        try {
+            filePageStoreManager.destroyGroupIfExists(tableId);
+        } catch (IOException e) {
+            throw new StorageException("Failed to destroy table directory: 
{}", e, tableId);
+        }
+    }
+
     /**
      * Returns checkpoint manager, {@code null} if engine not started.
      */
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryStorageEngine.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryStorageEngine.java
index c7719a910c..d59b01dd62 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryStorageEngine.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryStorageEngine.java
@@ -160,6 +160,11 @@ public class VolatilePageMemoryStorageEngine implements 
StorageEngine {
         return new VolatilePageMemoryTableStorage(tableDescriptor, 
indexDescriptorSupplier, dataRegion, destructionExecutor);
     }
 
+    @Override
+    public void dropMvTable(int tableId) {
+        // No-op.
+    }
+
     /**
      * Creates, starts and adds a new data region to the engine.
      *
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
index 6e4dd1110b..b2eb6c9e36 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
@@ -201,4 +201,10 @@ public class RocksDbStorageEngine implements StorageEngine 
{
 
         return storage;
     }
+
+    @Override
+    // TODO: IGNITE-21760 Implement
+    public void dropMvTable(int tableId) {
+        throw new 
UnsupportedOperationException("https://issues.apache.org/jira/browse/IGNITE-21760";);
+    }
 }
diff --git 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/engine/RocksDbStorageEngineTest.java
 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/engine/RocksDbStorageEngineTest.java
index f908c5d121..c376e6d326 100644
--- 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/engine/RocksDbStorageEngineTest.java
+++ 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/engine/RocksDbStorageEngineTest.java
@@ -25,6 +25,7 @@ import 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
 import 
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.extension.ExtendWith;
 
 /**
@@ -46,4 +47,10 @@ public class RocksDbStorageEngineTest extends 
AbstractStorageEngineTest {
                 workDir
         );
     }
+
+    @Override
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-21760";)
+    protected void testDropMvTableOnRecovery() throws Exception {
+        super.testDropMvTableOnRecovery();
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/DroppedTableInfo.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/DroppedTableInfo.java
new file mode 100644
index 0000000000..0077497928
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/DroppedTableInfo.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import org.apache.ignite.internal.tostring.S;
+
+/** Information about the dropped table. */
+final class DroppedTableInfo {
+    private final int tableId;
+
+    /** Catalog version in which the table was removed from the catalog. */
+    private final int tableRemovalCatalogVersion;
+
+    DroppedTableInfo(int tableId, int tableRemovalCatalogVersion) {
+        this.tableId = tableId;
+        this.tableRemovalCatalogVersion = tableRemovalCatalogVersion;
+    }
+
+    public int tableId() {
+        return tableId;
+    }
+
+    public int tableRemovalCatalogVersion() {
+        return tableRemovalCatalogVersion;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        DroppedTableInfo other = (DroppedTableInfo) o;
+
+        return tableId == other.tableId && tableRemovalCatalogVersion == 
other.tableRemovalCatalogVersion;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = tableId;
+        result = 31 * result + tableRemovalCatalogVersion;
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return S.toString(this);
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 4a3fbb2957..0785fa5ce1 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -52,6 +52,7 @@ import static 
org.apache.ignite.internal.metastorage.dsl.Conditions.value;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.ops;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
 import static org.apache.ignite.internal.metastorage.dsl.Statements.iif;
+import static 
org.apache.ignite.internal.table.distributed.TableUtils.droppedTables;
 import static 
org.apache.ignite.internal.table.distributed.index.IndexUtils.registerIndexesToTable;
 import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
 import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE;
@@ -578,6 +579,8 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
 
             long recoveryRevision = recoveryFinishFuture.join();
 
+            cleanUpResourcesForDroppedTablesOnRecoveryBusy();
+
             startTables(recoveryRevision, lowWatermark.getLowWatermark());
 
             processAssignmentsOnRecovery(recoveryRevision);
@@ -2441,9 +2444,8 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         int latestCatalogVersion = catalogService.latestCatalogVersion();
 
         var startedTables = new IntOpenHashSet();
-        List<CompletableFuture<?>> startTableFutures = new ArrayList<>();
+        var startTableFutures = new ArrayList<CompletableFuture<?>>();
 
-        // TODO: IGNITE-20384 Clean up abandoned resources for dropped zones 
from volt and metastore
         for (int ver = latestCatalogVersion; ver >= earliestCatalogVersion; 
ver--) {
             int ver0 = ver;
             catalogService.tables(ver).stream()
@@ -2495,4 +2497,29 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
             return tableId;
         }
     }
+
+    private void cleanUpResourcesForDroppedTablesOnRecoveryBusy() {
+        // TODO: IGNITE-20384 Clean up abandoned resources for dropped zones 
from vault and metastore
+        for (DroppedTableInfo droppedTableInfo : droppedTables(catalogService, 
lowWatermark.getLowWatermark())) {
+            int catalogVersion = droppedTableInfo.tableRemovalCatalogVersion() 
- 1;
+
+            CatalogTableDescriptor tableDescriptor = 
catalogService.table(droppedTableInfo.tableId(), catalogVersion);
+
+            assert tableDescriptor != null : "tableId=" + 
droppedTableInfo.tableId() + ", catalogVersion=" + catalogVersion;
+
+            CatalogZoneDescriptor zoneDescriptor = 
catalogService.zone(tableDescriptor.zoneId(), catalogVersion);
+
+            assert zoneDescriptor != null : "zoneId=" + 
tableDescriptor.zoneId() + ", catalogVersion=" + catalogVersion;
+
+            destroyTableStorageOnRecoveryBusy(tableDescriptor, zoneDescriptor);
+        }
+    }
+
+    private void destroyTableStorageOnRecoveryBusy(CatalogTableDescriptor 
tableDescriptor, CatalogZoneDescriptor zoneDescriptor) {
+        StorageEngine engine = 
dataStorageMgr.engine(zoneDescriptor.dataStorage().engine());
+
+        assert engine != null : "tableId=" + tableDescriptor.id() + ", 
engineName=" + zoneDescriptor.dataStorage().engine();
+
+        engine.dropMvTable(tableDescriptor.id());
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableUtils.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableUtils.java
index 3f1b8ca61e..7d5baf06a1 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableUtils.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableUtils.java
@@ -17,17 +17,23 @@
 
 package org.apache.ignite.internal.table.distributed;
 
+import static java.util.stream.Collectors.toCollection;
 import static 
org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.BUILDING;
 import static org.apache.ignite.internal.util.CollectionUtils.view;
 
+import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
 import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.tx.TransactionIds;
+import org.jetbrains.annotations.Nullable;
 
 /** Contains common helper methods and fields for use within a module. */
 public class TableUtils {
@@ -81,4 +87,37 @@ public class TableUtils {
                 BUILDING, indexId, fromCatalogVersionIncluded, 
latestCatalogVersion
         ));
     }
+
+    /**
+     * Collects a list of tables that were removed from the catalog and should 
have been dropped due to a low watermark (if the catalog
+     * version in which the table was removed is less than or equal to the 
active catalog version of the low watermark).
+     *
+     * @param catalogService Catalog service.
+     * @param lowWatermark Low watermark, {@code null} if it has never been 
updated.
+     */
+    // TODO: IGNITE-21771 Process or check catalog compaction
+    static List<DroppedTableInfo> droppedTables(CatalogService catalogService, 
@Nullable HybridTimestamp lowWatermark) {
+        if (lowWatermark == null) {
+            return List.of();
+        }
+
+        int earliestCatalogVersion = catalogService.earliestCatalogVersion();
+        int lwmCatalogVersion = 
catalogService.activeCatalogVersion(lowWatermark.longValue());
+
+        Set<Integer> tableIds = 
catalogService.tables(lwmCatalogVersion).stream()
+                .map(CatalogObjectDescriptor::id)
+                .collect(toCollection(HashSet::new));
+
+        var res = new ArrayList<DroppedTableInfo>();
+
+        for (int catalogVersion = lwmCatalogVersion - 1; catalogVersion >= 
earliestCatalogVersion; catalogVersion--) {
+            for (CatalogTableDescriptor table : 
catalogService.tables(catalogVersion)) {
+                if (tableIds.add(table.id())) {
+                    res.add(new DroppedTableInfo(table.id(), catalogVersion + 
1));
+                }
+            }
+        }
+
+        return res;
+    }
 }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
index d0dd06bb44..2cc7e95082 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
@@ -22,6 +22,7 @@ import static 
java.util.concurrent.CompletableFuture.completedFuture;
 import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
 import static 
org.apache.ignite.internal.catalog.CatalogTestUtils.createTestCatalogManager;
 import static org.apache.ignite.internal.table.TableTestUtils.createHashIndex;
+import static org.apache.ignite.internal.table.TableTestUtils.getTableIdStrict;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
 import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE;
@@ -34,6 +35,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockStatic;
@@ -58,6 +60,7 @@ import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.catalog.commands.ColumnParams;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.components.LongJvmPauseDetector;
 import org.apache.ignite.internal.configuration.ConfigurationRegistry;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
@@ -89,8 +92,11 @@ import org.apache.ignite.internal.schema.SchemaUtils;
 import org.apache.ignite.internal.schema.configuration.GcConfiguration;
 import 
org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration;
 import org.apache.ignite.internal.storage.DataStorageManager;
+import org.apache.ignite.internal.storage.DataStorageModule;
 import org.apache.ignite.internal.storage.DataStorageModules;
+import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
 import 
org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryDataStorageModule;
 import 
org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryStorageEngineConfiguration;
 import org.apache.ignite.internal.table.TableTestUtils;
@@ -112,6 +118,7 @@ import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.TopologyService;
 import org.apache.ignite.sql.IgniteSql;
+import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -164,6 +171,8 @@ public class TableManagerRecoveryTest extends 
IgniteAbstractTest {
 
     private volatile HybridTimestamp savedWatermark;
 
+    private final DataStorageModule dataStorageModule = 
createDataStorageModule();
+
     @AfterEach
     void after() throws Exception {
         stopComponents();
@@ -182,6 +191,8 @@ public class TableManagerRecoveryTest extends 
IgniteAbstractTest {
         clearInvocations(mvTableStorage);
         clearInvocations(txStateTableStorage);
 
+        int tableId = getTableIdStrict(catalogManager, TABLE_NAME, 
clock.nowLong());
+
         // Drop table and save watermark without triggering LWM events.
         dropTable(TABLE_NAME);
 
@@ -195,6 +206,9 @@ public class TableManagerRecoveryTest extends 
IgniteAbstractTest {
 
         verify(mvTableStorage, never()).createMvPartition(anyInt());
         verify(txStateTableStorage, 
never()).getOrCreateTxStateStorage(anyInt());
+
+        // Let's check that the table was deleted.
+        verify(dsm.engine(dataStorageModule.name())).dropMvTable(eq(tableId));
     }
 
     @Test
@@ -294,7 +308,7 @@ public class TableManagerRecoveryTest extends 
IgniteAbstractTest {
                 null,
                 null,
                 tm,
-                dsm = 
createDataStorageManager(mock(ConfigurationRegistry.class), workDir, 
storageEngineConfig),
+                dsm = 
createDataStorageManager(mock(ConfigurationRegistry.class), workDir, 
storageEngineConfig, dataStorageModule),
                 workDir,
                 metaStorageManager,
                 sm = new SchemaManager(revisionUpdater, catalogManager),
@@ -363,11 +377,12 @@ public class TableManagerRecoveryTest extends 
IgniteAbstractTest {
     private static DataStorageManager createDataStorageManager(
             ConfigurationRegistry mockedRegistry,
             Path storagePath,
-            PersistentPageMemoryStorageEngineConfiguration config
+            PersistentPageMemoryStorageEngineConfiguration config,
+            DataStorageModule dataStorageModule
     ) {
         
when(mockedRegistry.getConfiguration(PersistentPageMemoryStorageEngineConfiguration.KEY)).thenReturn(config);
 
-        DataStorageModules dataStorageModules = new 
DataStorageModules(List.of(new PersistentPageMemoryDataStorageModule()));
+        DataStorageModules dataStorageModules = new 
DataStorageModules(List.of(dataStorageModule));
 
         DataStorageManager manager = new DataStorageManager(
                 dataStorageModules.createStorageEngines(NODE_NAME, 
mockedRegistry, storagePath, null, mock(FailureProcessor.class))
@@ -408,4 +423,18 @@ public class TableManagerRecoveryTest extends 
IgniteAbstractTest {
         TableTestUtils.dropIndex(catalogManager, DEFAULT_SCHEMA_NAME, 
indexName);
     }
 
+    private static PersistentPageMemoryDataStorageModule 
createDataStorageModule() {
+        return new PersistentPageMemoryDataStorageModule() {
+            @Override
+            public StorageEngine createEngine(
+                    String igniteInstanceName,
+                    ConfigurationRegistry configRegistry,
+                    Path storagePath,
+                    @Nullable LongJvmPauseDetector longJvmPauseDetector,
+                    FailureProcessor failureProcessor
+            ) throws StorageException {
+                return spy(super.createEngine(igniteInstanceName, 
configRegistry, storagePath, longJvmPauseDetector, failureProcessor));
+            }
+        };
+    }
 }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableUtilsTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableUtilsTest.java
index f4075a89d4..d2ede8ed76 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableUtilsTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableUtilsTest.java
@@ -19,11 +19,15 @@ package org.apache.ignite.internal.table.distributed;
 
 import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
 import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.pkIndexName;
+import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
+import static org.apache.ignite.internal.table.TableTestUtils.COLUMN_NAME;
 import static org.apache.ignite.internal.table.TableTestUtils.INDEX_NAME;
 import static org.apache.ignite.internal.table.TableTestUtils.TABLE_NAME;
+import static 
org.apache.ignite.internal.table.TableTestUtils.addColumnToSimpleTable;
 import static 
org.apache.ignite.internal.table.TableTestUtils.createSimpleHashIndex;
 import static 
org.apache.ignite.internal.table.TableTestUtils.createSimpleTable;
 import static org.apache.ignite.internal.table.TableTestUtils.dropIndex;
+import static org.apache.ignite.internal.table.TableTestUtils.dropSimpleTable;
 import static org.apache.ignite.internal.table.TableTestUtils.getIndexIdStrict;
 import static org.apache.ignite.internal.table.TableTestUtils.getTableIdStrict;
 import static 
org.apache.ignite.internal.table.TableTestUtils.makeIndexAvailable;
@@ -33,97 +37,176 @@ import static 
org.apache.ignite.internal.table.distributed.TableUtils.indexIdsAt
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.apache.ignite.internal.tx.TransactionIds.transactionId;
 import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
+import static org.apache.ignite.sql.ColumnType.INT32;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 
 import java.util.List;
-import java.util.function.Consumer;
 import org.apache.ignite.internal.catalog.CatalogManager;
-import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.catalog.CatalogTestUtils;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 /** For {@link TableUtils} testing. */
 public class TableUtilsTest extends IgniteAbstractTest {
     private final HybridClock clock = new HybridClockImpl();
 
-    @Test
-    void testIndexIdsAtRwTxBeginTs() throws Exception {
-        withCatalogManager(catalogManager -> {
-            createSimpleTable(catalogManager, TABLE_NAME);
-
-            String indexName0 = INDEX_NAME + 0;
-            String indexName1 = INDEX_NAME + 1;
-            String indexName2 = INDEX_NAME + 2;
-            String indexName3 = INDEX_NAME + 3;
-            String indexName4 = INDEX_NAME + 4;
-
-            for (String indexName : List.of(indexName0, indexName1, 
indexName2, indexName3, indexName4)) {
-                createSimpleHashIndex(catalogManager, TABLE_NAME, indexName);
-            }
-
-            int indexId0 = indexId(catalogManager, indexName0);
-            int indexId1 = indexId(catalogManager, indexName1);
-            int indexId2 = indexId(catalogManager, indexName2);
-            int indexId3 = indexId(catalogManager, indexName3);
-            int indexId4 = indexId(catalogManager, indexName4);
-
-            for (String indexName : List.of(indexName1, indexName2, 
indexName3, indexName4)) {
-                startBuildingIndex(catalogManager, indexId(catalogManager, 
indexName));
-            }
-
-            for (String indexName : List.of(indexName2, indexName3, 
indexName4)) {
-                makeIndexAvailable(catalogManager, indexId(catalogManager, 
indexName));
-            }
-
-            for (String indexName : List.of(indexName3, indexName4)) {
-                dropIndex(catalogManager, DEFAULT_SCHEMA_NAME, indexName);
-            }
-
-            removeIndex(catalogManager, indexId4);
-
-            CatalogManager spy = spy(catalogManager);
-
-            HybridTimestamp beginTs = clock.now();
-
-            int tableId = getTableIdStrict(catalogManager, TABLE_NAME, 
clock.nowLong());
-
-            assertThat(
-                    indexIdsAtRwTxBeginTs(spy, transactionId(beginTs, 1), 
tableId),
-                    contains(
-                            indexId(catalogManager, pkIndexName(TABLE_NAME)),
-                            indexId0,
-                            indexId1,
-                            indexId2,
-                            indexId3
-                    )
-            );
-
-            verify(spy).activeCatalogVersion(eq(beginTs.longValue()));
-            
verify(spy).indexes(eq(catalogManager.activeCatalogVersion(beginTs.longValue())),
 eq(tableId));
-        });
+    private final CatalogManager catalogManager = 
CatalogTestUtils.createTestCatalogManager("test-node", clock);
+
+    @BeforeEach
+    void setUp() {
+        assertThat(catalogManager.start(), willCompleteSuccessfully());
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        closeAll(
+                catalogManager::beforeNodeStop,
+                catalogManager::stop
+        );
     }
 
-    private void withCatalogManager(Consumer<CatalogManager> consumer) throws 
Exception {
-        CatalogManager catalogManager = 
CatalogTestUtils.createTestCatalogManager("test-node", clock);
+    @Test
+    void testIndexIdsAtRwTxBeginTs() {
+        createSimpleTable(catalogManager, TABLE_NAME);
 
-        assertThat(catalogManager.start(), willCompleteSuccessfully());
+        String indexName0 = INDEX_NAME + 0;
+        String indexName1 = INDEX_NAME + 1;
+        String indexName2 = INDEX_NAME + 2;
+        String indexName3 = INDEX_NAME + 3;
+        String indexName4 = INDEX_NAME + 4;
 
-        try {
-            consumer.accept(catalogManager);
-        } finally {
-            closeAll(catalogManager::beforeNodeStop, catalogManager::stop);
+        for (String indexName : List.of(indexName0, indexName1, indexName2, 
indexName3, indexName4)) {
+            createSimpleHashIndex(catalogManager, TABLE_NAME, indexName);
         }
+
+        int indexId0 = indexId(indexName0);
+        int indexId1 = indexId(indexName1);
+        int indexId2 = indexId(indexName2);
+        int indexId3 = indexId(indexName3);
+        int indexId4 = indexId(indexName4);
+
+        for (String indexName : List.of(indexName1, indexName2, indexName3, 
indexName4)) {
+            startBuildingIndex(catalogManager, indexId(indexName));
+        }
+
+        for (String indexName : List.of(indexName2, indexName3, indexName4)) {
+            makeIndexAvailable(catalogManager, indexId(indexName));
+        }
+
+        for (String indexName : List.of(indexName3, indexName4)) {
+            dropIndex(catalogManager, DEFAULT_SCHEMA_NAME, indexName);
+        }
+
+        removeIndex(catalogManager, indexId4);
+
+        CatalogManager spy = spy(catalogManager);
+
+        HybridTimestamp beginTs = clock.now();
+
+        int tableId = getTableIdStrict(catalogManager, TABLE_NAME, 
clock.nowLong());
+
+        assertThat(
+                indexIdsAtRwTxBeginTs(spy, transactionId(beginTs, 1), tableId),
+                contains(
+                        indexId(pkIndexName(TABLE_NAME)),
+                        indexId0,
+                        indexId1,
+                        indexId2,
+                        indexId3
+                )
+        );
+
+        verify(spy).activeCatalogVersion(eq(beginTs.longValue()));
+        
verify(spy).indexes(eq(catalogManager.activeCatalogVersion(beginTs.longValue())),
 eq(tableId));
+    }
+
+    @Test
+    void testDroppedTables() {
+        String tableName0 = TABLE_NAME + 0;
+        String tableName1 = TABLE_NAME + 1;
+        String tableName2 = TABLE_NAME + 2;
+
+        createSimpleTable(catalogManager, tableName0);
+        addColumnToSimpleTable(catalogManager, tableName0, COLUMN_NAME + 0, 
INT32);
+        addColumnToSimpleTable(catalogManager, tableName0, COLUMN_NAME + 1, 
INT32);
+
+        createSimpleTable(catalogManager, tableName1);
+        addColumnToSimpleTable(catalogManager, tableName1, COLUMN_NAME + 2, 
INT32);
+
+        createSimpleTable(catalogManager, tableName2);
+        addColumnToSimpleTable(catalogManager, tableName1, COLUMN_NAME + 3, 
INT32);
+        addColumnToSimpleTable(catalogManager, tableName1, COLUMN_NAME + 4, 
INT32);
+        addColumnToSimpleTable(catalogManager, tableName1, COLUMN_NAME + 5, 
INT32);
+
+        int tableId0 = tableId(tableName0);
+        int tableId1 = tableId(tableName1);
+
+        int catalogVersionBeforeRemoveTable1 = 
catalogManager.latestCatalogVersion();
+        dropSimpleTable(catalogManager, tableName1);
+
+        int catalogVersionBeforeRemoveTable0 = 
catalogManager.latestCatalogVersion();
+        dropSimpleTable(catalogManager, tableName0);
+
+        assertThat(droppedTables(null), empty());
+        
assertThat(droppedTables(catalogTime(catalogVersionBeforeRemoveTable1)), 
empty());
+        // Let's check that if the time is slightly different from the 
activation time, the result will be the same.
+        
assertThat(droppedTables(catalogTime(catalogVersionBeforeRemoveTable1).addPhysicalTime(1)),
 empty());
+        
assertThat(droppedTables(catalogTime(catalogVersionBeforeRemoveTable0).addPhysicalTime(-1)),
 empty());
+
+        assertThat(
+                droppedTables(catalogTime(catalogVersionBeforeRemoveTable0)),
+                containsInAnyOrder(new DroppedTableInfo(tableId1, 
catalogVersionBeforeRemoveTable0))
+        );
+
+        assertThat(
+                
droppedTables(catalogTime(catalogVersionBeforeRemoveTable0).addPhysicalTime(1)),
+                containsInAnyOrder(new DroppedTableInfo(tableId1, 
catalogVersionBeforeRemoveTable0))
+        );
+
+        int latestCatalogVersion = catalogManager.latestCatalogVersion();
+
+        assertThat(
+                droppedTables(catalogTime(latestCatalogVersion)),
+                containsInAnyOrder(
+                        new DroppedTableInfo(tableId1, 
catalogVersionBeforeRemoveTable0),
+                        new DroppedTableInfo(tableId0, latestCatalogVersion)
+                )
+        );
+
+        assertThat(
+                
droppedTables(catalogTime(latestCatalogVersion).addPhysicalTime(1)),
+                containsInAnyOrder(
+                        new DroppedTableInfo(tableId1, 
catalogVersionBeforeRemoveTable0),
+                        new DroppedTableInfo(tableId0, latestCatalogVersion)
+                )
+        );
+    }
+
+    private int indexId(String indexName) {
+        return getIndexIdStrict(catalogManager, indexName, clock.nowLong());
+    }
+
+    private int tableId(String tableName) {
+        return getTableIdStrict(catalogManager, tableName, clock.nowLong());
+    }
+
+    private HybridTimestamp catalogTime(int catalogVersion) {
+        return hybridTimestamp(catalogManager.catalog(catalogVersion).time());
     }
 
-    private int indexId(CatalogService catalogService, String indexName) {
-        return getIndexIdStrict(catalogService, indexName, clock.nowLong());
+    private List<DroppedTableInfo> droppedTables(@Nullable HybridTimestamp 
lwm) {
+        return TableUtils.droppedTables(catalogManager, lwm);
     }
 }
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
index 85a59bdb41..b6ea6b4497 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
@@ -29,6 +29,7 @@ import java.util.List;
 import org.apache.ignite.internal.catalog.CatalogCommand;
 import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.commands.AlterTableAddColumnCommand;
 import org.apache.ignite.internal.catalog.commands.ColumnParams;
 import org.apache.ignite.internal.catalog.commands.CreateHashIndexCommand;
 import org.apache.ignite.internal.catalog.commands.CreateTableCommand;
@@ -105,6 +106,16 @@ public class TableTestUtils {
         );
     }
 
+    /**
+     * Drops table from {@link #createSimpleTable(CatalogManager, String)} in 
the catalog.
+     *
+     * @param catalogManager Catalog manager.
+     * @param tableName Table name.
+     */
+    public static void dropSimpleTable(CatalogManager catalogManager, String 
tableName) {
+        dropTable(catalogManager, DEFAULT_SCHEMA_NAME, tableName);
+    }
+
     /**
      * Drops index in the catalog.
      *
@@ -329,4 +340,41 @@ public class TableTestUtils {
     public static void makeIndexAvailable(CatalogManager catalogManager, int 
indexId) {
         
assertThat(catalogManager.execute(MakeIndexAvailableCommand.builder().indexId(indexId).build()),
 willCompleteSuccessfully());
     }
+
+    /**
+     * Adds a column to the table in the catalog.
+     *
+     * @param catalogManager Catalog manager.
+     * @param schemaName Schema name.
+     * @param tableName Table name.
+     * @param columnName Column name.
+     * @param columnType Column type.
+     */
+    public static void addColumnToTable(
+            CatalogManager catalogManager,
+            String schemaName,
+            String tableName,
+            String columnName,
+            ColumnType columnType
+    ) {
+        CatalogCommand command = AlterTableAddColumnCommand.builder()
+                .schemaName(schemaName)
+                .tableName(tableName)
+                
.columns(List.of(ColumnParams.builder().name(columnName).type(columnType).build()))
+                .build();
+
+        assertThat(catalogManager.execute(command), 
willCompleteSuccessfully());
+    }
+
+    /**
+     * Adds a column to the table from {@link 
#createSimpleTable(CatalogManager, String)} in the catalog.
+     *
+     * @param catalogManager Catalog manager.
+     * @param tableName Table name.
+     * @param columnName Column name.
+     * @param columnType Column type.
+     */
+    public static void addColumnToSimpleTable(CatalogManager catalogManager, 
String tableName, String columnName, ColumnType columnType) {
+        addColumnToTable(catalogManager, DEFAULT_SCHEMA_NAME, tableName, 
columnName, columnType);
+    }
 }

Reply via email to