This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 399f0f15da IGNITE-21680 Remove destroyed RocksDB indexes on recovery 
(#3435)
399f0f15da is described below

commit 399f0f15daaebf1099589d734c85f2763f0344a2
Author: Alexander Polovtcev <alex.polovt...@gmail.com>
AuthorDate: Wed Mar 20 09:44:54 2024 +0200

    IGNITE-21680 Remove destroyed RocksDB indexes on recovery (#3435)
---
 .../apache/ignite/internal/rocksdb/RocksUtils.java |   3 +-
 .../internal/rocksdb/flush/RocksDbFlusher.java     |   2 +-
 .../storage/AbstractMvTableStorageTest.java        | 126 +++++++++++-
 .../storage/rocksdb/ColumnFamilyUtils.java         |   2 +-
 .../ignite/internal/storage/rocksdb/HashIndex.java |  25 +--
 .../ignite/internal/storage/rocksdb/Index.java     |   8 +-
 .../internal/storage/rocksdb/RocksDbIndexes.java   | 229 +++++++++++++++++++++
 .../storage/rocksdb/RocksDbMetaStorage.java        |  17 +-
 .../storage/rocksdb/RocksDbMvPartitionStorage.java |   3 +-
 .../storage/rocksdb/RocksDbStorageEngine.java      |   6 +-
 .../storage/rocksdb/RocksDbStorageUtils.java       |  15 ++
 .../storage/rocksdb/RocksDbTableStorage.java       | 179 +++++-----------
 .../internal/storage/rocksdb/SortedIndex.java      |  53 +++--
 .../index/RocksDbBinaryTupleComparator.java        |  28 +--
 .../rocksdb/index/RocksDbSortedIndexStorage.java   |   1 +
 .../storage/rocksdb/instance/IndexIdCursor.java    | 116 +++++++++++
 .../rocksdb/instance/SharedRocksDbInstance.java    | 202 ++++++++++++------
 .../instance/SharedRocksDbInstanceCreator.java     |   8 +-
 .../storage/rocksdb/RocksDbMvTableStorageTest.java |   8 -
 .../rocksdb/instance/IndexIdCursorTest.java        |  93 +++++++++
 .../instance/SharedRocksDbInstanceTest.java        | 167 +++++++++++++++
 21 files changed, 1016 insertions(+), 275 deletions(-)

diff --git 
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksUtils.java
 
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksUtils.java
index e95b8f8132..e06b5941e6 100644
--- 
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksUtils.java
+++ 
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksUtils.java
@@ -47,8 +47,7 @@ public class RocksUtils {
     /**
      * Checks the status of the iterator and throws an exception if it is not 
correct.
      *
-     * <p>Check the status first. This operation is guaranteed to throw if an 
internal error has occurred during the iteration. Otherwise,
-     * we've exhausted the data range.
+     * <p>This operation is guaranteed to throw if an internal error has 
occurred during the iteration.
      *
      * @param it RocksDB iterator.
      * @throws IgniteInternalException if the iterator has an incorrect status.
diff --git 
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
 
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
index 043fe36550..62528bb05d 100644
--- 
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
+++ 
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
@@ -123,7 +123,7 @@ public class RocksDbFlusher {
 
     /**
      * Returns an instance of {@link AbstractEventListener} to process actual 
RocksDB events. Returned listener must be set into
-     * {@link Options#setListeners(List)} before database is started. 
Otherwise, no events would occurre.
+     * {@link Options#setListeners(List)} before database is started. 
Otherwise, no events would occur.
      */
     public AbstractEventListener listener() {
         return new RocksDbFlushListener(this);
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index 21930352c7..24806134f0 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -314,6 +314,113 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
         assertThat(tableStorage.getIndex(PARTITION_ID, hashIdx.id()), 
is(nullValue()));
     }
 
+    /**
+     * Tests that removing one Sorted Index does not affect the data in the 
other.
+     */
+    @Test
+    public void testDestroySortedIndexIndependence() {
+        CatalogTableDescriptor catalogTableDescriptor = 
catalogService.table(TABLE_NAME, clock.nowLong());
+
+        var catalogSortedIndex1 = new CatalogSortedIndexDescriptor(
+                200,
+                "TEST_INDEX_1",
+                catalogTableDescriptor.id(),
+                false,
+                AVAILABLE,
+                catalogService.latestCatalogVersion(),
+                List.of(new CatalogIndexColumnDescriptor("STRKEY", 
ASC_NULLS_LAST))
+        );
+
+        var catalogSortedIndex2 = new CatalogSortedIndexDescriptor(
+                201,
+                "TEST_INDEX_2",
+                catalogTableDescriptor.id(),
+                false,
+                AVAILABLE,
+                catalogService.latestCatalogVersion(),
+                List.of(new CatalogIndexColumnDescriptor("STRKEY", 
ASC_NULLS_LAST))
+        );
+
+        var sortedIndexDescriptor1 = new 
StorageSortedIndexDescriptor(catalogTableDescriptor, catalogSortedIndex1);
+        var sortedIndexDescriptor2 = new 
StorageSortedIndexDescriptor(catalogTableDescriptor, catalogSortedIndex2);
+
+        MvPartitionStorage partitionStorage = 
getOrCreateMvPartition(PARTITION_ID);
+
+        SortedIndexStorage sortedIndexStorage1 = 
tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIndexDescriptor1);
+        SortedIndexStorage sortedIndexStorage2 = 
tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIndexDescriptor2);
+
+        List<TestRow> rows = List.of(
+                new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(0, 
"0"), new TestValue(0, "0"))),
+                new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(1, 
"1"), new TestValue(1, "1")))
+        );
+
+        fillStorages(partitionStorage, null, sortedIndexStorage1, rows);
+        fillStorages(partitionStorage, null, sortedIndexStorage2, rows);
+
+        checkForPresenceRows(null, null, sortedIndexStorage1, rows);
+        checkForPresenceRows(null, null, sortedIndexStorage2, rows);
+
+        assertThat(tableStorage.destroyIndex(sortedIndexDescriptor1.id()), 
willCompleteSuccessfully());
+
+        assertThat(tableStorage.getIndex(PARTITION_ID, 
sortedIndexDescriptor1.id()), is(nullValue()));
+
+        checkForPresenceRows(null, null, sortedIndexStorage2, rows);
+    }
+
+    /**
+     * Tests that removing one Hash Index does not affect the data in the 
other.
+     */
+    @Test
+    public void testDestroyHashIndexIndependence() {
+        CatalogTableDescriptor catalogTableDescriptor = 
catalogService.table(TABLE_NAME, clock.nowLong());
+
+        var catalogHashIndex1 = new CatalogHashIndexDescriptor(
+                200,
+                "TEST_INDEX_1",
+                catalogTableDescriptor.id(),
+                true,
+                AVAILABLE,
+                catalogService.latestCatalogVersion(),
+                List.of("STRKEY")
+        );
+
+        var catalogHashIndex2 = new CatalogHashIndexDescriptor(
+                201,
+                "TEST_INDEX_2",
+                catalogTableDescriptor.id(),
+                true,
+                AVAILABLE,
+                catalogService.latestCatalogVersion(),
+                List.of("STRKEY")
+        );
+
+        var hashIndexDescriptor1 = new 
StorageHashIndexDescriptor(catalogTableDescriptor, catalogHashIndex1);
+        var hashIndexDescriptor2 = new 
StorageHashIndexDescriptor(catalogTableDescriptor, catalogHashIndex2);
+
+        MvPartitionStorage partitionStorage = 
getOrCreateMvPartition(PARTITION_ID);
+
+        HashIndexStorage hashIndexStorage1 = 
tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIndexDescriptor1);
+        HashIndexStorage hashIndexStorage2 = 
tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIndexDescriptor2);
+
+        List<TestRow> rows = List.of(
+                new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(0, 
"0"), new TestValue(0, "0"))),
+                new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(1, 
"1"), new TestValue(1, "1")))
+        );
+
+        fillStorages(partitionStorage, hashIndexStorage1, null, rows);
+        fillStorages(partitionStorage, hashIndexStorage2, null, rows);
+
+        checkForPresenceRows(null, hashIndexStorage1, null, rows);
+        checkForPresenceRows(null, hashIndexStorage2, null, rows);
+
+        assertThat(tableStorage.destroyIndex(hashIndexDescriptor1.id()), 
willCompleteSuccessfully());
+
+        assertThat(tableStorage.getIndex(PARTITION_ID, 
hashIndexDescriptor1.id()), is(nullValue()));
+
+        checkForPresenceRows(null, hashIndexStorage2, null, rows);
+    }
+
+
     @Test
     public void testHashIndexIndependence() {
         MvPartitionStorage partitionStorage1 = 
getOrCreateMvPartition(PARTITION_ID);
@@ -859,8 +966,8 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
 
     private static void fillStorages(
             MvPartitionStorage mvPartitionStorage,
-            HashIndexStorage hashIndexStorage,
-            SortedIndexStorage sortedIndexStorage,
+            @Nullable HashIndexStorage hashIndexStorage,
+            @Nullable SortedIndexStorage sortedIndexStorage,
             List<TestRow> rows
     ) {
         assertThat(rows, hasSize(greaterThanOrEqualTo(2)));
@@ -878,9 +985,6 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
             assertNotNull(binaryRow);
             assertNotNull(timestamp);
 
-            IndexRow hashIndexRow = 
indexRow(hashIndexStorage.indexDescriptor(), binaryRow, rowId);
-            IndexRow sortedIndexRow = 
indexRow(sortedIndexStorage.indexDescriptor(), binaryRow, rowId);
-
             mvPartitionStorage.runConsistently(locker -> {
                 locker.lock(rowId);
 
@@ -892,9 +996,17 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
                     mvPartitionStorage.addWriteCommitted(rowId, binaryRow, 
timestamp);
                 }
 
-                hashIndexStorage.put(hashIndexRow);
+                if (hashIndexStorage != null) {
+                    IndexRow hashIndexRow = 
indexRow(hashIndexStorage.indexDescriptor(), binaryRow, rowId);
 
-                sortedIndexStorage.put(sortedIndexRow);
+                    hashIndexStorage.put(hashIndexRow);
+                }
+
+                if (sortedIndexStorage != null) {
+                    IndexRow sortedIndexRow = 
indexRow(sortedIndexStorage.indexDescriptor(), binaryRow, rowId);
+
+                    sortedIndexStorage.put(sortedIndexRow);
+                }
 
                 return null;
             });
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java
index 339e05bc86..1cd093415d 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java
@@ -110,7 +110,7 @@ public class ColumnFamilyUtils {
      *
      * @see #comparatorFromCfName(byte[])
      */
-    static byte[] sortedIndexCfName(List<StorageSortedIndexColumnDescriptor> 
columns) {
+    public static byte[] 
sortedIndexCfName(List<StorageSortedIndexColumnDescriptor> columns) {
         ByteBuffer buf = ByteBuffer.allocate(SORTED_INDEX_CF_PREFIX.length() + 
columns.size() * 2);
 
         buf.put(SORTED_INDEX_CF_PREFIX.getBytes(UTF_8));
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
index 7f489d6e2f..a5f3bcce9e 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
@@ -17,7 +17,7 @@
 
 package org.apache.ignite.internal.storage.rocksdb;
 
-import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.createKey;
+import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.createKey;
 import static 
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance.deleteByPrefix;
 import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
 
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.rocksdb.ColumnFamily;
 import org.apache.ignite.internal.storage.index.HashIndexStorage;
 import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor;
 import 
org.apache.ignite.internal.storage.rocksdb.index.RocksDbHashIndexStorage;
+import 
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance;
 import org.jetbrains.annotations.Nullable;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.WriteBatch;
@@ -39,14 +40,23 @@ class HashIndex extends Index<RocksDbHashIndexStorage> {
 
     private final RocksDbMetaStorage indexMetaStorage;
 
-    HashIndex(ColumnFamily indexCf, StorageHashIndexDescriptor descriptor, 
RocksDbMetaStorage indexMetaStorage) {
+    HashIndex(SharedRocksDbInstance rocksDb, StorageHashIndexDescriptor 
descriptor, RocksDbMetaStorage indexMetaStorage) {
         super(descriptor.id());
 
-        this.indexCf = indexCf;
+        this.indexCf = rocksDb.hashIndexCf();
         this.descriptor = descriptor;
         this.indexMetaStorage = indexMetaStorage;
     }
 
+    /**
+     * Returns hash index storage for partition.
+     *
+     * @param partitionId Partition ID.
+     */
+    @Nullable RocksDbHashIndexStorage getStorage(int partitionId) {
+        return storageByPartitionId.get(partitionId);
+    }
+
     /**
      * Creates a new Hash Index storage or returns an existing one.
      */
@@ -66,13 +76,4 @@ class HashIndex extends Index<RocksDbHashIndexStorage> {
         // Every index storage uses an "index ID" + "partition ID" prefix. We 
can remove everything by just using the index ID prefix.
         deleteByPrefix(writeBatch, indexCf, createKey(BYTE_EMPTY_ARRAY, 
descriptor.id()));
     }
-
-    /**
-     * Returns hash index storage for partition.
-     *
-     * @param partitionId Partition ID.
-     */
-    @Nullable RocksDbHashIndexStorage get(int partitionId) {
-        return storageByPartitionId.get(partitionId);
-    }
 }
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/Index.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/Index.java
index 2640359a91..30f49dbcb7 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/Index.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/Index.java
@@ -67,12 +67,12 @@ abstract class Index<S extends AbstractRocksDbIndexStorage> 
{
      * @throws RocksDBException If failed to delete data.
      */
     void destroy(int partitionId, WriteBatch writeBatch) throws 
RocksDBException {
-        S hashIndex = storageByPartitionId.remove(partitionId);
+        S storage = storageByPartitionId.remove(partitionId);
 
-        if (hashIndex != null) {
-            hashIndex.transitionToDestroyedState();
+        if (storage != null) {
+            storage.transitionToDestroyedState();
 
-            hashIndex.destroyData(writeBatch);
+            storage.destroyData(writeBatch);
         }
     }
 }
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbIndexes.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbIndexes.java
new file mode 100644
index 0000000000..6fef05303a
--- /dev/null
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbIndexes.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.createKey;
+import static 
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance.DFLT_WRITE_OPTS;
+import static 
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance.deleteByPrefix;
+import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.IntFunction;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.storage.index.HashIndexStorage;
+import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.storage.index.SortedIndexStorage;
+import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor;
+import org.apache.ignite.internal.storage.index.StorageIndexDescriptorSupplier;
+import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor;
+import 
org.apache.ignite.internal.storage.rocksdb.index.AbstractRocksDbIndexStorage;
+import 
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+
+/** Manager for all RocksDB-based indexes. */
+class RocksDbIndexes {
+    /** Hash Index storages by Index IDs. */
+    private final ConcurrentMap<Integer, HashIndex> hashIndices = new 
ConcurrentHashMap<>();
+
+    /** Sorted Index storages by Index IDs. */
+    private final ConcurrentMap<Integer, SortedIndex> sortedIndices = new 
ConcurrentHashMap<>();
+
+    private final SharedRocksDbInstance rocksDb;
+
+    /** Callback for getting partition storages using partition IDs. */
+    private final IntFunction<RocksDbMvPartitionStorage> 
partitionStorageProvider;
+
+    RocksDbIndexes(SharedRocksDbInstance rocksDb, 
IntFunction<RocksDbMvPartitionStorage> partitionStorageProvider) {
+        this.rocksDb = rocksDb;
+        this.partitionStorageProvider = partitionStorageProvider;
+    }
+
+    void recoverIndexes(StorageIndexDescriptorSupplier 
indexDescriptorSupplier) throws RocksDBException {
+        try (WriteBatch writeBatch = new WriteBatch()) {
+            for (int indexId : rocksDb.hashIndexIds()) {
+                var descriptor = (StorageHashIndexDescriptor) 
indexDescriptorSupplier.get(indexId);
+
+                if (descriptor == null) {
+                    deleteByPrefix(writeBatch, rocksDb.hashIndexCf(), 
createKey(BYTE_EMPTY_ARRAY, indexId));
+                } else {
+                    hashIndices.put(indexId, new HashIndex(rocksDb, 
descriptor, rocksDb.meta));
+                }
+            }
+
+            var indexCfsToDestroy = new ArrayList<Map.Entry<Integer, 
ColumnFamily>>();
+
+            for (Map.Entry<Integer, ColumnFamily> e : 
rocksDb.sortedIndexes().entrySet()) {
+                int indexId = e.getKey();
+
+                ColumnFamily indexCf = e.getValue();
+
+                var descriptor = (StorageSortedIndexDescriptor) 
indexDescriptorSupplier.get(indexId);
+
+                if (descriptor == null) {
+                    deleteByPrefix(writeBatch, indexCf, 
createKey(BYTE_EMPTY_ARRAY, indexId));
+
+                    indexCfsToDestroy.add(e);
+                } else {
+                    sortedIndices.put(indexId, 
SortedIndex.restoreExisting(rocksDb, indexCf, descriptor, rocksDb.meta));
+                }
+            }
+
+            rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch);
+
+            if (!indexCfsToDestroy.isEmpty()) {
+                rocksDb.flusher.awaitFlush(false)
+                        .thenRunAsync(() -> {
+                            for (Map.Entry<Integer, ColumnFamily> e : 
indexCfsToDestroy) {
+                                int indexId = e.getKey();
+
+                                ColumnFamily indexCf = e.getValue();
+
+                                
rocksDb.destroySortedIndexCfIfNeeded(indexCf.nameBytes(), indexId);
+                            }
+                        }, rocksDb.engine.threadPool());
+            }
+        }
+    }
+
+    SortedIndexStorage getOrCreateSortedIndex(int partitionId, 
StorageSortedIndexDescriptor indexDescriptor) {
+        SortedIndex sortedIndex = sortedIndices.computeIfAbsent(
+                indexDescriptor.id(),
+                id -> SortedIndex.createNew(rocksDb, indexDescriptor, 
rocksDb.meta)
+        );
+
+        return 
sortedIndex.getOrCreateStorage(partitionStorageProvider.apply(partitionId));
+    }
+
+    HashIndexStorage getOrCreateHashIndex(int partitionId, 
StorageHashIndexDescriptor indexDescriptor) {
+        HashIndex hashIndex = hashIndices.computeIfAbsent(
+                indexDescriptor.id(),
+                id -> new HashIndex(rocksDb, indexDescriptor, rocksDb.meta)
+        );
+
+        return 
hashIndex.getOrCreateStorage(partitionStorageProvider.apply(partitionId));
+    }
+
+    @Nullable IndexStorage getIndex(int partitionId, int indexId) {
+        HashIndex hashIndex = hashIndices.get(indexId);
+
+        if (hashIndex != null) {
+            assert !sortedIndices.containsKey(indexId) : indexId;
+
+            return hashIndex.getStorage(partitionId);
+        }
+
+        SortedIndex sortedIndex = sortedIndices.get(indexId);
+
+        if (sortedIndex != null) {
+            return sortedIndex.getStorage(partitionId);
+        }
+
+        return null;
+    }
+
+    void startRebalance(int partitionId, WriteBatch writeBatch) {
+        getAllStorages(partitionId).forEach(indexStorage -> 
indexStorage.startRebalance(writeBatch));
+    }
+
+    void abortRebalance(int partitionId, WriteBatch writeBatch) {
+        getAllStorages(partitionId).forEach(indexStorage -> 
indexStorage.abortRebalance(writeBatch));
+    }
+
+    void finishRebalance(int partitionId) {
+        
getAllStorages(partitionId).forEach(AbstractRocksDbIndexStorage::finishRebalance);
+    }
+
+    List<AutoCloseable> getResourcesForClose() {
+        return allIndexes().map(index -> (AutoCloseable) 
index::close).collect(toList());
+    }
+
+    List<AutoCloseable> getResourcesForDestroy() {
+        return allIndexes().map(index -> (AutoCloseable) 
index::transitionToDestroyedState).collect(toList());
+    }
+
+    private Stream<Index<?>> allIndexes() {
+        return Stream.concat(hashIndices.values().stream(), 
sortedIndices.values().stream());
+    }
+
+    void destroyIndex(int indexId) throws RocksDBException {
+        HashIndex hashIdx = hashIndices.remove(indexId);
+
+        SortedIndex sortedIdx = sortedIndices.remove(indexId);
+
+        if (hashIdx == null && sortedIdx == null) {
+            return;
+        }
+
+        try (WriteBatch writeBatch = new WriteBatch()) {
+            if (hashIdx != null) {
+                hashIdx.destroy(writeBatch);
+            }
+
+            if (sortedIdx != null) {
+                sortedIdx.destroy(writeBatch);
+            }
+
+            rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch);
+        }
+
+        if (sortedIdx != null) {
+            rocksDb.flusher.awaitFlush(false)
+                    .thenRunAsync(sortedIdx::destroySortedIndexCfIfNeeded, 
rocksDb.engine.threadPool());
+        }
+    }
+
+    void destroyAllIndexesForPartition(int partitionId, WriteBatch writeBatch) 
throws RocksDBException {
+        for (HashIndex hashIndex : hashIndices.values()) {
+            hashIndex.destroy(partitionId, writeBatch);
+        }
+
+        for (SortedIndex sortedIndex : sortedIndices.values()) {
+            sortedIndex.destroy(partitionId, writeBatch);
+        }
+    }
+
+    void destroyAllIndexes(WriteBatch writeBatch) throws RocksDBException {
+        for (HashIndex hashIndex : hashIndices.values()) {
+            hashIndex.destroy(writeBatch);
+        }
+
+        for (SortedIndex sortedIndex : sortedIndices.values()) {
+            sortedIndex.destroy(writeBatch);
+        }
+    }
+
+    void scheduleAllIndexCfDestroy() {
+        rocksDb.flusher.awaitFlush(false)
+                .thenRunAsync(() -> 
sortedIndices.values().forEach(SortedIndex::destroySortedIndexCfIfNeeded), 
rocksDb.engine.threadPool());
+    }
+
+    Stream<AbstractRocksDbIndexStorage> getAllStorages(int partitionId) {
+        return Stream.concat(
+                hashIndices.values().stream().map(index -> 
index.getStorage(partitionId)),
+                sortedIndices.values().stream().map(index -> 
index.getStorage(partitionId))
+        );
+    }
+}
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java
index edfb1d4e62..6511fd0275 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java
@@ -17,9 +17,9 @@
 
 package org.apache.ignite.internal.storage.rocksdb;
 
-import static java.nio.ByteOrder.BIG_ENDIAN;
 import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.KEY_BYTE_ORDER;
 import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.ROW_ID_SIZE;
+import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.createKey;
 import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.getRowIdUuid;
 import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.putRowIdUuid;
 import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
@@ -59,21 +59,6 @@ public class RocksDbMetaStorage {
         this.metaColumnFamily = metaColumnFamily;
     }
 
-    /**
-     * Creates a byte array, that uses the {@code prefix} as a prefix, and 
every other {@code int} values as a 4-bytes chunk in Big Endian.
-     */
-    public static byte[] createKey(byte[] prefix, int... values) {
-        ByteBuffer buf = ByteBuffer.allocate(prefix.length + Integer.BYTES * 
values.length).order(BIG_ENDIAN);
-
-        buf.put(prefix);
-
-        for (int value : values) {
-            buf.putInt(value);
-        }
-
-        return buf.array();
-    }
-
     /**
      * Returns a column family instance, associated with the meta storage.
      */
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 734ea4b9a0..162046ebbd 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -34,8 +34,8 @@ import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.put
 import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.readTimestampDesc;
 import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.PARTITION_CONF_PREFIX;
 import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.PARTITION_META_PREFIX;
-import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.createKey;
 import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.KEY_BYTE_ORDER;
+import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.createKey;
 import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.normalize;
 import static 
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance.DFLT_WRITE_OPTS;
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageState;
@@ -877,7 +877,6 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
         });
     }
 
-    // TODO: IGNITE-16914 Play with prefix settings and benchmark results.
     @Override
     public PartitionTimestampCursor scan(HybridTimestamp timestamp) throws 
StorageException {
         Objects.requireNonNull(timestamp, "timestamp is null");
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
index db16777194..6e4dd1110b 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
@@ -195,6 +195,10 @@ public class RocksDbStorageEngine implements StorageEngine 
{
             }
         });
 
