This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 34b536a6bd IGNITE-21671 Remove destroyed PageMemory indexes on 
recovery (#3399)
34b536a6bd is described below

commit 34b536a6bd53b173b95a166cc5391fad8c2b91e5
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Thu Mar 14 12:45:04 2024 +0200

    IGNITE-21671 Remove destroyed PageMemory indexes on recovery (#3399)
---
 .../index/StorageIndexDescriptorSupplier.java      |  38 +---
 .../storage/AbstractMvTableStorageTest.java        | 215 +++++++++++++------
 .../index/hash/PageMemoryHashIndexStorage.java     |  12 +-
 .../storage/pagememory/index/meta/IndexMeta.java   |  38 +++-
 .../UpdateLastRowIdUuidToBuiltInvokeClosure.java   |   2 +-
 .../pagememory/index/meta/io/IndexMetaIo.java      |  21 +-
 .../index/sorted/PageMemorySortedIndexStorage.java |  13 +-
 .../storage/pagememory/mv/IndexStorageFactory.java |  50 ++++-
 .../storage/pagememory/mv/PageMemoryIndexes.java   |  42 +++-
 .../PersistentPageMemoryMvTableStorageTest.java    |   3 +-
 .../VolatilePageMemoryMvTableStorageTest.java      |   3 +-
 .../PersistentPageMemoryHashIndexStorageTest.java  |   2 +-
 ...PersistentPageMemorySortedIndexStorageTest.java |   2 +-
 .../VolatilePageMemoryHashIndexStorageTest.java    |   3 +-
 .../VolatilePageMemorySortedIndexStorageTest.java  |   3 +-
 ...ageMemoryMvPartitionStorageConcurrencyTest.java |   3 +-
 ...rsistentPageMemoryMvPartitionStorageGcTest.java |   3 +-
 ...PersistentPageMemoryMvPartitionStorageTest.java |   3 +-
 ...ageMemoryMvPartitionStorageConcurrencyTest.java |   3 +-
 ...VolatilePageMemoryMvPartitionStorageGcTest.java |   3 +-
 .../VolatilePageMemoryMvPartitionStorageTest.java  |   3 +-
 .../RocksDbMvPartitionStorageConcurrencyTest.java  |   3 +-
 .../rocksdb/RocksDbMvPartitionStorageGcTest.java   |   3 +-
 .../rocksdb/RocksDbMvPartitionStorageTest.java     |   3 +-
 .../storage/rocksdb/RocksDbMvTableStorageTest.java |  11 +-
 .../storage/rocksdb/RocksDbStorageEngineTest.java  |   5 +-
 .../rocksdb/index/RocksDbHashIndexStorageTest.java |   3 +-
 .../index/RocksDbSortedIndexStorageTest.java       |   3 +-
 .../CatalogStorageIndexDescriptorSupplier.java     |  72 +++++++
 .../internal/table/distributed/LowWatermark.java   |   6 +-
 .../internal/table/distributed/TableManager.java   |   3 +-
 .../CatalogStorageIndexDescriptorSupplierTest.java | 238 +++++++++++++++++++++
 .../PersistentPageMemoryGcUpdateHandlerTest.java   |   8 +-
 .../distributed/gc/RocksDbGcUpdateHandlerTest.java |   8 +-
 .../gc/VolatilePageMemoryGcUpdateHandlerTest.java  |   8 +-
 35 files changed, 678 insertions(+), 161 deletions(-)

diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/StorageIndexDescriptorSupplier.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/StorageIndexDescriptorSupplier.java
index 435298237f..1c2e27dc27 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/StorageIndexDescriptorSupplier.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/StorageIndexDescriptorSupplier.java
@@ -17,45 +17,15 @@
 
 package org.apache.ignite.internal.storage.index;
 
-import org.apache.ignite.internal.catalog.CatalogService;
-import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
-import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.jetbrains.annotations.Nullable;
 
-/**
- * Index descriptor supplier from catalog.
- */
+/** Index descriptor supplier. */
 // TODO: IGNITE-19717 Get rid of it
-public class StorageIndexDescriptorSupplier {
-    private final CatalogService catalogService;
-
-    /**
-     * Constructor.
-     *
-     * @param catalogService Catalog service.
-     */
-    public StorageIndexDescriptorSupplier(CatalogService catalogService) {
-        this.catalogService = catalogService;
-    }
-
+public interface StorageIndexDescriptorSupplier {
     /**
      * Returns an index descriptor by its ID, {@code null} if absent.
      *
-     * @param id Index ID.
+     * @param indexId Index ID.
      */
-    public @Nullable StorageIndexDescriptor get(int id) {
-        int catalogVersion = catalogService.latestCatalogVersion();
-
-        CatalogIndexDescriptor index = catalogService.index(id, 
catalogVersion);
-
-        if (index == null) {
-            return null;
-        }
-
-        CatalogTableDescriptor table = catalogService.table(index.tableId(), 
catalogVersion);
-
-        assert table != null : "tableId=" + index.tableId() + ", indexId=" + 
index.id();
-
-        return StorageIndexDescriptor.create(table, index);
-    }
+    @Nullable StorageIndexDescriptor get(int indexId);
 }
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index 64eae7bdf5..e1a2369f4e 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -46,6 +46,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assumptions.assumeFalse;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
@@ -67,7 +68,6 @@ import 
org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
 import 
org.apache.ignite.internal.catalog.descriptors.CatalogSortedIndexDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.lang.IgniteTuple3;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.BinaryTupleSchema;
@@ -81,6 +81,7 @@ import org.apache.ignite.internal.storage.index.PeekCursor;
 import org.apache.ignite.internal.storage.index.SortedIndexStorage;
 import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor;
 import org.apache.ignite.internal.storage.index.StorageIndexDescriptor;
+import org.apache.ignite.internal.storage.index.StorageIndexDescriptorSupplier;
 import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor;
 import org.apache.ignite.internal.type.NativeTypes;
 import org.apache.ignite.internal.util.Cursor;
@@ -115,7 +116,40 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
 
     protected StorageHashIndexDescriptor hashIdx;
 
-    protected final CatalogService catalogService = mock(CatalogService.class);
+    private final CatalogService catalogService = mock(CatalogService.class);
+
+    protected final StorageIndexDescriptorSupplier indexDescriptorSupplier = 
new StorageIndexDescriptorSupplier() {
+        @Override
+        public @Nullable StorageIndexDescriptor get(int indexId) {
+            int catalogVersion = catalogService.latestCatalogVersion();
+
+            CatalogIndexDescriptor indexDescriptor = 
catalogService.index(indexId, catalogVersion);
+
+            if (indexDescriptor == null) {
+                return null;
+            }
+
+            CatalogTableDescriptor tableDescriptor = 
catalogService.table(indexDescriptor.tableId(), catalogVersion);
+
+            assertThat(tableDescriptor, is(notNullValue()));
+
+            return StorageIndexDescriptor.create(tableDescriptor, 
indexDescriptor);
+        }
+    };
+
+    private class TestRow {
+        final RowId rowId;
+
+        final BinaryRow row;
+
+        final HybridTimestamp timestamp;
+
+        TestRow(RowId rowId, BinaryRow row) {
+            this.rowId = rowId;
+            this.row = row;
+            this.timestamp = clock.now();
+        }
+    }
 
     @AfterEach
     protected void tearDown() throws Exception {
@@ -396,15 +430,15 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
         HashIndexStorage hashIndexStorage = 
tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx);
         SortedIndexStorage sortedIndexStorage = 
tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx);
 
-        // Error because reblance has not yet started for the partition.
+        // Error because rebalance has not yet started for the partition.
         assertThrows(
                 StorageRebalanceException.class,
                 () -> tableStorage.finishRebalancePartition(PARTITION_ID, 100, 
500, new byte[0])
         );
 
-        List<IgniteTuple3<RowId, BinaryRow, HybridTimestamp>> 
rowsBeforeRebalanceStart = List.of(
-                new IgniteTuple3<>(new RowId(PARTITION_ID), binaryRow(new 
TestKey(0, "0"), new TestValue(0, "0")), clock.now()),
-                new IgniteTuple3<>(new RowId(PARTITION_ID), binaryRow(new 
TestKey(1, "1"), new TestValue(1, "1")), clock.now())
+        List<TestRow> rowsBeforeRebalanceStart = List.of(
+                new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(0, 
"0"), new TestValue(0, "0"))),
+                new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(1, 
"1"), new TestValue(1, "1")))
         );
 
         startRebalanceWithChecks(
@@ -416,9 +450,9 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
         );
 
         // Let's fill the storages with fresh data on rebalance.
-        List<IgniteTuple3<RowId, BinaryRow, HybridTimestamp>> rowsOnRebalance 
= List.of(
-                new IgniteTuple3<>(new RowId(PARTITION_ID), binaryRow(new 
TestKey(2, "2"), new TestValue(2, "2")), clock.now()),
-                new IgniteTuple3<>(new RowId(PARTITION_ID), binaryRow(new 
TestKey(3, "3"), new TestValue(3, "3")), clock.now())
+        List<TestRow> rowsOnRebalance = List.of(
+                new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(2, 
"2"), new TestValue(2, "2"))),
+                new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(3, 
"3"), new TestValue(3, "3")))
         );
 
         fillStorages(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, 
rowsOnRebalance);
@@ -461,9 +495,9 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
         // Nothing will happen because rebalancing has not started.
         tableStorage.abortRebalancePartition(PARTITION_ID).get(1, SECONDS);
 
-        List<IgniteTuple3<RowId, BinaryRow, HybridTimestamp>> 
rowsBeforeRebalanceStart = List.of(
-                new IgniteTuple3<>(new RowId(PARTITION_ID), binaryRow(new 
TestKey(0, "0"), new TestValue(0, "0")), clock.now()),
-                new IgniteTuple3<>(new RowId(PARTITION_ID), binaryRow(new 
TestKey(1, "1"), new TestValue(1, "1")), clock.now())
+        List<TestRow> rowsBeforeRebalanceStart = List.of(
+                new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(0, 
"0"), new TestValue(0, "0"))),
+                new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(1, 
"1"), new TestValue(1, "1")))
         );
 
         startRebalanceWithChecks(
@@ -475,9 +509,9 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
         );
 
         // Let's fill the storages with fresh data on rebalance.
-        List<IgniteTuple3<RowId, BinaryRow, HybridTimestamp>> rowsOnRebalance 
= List.of(
-                new IgniteTuple3<>(new RowId(PARTITION_ID), binaryRow(new 
TestKey(2, "2"), new TestValue(2, "2")), clock.now()),
-                new IgniteTuple3<>(new RowId(PARTITION_ID), binaryRow(new 
TestKey(3, "3"), new TestValue(3, "3")), clock.now())
+        List<TestRow> rowsOnRebalance = List.of(
+                new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(2, 
"2"), new TestValue(2, "2"))),
+                new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(3, 
"3"), new TestValue(3, "3")))
         );
 
         fillStorages(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, 
rowsOnRebalance);
@@ -523,9 +557,9 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
         HashIndexStorage hashIndexStorage = 
tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx);
         SortedIndexStorage sortedIndexStorage = 
tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx);
 
-        List<IgniteTuple3<RowId, BinaryRow, HybridTimestamp>> rows = List.of(
-                new IgniteTuple3<>(new RowId(PARTITION_ID), binaryRow(new 
TestKey(0, "0"), new TestValue(0, "0")), clock.now()),
-                new IgniteTuple3<>(new RowId(PARTITION_ID), binaryRow(new 
TestKey(1, "1"), new TestValue(1, "1")), clock.now())
+        List<TestRow> rows = List.of(
+                new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(0, 
"0"), new TestValue(0, "0"))),
+                new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(1, 
"1"), new TestValue(1, "1")))
         );
 
         fillStorages(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, 
rows);
@@ -580,9 +614,9 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
         assertNull(mvPartitionStorage.committedGroupConfiguration());
 
         // Let's fill the storages and clean them.
-        List<IgniteTuple3<RowId, BinaryRow, HybridTimestamp>> rows = List.of(
-                new IgniteTuple3<>(new RowId(PARTITION_ID), binaryRow(new 
TestKey(0, "0"), new TestValue(0, "0")), clock.now()),
-                new IgniteTuple3<>(new RowId(PARTITION_ID), binaryRow(new 
TestKey(1, "1"), new TestValue(1, "1")), clock.now())
+        List<TestRow> rows = List.of(
+                new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(0, 
"0"), new TestValue(0, "0"))),
+                new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(1, 
"1"), new TestValue(1, "1")))
         );
 
         byte[] raftGroupConfig = createRandomRaftGroupConfiguration();
@@ -702,6 +736,41 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
         }
     }
 
+    @Test
+    public void testIndexDestructionOnRecovery() throws Exception {
+        assumeFalse(tableStorage.isVolatile(), "Volatile storages do not 
support index recovery");
+
+        MvPartitionStorage mvPartitionStorage = 
getOrCreateMvPartition(PARTITION_ID);
+        HashIndexStorage hashIndexStorage = 
tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx);
+        SortedIndexStorage sortedIndexStorage = 
tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx);
+
+        List<TestRow> rows = List.of(
+                new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(0, 
"0"), new TestValue(0, "0"))),
+                new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(1, 
"1"), new TestValue(1, "1")))
+        );
+
+        fillStorages(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, 
rows);
+
+        assertThat(mvPartitionStorage.flush(), willCompleteSuccessfully());
+
+        // Restart storages.
+        tableStorage.close();
+
+        // Emulate a situation when indexes have been removed from the 
catalog. We then expect them to be removed upon startup.
+        when(catalogService.index(eq(hashIdx.id()), 
anyInt())).thenReturn(null);
+        when(catalogService.index(eq(sortedIdx.id()), 
anyInt())).thenReturn(null);
+
+        tableStorage = createMvTableStorage();
+
+        mvPartitionStorage = getOrCreateMvPartition(PARTITION_ID);
+        hashIndexStorage = tableStorage.getOrCreateHashIndex(PARTITION_ID, 
hashIdx);
+        sortedIndexStorage = tableStorage.getOrCreateSortedIndex(PARTITION_ID, 
sortedIdx);
+
+        // Data should remain in the partition storages, but the indexes must 
be cleaned up.
+        checkForPresenceRows(mvPartitionStorage, null, null, rows);
+        checkForMissingRows(null, hashIndexStorage, sortedIndexStorage, rows);
+    }
+
     private static void createTestTableAndIndexes(CatalogService 
catalogService) {
         int id = 0;
 
@@ -762,7 +831,6 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
         }
     }
 
-    @SuppressWarnings("ResultOfMethodCallIgnored")
     private static void checkCursorAfterStartRebalance(Cursor<?> cursor) {
         assertDoesNotThrow(cursor::close);
 
@@ -778,18 +846,18 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
             MvPartitionStorage mvPartitionStorage,
             HashIndexStorage hashIndexStorage,
             SortedIndexStorage sortedIndexStorage,
-            List<IgniteTuple3<RowId, BinaryRow, HybridTimestamp>> rows
+            List<TestRow> rows
     ) {
         assertThat(rows, hasSize(greaterThanOrEqualTo(2)));
 
         for (int i = 0; i < rows.size(); i++) {
             int finalI = i;
 
-            IgniteTuple3<RowId, BinaryRow, HybridTimestamp> row = rows.get(i);
+            TestRow row = rows.get(i);
 
-            RowId rowId = row.get1();
-            BinaryRow binaryRow = row.get2();
-            HybridTimestamp timestamp = row.get3();
+            RowId rowId = row.rowId;
+            BinaryRow binaryRow = row.row;
+            HybridTimestamp timestamp = row.timestamp;
 
             assertNotNull(rowId);
             assertNotNull(binaryRow);
@@ -819,24 +887,33 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
     }
 
     private static void checkForMissingRows(
-            MvPartitionStorage mvPartitionStorage,
-            HashIndexStorage hashIndexStorage,
-            SortedIndexStorage sortedIndexStorage,
-            List<IgniteTuple3<RowId, BinaryRow, HybridTimestamp>> rows
+            @Nullable MvPartitionStorage mvPartitionStorage,
+            @Nullable HashIndexStorage hashIndexStorage,
+            @Nullable SortedIndexStorage sortedIndexStorage,
+            List<TestRow> rows
     ) {
-        for (IgniteTuple3<RowId, BinaryRow, HybridTimestamp> row : rows) {
-            List<ReadResult> allVersions = 
mvPartitionStorage.runConsistently(locker -> {
-                locker.lock(row.get1());
+        for (TestRow row : rows) {
+            if (mvPartitionStorage != null) {
+                List<ReadResult> allVersions = 
mvPartitionStorage.runConsistently(locker -> {
+                    locker.lock(row.rowId);
 
-                return getAll(mvPartitionStorage.scanVersions(row.get1()));
-            });
-            assertThat(allVersions, is(empty()));
+                    return getAll(mvPartitionStorage.scanVersions(row.rowId));
+                });
+
+                assertThat(allVersions, is(empty()));
+            }
+
+            if (hashIndexStorage != null) {
+                IndexRow hashIndexRow = 
indexRow(hashIndexStorage.indexDescriptor(), row.row, row.rowId);
+
+                
assertThat(getAll(hashIndexStorage.get(hashIndexRow.indexColumns())), 
is(empty()));
+            }
 
-            IndexRow hashIndexRow = 
indexRow(hashIndexStorage.indexDescriptor(), row.get2(), row.get1());
-            IndexRow sortedIndexRow = 
indexRow(sortedIndexStorage.indexDescriptor(), row.get2(), row.get1());
+            if (sortedIndexStorage != null) {
+                IndexRow sortedIndexRow = 
indexRow(sortedIndexStorage.indexDescriptor(), row.row, row.rowId);
 
-            
assertThat(getAll(hashIndexStorage.get(hashIndexRow.indexColumns())), 
is(empty()));
-            
assertThat(getAll(sortedIndexStorage.get(sortedIndexRow.indexColumns())), 
is(empty()));
+                
assertThat(getAll(sortedIndexStorage.get(sortedIndexRow.indexColumns())), 
is(empty()));
+            }
         }
     }
 
@@ -845,25 +922,33 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
     }
 
     private static void checkForPresenceRows(
-            MvPartitionStorage mvPartitionStorage,
-            HashIndexStorage hashIndexStorage,
-            SortedIndexStorage sortedIndexStorage,
-            List<IgniteTuple3<RowId, BinaryRow, HybridTimestamp>> rows
+            @Nullable MvPartitionStorage mvPartitionStorage,
+            @Nullable HashIndexStorage hashIndexStorage,
+            @Nullable SortedIndexStorage sortedIndexStorage,
+            List<TestRow> rows
     ) {
-        for (IgniteTuple3<RowId, BinaryRow, HybridTimestamp> row : rows) {
-            List<BinaryRow> allVersions = 
mvPartitionStorage.runConsistently(locker -> {
-                locker.lock(row.get1());
+        for (TestRow row : rows) {
+            if (mvPartitionStorage != null) {
+                List<BinaryRow> allVersions = 
mvPartitionStorage.runConsistently(locker -> {
+                    locker.lock(row.rowId);
 
-                return 
toListOfBinaryRows(mvPartitionStorage.scanVersions(row.get1()));
-            });
+                    return 
toListOfBinaryRows(mvPartitionStorage.scanVersions(row.rowId));
+                });
+
+                assertThat(allVersions, contains(equalToRow(row.row)));
+            }
+
+            if (hashIndexStorage != null) {
+                IndexRow hashIndexRow = 
indexRow(hashIndexStorage.indexDescriptor(), row.row, row.rowId);
 
-            assertThat(allVersions, contains(equalToRow(row.get2())));
+                
assertThat(getAll(hashIndexStorage.get(hashIndexRow.indexColumns())), 
contains(row.rowId));
+            }
 
-            IndexRow hashIndexRow = 
indexRow(hashIndexStorage.indexDescriptor(), row.get2(), row.get1());
-            IndexRow sortedIndexRow = 
indexRow(sortedIndexStorage.indexDescriptor(), row.get2(), row.get1());
+            if (sortedIndexStorage != null) {
+                IndexRow sortedIndexRow = 
indexRow(sortedIndexStorage.indexDescriptor(), row.row, row.rowId);
 
-            
assertThat(getAll(hashIndexStorage.get(hashIndexRow.indexColumns())), 
contains(row.get1()));
-            
assertThat(getAll(sortedIndexStorage.get(sortedIndexRow.indexColumns())), 
contains(row.get1()));
+                
assertThat(getAll(sortedIndexStorage.get(sortedIndexRow.indexColumns())), 
contains(row.rowId));
+            }
         }
     }
 
@@ -971,17 +1056,17 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
         HashIndexStorage hashIndexStorage = 
tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx);
         SortedIndexStorage sortedIndexStorage = 
tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx);
 