-        return new RocksDbTableStorage(sharedInstance, tableDescriptor);
+        var storage = new RocksDbTableStorage(sharedInstance, tableDescriptor, 
indexDescriptorSupplier);
+
+        storage.start();
+
+        return storage;
     }
 }
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageUtils.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageUtils.java
index 2476459c96..44030ceab2 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageUtils.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageUtils.java
@@ -68,4 +68,19 @@ public class RocksDbStorageUtils {
     static long normalize(long value) {
         return value ^ (1L << 63);
     }
+
+    /**
+     * Creates a byte array, that uses the {@code prefix} as a prefix, and 
every other {@code int} values as a 4-bytes chunk in Big Endian.
+     */
+    public static byte[] createKey(byte[] prefix, int... values) {
+        ByteBuffer buf = ByteBuffer.allocate(prefix.length + Integer.BYTES * 
values.length).order(KEY_BYTE_ORDER);
+
+        buf.put(prefix);
+
+        for (int value : values) {
+            buf.putInt(value);
+        }
+
+        return buf.array();
+    }
 }
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index 6ae2185420..30c5ac411a 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.storage.rocksdb;
 import static java.util.stream.Collectors.toList;
 import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.PARTITION_CONF_PREFIX;
 import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.PARTITION_META_PREFIX;