-        List<IgniteTuple3<RowId, BinaryRow, HybridTimestamp>> rows = List.of(
-                new IgniteTuple3<>(new RowId(PARTITION_ID), binaryRow(new 
TestKey(0, "0"), new TestValue(0, "0")), clock.now()),
-                new IgniteTuple3<>(new RowId(PARTITION_ID), binaryRow(new 
TestKey(1, "1"), new TestValue(1, "1")), clock.now())
+        List<TestRow> rows = List.of(
+                new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(0, 
"0"), new TestValue(0, "0"))),
+                new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(1, 
"1"), new TestValue(1, "1")))
         );
 
         fillStorages(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, 
rows);
 
         PartitionTimestampCursor scanTimestampCursor = 
mvPartitionStorage.scan(clock.now());
 
-        IndexRow hashIndexRow = indexRow(hashIndexStorage.indexDescriptor(), 
rows.get(0).get2(), rows.get(0).get1());
-        IndexRow sortedIndexRow = 
indexRow(sortedIndexStorage.indexDescriptor(), rows.get(0).get2(), 
rows.get(0).get1());
+        IndexRow hashIndexRow = indexRow(hashIndexStorage.indexDescriptor(), 
rows.get(0).row, rows.get(0).rowId);
+        IndexRow sortedIndexRow = 
indexRow(sortedIndexStorage.indexDescriptor(), rows.get(0).row, 
rows.get(0).rowId);
 
         Cursor<RowId> getFromHashIndexCursor = 
hashIndexStorage.get(hashIndexRow.indexColumns());
 
@@ -1019,17 +1104,17 @@ public abstract class AbstractMvTableStorageTest 
extends BaseMvStoragesTest {
             MvPartitionStorage mvPartitionStorage,
             HashIndexStorage hashIndexStorage,
             SortedIndexStorage sortedIndexStorage,
-            List<IgniteTuple3<RowId, BinaryRow, HybridTimestamp>> 
rowsBeforeRebalanceStart
+            List<TestRow> rowsBeforeRebalanceStart
     ) {
         fillStorages(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, 
rowsBeforeRebalanceStart);
 
         // Let's open the cursors before start rebalance.
-        IgniteTuple3<RowId, BinaryRow, HybridTimestamp> rowForCursors = 
rowsBeforeRebalanceStart.get(0);
+        TestRow rowForCursors = rowsBeforeRebalanceStart.get(0);
 
-        Cursor<?> mvPartitionStorageScanCursor = 
mvPartitionStorage.scan(rowForCursors.get3());
+        Cursor<?> mvPartitionStorageScanCursor = 
mvPartitionStorage.scan(rowForCursors.timestamp);
 
-        IndexRow hashIndexRow = indexRow(hashIndexStorage.indexDescriptor(), 
rowForCursors.get2(), rowForCursors.get1());
-        IndexRow sortedIndexRow = 
indexRow(sortedIndexStorage.indexDescriptor(), rowForCursors.get2(), 
rowForCursors.get1());
+        IndexRow hashIndexRow = indexRow(hashIndexStorage.indexDescriptor(), 
rowForCursors.row, rowForCursors.rowId);
+        IndexRow sortedIndexRow = 
indexRow(sortedIndexStorage.indexDescriptor(), rowForCursors.row, 
rowForCursors.rowId);
 
         Cursor<?> hashIndexStorageGetCursor = 
hashIndexStorage.get(hashIndexRow.indexColumns());
 
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java
index 8bf96f14b2..172ab60cf9 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java
@@ -36,12 +36,18 @@ import 
org.apache.ignite.internal.storage.pagememory.index.freelist.IndexColumns
 import org.apache.ignite.internal.storage.pagememory.index.meta.IndexMeta;
 import org.apache.ignite.internal.storage.pagememory.index.meta.IndexMetaTree;
 import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Implementation of Hash index storage using Page Memory.
  */
 public class PageMemoryHashIndexStorage extends 
AbstractPageMemoryIndexStorage<HashIndexRowKey, HashIndexRow> implements 
HashIndexStorage {
-    /** Index descriptor. */
+    /**
+     * Index descriptor.
+     *
+     * <p>Can be {@code null} only during recovery.
+     */
+    @Nullable
     private final StorageHashIndexDescriptor descriptor;
 
     /** Hash index tree instance. */
@@ -58,7 +64,7 @@ public class PageMemoryHashIndexStorage extends 
AbstractPageMemoryIndexStorage<H
      */
     public PageMemoryHashIndexStorage(
             IndexMeta indexMeta,
-            StorageHashIndexDescriptor descriptor,
+            @Nullable StorageHashIndexDescriptor descriptor,
             IndexColumnsFreeList freeList,
             HashIndexTree hashIndexTree,
             IndexMetaTree indexMetaTree,
@@ -72,6 +78,8 @@ public class PageMemoryHashIndexStorage extends 
AbstractPageMemoryIndexStorage<H
 
     @Override
     public StorageHashIndexDescriptor indexDescriptor() {
+        assert descriptor != null : "This tree must only be used during 
recovery";
+
         return descriptor;
     }
 
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/meta/IndexMeta.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/meta/IndexMeta.java
index 87760a30af..7286a99dbf 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/meta/IndexMeta.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/meta/IndexMeta.java
@@ -27,6 +27,37 @@ import org.jetbrains.annotations.Nullable;
  * Index meta information.
  */
 public class IndexMeta extends IndexMetaKey {
+    /** Index type. */
+    public enum IndexType {
+        HASH((byte) 0),
+
+        SORTED((byte) 1);
+
+        private final byte serializationValue;
+
+        IndexType(byte serializationValue) {
+            this.serializationValue = serializationValue;
+        }
+
+        /** Converts an index type to its serialized representation. */
+        public byte serialize() {
+            return serializationValue;
+        }
+
+        /** Restores an index type from its serialized representation. */
+        public static IndexType deserialize(byte serializationValue) {
+            if (HASH.serializationValue == serializationValue) {
+                return HASH;
+            } else if (SORTED.serializationValue == serializationValue) {
+                return SORTED;
+            } else {
+                throw new AssertionError("Unknown serialization value: " + 
serializationValue);
+            }
+        }
+    }
+
+    private final IndexType indexType;
+
     @IgniteToStringExclude
     private final long metaPageId;
 
@@ -40,13 +71,18 @@ public class IndexMeta extends IndexMetaKey {
      * @param nextRowIdUuidToBuild Row ID uuid for which the index needs to be 
built, {@code null} means that the index building has
      *      completed.
      */
-    public IndexMeta(int id, long metaPageId, @Nullable UUID 
nextRowIdUuidToBuild) {
+    public IndexMeta(int id, IndexType indexType, long metaPageId, @Nullable 
UUID nextRowIdUuidToBuild) {
         super(id);
 
+        this.indexType = indexType;
         this.metaPageId = metaPageId;
         this.nextRowIdUuidToBuild = nextRowIdUuidToBuild;
     }
 
+    public IndexType indexType() {
+        return indexType;
+    }
+
     /**
      * Returns page ID of the index tree meta page.
      */
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/meta/UpdateLastRowIdUuidToBuiltInvokeClosure.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/meta/UpdateLastRowIdUuidToBuiltInvokeClosure.java
index b1ed55fb61..8b6326a493 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/meta/UpdateLastRowIdUuidToBuiltInvokeClosure.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/meta/UpdateLastRowIdUuidToBuiltInvokeClosure.java
@@ -45,7 +45,7 @@ public class UpdateLastRowIdUuidToBuiltInvokeClosure 
implements InvokeClosure<In
     public void call(@Nullable IndexMeta oldRow) throws 
IgniteInternalCheckedException {
         assert oldRow != null;
 
-        newRow = new IndexMeta(oldRow.indexId(), oldRow.metaPageId(), 
newLastRowIdUuidToBuilt);
+        newRow = new IndexMeta(oldRow.indexId(), oldRow.indexType(), 
oldRow.metaPageId(), newLastRowIdUuidToBuilt);
     }
 
     @Override
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/meta/io/IndexMetaIo.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/meta/io/IndexMetaIo.java
index c299e089a6..fef17dd3c3 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/meta/io/IndexMetaIo.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/meta/io/IndexMetaIo.java
@@ -17,8 +17,10 @@
 
 package org.apache.ignite.internal.storage.pagememory.index.meta.io;
 
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getByte;
 import static org.apache.ignite.internal.pagememory.util.PageUtils.getInt;
 import static org.apache.ignite.internal.pagememory.util.PageUtils.getLong;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putByte;
 import static org.apache.ignite.internal.pagememory.util.PageUtils.putInt;
 import static org.apache.ignite.internal.pagememory.util.PageUtils.putLong;
 
@@ -26,6 +28,7 @@ import java.util.UUID;
 import org.apache.ignite.internal.pagememory.tree.io.BplusIo;
 import org.apache.ignite.internal.pagememory.util.PageUtils;
 import org.apache.ignite.internal.storage.pagememory.index.meta.IndexMeta;
+import 
org.apache.ignite.internal.storage.pagememory.index.meta.IndexMeta.IndexType;
 import org.apache.ignite.internal.storage.pagememory.index.meta.IndexMetaKey;
 
 /**
@@ -42,8 +45,11 @@ public interface IndexMetaIo {
     /** Offset of the index ID (4 bytes). */
     int INDEX_ID_OFFSET = 0;
 
+    /** Offset of the Index Type (1 byte). */
+    int INDEX_TYPE_OFFSET = INDEX_ID_OFFSET + Integer.BYTES;
+
     /** Index tree meta page id offset - long (8 bytes). */
-    int INDEX_TREE_META_PAGE_ID_OFFSET = INDEX_ID_OFFSET + Integer.BYTES;
+    int INDEX_TREE_META_PAGE_ID_OFFSET = INDEX_TYPE_OFFSET + Byte.BYTES;
 
     /**
      * Offset of the {@link UUID#getMostSignificantBits() most significant 
bits} of Row ID uuid for which the index needs to be built (8
@@ -58,9 +64,10 @@ public interface IndexMetaIo {
     int NEXT_ROW_ID_TO_BUILT_LSB_OFFSET = NEXT_ROW_ID_TO_BUILT_MSB_OFFSET + 
Long.BYTES;
 
     /** Payload size in bytes. */
-    int SIZE_IN_BYTES = Integer.BYTES /* Index ID - int (4 bytes) */
-            + Long.BYTES /* Index root page ID - long (8 bytes) */
-            + 2 * Long.BYTES /* Row ID uuid for which the index needs to be 
built - {@link UUID} (16 bytes) */;
+    int SIZE_IN_BYTES = Integer.BYTES // Index ID - int (4 bytes)
+            + Byte.BYTES // Index type - byte (1 byte)
+            + Long.BYTES // Index root page ID - long (8 bytes)
+            + 2 * Long.BYTES; // Row ID uuid for which the index needs to be 
built - {@link UUID} (16 bytes)
 
     /**
      * Returns an offset of the element inside the page.
@@ -97,6 +104,8 @@ public interface IndexMetaIo {
 
         int indexId = getInt(pageAddr, elementOffset + INDEX_ID_OFFSET);
 
+        IndexType indexType = IndexType.deserialize(getByte(pageAddr, 
elementOffset + INDEX_TYPE_OFFSET));
+
         long indexTreeMetaPageId = getLong(pageAddr, elementOffset + 
INDEX_TREE_META_PAGE_ID_OFFSET);
 
         long nextRowIdUuidToBuiltMsb = getLong(pageAddr, elementOffset + 
NEXT_ROW_ID_TO_BUILT_MSB_OFFSET);
@@ -106,7 +115,7 @@ public interface IndexMetaIo {
                 ? null
                 : new UUID(nextRowIdUuidToBuiltMsb, nextRowIdUuidToBuiltLsb);
 
-        return new IndexMeta(indexId, indexTreeMetaPageId, nextRowIdUuid);
+        return new IndexMeta(indexId, indexType, indexTreeMetaPageId, 
nextRowIdUuid);
     }
 
     /**
@@ -133,6 +142,8 @@ public interface IndexMetaIo {
 
         putInt(pageAddr, off + INDEX_ID_OFFSET, row.indexId());
 
+        putByte(pageAddr, off + INDEX_TYPE_OFFSET, 
row.indexType().serialize());
+
         putLong(pageAddr, off + INDEX_TREE_META_PAGE_ID_OFFSET, 
row.metaPageId());
 
         UUID nextRowIdUuidToBuild = row.nextRowIdUuidToBuild();
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
index d962e273d8..77b9ba4567 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
@@ -48,7 +48,12 @@ import org.jetbrains.annotations.Nullable;
  */
 public class PageMemorySortedIndexStorage extends 
AbstractPageMemoryIndexStorage<SortedIndexRowKey, SortedIndexRow>
         implements SortedIndexStorage {
-    /** Index descriptor. */
+    /**
+     * Index descriptor.
+     *
+     * <p>Can be {@code null} only during recovery.
+     */
+    @Nullable
     private final StorageSortedIndexDescriptor descriptor;
 
     /** Sorted index tree instance. */
@@ -65,7 +70,7 @@ public class PageMemorySortedIndexStorage extends 
AbstractPageMemoryIndexStorage
      */
     public PageMemorySortedIndexStorage(
             IndexMeta indexMeta,
-            StorageSortedIndexDescriptor descriptor,
+            @Nullable StorageSortedIndexDescriptor descriptor,
             IndexColumnsFreeList freeList,
             SortedIndexTree sortedIndexTree,
             IndexMetaTree indexMetaTree,
@@ -79,6 +84,8 @@ public class PageMemorySortedIndexStorage extends 
AbstractPageMemoryIndexStorage
 
     @Override
     public StorageSortedIndexDescriptor indexDescriptor() {
+        assert descriptor != null : "This tree must only be used during 
recovery";
+
         return descriptor;
     }
 
@@ -193,7 +200,7 @@ public class PageMemorySortedIndexStorage extends 
AbstractPageMemoryIndexStorage
 
     private @Nullable IndexRowImpl toIndexRowImpl(@Nullable SortedIndexRow 
sortedIndexRow) {
         return sortedIndexRow == null ? null : new IndexRowImpl(
-                new BinaryTuple(descriptor.binaryTupleSchema().elementCount(), 
sortedIndexRow.indexColumns().valueBuffer()),
+                new 
BinaryTuple(indexDescriptor().binaryTupleSchema().elementCount(), 
sortedIndexRow.indexColumns().valueBuffer()),
                 sortedIndexRow.rowId()
         );
     }
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/IndexStorageFactory.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/IndexStorageFactory.java
index 4d8e9fed1a..dc2181ef8a 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/IndexStorageFactory.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/IndexStorageFactory.java
@@ -33,6 +33,7 @@ import 
org.apache.ignite.internal.storage.pagememory.index.freelist.IndexColumns
 import org.apache.ignite.internal.storage.pagememory.index.hash.HashIndexTree;
 import 
org.apache.ignite.internal.storage.pagememory.index.hash.PageMemoryHashIndexStorage;
 import org.apache.ignite.internal.storage.pagememory.index.meta.IndexMeta;
+import 
org.apache.ignite.internal.storage.pagememory.index.meta.IndexMeta.IndexType;
 import org.apache.ignite.internal.storage.pagememory.index.meta.IndexMetaTree;
 import 
org.apache.ignite.internal.storage.pagememory.index.sorted.PageMemorySortedIndexStorage;
 import 
org.apache.ignite.internal.storage.pagememory.index.sorted.SortedIndexTree;
@@ -111,6 +112,20 @@ class IndexStorageFactory {
         );
     }
 
+    /**
+     * Restores an existing Page Memory-based Hash Index storage that will be 
immediately be destroyed during recovery.
+     */
+    PageMemoryHashIndexStorage restoreHashIndexStorageForDestroy(IndexMeta 
indexMeta) {
+        return new PageMemoryHashIndexStorage(
+                indexMeta,
+                null,
+                indexFreeList,
+                restoreHashIndexTree(indexMeta),
+                indexMetaTree,
+                tableStorage.isVolatile()
+        );
+    }
+
     private IndexTreeAndMeta<HashIndexTree> 
createHashIndexTreeAndMeta(StorageHashIndexDescriptor indexDescriptor) {
         return createIndexTree(
                 indexDescriptor,
@@ -174,6 +189,20 @@ class IndexStorageFactory {
         );
     }
 
+    /**
+     * Restores an existing Page Memory-based Sorted Index storage that will 
be immediately be destroyed during recovery.
+     */
+    PageMemorySortedIndexStorage restoreSortedIndexStorageForDestroy(IndexMeta 
indexMeta) {
+        return new PageMemorySortedIndexStorage(
+                indexMeta,
+                null,
+                indexFreeList,
+                restoreSortedIndexTreeForDestroy(indexMeta),
+                indexMetaTree,
+                tableStorage.isVolatile()
+        );
+    }
+
     private IndexTreeAndMeta<SortedIndexTree> 
createSortedIndexTreeAndMeta(StorageSortedIndexDescriptor indexDescriptor) {
         return createIndexTree(
                 indexDescriptor,
@@ -209,6 +238,23 @@ class IndexStorageFactory {
         }
     }
 
+    private SortedIndexTree restoreSortedIndexTreeForDestroy(IndexMeta 
indexMeta) {
+        try {
+            return SortedIndexTree.restoreForDestroy(
+                    tableStorage.getTableId(),
+                    Integer.toString(tableStorage.getTableId()),
+                    partitionId,
+                    tableStorage.dataRegion().pageMemory(),
+                    PageLockListenerNoOp.INSTANCE,
+                    new AtomicLong(),
+                    indexMeta.metaPageId(),
+                    indexReuseList
+            );
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException(e);
+        }
+    }
+
     /**
      * Creates a new B-Tree for the given {@code indexStorage}.
      */
@@ -235,7 +281,9 @@ class IndexStorageFactory {
 
             T tree = treeConstructor.createTree(metaPageId);
 
-            var indexMeta = new IndexMeta(descriptor.id(), metaPageId, 
RowId.lowestRowId(partitionId).uuid());
+            IndexType indexType = descriptor instanceof 
StorageHashIndexDescriptor ? IndexType.HASH : IndexType.SORTED;
+
+            var indexMeta = new IndexMeta(descriptor.id(), indexType, 
metaPageId, RowId.lowestRowId(partitionId).uuid());
 
             boolean replaced = indexMetaTree.putx(indexMeta);
 
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryIndexes.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryIndexes.java
index e17d25f02f..869d06e8aa 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryIndexes.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryIndexes.java
@@ -27,6 +27,8 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.function.Consumer;
 import java.util.stream.Stream;
 import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.pagememory.util.GradualTaskExecutor;
 import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
 import org.apache.ignite.internal.storage.StorageException;
@@ -48,6 +50,8 @@ import org.jetbrains.annotations.Nullable;
  * Class that manages indexes of a single {@link 
AbstractPageMemoryMvPartitionStorage}.
  */
 class PageMemoryIndexes {
+    private static final IgniteLogger LOG = 
Loggers.forClass(PageMemoryIndexes.class);
+
     private final Consumer<WriteClosure<Void>> runConsistently;
 
     private final GradualTaskExecutor destructionExecutor;
@@ -105,8 +109,19 @@ class PageMemoryIndexes {
                 StorageIndexDescriptor indexDescriptor = 
indexDescriptorSupplier.get(indexId);
 
                 if (indexDescriptor == null) {
-                    // TODO: IGNITE-21671 destroy the index if it can't be 
found in the Catalog.
-                    continue;
+                    // Index has not been found in the Catalog, we can treat 
it as destroyed and remove the storage.
+                    runConsistently.accept(locker -> {
+                        destroyIndexOnRecovery(indexMeta, indexStorageFactory, 
indexMetaTree)
+                                .whenComplete((v, e) -> {
+                                    if (e != null) {
+                                        LOG.error(
+                                                "Unable to destroy existing 
index {}, that has been removed from the Catalog", e, indexId
+                                        );
+                                    }
+                                });
+
+                        return null;
+                    });
                 } else if (indexDescriptor instanceof 
StorageHashIndexDescriptor) {
                     PageMemoryHashIndexStorage indexStorage = 
indexStorageFactory
                             
.restoreHashIndexStorage((StorageHashIndexDescriptor) indexDescriptor, 
indexMeta);
@@ -124,6 +139,29 @@ class PageMemoryIndexes {
         }
     }
 
+    private CompletableFuture<Void> destroyIndexOnRecovery(
+            IndexMeta indexMeta,
+            IndexStorageFactory indexStorageFactory,
+            IndexMetaTree indexMetaTree
+    ) {
+        switch (indexMeta.indexType()) {
+            case HASH:
+                PageMemoryHashIndexStorage hashIndexStorage = 
indexStorageFactory.restoreHashIndexStorageForDestroy(indexMeta);
+
+                return destroyStorage(indexMeta.indexId(), hashIndexStorage, 
indexMetaTree);
+
+            case SORTED:
+                PageMemorySortedIndexStorage sortedIndexStorage = 
indexStorageFactory.restoreSortedIndexStorageForDestroy(indexMeta);
+
+                return destroyStorage(indexMeta.indexId(), sortedIndexStorage, 
indexMetaTree);
+
+            default:
+                throw new AssertionError(String.format(
+                        "Unexpected index type %s for index %d", 
indexMeta.indexType(), indexMeta.indexId()
+                ));
+        }
+    }
+
     CompletableFuture<Void> destroyIndex(int indexId, IndexMetaTree 
indexMetaTree) {
         PageMemoryHashIndexStorage hashIndexStorage = 
hashIndexes.remove(indexId);
 
diff --git 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
index 6e7d3a6e16..a3b8b7682e 100644
--- 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
+++ 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
@@ -31,7 +31,6 @@ import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointSt
 import org.apache.ignite.internal.storage.AbstractMvTableStorageTest;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
-import org.apache.ignite.internal.storage.index.StorageIndexDescriptorSupplier;
 import 
org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryStorageEngineConfiguration;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
@@ -76,7 +75,7 @@ public class PersistentPageMemoryMvTableStorageTest extends 
AbstractMvTableStora
     protected MvTableStorage createMvTableStorage() {
         return engine.createMvTable(
                 new StorageTableDescriptor(1, DEFAULT_PARTITION_COUNT, 
DEFAULT_DATA_REGION_NAME),
-                new StorageIndexDescriptorSupplier(catalogService)
+                indexDescriptorSupplier
         );
     }
 
diff --git 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryMvTableStorageTest.java
 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryMvTableStorageTest.java
index 370fccc77a..536e2f1c91 100644
--- 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryMvTableStorageTest.java
+++ 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryMvTableStorageTest.java
@@ -47,7 +47,6 @@ import 
org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
 import org.apache.ignite.internal.storage.index.HashIndexStorage;
 import org.apache.ignite.internal.storage.index.IndexRowImpl;
 import org.apache.ignite.internal.storage.index.SortedIndexStorage;
-import org.apache.ignite.internal.storage.index.StorageIndexDescriptorSupplier;
 import 
org.apache.ignite.internal.storage.pagememory.configuration.schema.VolatilePageMemoryStorageEngineConfiguration;
 import org.apache.ignite.internal.type.NativeTypes;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -90,7 +89,7 @@ public class VolatilePageMemoryMvTableStorageTest extends 
AbstractMvTableStorage
     protected MvTableStorage createMvTableStorage() {
         return engine.createMvTable(
                 new StorageTableDescriptor(1, DEFAULT_PARTITION_COUNT, 
DEFAULT_DATA_REGION_NAME),
-                new StorageIndexDescriptorSupplier(catalogService)
+                indexDescriptorSupplier
         );
     }
 
diff --git 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/PersistentPageMemoryHashIndexStorageTest.java
 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/PersistentPageMemoryHashIndexStorageTest.java
index d690684b0e..6a0c0cd025 100644
--- 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/PersistentPageMemoryHashIndexStorageTest.java
+++ 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/PersistentPageMemoryHashIndexStorageTest.java
@@ -61,7 +61,7 @@ class PersistentPageMemoryHashIndexStorageTest extends 
AbstractPageMemoryHashInd
 
         tableStorage = engine.createMvTable(
                 new StorageTableDescriptor(1, DEFAULT_PARTITION_COUNT, 
DEFAULT_DATA_REGION_NAME),
-                new StorageIndexDescriptorSupplier(catalogService)
+                mock(StorageIndexDescriptorSupplier.class)
         );
 
         initialize(tableStorage, engineConfig);
diff --git 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/PersistentPageMemorySortedIndexStorageTest.java
 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/PersistentPageMemorySortedIndexStorageTest.java
index c1ab65a0e8..0dadd5bdbf 100644
--- 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/PersistentPageMemorySortedIndexStorageTest.java
+++ 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/PersistentPageMemorySortedIndexStorageTest.java
@@ -66,7 +66,7 @@ class PersistentPageMemorySortedIndexStorageTest extends 
AbstractPageMemorySorte
 
         tableStorage = engine.createMvTable(
                 new StorageTableDescriptor(1, DEFAULT_PARTITION_COUNT, 
DEFAULT_DATA_REGION_NAME),
-                new StorageIndexDescriptorSupplier(catalogService)
+                mock(StorageIndexDescriptorSupplier.class)
         );
 
         initialize(tableStorage, engineConfig);
diff --git 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/VolatilePageMemoryHashIndexStorageTest.java
 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/VolatilePageMemoryHashIndexStorageTest.java
index 9b6669415d..7eb64237c6 100644
--- 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/VolatilePageMemoryHashIndexStorageTest.java
+++ 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/VolatilePageMemoryHashIndexStorageTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.storage.pagememory.index;
 
 import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
 import static 
org.apache.ignite.internal.storage.pagememory.configuration.schema.BasePageMemoryStorageEngineConfigurationSchema.DEFAULT_DATA_REGION_NAME;
+import static org.mockito.Mockito.mock;
 
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
@@ -55,7 +56,7 @@ class VolatilePageMemoryHashIndexStorageTest extends 
AbstractPageMemoryHashIndex
 
         tableStorage = engine.createMvTable(
                 new StorageTableDescriptor(1, DEFAULT_PARTITION_COUNT, 
DEFAULT_DATA_REGION_NAME),
-                new StorageIndexDescriptorSupplier(catalogService)
+                mock(StorageIndexDescriptorSupplier.class)
         );
 
         initialize(tableStorage, engineConfig);
diff --git 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/VolatilePageMemorySortedIndexStorageTest.java
 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/VolatilePageMemorySortedIndexStorageTest.java
index c8891363ae..0a719264d0 100644
--- 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/VolatilePageMemorySortedIndexStorageTest.java
+++ 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/VolatilePageMemorySortedIndexStorageTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.storage.pagememory.index;
 
 import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
 import static 
org.apache.ignite.internal.storage.pagememory.configuration.schema.BasePageMemoryStorageEngineConfigurationSchema.DEFAULT_DATA_REGION_NAME;
+import static org.mockito.Mockito.mock;
 
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
@@ -55,7 +56,7 @@ class VolatilePageMemorySortedIndexStorageTest extends 
AbstractPageMemorySortedI
 
         tableStorage = engine.createMvTable(
                 new StorageTableDescriptor(1, DEFAULT_PARTITION_COUNT, 
DEFAULT_DATA_REGION_NAME),
-                new StorageIndexDescriptorSupplier(catalogService)
+                mock(StorageIndexDescriptorSupplier.class)
         );
 
         initialize(tableStorage, engineConfig);
diff --git 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageConcurrencyTest.java
 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageConcurrencyTest.java
index ed3b8bd475..75b96cc3cc 100644
--- 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageConcurrencyTest.java
+++ 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageConcurrencyTest.java
@@ -22,7 +22,6 @@ import static 
org.apache.ignite.internal.storage.pagememory.configuration.schema
 import static org.mockito.Mockito.mock;
 
 import java.nio.file.Path;
-import org.apache.ignite.internal.catalog.CatalogService;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.failure.FailureProcessor;
 import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
@@ -61,7 +60,7 @@ class PersistentPageMemoryMvPartitionStorageConcurrencyTest 
extends AbstractMvPa
 
         table = engine.createMvTable(
                 new StorageTableDescriptor(1, DEFAULT_PARTITION_COUNT, 
DEFAULT_DATA_REGION_NAME),
-                new StorageIndexDescriptorSupplier(mock(CatalogService.class))
+                mock(StorageIndexDescriptorSupplier.class)
         );
 
         initialize(table);
diff --git 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageGcTest.java
 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageGcTest.java
index 5025203771..074343fc63 100644
--- 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageGcTest.java
+++ 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageGcTest.java
@@ -22,7 +22,6 @@ import static 
org.apache.ignite.internal.storage.pagememory.configuration.schema
 import static org.mockito.Mockito.mock;
 
 import java.nio.file.Path;
-import org.apache.ignite.internal.catalog.CatalogService;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.failure.FailureProcessor;
 import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
@@ -61,7 +60,7 @@ class PersistentPageMemoryMvPartitionStorageGcTest extends 
AbstractMvPartitionSt
 
         table = engine.createMvTable(
                 new StorageTableDescriptor(1, DEFAULT_PARTITION_COUNT, 
DEFAULT_DATA_REGION_NAME),
-                new StorageIndexDescriptorSupplier(mock(CatalogService.class))
+                mock(StorageIndexDescriptorSupplier.class)
         );
 
         initialize(table);
diff --git 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java
 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java
index 4832c4f8bf..0dd66ecb77 100644
--- 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java
+++ 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java
@@ -29,7 +29,6 @@ import static org.hamcrest.Matchers.is;
 import static org.mockito.Mockito.mock;
 
 import java.nio.file.Path;
-import org.apache.ignite.internal.catalog.CatalogService;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.failure.FailureProcessor;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -72,7 +71,7 @@ class PersistentPageMemoryMvPartitionStorageTest extends 
AbstractPageMemoryMvPar
 
         table = engine.createMvTable(
                 new StorageTableDescriptor(1, DEFAULT_PARTITION_COUNT, 
DEFAULT_DATA_REGION_NAME),
-                new StorageIndexDescriptorSupplier(mock(CatalogService.class))
+                mock(StorageIndexDescriptorSupplier.class)
         );
 
         initialize(table);
diff --git 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageConcurrencyTest.java
 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageConcurrencyTest.java
index 5e4a7826c2..0c8e9c5c41 100644
--- 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageConcurrencyTest.java
+++ 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageConcurrencyTest.java
@@ -21,7 +21,6 @@ import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_P
 import static 
org.apache.ignite.internal.storage.pagememory.configuration.schema.BasePageMemoryStorageEngineConfigurationSchema.DEFAULT_DATA_REGION_NAME;
 import static org.mockito.Mockito.mock;
 
-import org.apache.ignite.internal.catalog.CatalogService;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.pagememory.evict.PageEvictionTrackerNoOp;
 import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
@@ -54,7 +53,7 @@ class VolatilePageMemoryMvPartitionStorageConcurrencyTest 
extends AbstractMvPart
 
         table = engine.createMvTable(
                 new StorageTableDescriptor(1, DEFAULT_PARTITION_COUNT, 
DEFAULT_DATA_REGION_NAME),
-                new StorageIndexDescriptorSupplier(mock(CatalogService.class))
+                mock(StorageIndexDescriptorSupplier.class)
         );
 
         initialize(table);
diff --git 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageGcTest.java
 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageGcTest.java
index 75481fa552..ac3d8d9595 100644
--- 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageGcTest.java
+++ 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageGcTest.java
@@ -21,7 +21,6 @@ import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_P
 import static 
org.apache.ignite.internal.storage.pagememory.configuration.schema.BasePageMemoryStorageEngineConfigurationSchema.DEFAULT_DATA_REGION_NAME;
 import static org.mockito.Mockito.mock;
 
-import org.apache.ignite.internal.catalog.CatalogService;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.pagememory.evict.PageEvictionTrackerNoOp;
 import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
@@ -54,7 +53,7 @@ class VolatilePageMemoryMvPartitionStorageGcTest extends 
AbstractMvPartitionStor
 
         table = engine.createMvTable(
                 new StorageTableDescriptor(1, DEFAULT_PARTITION_COUNT, 
DEFAULT_DATA_REGION_NAME),
-                new StorageIndexDescriptorSupplier(mock(CatalogService.class))
+                mock(StorageIndexDescriptorSupplier.class)
         );
 
         initialize(table);
diff --git 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageTest.java
 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageTest.java
index 1a92b4c6c0..d3dc387061 100644
--- 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageTest.java
+++ 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageTest.java
@@ -21,7 +21,6 @@ import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_P
 import static 
org.apache.ignite.internal.storage.pagememory.configuration.schema.BasePageMemoryStorageEngineConfigurationSchema.DEFAULT_DATA_REGION_NAME;
 import static org.mockito.Mockito.mock;
 
-import org.apache.ignite.internal.catalog.CatalogService;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.pagememory.evict.PageEvictionTrackerNoOp;
 import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
@@ -54,7 +53,7 @@ class VolatilePageMemoryMvPartitionStorageTest extends 
AbstractPageMemoryMvParti
 
         table = engine.createMvTable(
                 new StorageTableDescriptor(1, DEFAULT_PARTITION_COUNT, 
DEFAULT_DATA_REGION_NAME),
-                new StorageIndexDescriptorSupplier(mock(CatalogService.class))
+                mock(StorageIndexDescriptorSupplier.class)
         );
 
         initialize(table);
diff --git 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageConcurrencyTest.java
 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageConcurrencyTest.java
index 2db6b43e13..857afd7b93 100644
--- 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageConcurrencyTest.java
+++ 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageConcurrencyTest.java
@@ -22,7 +22,6 @@ import static 
org.apache.ignite.internal.storage.rocksdb.configuration.schema.Ro
 import static org.mockito.Mockito.mock;
 
 import java.nio.file.Path;
-import org.apache.ignite.internal.catalog.CatalogService;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import 
org.apache.ignite.internal.storage.AbstractMvPartitionStorageConcurrencyTest;
 import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
@@ -56,7 +55,7 @@ public class RocksDbMvPartitionStorageConcurrencyTest extends 
AbstractMvPartitio
 
         table = engine.createMvTable(
                 new StorageTableDescriptor(1, DEFAULT_PARTITION_COUNT, 
DEFAULT_DATA_REGION_NAME),
-                new StorageIndexDescriptorSupplier(mock(CatalogService.class))
+                mock(StorageIndexDescriptorSupplier.class)
         );
 
         initialize(table);
diff --git 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageGcTest.java
 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageGcTest.java
index 1d511a3eb3..aa26cc7381 100644
--- 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageGcTest.java
+++ 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageGcTest.java
@@ -22,7 +22,6 @@ import static 
org.apache.ignite.internal.storage.rocksdb.configuration.schema.Ro
 import static org.mockito.Mockito.mock;
 
 import java.nio.file.Path;
-import org.apache.ignite.internal.catalog.CatalogService;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.storage.AbstractMvPartitionStorageGcTest;
 import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
@@ -56,7 +55,7 @@ public class RocksDbMvPartitionStorageGcTest extends 
AbstractMvPartitionStorageG
 
         table = engine.createMvTable(
                 new StorageTableDescriptor(1, DEFAULT_PARTITION_COUNT, 
DEFAULT_DATA_REGION_NAME),
-                new StorageIndexDescriptorSupplier(mock(CatalogService.class))
+                mock(StorageIndexDescriptorSupplier.class)
         );
 
         initialize(table);
diff --git 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
index 0ca7d8e2bf..0cd1b61c79 100644
--- 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
+++ 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
@@ -22,7 +22,6 @@ import static 
org.apache.ignite.internal.storage.rocksdb.configuration.schema.Ro
 import static org.mockito.Mockito.mock;
 
 import java.nio.file.Path;
-import org.apache.ignite.internal.catalog.CatalogService;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.storage.AbstractMvPartitionStorageTest;
 import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
@@ -56,7 +55,7 @@ public class RocksDbMvPartitionStorageTest extends 
AbstractMvPartitionStorageTes
 
         table = engine.createMvTable(
                 new StorageTableDescriptor(1, DEFAULT_PARTITION_COUNT, 
DEFAULT_DATA_REGION_NAME),
-                new StorageIndexDescriptorSupplier(mock(CatalogService.class))
+                mock(StorageIndexDescriptorSupplier.class)
         );
 
         initialize(table);
diff --git 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
index 9c5515265a..07d62ed440 100644
--- 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
+++ 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
@@ -35,13 +35,13 @@ import 
org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
-import org.apache.ignite.internal.storage.index.StorageIndexDescriptorSupplier;
 import 
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
@@ -77,7 +77,7 @@ public class RocksDbMvTableStorageTest extends 
AbstractMvTableStorageTest {
     protected MvTableStorage createMvTableStorage() {
         return engine.createMvTable(
                 new StorageTableDescriptor(1, DEFAULT_PARTITION_COUNT, 
DEFAULT_DATA_REGION_NAME),
-                new StorageIndexDescriptorSupplier(catalogService)
+                indexDescriptorSupplier
         );
     }
 
@@ -159,4 +159,11 @@ public class RocksDbMvTableStorageTest extends 
AbstractMvTableStorageTest {
     void storageAdvertisesItIsPersistent() {
         assertThat(tableStorage.isVolatile(), is(false));
     }
+
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-21680";)
+    @Test
+    @Override
+    public void testIndexDestructionOnRecovery() throws Exception {
+        super.testIndexDestructionOnRecovery();
+    }
 }
diff --git 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngineTest.java
 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngineTest.java
index 99b87f3cc5..88680beb1b 100644
--- 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngineTest.java
+++ 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngineTest.java
@@ -26,7 +26,6 @@ import static org.mockito.Mockito.mock;
 
 import java.nio.file.Path;
 import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.internal.catalog.CatalogService;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
@@ -73,7 +72,7 @@ public class RocksDbStorageEngineTest extends 
BaseIgniteAbstractTest {
     void testCreateTableWithDefaultDataRegion() {
         table = engine.createMvTable(
                 new StorageTableDescriptor(1, DEFAULT_PARTITION_COUNT, 
DEFAULT_DATA_REGION_NAME),
-                new StorageIndexDescriptorSupplier(mock(CatalogService.class))
+                mock(StorageIndexDescriptorSupplier.class)
         );
 
         getOrCreateMvPartition(table, 1);
@@ -90,7 +89,7 @@ public class RocksDbStorageEngineTest extends 
BaseIgniteAbstractTest {
 
         table = engine.createMvTable(
                 new StorageTableDescriptor(1, DEFAULT_PARTITION_COUNT, 
customRegionName),
-                new StorageIndexDescriptorSupplier(mock(CatalogService.class))
+                mock(StorageIndexDescriptorSupplier.class)
         );
 
         getOrCreateMvPartition(table, 1);
diff --git 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorageTest.java
 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorageTest.java
index b6cadb23cc..40dd9027b0 100644
--- 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorageTest.java
+++ 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorageTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.storage.rocksdb.index;
 
 import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
 import static 
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfigurationSchema.DEFAULT_DATA_REGION_NAME;
+import static org.mockito.Mockito.mock;
 
 import java.nio.file.Path;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
@@ -55,7 +56,7 @@ public class RocksDbHashIndexStorageTest extends 
AbstractHashIndexStorageTest {
 
         tableStorage = engine.createMvTable(
                 new StorageTableDescriptor(1, DEFAULT_PARTITION_COUNT, 
DEFAULT_DATA_REGION_NAME),
-                new StorageIndexDescriptorSupplier(catalogService)
+                mock(StorageIndexDescriptorSupplier.class)
         );
 
         initialize(tableStorage);
diff --git 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorageTest.java
 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorageTest.java
index aa8612d737..270bc8e33b 100644
--- 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorageTest.java
+++ 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorageTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.storage.rocksdb.index;
 
 import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
 import static 
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfigurationSchema.DEFAULT_DATA_REGION_NAME;
+import static org.mockito.Mockito.mock;
 
 import java.nio.file.Path;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
@@ -55,7 +56,7 @@ public class RocksDbSortedIndexStorageTest extends 
AbstractSortedIndexStorageTes
 
         tableStorage = engine.createMvTable(
                 new StorageTableDescriptor(1, DEFAULT_PARTITION_COUNT, 
DEFAULT_DATA_REGION_NAME),
-                new StorageIndexDescriptorSupplier(catalogService)
+                mock(StorageIndexDescriptorSupplier.class)
         );
 
         initialize(tableStorage);
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/CatalogStorageIndexDescriptorSupplier.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/CatalogStorageIndexDescriptorSupplier.java
new file mode 100644
index 0000000000..645297db48
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/CatalogStorageIndexDescriptorSupplier.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.storage.index.StorageIndexDescriptor;
+import org.apache.ignite.internal.storage.index.StorageIndexDescriptorSupplier;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Index descriptor supplier from catalog.
+ */
+class CatalogStorageIndexDescriptorSupplier implements 
StorageIndexDescriptorSupplier {
+    private final CatalogService catalogService;
+
+    private final LowWatermark lowWatermark;
+
+    CatalogStorageIndexDescriptorSupplier(CatalogService catalogService, 
LowWatermark lowWatermark) {
+        this.catalogService = catalogService;
+        this.lowWatermark = lowWatermark;
+    }
+
+    @Override
+    public @Nullable StorageIndexDescriptor get(int indexId) {
+        // Search for the index in the catalog history, which versions 
correspond to (lowWatermark, now] timestamp range.
+        int latestCatalogVersion = catalogService.latestCatalogVersion();
+
+        // Get the current Low Watermark value. Since this class is used only 
on recovery, we expect that this value will not change
+        // concurrently.
+        HybridTimestamp lowWatermarkTimestamp = lowWatermark.getLowWatermark();
+
+        int earliestCatalogVersion = lowWatermarkTimestamp == null
+                ? catalogService.earliestCatalogVersion()
+                : 
catalogService.activeCatalogVersion(lowWatermarkTimestamp.longValue());
+
+        for (int catalogVersion = latestCatalogVersion; catalogVersion >= 
earliestCatalogVersion; catalogVersion--) {
+            CatalogIndexDescriptor index = catalogService.index(indexId, 
catalogVersion);
+
+            if (index != null) {
+                return createStorageIndexDescriptor(index, catalogVersion);
+            }
+        }
+
+        return null;
+    }
+
+    private StorageIndexDescriptor 
createStorageIndexDescriptor(CatalogIndexDescriptor indexDescriptor, int 
catalogVersion) {
+        CatalogTableDescriptor table = 
catalogService.table(indexDescriptor.tableId(), catalogVersion);
+
+        assert table != null : "tableId=" + indexDescriptor.tableId() + ", 
indexId=" + indexDescriptor.id();
+
+        return StorageIndexDescriptor.create(table, indexDescriptor);
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java
index 8e5cc3cf20..5e47e7ba9f 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java
@@ -190,14 +190,14 @@ public class LowWatermark implements IgniteComponent {
         return lowWatermark;
     }
 
-    void updateLowWatermark() {
-        inBusyLock(busyLock, () -> {
+    CompletableFuture<Void> updateLowWatermark() {
+        return inBusyLock(busyLock, () -> {
             HybridTimestamp lowWatermarkCandidate = 
createNewLowWatermarkCandidate();
 
             // Wait until all the read-only transactions are finished before 
the new candidate, since no new RO transactions could be
             // created, then we can safely promote the candidate as a new low 
watermark, store it in vault, and we can safely start cleaning
             // up the stale/junk data in the tables.
-            txManager.updateLowWatermark(lowWatermarkCandidate)
+            return txManager.updateLowWatermark(lowWatermarkCandidate)
                     .thenComposeAsync(unused -> inBusyLock(busyLock, () -> {
                         vaultManager.put(LOW_WATERMARK_VAULT_KEY, 
ByteUtils.toBytes(lowWatermarkCandidate));
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 9f9a49b6f0..90fd8c21a2 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -161,7 +161,6 @@ import 
org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.storage.engine.StorageEngine;
 import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
-import org.apache.ignite.internal.storage.index.StorageIndexDescriptorSupplier;
 import org.apache.ignite.internal.table.IgniteTablesInternal;
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.TableImpl;
@@ -1361,7 +1360,7 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
 
         return engine.createMvTable(
                 new StorageTableDescriptor(tableDescriptor.id(), 
zoneDescriptor.partitions(), dataStorage.dataRegion()),
-                new StorageIndexDescriptorSupplier(catalogService)
+                new CatalogStorageIndexDescriptorSupplier(catalogService, 
lowWatermark)
         );
     }
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/CatalogStorageIndexDescriptorSupplierTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/CatalogStorageIndexDescriptorSupplierTest.java
new file mode 100644
index 0000000000..5e04da9afd
--- /dev/null
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/CatalogStorageIndexDescriptorSupplierTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
+import static 
org.apache.ignite.internal.catalog.CatalogTestUtils.createTestCatalogManager;
+import static org.apache.ignite.internal.hlc.HybridTimestamp.CLOCK_SKEW;
+import static 
org.apache.ignite.internal.replicator.ReplicaManager.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.lenient;
+
+import java.util.List;
+import org.apache.ignite.internal.catalog.CatalogCommand;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.commands.ColumnParams;
+import org.apache.ignite.internal.catalog.commands.CreateHashIndexCommand;
+import org.apache.ignite.internal.catalog.commands.CreateTableCommand;
+import org.apache.ignite.internal.catalog.commands.DropIndexCommand;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.failure.FailureProcessor;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import 
org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
+import org.apache.ignite.internal.storage.index.StorageIndexDescriptor;
+import org.apache.ignite.internal.storage.index.StorageIndexDescriptorSupplier;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.sql.ColumnType;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+@ExtendWith(ConfigurationExtension.class)
+class CatalogStorageIndexDescriptorSupplierTest extends BaseIgniteAbstractTest 
{
+    // TODO: remove this, see 
https://issues.apache.org/jira/browse/IGNITE-18977
+    private static final long MIN_DATA_AVAILABILITY_TIME = 
DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS + CLOCK_SKEW;
+
+    private static final String TABLE_NAME = "TEST";
+
+    private static final String INDEX_NAME = "TEST_IDX";
+
+    private final HybridClock clock = new HybridClockImpl();
+
+    private CatalogManager catalogManager;
+
+    private LowWatermark lowWatermark;
+
+    private StorageIndexDescriptorSupplier indexDescriptorSupplier;
+
+    @BeforeEach
+    void setUp(
+            TestInfo testInfo,
+            @InjectConfiguration("mock.dataAvailabilityTime = " + 
MIN_DATA_AVAILABILITY_TIME)
+            LowWatermarkConfiguration lowWatermarkConfiguration,
+            @Mock TxManager txManager,
+            @Mock VaultManager vaultManager,
+            @Mock FailureProcessor failureProcessor
+    ) {
+        String nodeName = testNodeName(testInfo, 0);
+
+        catalogManager = createTestCatalogManager(nodeName, clock);
+
+        
lenient().when(txManager.updateLowWatermark(any())).thenReturn(nullCompletedFuture());
+
+        lowWatermark = new LowWatermark(
+                nodeName,
+                lowWatermarkConfiguration,
+                clock,
+                txManager,
+                vaultManager,
+                failureProcessor
+        );
+
+        indexDescriptorSupplier = new 
CatalogStorageIndexDescriptorSupplier(catalogManager, lowWatermark);
+
+        catalogManager.start();
+
+        lowWatermark.start();
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        IgniteUtils.stopAll(lowWatermark, catalogManager);
+    }
+
+    @Test
+    void testGetMissingIndex() {
+        assertThat(indexDescriptorSupplier.get(0), is(nullValue()));
+    }
+
+    @Test
+    void testGetLatestAliveIndex() {
+        int indexId = createIndex();
+
+        StorageIndexDescriptor indexDescriptor = 
indexDescriptorSupplier.get(indexId);
+
+        assertThat(indexDescriptor, is(notNullValue()));
+        assertThat(indexDescriptor.id(), is(indexId));
+    }
+
+    @Test
+    void testGetDroppedIndex() {
+        int indexId = createIndex();
+
+        CatalogCommand dropIndexCommand = DropIndexCommand.builder()
+                .schemaName(DEFAULT_SCHEMA_NAME)
+                .indexName(INDEX_NAME)
+                .build();
+
+        assertThat(catalogManager.execute(dropIndexCommand), 
willCompleteSuccessfully());
+
+        assertThat(catalogManager.index(indexId, 
catalogManager.latestCatalogVersion()), is(nullValue()));
+
+        StorageIndexDescriptor indexDescriptor = 
indexDescriptorSupplier.get(indexId);
+
+        assertThat(indexDescriptor, is(notNullValue()));
+        assertThat(indexDescriptor.id(), is(indexId));
+    }
+
+    @Test
+    void testGetAliveIndexAfterWatermark() throws InterruptedException {
+        int indexId = createIndex();
+
+        HybridTimestamp timestampAfterCreate = clock.now();
+
+        StorageIndexDescriptor indexDescriptor = 
indexDescriptorSupplier.get(indexId);
+
+        assertThat(indexDescriptor, is(notNullValue()));
+        assertThat(indexDescriptor.id(), is(indexId));
+
+        raiseWatermarkUpTo(timestampAfterCreate);
+
+        indexDescriptor = indexDescriptorSupplier.get(indexId);
+
+        assertThat(indexDescriptor, is(notNullValue()));
+        assertThat(indexDescriptor.id(), is(indexId));
+    }
+
+    @Test
+    void testGetDroppedIndexAfterWatermark() throws InterruptedException {
+        int indexId = createIndex();
+
+        CatalogCommand dropIndexCommand = DropIndexCommand.builder()
+                .schemaName(DEFAULT_SCHEMA_NAME)
+                .indexName(INDEX_NAME)
+                .build();
+
+        assertThat(catalogManager.execute(dropIndexCommand), 
willCompleteSuccessfully());
+
+        HybridTimestamp timestampAfterDrop = clock.now();
+
+        StorageIndexDescriptor indexDescriptor = 
indexDescriptorSupplier.get(indexId);
+
+        assertThat(indexDescriptor, is(notNullValue()));
+        assertThat(indexDescriptor.id(), is(indexId));
+
+        raiseWatermarkUpTo(timestampAfterDrop);
+
+        indexDescriptor = indexDescriptorSupplier.get(indexId);
+
+        assertThat(indexDescriptor, is(nullValue()));
+    }
+
+    private int createIndex() {
+        List<CatalogCommand> commands = List.of(
+                CreateTableCommand.builder()
+                        .schemaName(DEFAULT_SCHEMA_NAME)
+                        .tableName(TABLE_NAME)
+                        
.columns(List.of(ColumnParams.builder().name("foo").type(ColumnType.INT32).build()))
+                        .primaryKeyColumns(List.of("foo"))
+                        .build(),
+                CreateHashIndexCommand.builder()
+                        .schemaName(DEFAULT_SCHEMA_NAME)
+                        .tableName(TABLE_NAME)
+                        .indexName(INDEX_NAME)
+                        .columns(List.of("foo"))
+                        .build()
+        );
+
+        assertThat(catalogManager.execute(commands), 
willCompleteSuccessfully());
+
+        CatalogIndexDescriptor index = catalogManager.aliveIndex(INDEX_NAME, 
clock.nowLong());
+
+        assertThat(index, is(notNullValue()));
+
+        return index.id();
+    }
+
+    private void raiseWatermarkUpTo(HybridTimestamp now) throws 
InterruptedException {
+        // Wait for the low watermark to cover the previous Catalog version.
+        assertTrue(waitForCondition(() -> {
+            HybridTimestamp candidate = 
lowWatermark.createNewLowWatermarkCandidate();
+
+            return candidate.compareTo(now) >= 0;
+        }, 10_000));
+
+        assertThat(lowWatermark.updateLowWatermark(), 
willCompleteSuccessfully());
+
+        HybridTimestamp lowWatermarkTimestamp = lowWatermark.getLowWatermark();
+
+        assertThat(lowWatermarkTimestamp, is(notNullValue()));
+        
assertThat(catalogManager.activeCatalogVersion(lowWatermarkTimestamp.longValue()),
 is(catalogManager.latestCatalogVersion()));
+    }
+}
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/PersistentPageMemoryGcUpdateHandlerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/PersistentPageMemoryGcUpdateHandlerTest.java
index 6c011b6cd4..ec6034ddf0 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/PersistentPageMemoryGcUpdateHandlerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/PersistentPageMemoryGcUpdateHandlerTest.java
@@ -19,10 +19,10 @@ package org.apache.ignite.internal.table.distributed.gc;
 
 import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
 import static 
org.apache.ignite.internal.storage.pagememory.configuration.schema.BasePageMemoryStorageEngineConfigurationSchema.DEFAULT_DATA_REGION_NAME;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
 import static org.mockito.Mockito.mock;
 
 import java.nio.file.Path;
-import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.components.LongJvmPauseDetector;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.failure.FailureProcessor;
@@ -37,6 +37,7 @@ import 
org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.extension.ExtendWith;
 
 @ExtendWith(WorkDirectoryExtension.class)
@@ -50,13 +51,14 @@ class PersistentPageMemoryGcUpdateHandlerTest extends 
AbstractGcUpdateHandlerTes
 
     @BeforeEach
     void setUp(
+            TestInfo testInfo,
             @InjectConfiguration 
PersistentPageMemoryStorageEngineConfiguration engineConfig
     ) {
         PageIoRegistry ioRegistry = new PageIoRegistry();
 
         ioRegistry.loadFromServiceLoader();
 
-        String nodeName = "test";
+        String nodeName = testNodeName(testInfo, 0);
 
         engine = new PersistentPageMemoryStorageEngine(
                 nodeName,
@@ -71,7 +73,7 @@ class PersistentPageMemoryGcUpdateHandlerTest extends 
AbstractGcUpdateHandlerTes
 
         table = engine.createMvTable(
                 new StorageTableDescriptor(TABLE_ID, DEFAULT_PARTITION_COUNT, 
DEFAULT_DATA_REGION_NAME),
-                new StorageIndexDescriptorSupplier(mock(CatalogService.class))
+                mock(StorageIndexDescriptorSupplier.class)
         );
 
         initialize(table);
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/RocksDbGcUpdateHandlerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/RocksDbGcUpdateHandlerTest.java
index 01d251a0ac..16b820dda5 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/RocksDbGcUpdateHandlerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/RocksDbGcUpdateHandlerTest.java
@@ -19,10 +19,10 @@ package org.apache.ignite.internal.table.distributed.gc;
 
 import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
 import static 
org.apache.ignite.internal.storage.pagememory.configuration.schema.BasePageMemoryStorageEngineConfigurationSchema.DEFAULT_DATA_REGION_NAME;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
 import static org.mockito.Mockito.mock;
 
 import java.nio.file.Path;
-import org.apache.ignite.internal.catalog.CatalogService;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
 import org.apache.ignite.internal.storage.index.StorageIndexDescriptorSupplier;
@@ -34,6 +34,7 @@ import 
org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.extension.ExtendWith;
 
 @ExtendWith(WorkDirectoryExtension.class)
@@ -47,15 +48,16 @@ class RocksDbGcUpdateHandlerTest extends 
AbstractGcUpdateHandlerTest {
 
     @BeforeEach
     void setUp(
+            TestInfo testInfo,
             @InjectConfiguration RocksDbStorageEngineConfiguration engineConfig
     ) {
-        engine = new RocksDbStorageEngine("test", engineConfig, workDir);
+        engine = new RocksDbStorageEngine(testNodeName(testInfo, 0), 
engineConfig, workDir);
 
         engine.start();
 
         table = engine.createMvTable(
                 new StorageTableDescriptor(TABLE_ID, DEFAULT_PARTITION_COUNT, 
DEFAULT_DATA_REGION_NAME),
-                new StorageIndexDescriptorSupplier(mock(CatalogService.class))
+                mock(StorageIndexDescriptorSupplier.class)
         );
 
         initialize(table);
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/VolatilePageMemoryGcUpdateHandlerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/VolatilePageMemoryGcUpdateHandlerTest.java
index 4e240e1bd2..300283ba52 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/VolatilePageMemoryGcUpdateHandlerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/VolatilePageMemoryGcUpdateHandlerTest.java
@@ -19,9 +19,9 @@ package org.apache.ignite.internal.table.distributed.gc;
 
 import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
 import static 
org.apache.ignite.internal.storage.pagememory.configuration.schema.BasePageMemoryStorageEngineConfigurationSchema.DEFAULT_DATA_REGION_NAME;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
 import static org.mockito.Mockito.mock;
 
-import org.apache.ignite.internal.catalog.CatalogService;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.pagememory.evict.PageEvictionTrackerNoOp;
 import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
@@ -33,6 +33,7 @@ import 
org.apache.ignite.internal.storage.pagememory.configuration.schema.Volati
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestInfo;
 
 class VolatilePageMemoryGcUpdateHandlerTest extends 
AbstractGcUpdateHandlerTest {
     private VolatilePageMemoryStorageEngine engine;
@@ -41,19 +42,20 @@ class VolatilePageMemoryGcUpdateHandlerTest extends 
AbstractGcUpdateHandlerTest
 
     @BeforeEach
     void setUp(
+            TestInfo testInfo,
             @InjectConfiguration VolatilePageMemoryStorageEngineConfiguration 
engineConfig
     ) {
         PageIoRegistry ioRegistry = new PageIoRegistry();
 
         ioRegistry.loadFromServiceLoader();
 
-        engine = new VolatilePageMemoryStorageEngine("test", engineConfig, 
ioRegistry, PageEvictionTrackerNoOp.INSTANCE);
+        engine = new VolatilePageMemoryStorageEngine(testNodeName(testInfo, 
0), engineConfig, ioRegistry, PageEvictionTrackerNoOp.INSTANCE);
 
         engine.start();
 
         table = engine.createMvTable(
                 new StorageTableDescriptor(TABLE_ID, DEFAULT_PARTITION_COUNT, 
DEFAULT_DATA_REGION_NAME),
-                new StorageIndexDescriptorSupplier(mock(CatalogService.class))
+                mock(StorageIndexDescriptorSupplier.class)
         );
 
         initialize(table);

Reply via email to