-import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.createKey;
+import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.createKey;
 import static 
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance.DFLT_WRITE_OPTS;
 import static 
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance.deleteByPrefix;
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.createMissingMvPartitionErrorMessage;
@@ -30,15 +30,11 @@ import static 
org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Stream;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.StorageRebalanceException;
@@ -48,9 +44,9 @@ import 
org.apache.ignite.internal.storage.index.HashIndexStorage;
 import org.apache.ignite.internal.storage.index.IndexStorage;
 import org.apache.ignite.internal.storage.index.SortedIndexStorage;
 import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor;
+import org.apache.ignite.internal.storage.index.StorageIndexDescriptorSupplier;
 import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor;
-import 
org.apache.ignite.internal.storage.rocksdb.index.RocksDbHashIndexStorage;
-import 
org.apache.ignite.internal.storage.rocksdb.index.RocksDbSortedIndexStorage;
+import 
org.apache.ignite.internal.storage.rocksdb.index.AbstractRocksDbIndexStorage;
 import 
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance;
 import org.apache.ignite.internal.storage.util.MvPartitionStorages;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
@@ -71,11 +67,7 @@ public class RocksDbTableStorage implements MvTableStorage {
     /** Partition storages. */
     private final MvPartitionStorages<RocksDbMvPartitionStorage> 
mvPartitionStorages;
 
-    /** Hash Index storages by Index IDs. */
-    private final ConcurrentMap<Integer, HashIndex> hashIndices = new 
ConcurrentHashMap<>();
-
-    /** Sorted Index storages by Index IDs. */
-    private final ConcurrentMap<Integer, SortedIndex> sortedIndices = new 
ConcurrentHashMap<>();
+    private final RocksDbIndexes indexes;
 
     /** Busy lock to stop synchronously. */
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
@@ -86,6 +78,8 @@ public class RocksDbTableStorage implements MvTableStorage {
     /** Table descriptor. */
     private final StorageTableDescriptor tableDescriptor;
 
+    private final StorageIndexDescriptorSupplier indexDescriptorSupplier;
+
     /**
      * Constructor.
      *
@@ -94,11 +88,22 @@ public class RocksDbTableStorage implements MvTableStorage {
      */
     RocksDbTableStorage(
             SharedRocksDbInstance rocksDb,
-            StorageTableDescriptor tableDescriptor
+            StorageTableDescriptor tableDescriptor,
+            StorageIndexDescriptorSupplier indexDescriptorSupplier
     ) {
         this.rocksDb = rocksDb;
         this.tableDescriptor = tableDescriptor;
         this.mvPartitionStorages = new 
MvPartitionStorages<>(tableDescriptor.getId(), tableDescriptor.getPartitions());
+        this.indexes = new RocksDbIndexes(rocksDb, 
this::getMvPartitionChecked);
+        this.indexDescriptorSupplier = indexDescriptorSupplier;
+    }
+
+    void start() {
+        try {
+            indexes.recoverIndexes(indexDescriptorSupplier);
+        } catch (RocksDBException e) {
+            throw new StorageException("Unable to recover indexes", e);
+        }
     }
 
     /**
@@ -140,7 +145,7 @@ public class RocksDbTableStorage implements MvTableStorage {
      * Returns a future to wait next flush operation from the current point in 
time. Uses {@link RocksDB#getLatestSequenceNumber()} to
      * achieve this.
      *
-     * @param schedule {@code true} if {@link RocksDB#flush(FlushOptions)} 
should be explicitly triggerred in the near future.
+     * @param schedule {@code true} if {@link RocksDB#flush(FlushOptions)} 
should be explicitly triggered in the near future.
      */
     public CompletableFuture<Void> awaitFlush(boolean schedule) {
         return inBusyLock(busyLock, () -> 
rocksDb.flusher.awaitFlush(schedule));
@@ -157,13 +162,15 @@ public class RocksDbTableStorage implements 
MvTableStorage {
                 .thenAccept(partitionStorages -> {
                     var resources = new ArrayList<AutoCloseable>();
 
-                    Stream.concat(hashIndices.values().stream(), 
sortedIndices.values().stream()).forEach(index -> resources.add(
-                            destroy ? index::transitionToDestroyedState : 
index::close
-                    ));
+                    if (destroy) {
+                        resources.addAll(indexes.getResourcesForDestroy());
 
-                    partitionStorages.forEach(mvPartitionStorage -> 
resources.add(
-                            destroy ? 
mvPartitionStorage::transitionToDestroyedState : mvPartitionStorage::close
-                    ));
+                        partitionStorages.forEach(mvPartitionStorage -> 
resources.add(mvPartitionStorage::transitionToDestroyedState));
+                    } else {
+                        resources.addAll(indexes.getResourcesForClose());
+
+                        partitionStorages.forEach(mvPartitionStorage -> 
resources.add(mvPartitionStorage::close));
+                    }
 
                     try {
                         IgniteUtils.closeAll(resources);
@@ -201,18 +208,14 @@ public class RocksDbTableStorage implements 
MvTableStorage {
             deleteByPrefix(writeBatch, rocksDb.partitionCf, tablePrefix);
             deleteByPrefix(writeBatch, rocksDb.gcQueueCf, tablePrefix);
 
-            for (HashIndex hashIndex : hashIndices.values()) {
-                hashIndex.destroy(writeBatch);
-            }
-
-            for (SortedIndex sortedIndex : sortedIndices.values()) {
-                sortedIndex.destroy(writeBatch);
-            }
+            indexes.destroyAllIndexes(writeBatch);
 
             deleteByPrefix(writeBatch, rocksDb.meta.columnFamily(), 
createKey(PARTITION_META_PREFIX, tableId));
             deleteByPrefix(writeBatch, rocksDb.meta.columnFamily(), 
createKey(PARTITION_CONF_PREFIX, tableId));
 
             rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch);
+
+            indexes.scheduleAllIndexCfDestroy();
         } catch (RocksDBException e) {
             throw new StorageException("Failed to destroy table data. 
[tableId={}]", e, getTableId());
         }
@@ -241,6 +244,16 @@ public class RocksDbTableStorage implements MvTableStorage 
{
         return inBusyLock(busyLock, () -> 
mvPartitionStorages.get(partitionId));
     }
 
+    private RocksDbMvPartitionStorage getMvPartitionChecked(int partitionId) {
+        RocksDbMvPartitionStorage partitionStorage = 
mvPartitionStorages.get(partitionId);
+
+        if (partitionStorage == null) {
+            throw new 
StorageException(createMissingMvPartitionErrorMessage(partitionId));
+        }
+
+        return partitionStorage;
+    }
+
     @Override
     public CompletableFuture<Void> destroyPartition(int partitionId) {
         return inBusyLock(busyLock, () -> 
mvPartitionStorages.destroy(partitionId, mvPartitionStorage -> {
@@ -251,13 +264,7 @@ public class RocksDbTableStorage implements MvTableStorage 
{
                 // RocksDB itself will then destroy the data on flush.
                 mvPartitionStorage.destroyData(writeBatch);
 
-                for (HashIndex hashIndex : hashIndices.values()) {
-                    hashIndex.destroy(partitionId, writeBatch);
-                }
-
-                for (SortedIndex sortedIndex : sortedIndices.values()) {
-                    sortedIndex.destroy(partitionId, writeBatch);
-                }
+                indexes.destroyAllIndexesForPartition(partitionId, writeBatch);
 
                 rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch);
 
@@ -270,65 +277,19 @@ public class RocksDbTableStorage implements 
MvTableStorage {
 
     @Override
     public SortedIndexStorage getOrCreateSortedIndex(int partitionId, 
StorageSortedIndexDescriptor indexDescriptor) {
-        return inBusyLock(busyLock, () -> {
-            SortedIndex storages = sortedIndices.computeIfAbsent(
-                    indexDescriptor.id(),
-                    id -> createSortedIndex(indexDescriptor)
-            );
-
-            RocksDbMvPartitionStorage partitionStorage = 
mvPartitionStorages.get(partitionId);
-
-            if (partitionStorage == null) {
-                throw new 
StorageException(createMissingMvPartitionErrorMessage(partitionId));
-            }
-
-            return storages.getOrCreateStorage(partitionStorage);
-        });
-    }
-
-    private SortedIndex createSortedIndex(StorageSortedIndexDescriptor 
indexDescriptor) {
-        return new SortedIndex(rocksDb, indexDescriptor, rocksDb.meta);
+        return inBusyLock(busyLock, () -> 
indexes.getOrCreateSortedIndex(partitionId, indexDescriptor));
     }
 
     @Override
     public HashIndexStorage getOrCreateHashIndex(int partitionId, 
StorageHashIndexDescriptor indexDescriptor) {
-        return inBusyLock(busyLock, () -> {
-            HashIndex storages = hashIndices.computeIfAbsent(
-                    indexDescriptor.id(),
-                    id -> new HashIndex(rocksDb.hashIndexCf, indexDescriptor, 
rocksDb.meta)
-            );
-
-            RocksDbMvPartitionStorage partitionStorage = 
mvPartitionStorages.get(partitionId);
-
-            if (partitionStorage == null) {
-                throw new 
StorageException(createMissingMvPartitionErrorMessage(partitionId));
-            }
-
-            return storages.getOrCreateStorage(partitionStorage);
-        });
+        return inBusyLock(busyLock, () -> 
indexes.getOrCreateHashIndex(partitionId, indexDescriptor));
     }
 
     @Override
     public CompletableFuture<Void> destroyIndex(int indexId) {
         return inBusyLock(busyLock, () -> {
-            HashIndex hashIdx = hashIndices.remove(indexId);
-
-            SortedIndex sortedIdx = sortedIndices.remove(indexId);
-
-            if (hashIdx == null && sortedIdx == null) {
-                return nullCompletedFuture();
-            }
-
-            try (WriteBatch writeBatch = new WriteBatch()) {
-                if (hashIdx != null) {
-                    hashIdx.destroy(writeBatch);
-                }
-
-                if (sortedIdx != null) {
-                    sortedIdx.destroy(writeBatch);
-                }
-
-                rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch);
+            try {
+                indexes.destroyIndex(indexId);
 
                 return nullCompletedFuture();
             } catch (RocksDBException e) {
@@ -348,8 +309,7 @@ public class RocksDbTableStorage implements MvTableStorage {
             try (WriteBatch writeBatch = new WriteBatch()) {
                 mvPartitionStorage.startRebalance(writeBatch);
 
-                getHashIndexStorages(partitionId).forEach(index -> 
index.startRebalance(writeBatch));
-                getSortedIndexStorages(partitionId).forEach(index -> 
index.startRebalance(writeBatch));
+                indexes.startRebalance(partitionId, writeBatch);
 
                 rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch);
 
@@ -370,8 +330,7 @@ public class RocksDbTableStorage implements MvTableStorage {
             try (WriteBatch writeBatch = new WriteBatch()) {
                 mvPartitionStorage.abortRebalance(writeBatch);
 
-                getHashIndexStorages(partitionId).forEach(index -> 
index.abortRebalance(writeBatch));
-                getSortedIndexStorages(partitionId).forEach(index -> 
index.abortRebalance(writeBatch));
+                indexes.abortRebalance(partitionId, writeBatch);
 
                 rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch);
 
@@ -396,8 +355,7 @@ public class RocksDbTableStorage implements MvTableStorage {
             try (WriteBatch writeBatch = new WriteBatch()) {
                 mvPartitionStorage.finishRebalance(writeBatch, 
lastAppliedIndex, lastAppliedTerm, groupConfig);
 
-                
getHashIndexStorages(partitionId).forEach(RocksDbHashIndexStorage::finishRebalance);
-                
getSortedIndexStorages(partitionId).forEach(RocksDbSortedIndexStorage::finishRebalance);
+                indexes.finishRebalance(partitionId);
 
                 rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch);
 
@@ -414,18 +372,13 @@ public class RocksDbTableStorage implements 
MvTableStorage {
     @Override
     public CompletableFuture<Void> clearPartition(int partitionId) {
         return inBusyLock(busyLock, () -> 
mvPartitionStorages.clear(partitionId, mvPartitionStorage -> {
-            List<RocksDbHashIndexStorage> hashIndexStorages = 
getHashIndexStorages(partitionId);
-            List<RocksDbSortedIndexStorage> sortedIndexStorages = 
getSortedIndexStorages(partitionId);
+            List<AbstractRocksDbIndexStorage> indexStorages = 
indexes.getAllStorages(partitionId).collect(toList());
 
             try (WriteBatch writeBatch = new WriteBatch()) {
                 mvPartitionStorage.startCleanup(writeBatch);
 
-                for (RocksDbHashIndexStorage hashIndexStorage : 
hashIndexStorages) {
-                    hashIndexStorage.startCleanup(writeBatch);
-                }
-
-                for (RocksDbSortedIndexStorage sortedIndexStorage : 
sortedIndexStorages) {
-                    sortedIndexStorage.startCleanup(writeBatch);
+                for (AbstractRocksDbIndexStorage storage : indexStorages) {
+                    storage.startCleanup(writeBatch);
                 }
 
                 rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch);
@@ -436,8 +389,7 @@ public class RocksDbTableStorage implements MvTableStorage {
             } finally {
                 mvPartitionStorage.finishCleanup();
 
-                
hashIndexStorages.forEach(RocksDbHashIndexStorage::finishCleanup);
-                
sortedIndexStorages.forEach(RocksDbSortedIndexStorage::finishCleanup);
+                
indexStorages.forEach(AbstractRocksDbIndexStorage::finishCleanup);
             }
         }));
     }
@@ -449,34 +401,13 @@ public class RocksDbTableStorage implements 
MvTableStorage {
         return tableDescriptor.getId();
     }
 
-    private List<RocksDbHashIndexStorage> getHashIndexStorages(int 
partitionId) {
-        return hashIndices.values().stream().map(indexes -> 
indexes.get(partitionId)).filter(Objects::nonNull).collect(toList());
-    }
-
-    private List<RocksDbSortedIndexStorage> getSortedIndexStorages(int 
partitionId) {
-        return sortedIndices.values().stream().map(indexes -> 
indexes.get(partitionId)).filter(Objects::nonNull).collect(toList());
-    }
-
     @Override
     public @Nullable IndexStorage getIndex(int partitionId, int indexId) {
         return inBusyLock(busyLock, () -> {
-            if (mvPartitionStorages.get(partitionId) == null) {
-                throw new 
StorageException(createMissingMvPartitionErrorMessage(partitionId));
-            }
-
-            HashIndex hashIndex = hashIndices.get(indexId);
-
-            if (hashIndex != null) {
-                return hashIndex.get(partitionId);
-            }
-
-            SortedIndex sortedIndex = sortedIndices.get(indexId);
-
-            if (sortedIndex != null) {
-                return sortedIndex.get(partitionId);
-            }
+            // Check for partition existence.
+            getMvPartitionChecked(partitionId);
 
-            return (IndexStorage) null;
+            return indexes.getIndex(partitionId, indexId);
         });
     }
 
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
index fd16d093a6..a0cf97bf1a 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
@@ -18,6 +18,9 @@
 package org.apache.ignite.internal.storage.rocksdb;
 
 import static 
org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.sortedIndexCfName;
+import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.createKey;
+import static 
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance.deleteByPrefix;
+import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
 
 import org.apache.ignite.internal.rocksdb.ColumnFamily;
 import org.apache.ignite.internal.storage.index.SortedIndexStorage;
@@ -25,6 +28,7 @@ import 
org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor;
 import 
org.apache.ignite.internal.storage.rocksdb.index.RocksDbSortedIndexStorage;
 import 
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance;
 import org.jetbrains.annotations.Nullable;
+import org.rocksdb.RocksDBException;
 import org.rocksdb.WriteBatch;
 
 /**
@@ -39,8 +43,9 @@ class SortedIndex extends Index<RocksDbSortedIndexStorage> {
 
     private final RocksDbMetaStorage indexMetaStorage;
 
-    SortedIndex(
+    private SortedIndex(
             SharedRocksDbInstance rocksDb,
+            ColumnFamily indexCf,
             StorageSortedIndexDescriptor descriptor,
             RocksDbMetaStorage indexMetaStorage
     ) {
@@ -48,13 +53,38 @@ class SortedIndex extends Index<RocksDbSortedIndexStorage> {
 
         this.rocksDb = rocksDb;
         this.descriptor = descriptor;
-        this.indexCf = rocksDb.getSortedIndexCfOnIndexCreate(
-                sortedIndexCfName(descriptor.columns()),
-                descriptor.id()
-        );
+        this.indexCf = indexCf;
         this.indexMetaStorage = indexMetaStorage;
     }
 
+    static SortedIndex createNew(
+            SharedRocksDbInstance rocksDb,
+            StorageSortedIndexDescriptor descriptor,
+            RocksDbMetaStorage indexMetaStorage
+    ) {
+        ColumnFamily indexCf = 
rocksDb.getOrCreateSortedIndexCf(sortedIndexCfName(descriptor.columns()), 
descriptor.id());
+
+        return new SortedIndex(rocksDb, indexCf, descriptor, indexMetaStorage);
+    }
+
+    static SortedIndex restoreExisting(
+            SharedRocksDbInstance rocksDb,
+            ColumnFamily indexCf,
+            StorageSortedIndexDescriptor descriptor,
+            RocksDbMetaStorage indexMetaStorage
+    ) {
+        return new SortedIndex(rocksDb, indexCf, descriptor, indexMetaStorage);
+    }
+
+    /**
+     * Returns sorted index storage for partition.
+     *
+     * @param partitionId Partition ID.
+     */
+    @Nullable RocksDbSortedIndexStorage getStorage(int partitionId) {
+        return storageByPartitionId.get(partitionId);
+    }
+
     /**
      * Creates a new Sorted Index storage or returns an existing one.
      */
@@ -68,18 +98,17 @@ class SortedIndex extends Index<RocksDbSortedIndexStorage> {
     /**
      * Removes all data associated with the index.
      */
-    void destroy(WriteBatch writeBatch) {
+    void destroy(WriteBatch writeBatch) throws RocksDBException {
         transitionToDestroyedState();
 
-        rocksDb.dropCfOnIndexDestroy(indexCf.nameBytes(), descriptor.id());
+        deleteByPrefix(writeBatch, indexCf, createKey(BYTE_EMPTY_ARRAY, 
descriptor.id()));
     }
 
     /**
-     * Returns sorted index storage for partition.
-     *
-     * @param partitionId Partition ID.
+     * Signals the shared RocksDB instance that this index has been destroyed 
and all shared resources (like the Column Family) can
+     * be de-allocated.
      */
-    @Nullable RocksDbSortedIndexStorage get(int partitionId) {
-        return storageByPartitionId.get(partitionId);
+    void destroySortedIndexCfIfNeeded() {
+        rocksDb.destroySortedIndexCfIfNeeded(indexCf.nameBytes(), 
descriptor.id());
     }
 }
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbBinaryTupleComparator.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbBinaryTupleComparator.java
index fa70fe3672..575a6d12c9 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbBinaryTupleComparator.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbBinaryTupleComparator.java
@@ -57,36 +57,40 @@ public class RocksDbBinaryTupleComparator extends 
AbstractComparator {
 
     @Override
     public int compare(ByteBuffer a, ByteBuffer b) {
-        int compareTableId = Integer.compare(a.getInt(), b.getInt());
+        int compareIndexId = Integer.compare(a.getInt(), b.getInt());
 
-        if (compareTableId != 0) {
-            return compareTableId;
+        if (compareIndexId != 0) {
+            return compareIndexId;
+        }
+
+        // Handle index ID-only buffers, probably coming from the range 
tombstones.
+        if (!bothHasRemaining(a, b)) {
+            return Integer.compare(a.remaining(), b.remaining());
         }
 
-        // Compare table ids.
         int comparePartitionId = Short.compareUnsigned(a.getShort(), 
b.getShort());
 
         if (comparePartitionId != 0) {
             return comparePartitionId;
         }
 
-        ByteBuffer firstBinaryTupleBuffer = 
a.slice().order(ByteOrder.LITTLE_ENDIAN);
-        ByteBuffer secondBinaryTupleBuffer = 
b.slice().order(ByteOrder.LITTLE_ENDIAN);
-
         // Handle partition bounds.
-        if (!firstBinaryTupleBuffer.hasRemaining()) {
-            return -1;
+        if (!bothHasRemaining(a, b)) {
+            return Integer.compare(a.remaining(), b.remaining());
         }
 
-        if (!secondBinaryTupleBuffer.hasRemaining()) {
-            return 1;
-        }
+        ByteBuffer firstBinaryTupleBuffer = 
a.slice().order(ByteOrder.LITTLE_ENDIAN);
+        ByteBuffer secondBinaryTupleBuffer = 
b.slice().order(ByteOrder.LITTLE_ENDIAN);
 
         int compareTuples = comparator.compare(firstBinaryTupleBuffer, 
secondBinaryTupleBuffer);
 
         return compareTuples == 0 ? compareRowIds(a, b) : compareTuples;
     }
 
+    private static boolean bothHasRemaining(ByteBuffer a, ByteBuffer b) {
+        return a.hasRemaining() && b.hasRemaining();
+    }
+
     private static int compareRowIds(ByteBuffer a, ByteBuffer b) {
         long firstMostSignBits = a.getLong(a.limit() - Long.BYTES * 2);
         long secondMostSignBits = b.getLong(b.limit() - Long.BYTES * 2);
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
index c71799b3fc..bd23e53012 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
@@ -53,6 +53,7 @@ import org.rocksdb.WriteBatchWithIndex;
  *
  * <p>This storage uses the following format for keys:
  * <pre>
+ * Index ID - 4 bytes
  * Partition ID - 2 bytes
  * Tuple value - variable length
  * Row ID (UUID) - 16 bytes
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/IndexIdCursor.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/IndexIdCursor.java
new file mode 100644
index 0000000000..0417cc7585
--- /dev/null
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/IndexIdCursor.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.instance;
+
+import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.KEY_BYTE_ORDER;
+
+import java.nio.ByteBuffer;
+import java.util.NoSuchElementException;
+import org.apache.ignite.internal.rocksdb.RocksUtils;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.RocksIterator;
+
+/**
+ * Cursor that iterates over distinct index IDs in a Sorted Index column 
family.
+ *
+ * <p>Sorted index column family can contain multiple indexes that have the 
same order and type of indexed columns. Index ID is stored
+ * as the first 4 bytes of a RocksDB key.
+ */
+class IndexIdCursor implements Cursor<Integer> {
+    private final RocksIterator it;
+
+    private @Nullable ByteBuffer curIndexId = null;
+
+    private boolean hasNext = true;
+
+    IndexIdCursor(RocksIterator it) {
+        this.it = it;
+    }
+
+    @Override
+    public boolean hasNext() {
+        if (!hasNext) {
+            return false;
+        }
+
+        if (curIndexId == null) {
+            curIndexId = 
ByteBuffer.allocate(Integer.BYTES).order(KEY_BYTE_ORDER);
+
+            it.seekToFirst();
+        } else if (setNextIndexId()) {
+            it.seek(curIndexId);
+
+            curIndexId.rewind();
+        } else {
+            hasNext = false;
+
+            return false;
+        }
+
+        hasNext = it.isValid();
+
+        if (!hasNext) {
+            RocksUtils.checkIterator(it);
+        }
+
+        return hasNext;
+    }
+
+    @Override
+    public Integer next() {
+        if (!hasNext) {
+            throw new NoSuchElementException();
+        }
+
+        assert curIndexId != null;
+
+        it.key(curIndexId);
+
+        curIndexId.rewind();
+
+        return curIndexId.getInt(0);
+    }
+
+    @Override
+    public void close() {
+        it.close();
+    }
+
+    /**
+     * Sets the next hypothetical index ID by incrementing the current index 
ID by one.
+     *
+     * @return {@code false} if the current index ID already has the maximum 
possible value and the ID range has been exhausted.
+     *         {@code true} otherwise.
+     */
+    private boolean setNextIndexId() {
+        assert curIndexId != null;
+
+        for (int i = curIndexId.remaining() - 1; i >= 0; i--) {
+            byte b = (byte) (curIndexId.get(i) + 1);
+
+            curIndexId.put(i, b);
+
+            if (b != 0) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+}
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java
index 69bb41b49d..6fe23169ab 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java
@@ -24,9 +24,12 @@ import static 
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbI
 
 import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -38,6 +41,7 @@ import 
org.apache.ignite.internal.storage.StorageClosedException;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage;
 import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.rocksdb.ColumnFamilyDescriptor;
@@ -54,6 +58,35 @@ public final class SharedRocksDbInstance {
     /** Write options. */
     public static final WriteOptions DFLT_WRITE_OPTS = new 
WriteOptions().setDisableWAL(true);
 
+    /**
+     * Class that represents a Column Family for sorted indexes and all index 
IDs that map to this Column Family.
+     *
+     * <p>Sorted indexes that have the same order and type of indexed columns 
get mapped to the same Column Family in order to share
+     * the same comparator.
+     */
+    private static class SortedIndexColumnFamily implements AutoCloseable {
+        final ColumnFamily columnFamily;
+
+        final Set<Integer> indexIds;
+
+        SortedIndexColumnFamily(ColumnFamily columnFamily, int indexId) {
+            this.columnFamily = columnFamily;
+            this.indexIds = new HashSet<>();
+
+            indexIds.add(indexId);
+        }
+
+        SortedIndexColumnFamily(ColumnFamily columnFamily, Set<Integer> 
indexIds) {
+            this.columnFamily = columnFamily;
+            this.indexIds = indexIds;
+        }
+
+        @Override
+        public void close() {
+            columnFamily.handle().close();
+        }
+    }
+
     /** RocksDB storage engine instance. */
     public final RocksDbStorageEngine engine;
 
@@ -76,13 +109,10 @@ public final class SharedRocksDbInstance {
     public final ColumnFamily gcQueueCf;
 
     /** Column Family for Hash Index data. */
-    public final ColumnFamily hashIndexCf;
+    private final ColumnFamily hashIndexCf;
 
     /** Column Family instances for different types of sorted indexes, 
identified by the column family name. */
-    private final ConcurrentMap<ByteArray, ColumnFamily> sortedIndexCfs;
-
-    /** Column family names mapped to sets of index IDs, that use that CF. */
-    private final ConcurrentMap<ByteArray, Set<Integer>> 
sortedIndexIdsByCfName = new ConcurrentHashMap<>();
+    private final ConcurrentMap<ByteArray, SortedIndexColumnFamily> 
sortedIndexCfsByName = new ConcurrentHashMap<>();
 
     /** Busy lock to stop synchronously. */
     private final IgniteSpinBusyLock busyLock;
@@ -100,7 +130,7 @@ public final class SharedRocksDbInstance {
             ColumnFamily partitionCf,
             ColumnFamily gcQueueCf,
             ColumnFamily hashIndexCf,
-            ConcurrentMap<ByteArray, ColumnFamily> sortedIndexCfs
+            List<ColumnFamily> sortedIndexCfs
     ) {
         this.engine = engine;
         this.path = path;
@@ -113,7 +143,29 @@ public final class SharedRocksDbInstance {
         this.partitionCf = partitionCf;
         this.gcQueueCf = gcQueueCf;
         this.hashIndexCf = hashIndexCf;
-        this.sortedIndexCfs = sortedIndexCfs;
+
+        recoverExistingSortedIndexes(sortedIndexCfs);
+    }
+
+    private void recoverExistingSortedIndexes(List<ColumnFamily> 
sortedIndexCfs) {
+        for (ColumnFamily sortedIndexCf : sortedIndexCfs) {
+            var indexIds = new HashSet<Integer>();
+
+            try (Cursor<Integer> sortedIndexIdCursor = 
indexIdsCursor(sortedIndexCf)) {
+                for (Integer indexId : sortedIndexIdCursor) {
+                    indexIds.add(indexId);
+                }
+            }
+
+            if (indexIds.isEmpty()) {
+                destroyColumnFamily(sortedIndexCf);
+            } else {
+                this.sortedIndexCfsByName.put(
+                        new ByteArray(sortedIndexCf.nameBytes()),
+                        new SortedIndexColumnFamily(sortedIndexCf, indexIds)
+                );
+            }
+        }
     }
 
     /**
@@ -141,10 +193,7 @@ public final class SharedRocksDbInstance {
         resources.add(partitionCf.handle());
         resources.add(gcQueueCf.handle());
         resources.add(hashIndexCf.handle());
-        resources.addAll(sortedIndexCfs.values().stream()
-                .map(ColumnFamily::handle)
-                .collect(toList())
-        );
+        resources.addAll(sortedIndexCfsByName.values());
 
         resources.add(db);
         resources.add(flusher::stop);
@@ -159,32 +208,57 @@ public final class SharedRocksDbInstance {
     }
 
     /**
-     * Returns Column Family instance with the desired name. Creates it it it 
doesn't exist.
-     * Tracks every created index by its {@code indexId}.
+     * Returns the Column Family containing all hash indexes.
      */
-    public ColumnFamily getSortedIndexCfOnIndexCreate(byte[] cfName, int 
indexId) {
-        if (!busyLock.enterBusy()) {
-            throw new StorageClosedException();
+    public ColumnFamily hashIndexCf() {
+        return hashIndexCf;
+    }
+
+    /**
+     * Returns a collection of all hash index IDs that currently exist in the 
storage.
+     */
+    public Collection<Integer> hashIndexIds() {
+        try (Cursor<Integer> hashIndexIdCursor = indexIdsCursor(hashIndexCf)) {
+            return hashIndexIdCursor.stream().collect(toList());
         }
+    }
 
-        try {
-            ColumnFamily[] result = {null};
+    /**
+     * Returns an "index ID - Column Family" mapping for all sorted indexes 
that currently exist in the storage.
+     */
+    public Map<Integer, ColumnFamily> sortedIndexes() {
+        var result = new HashMap<Integer, ColumnFamily>();
 
-            sortedIndexIdsByCfName.compute(new ByteArray(cfName), (name, 
indexIds) -> {
-                ColumnFamily columnFamily = getOrCreateColumnFamily(cfName, 
name);
+        for (SortedIndexColumnFamily indexCf : sortedIndexCfsByName.values()) {
+            for (Integer indexId : indexCf.indexIds) {
+                result.put(indexId, indexCf.columnFamily);
+            }
+        }
 
-                result[0] = columnFamily;
+        return result;
+    }
 
-                if (indexIds == null) {
-                    indexIds = new HashSet<>();
-                }
+    /**
+     * Returns Column Family instance with the desired name. Creates it if it 
doesn't exist. Tracks every created index by its
+     * {@code indexId}.
+     */
+    public ColumnFamily getOrCreateSortedIndexCf(byte[] cfName, int indexId) {
+        if (!busyLock.enterBusy()) {
+            throw new StorageClosedException();
+        }
 
-                indexIds.add(indexId);
+        try {
+            SortedIndexColumnFamily result = sortedIndexCfsByName.compute(new 
ByteArray(cfName), (unused, sortedIndexCf) -> {
+                if (sortedIndexCf == null) {
+                    return new 
SortedIndexColumnFamily(createColumnFamily(cfName), indexId);
+                } else {
+                    sortedIndexCf.indexIds.add(indexId);
 
-                return indexIds;
+                    return sortedIndexCf;
+                }
             });
 
-            return result[0];
+            return result.columnFamily;
         } finally {
             busyLock.leaveBusy();
         }
@@ -193,67 +267,59 @@ public final class SharedRocksDbInstance {
     /**
      * Possibly drops the column family after destroying the index.
      */
-    public void dropCfOnIndexDestroy(byte[] cfName, int indexId) {
+    public void destroySortedIndexCfIfNeeded(byte[] cfName, int indexId) {
         if (!busyLock.enterBusy()) {
             throw new StorageClosedException();
         }
 
         try {
-            sortedIndexIdsByCfName.compute(new ByteArray(cfName), (name, 
indexIds) -> {
-                if (indexIds == null) {
-                    return null;
-                }
+            sortedIndexCfsByName.computeIfPresent(new ByteArray(cfName), 
(unused, indexCf) -> {
+                indexCf.indexIds.remove(indexId);
 
-                indexIds.remove(indexId);
-
-                if (indexIds.isEmpty()) {
-                    indexIds = null;
-
-                    destroyColumnFamily(name);
+                if (!indexCf.indexIds.isEmpty()) {
+                    return indexCf;
                 }
 
-                return indexIds;
+                destroyColumnFamily(indexCf.columnFamily);
+
+                return null;
             });
         } finally {
             busyLock.leaveBusy();
         }
     }
 
-    private ColumnFamily getOrCreateColumnFamily(byte[] cfName, ByteArray 
name) {
-        return sortedIndexCfs.computeIfAbsent(name, unused -> {
-            ColumnFamilyDescriptor cfDescriptor = new 
ColumnFamilyDescriptor(cfName, sortedIndexCfOptions(cfName));
-
-            ColumnFamily columnFamily;
-            try {
-                columnFamily = ColumnFamily.create(db, cfDescriptor);
-            } catch (RocksDBException e) {
-                throw new StorageException("Failed to create new RocksDB 
column family: " + toStringName(cfDescriptor.getName()), e);
-            }
+    private static Cursor<Integer> indexIdsCursor(ColumnFamily cf) {
+        return new IndexIdCursor(cf.newIterator());
+    }
 
-            flusher.addColumnFamily(columnFamily.handle());
+    private ColumnFamily createColumnFamily(byte[] cfName) {
+        ColumnFamilyDescriptor cfDescriptor = new 
ColumnFamilyDescriptor(cfName, sortedIndexCfOptions(cfName));
 
-            return columnFamily;
-        });
-    }
+        ColumnFamily columnFamily;
+        try {
+            columnFamily = ColumnFamily.create(db, cfDescriptor);
+        } catch (RocksDBException e) {
+            throw new StorageException("Failed to create new RocksDB column 
family: " + toStringName(cfDescriptor.getName()), e);
+        }
 
-    private void destroyColumnFamily(ByteArray cfName) {
-        sortedIndexCfs.computeIfPresent(cfName, (unused, columnFamily) -> {
-            ColumnFamilyHandle columnFamilyHandle = columnFamily.handle();
+        flusher.addColumnFamily(columnFamily.handle());
 
-            flusher.removeColumnFamily(columnFamilyHandle);
+        return columnFamily;
+    }
 
-            try {
-                db.dropColumnFamily(columnFamilyHandle);
+    private void destroyColumnFamily(ColumnFamily columnFamily) {
+        ColumnFamilyHandle columnFamilyHandle = columnFamily.handle();
 
-                db.destroyColumnFamilyHandle(columnFamilyHandle);
-            } catch (RocksDBException e) {
-                throw new StorageException(
-                        "Failed to destroy RocksDB Column Family. [cfName={}, 
path={}]",
-                        e, toStringName(cfName.bytes()), path
-                );
-            }
+        flusher.removeColumnFamily(columnFamilyHandle);
 
-            return null;
-        });
+        try {
+            columnFamily.destroy();
+        } catch (RocksDBException e) {
+            throw new StorageException(
+                    "Failed to destroy RocksDB Column Family. [cfName={}, 
path={}]",
+                    e, columnFamily.name(), path
+            );
+        }
     }
 }
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceCreator.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceCreator.java
index ddd11d7dac..c97fb21033 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceCreator.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceCreator.java
@@ -27,9 +27,6 @@ import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.rocksdb.ColumnFamily;
 import org.apache.ignite.internal.rocksdb.flush.RocksDbFlusher;
 import org.apache.ignite.internal.storage.StorageException;
@@ -100,7 +97,7 @@ public class SharedRocksDbInstanceCreator {
             ColumnFamily partitionCf = null;
             ColumnFamily gcQueueCf = null;
             ColumnFamily hashIndexCf = null;
-            ConcurrentMap<ByteArray, ColumnFamily> sortedIndexCfs = new 
ConcurrentHashMap<>();
+            var sortedIndexCfs = new ArrayList<ColumnFamily>();
 
             // Read all existing Column Families from the db and parse them 
according to type: meta, partition data or index.
             for (ColumnFamilyHandle cfHandle : cfHandles) {
@@ -128,7 +125,7 @@ public class SharedRocksDbInstanceCreator {
                         break;
 
                     case SORTED_INDEX:
-                        sortedIndexCfs.put(new 
ByteArray(cf.handle().getName()), cf);
+                        sortedIndexCfs.add(cf);
 
                         break;
 
@@ -211,6 +208,7 @@ public class SharedRocksDbInstanceCreator {
         }
     }
 
+    @SuppressWarnings("resource")
     private static ColumnFamilyOptions defaultCfOptions() {
         return new ColumnFamilyOptions()
                 .setMemtablePrefixBloomSizeRatio(0.125)
diff --git 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
index 07d62ed440..994cba7fe0 100644
--- 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
+++ 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
@@ -41,7 +41,6 @@ import 
org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
@@ -159,11 +158,4 @@ public class RocksDbMvTableStorageTest extends 
AbstractMvTableStorageTest {
     void storageAdvertisesItIsPersistent() {
         assertThat(tableStorage.isVolatile(), is(false));
     }
-
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-21680";)
-    @Test
-    @Override
-    public void testIndexDestructionOnRecovery() throws Exception {
-        super.testIndexDestructionOnRecovery();
-    }
 }
diff --git 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/IndexIdCursorTest.java
 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/IndexIdCursorTest.java
new file mode 100644
index 0000000000..315acfc112
--- /dev/null
+++ 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/IndexIdCursorTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.instance;
+
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.createKey;
+import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
+
+import java.util.List;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+/** Class that contains tests for {@link IndexIdCursor}. */
+class IndexIdCursorTest extends IgniteAbstractTest {
+    private RocksDB rocksDb;
+
+    @BeforeEach
+    void setUp() throws RocksDBException {
+        rocksDb = RocksDB.open(workDir.toString());
+    }
+
+    @AfterEach
+    void tearDown() {
+        rocksDb.close();
+    }
+
+    @Test
+    void testEmptyCursor() {
+        assertThat(getAll(), is(empty()));
+    }
+
+    @Test
+    void testSingleElement() throws RocksDBException {
+        insertData(0, 0);
+
+        assertThat(getAll(), contains(0));
+    }
+
+    @Test
+    void testBorders() throws RocksDBException {
+        insertData(Integer.MIN_VALUE, 0);
+        insertData(Integer.MAX_VALUE, 0);
+        insertData(-1, 0);
+        insertData(0, 0);
+
+        // RocksDB uses unsigned comparison.
+        assertThat(getAll(), contains(0, Integer.MAX_VALUE, Integer.MIN_VALUE, 
-1));
+    }
+
+    @Test
+    void testRemovesDuplicatesAndSorts() throws RocksDBException {
+        for (int i = 5; i >= 0; i--) {
+            for (int j = 0; j < 3; j++) {
+                insertData(i, j);
+            }
+        }
+
+        assertThat(getAll(), contains(0, 1, 2, 3, 4, 5));
+    }
+
+    private List<Integer> getAll() {
+        try (var cursor = new IndexIdCursor(rocksDb.newIterator())) {
+            return cursor.stream().collect(toList());
+        }
+    }
+
+    private void insertData(int indexId, int extra) throws RocksDBException {
+        rocksDb.put(createKey(BYTE_EMPTY_ARRAY, indexId, extra), 
BYTE_EMPTY_ARRAY);
+    }
+}
diff --git 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceTest.java
 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceTest.java
new file mode 100644
index 0000000000..7ef2efc91d
--- /dev/null
+++ 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.instance;
+
+import static 
org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.sortedIndexCfName;
+import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasKey;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.sameInstance;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.Map;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import 
org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor.StorageSortedIndexColumnDescriptor;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbDataRegion;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import 
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.type.NativeTypes;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDBException;
+
+/** Contains tests for {@link SharedRocksDbInstance}. */
+@ExtendWith(ConfigurationExtension.class)
+class SharedRocksDbInstanceTest extends IgniteAbstractTest {
+    private RocksDbStorageEngine engine;
+
+    private RocksDbDataRegion dataRegion;
+
+    private SharedRocksDbInstance rocksDb;
+
+    @BeforeEach
+    void setUp(@InjectConfiguration RocksDbStorageEngineConfiguration 
engineConfig) throws Exception {
+        engine = new RocksDbStorageEngine("test", engineConfig, workDir);
+
+        engine.start();
+
+        dataRegion = new RocksDbDataRegion(engineConfig.defaultRegion());
+
+        dataRegion.start();
+
+        rocksDb = createDb();
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        IgniteUtils.closeAllManually(
+                rocksDb == null ? null : rocksDb::stop,
+                dataRegion == null ? null : dataRegion::stop,
+                engine == null ? null : engine::stop
+        );
+    }
+
+    private SharedRocksDbInstance createDb() throws Exception {
+        return new SharedRocksDbInstanceCreator().create(engine, dataRegion, 
workDir);
+    }
+
+    @Test
+    void testSortedIndexCfCaching() {
+        byte[] fooName = sortedIndexCfName(List.of(
+                new StorageSortedIndexColumnDescriptor("x", NativeTypes.INT64, 
true, true)
+        ));
+
+        byte[] barName = sortedIndexCfName(List.of(
+                new StorageSortedIndexColumnDescriptor("y", NativeTypes.UUID, 
true, true)
+        ));
+
+        byte[] bazName = sortedIndexCfName(List.of(
+                new StorageSortedIndexColumnDescriptor("z", NativeTypes.INT64, 
true, true)
+        ));
+
+        ColumnFamily foo = rocksDb.getOrCreateSortedIndexCf(fooName, 1);
+        ColumnFamily bar = rocksDb.getOrCreateSortedIndexCf(barName, 2);
+        ColumnFamily baz = rocksDb.getOrCreateSortedIndexCf(bazName, 3);
+
+        assertThat(foo, is(sameInstance(baz)));
+        assertThat(foo, is(not(sameInstance(bar))));
+
+        rocksDb.destroySortedIndexCfIfNeeded(fooName, 1);
+
+        assertTrue(cfExists(fooName));
+
+        rocksDb.destroySortedIndexCfIfNeeded(barName, 2);
+
+        assertFalse(cfExists(barName));
+
+        rocksDb.destroySortedIndexCfIfNeeded(bazName, 3);
+
+        assertFalse(cfExists(bazName));
+    }
+
+    @Test
+    void testSortedIndexRecovery() throws Exception {
+        byte[] fooName = sortedIndexCfName(List.of(
+                new StorageSortedIndexColumnDescriptor("x", NativeTypes.INT64, 
true, true)
+        ));
+
+        byte[] barName = sortedIndexCfName(List.of(
+                new StorageSortedIndexColumnDescriptor("y", NativeTypes.UUID, 
true, true)
+        ));
+
+        byte[] bazName = sortedIndexCfName(List.of(
+                new StorageSortedIndexColumnDescriptor("z", NativeTypes.INT64, 
true, true)
+        ));
+
+        ColumnFamily foo = rocksDb.getOrCreateSortedIndexCf(fooName, 1);
+        ColumnFamily bar = rocksDb.getOrCreateSortedIndexCf(barName, 2);
+        ColumnFamily baz = rocksDb.getOrCreateSortedIndexCf(bazName, 3);
+
+        assertThat(rocksDb.sortedIndexes(), is(Map.of(1, foo, 2, bar, 3, 
baz)));
+
+        // Put some data in the CF. We then check that the non-empty CF is 
restored upon DB restart but the empty one is dropped.
+        foo.put(ByteUtils.intToBytes(1), BYTE_EMPTY_ARRAY);
+
+        rocksDb.stop();
+
+        rocksDb = createDb();
+
+        assertThat(rocksDb.sortedIndexes(), hasKey(1));
+        assertThat(rocksDb.sortedIndexes(), not(hasKey(2)));
+        assertThat(rocksDb.sortedIndexes(), not(hasKey(3)));
+
+        assertTrue(cfExists(fooName));
+        assertFalse(cfExists(barName));
+    }
+
+    private boolean cfExists(byte[] cfName) {
+        try {
+            // Check Column Family existence by trying to create a new one 
with the same name.
+            ColumnFamilyHandle handle = rocksDb.db.createColumnFamily(new 
ColumnFamilyDescriptor(cfName));
+
+            rocksDb.db.destroyColumnFamilyHandle(handle);
+
+            return false;
+        } catch (RocksDBException e) {
+            return true;
+        }
+    }
+}

Reply via email to