This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new bbdb28a993 IGNITE-21760 Remove destroyed RocksDB tables on recovery 
(#3488)
bbdb28a993 is described below

commit bbdb28a993556c42c201f7f899fa3d525e16f398
Author: Alexander Polovtcev <alex.polovt...@gmail.com>
AuthorDate: Wed Mar 27 10:47:31 2024 +0200

    IGNITE-21760 Remove destroyed RocksDB tables on recovery (#3488)
---
 check-rules/spotbugs-excludes.xml                  | 10 +++
 .../storage/engine/AbstractStorageEngineTest.java  |  2 +-
 .../ignite/internal/storage/rocksdb/HashIndex.java | 13 ++-
 .../ignite/internal/storage/rocksdb/Index.java     | 20 ++---
 .../storage/rocksdb/RocksDbDataRegion.java         | 16 ++--
 .../internal/storage/rocksdb/RocksDbIndexes.java   | 49 +++---------
 .../storage/rocksdb/RocksDbMetaStorage.java        | 22 ++---
 .../storage/rocksdb/RocksDbStorageEngine.java      | 93 ++++++++++++----------
 .../storage/rocksdb/RocksDbTableStorage.java       | 29 +------
 .../internal/storage/rocksdb/SortedIndex.java      | 19 +----
 .../rocksdb/index/AbstractRocksDbIndexStorage.java | 11 ++-
 .../rocksdb/index/RocksDbHashIndexStorage.java     |  2 +-
 .../rocksdb/index/RocksDbSortedIndexStorage.java   |  2 +-
 .../rocksdb/instance/IndexColumnFamily.java        | 40 ++++++++++
 .../rocksdb/instance/SharedRocksDbInstance.java    | 78 +++++++++++++++---
 .../rocksdb/engine/RocksDbStorageEngineTest.java   |  7 --
 .../instance/SharedRocksDbInstanceTest.java        | 41 +++++++---
 17 files changed, 262 insertions(+), 192 deletions(-)

diff --git a/check-rules/spotbugs-excludes.xml 
b/check-rules/spotbugs-excludes.xml
index 3d4554aca2..a25c91874a 100644
--- a/check-rules/spotbugs-excludes.xml
+++ b/check-rules/spotbugs-excludes.xml
@@ -206,6 +206,16 @@
     <Class 
name="org.apache.ignite.internal.tx.impl.TransactionInflights$TxContext"/>
     <Field name="inflights"/>
   </Match>
+  <Match>
+    <!-- Public byte array constants, not expected to be modified. -->
+    <Bug pattern="MS_MUTABLE_ARRAY"/>
+    <Class 
name="org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage"/>
+    <Or>
+      <Field name="PARTITION_META_PREFIX"/>
+      <Field name="PARTITION_CONF_PREFIX"/>
+      <Field name="INDEX_ROW_ID_PREFIX"/>
+    </Or>
+  </Match>
   <!-- end of false-positive exclusions -->
 
 
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/engine/AbstractStorageEngineTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/engine/AbstractStorageEngineTest.java
index d7752d3fe6..f8eab538bb 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/engine/AbstractStorageEngineTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/engine/AbstractStorageEngineTest.java
@@ -112,7 +112,7 @@ public abstract class AbstractStorageEngineTest extends 
BaseMvStoragesTest {
     }
 
     @Test
-    protected void testDropMvTableOnRecovery() throws Exception {
+    void testDropMvTableOnRecovery() throws Exception {
         assumeFalse(storageEngine.isVolatile());
 
         int tableId = 1;
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
index 571e0f5853..bd45c252f3 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
@@ -17,9 +17,9 @@
 
 package org.apache.ignite.internal.storage.rocksdb;
 
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
 import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor;
 import 
org.apache.ignite.internal.storage.rocksdb.index.RocksDbHashIndexStorage;
-import 
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance;
 
 /**
  * Class that represents a Hash Index defined for all partitions of a Table.
@@ -29,8 +29,13 @@ class HashIndex extends Index<RocksDbHashIndexStorage> {
 
     private final RocksDbMetaStorage indexMetaStorage;
 
-    HashIndex(SharedRocksDbInstance rocksDb, int tableId, 
StorageHashIndexDescriptor descriptor, RocksDbMetaStorage indexMetaStorage) {
-        super(tableId, descriptor.id(), rocksDb.hashIndexCf());
+    HashIndex(
+            int tableId,
+            ColumnFamily indexCf,
+            StorageHashIndexDescriptor descriptor,
+            RocksDbMetaStorage indexMetaStorage
+    ) {
+        super(tableId, descriptor.id(), indexCf);
 
         this.descriptor = descriptor;
         this.indexMetaStorage = indexMetaStorage;
@@ -38,6 +43,6 @@ class HashIndex extends Index<RocksDbHashIndexStorage> {
 
     @Override
     RocksDbHashIndexStorage createStorage(int partitionId) {
-        return new RocksDbHashIndexStorage(descriptor, tableId, partitionId, 
indexCf, indexMetaStorage);
+        return new RocksDbHashIndexStorage(descriptor, tableId, partitionId, 
indexColumnFamily().columnFamily(), indexMetaStorage);
     }
 }
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/Index.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/Index.java
index c184cf9a70..58c23aefaa 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/Index.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/Index.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.internal.rocksdb.ColumnFamily;
 import org.apache.ignite.internal.storage.StorageException;
 import 
org.apache.ignite.internal.storage.rocksdb.index.AbstractRocksDbIndexStorage;
+import org.apache.ignite.internal.storage.rocksdb.instance.IndexColumnFamily;
 import org.apache.ignite.internal.storage.util.StorageState;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.jetbrains.annotations.Nullable;
@@ -37,16 +38,17 @@ import org.rocksdb.WriteBatch;
 abstract class Index<S extends AbstractRocksDbIndexStorage> {
     final int tableId;
 
-    final int indexId;
-
-    final ColumnFamily indexCf;
+    private final IndexColumnFamily indexColumnFamily;
 
     private final ConcurrentMap<Integer, S> storageByPartitionId = new 
ConcurrentHashMap<>();
 
-    Index(int tableId, int indexId, ColumnFamily indexCf) {
+    Index(int tableId, int indexId, ColumnFamily cf) {
         this.tableId = tableId;
-        this.indexId = indexId;
-        this.indexCf = indexCf;
+        this.indexColumnFamily = new IndexColumnFamily(indexId, cf);
+    }
+
+    IndexColumnFamily indexColumnFamily() {
+        return indexColumnFamily;
     }
 
     /**
@@ -74,7 +76,7 @@ abstract class Index<S extends AbstractRocksDbIndexStorage> {
         try {
             
IgniteUtils.closeAll(storageByPartitionId.values().stream().map(index -> 
index::close));
         } catch (Exception e) {
-            throw new StorageException("Failed to close index storages: " + 
indexId, e);
+            throw new StorageException("Failed to close index storages: " + 
indexColumnFamily.indexId(), e);
         }
     }
 
@@ -85,7 +87,7 @@ abstract class Index<S extends AbstractRocksDbIndexStorage> {
         try {
             
IgniteUtils.closeAll(storageByPartitionId.values().stream().map(index -> 
index::transitionToDestroyedState));
         } catch (Exception e) {
-            throw new StorageException("Failed to transition index storages to 
the DESTROYED state: " + indexId, e);
+            throw new StorageException("Failed to transition index storages to 
the DESTROYED state: " + indexColumnFamily.indexId(), e);
         }
     }
 
@@ -111,6 +113,6 @@ abstract class Index<S extends AbstractRocksDbIndexStorage> 
{
     void destroy(WriteBatch writeBatch) throws RocksDBException {
         transitionToDestroyedState();
 
-        deleteByPrefix(writeBatch, indexCf, indexPrefix(tableId, indexId));
+        deleteByPrefix(writeBatch, indexColumnFamily.columnFamily(), 
indexPrefix(tableId, indexColumnFamily().indexId()));
     }
 }
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbDataRegion.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbDataRegion.java
index fb7a73512f..20581f50a5 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbDataRegion.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbDataRegion.java
@@ -21,7 +21,6 @@ import static 
org.apache.ignite.internal.storage.rocksdb.configuration.schema.Ro
 import static 
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataRegionConfigurationSchema.ROCKSDB_LRU_CACHE;
 
 import java.util.Locale;
-import 
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataRegionConfiguration;
 import 
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataRegionView;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.rocksdb.Cache;
@@ -34,7 +33,7 @@ import org.rocksdb.WriteBufferManager;
  */
 public class RocksDbDataRegion {
     /** Region configuration. */
-    private final RocksDbDataRegionConfiguration cfg;
+    private final RocksDbDataRegionView dataRegionView;
 
     /** RocksDB cache instance. */
     private Cache cache;
@@ -45,18 +44,16 @@ public class RocksDbDataRegion {
     /**
      * Constructor.
      *
-     * @param cfg Data region configuration.
+     * @param dataRegionView Data region configuration.
      */
-    public RocksDbDataRegion(RocksDbDataRegionConfiguration cfg) {
-        this.cfg = cfg;
+    public RocksDbDataRegion(RocksDbDataRegionView dataRegionView) {
+        this.dataRegionView = dataRegionView;
     }
 
     /**
      * Start the rocksDb data region.
      */
     public void start() {
-        RocksDbDataRegionView dataRegionView = cfg.value();
-
         long writeBufferSize = dataRegionView.writeBufferSize();
 
         long totalCacheSize = dataRegionView.size() + writeBufferSize;
@@ -73,7 +70,10 @@ public class RocksDbDataRegion {
                 break;
 
             default:
-                assert false : dataRegionView.cache();
+                throw new AssertionError(String.format(
+                        "Unknown data region cache type: [dataRegion=%s, 
cacheType=%s]",
+                        dataRegionView.name(), dataRegionView.cache()
+                ));
         }
 
         writeBufferManager = new WriteBufferManager(writeBufferSize, cache);
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbIndexes.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbIndexes.java
index dee24e0369..7ff1b893a1 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbIndexes.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbIndexes.java
@@ -27,7 +27,6 @@ import static 
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbI
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.stream.Stream;
@@ -39,6 +38,7 @@ import 
org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor;
 import org.apache.ignite.internal.storage.index.StorageIndexDescriptorSupplier;
 import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor;
 import 
org.apache.ignite.internal.storage.rocksdb.index.AbstractRocksDbIndexStorage;
+import org.apache.ignite.internal.storage.rocksdb.instance.IndexColumnFamily;
 import 
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance;
 import org.jetbrains.annotations.Nullable;
 import org.rocksdb.RocksDBException;
@@ -69,41 +69,32 @@ class RocksDbIndexes {
                 if (descriptor == null) {
                     deleteByPrefix(writeBatch, rocksDb.hashIndexCf(), 
indexPrefix(tableId, indexId));
                 } else {
-                    hashIndices.put(indexId, new HashIndex(rocksDb, tableId, 
descriptor, rocksDb.meta));
+                    hashIndices.put(indexId, new HashIndex(tableId, 
rocksDb.hashIndexCf(), descriptor, rocksDb.meta));
                 }
             }
 
-            var indexCfsToDestroy = new ArrayList<Map.Entry<Integer, 
ColumnFamily>>();
+            var indexCfsToDestroy = new ArrayList<IndexColumnFamily>();
 
-            for (Map.Entry<Integer, ColumnFamily> e : 
rocksDb.sortedIndexes(tableId).entrySet()) {
-                int indexId = e.getKey();
+            for (IndexColumnFamily indexColumnFamily : 
rocksDb.sortedIndexes(tableId)) {
+                int indexId = indexColumnFamily.indexId();
 
-                ColumnFamily indexCf = e.getValue();
+                ColumnFamily cf = indexColumnFamily.columnFamily();
 
                 var descriptor = (StorageSortedIndexDescriptor) 
indexDescriptorSupplier.get(indexId);
 
                 if (descriptor == null) {
-                    deleteByPrefix(writeBatch, indexCf, indexPrefix(tableId, 
indexId));
+                    deleteByPrefix(writeBatch, cf, indexPrefix(tableId, 
indexId));
 
-                    indexCfsToDestroy.add(e);
+                    indexCfsToDestroy.add(indexColumnFamily);
                 } else {
-                    sortedIndices.put(indexId, 
SortedIndex.restoreExisting(rocksDb, tableId, indexCf, descriptor, 
rocksDb.meta));
+                    sortedIndices.put(indexId, 
SortedIndex.restoreExisting(tableId, cf, descriptor, rocksDb.meta));
                 }
             }
 
             rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch);
 
             if (!indexCfsToDestroy.isEmpty()) {
-                rocksDb.flusher.awaitFlush(false)
-                        .thenRunAsync(() -> {
-                            for (Map.Entry<Integer, ColumnFamily> e : 
indexCfsToDestroy) {
-                                int indexId = e.getKey();
-
-                                ColumnFamily indexCf = e.getValue();
-
-                                
rocksDb.destroySortedIndexCfIfNeeded(indexCf.nameBytes(), indexId);
-                            }
-                        }, rocksDb.engine.threadPool());
+                rocksDb.scheduleIndexCfsDestroy(indexCfsToDestroy);
             }
         }
     }
@@ -120,7 +111,7 @@ class RocksDbIndexes {
     HashIndexStorage getOrCreateHashIndex(int partitionId, 
StorageHashIndexDescriptor indexDescriptor) {
         HashIndex hashIndex = hashIndices.computeIfAbsent(
                 indexDescriptor.id(),
-                id -> new HashIndex(rocksDb, tableId, indexDescriptor, 
rocksDb.meta)
+                id -> new HashIndex(tableId, rocksDb.hashIndexCf(), 
indexDescriptor, rocksDb.meta)
         );
 
         return hashIndex.getOrCreateStorage(partitionId);
@@ -190,8 +181,7 @@ class RocksDbIndexes {
         }
 
         if (sortedIdx != null) {
-            rocksDb.flusher.awaitFlush(false)
-                    .thenRunAsync(sortedIdx::destroySortedIndexCfIfNeeded, 
rocksDb.engine.threadPool());
+            
rocksDb.scheduleIndexCfsDestroy(List.of(sortedIdx.indexColumnFamily()));
         }
     }
 
@@ -205,21 +195,6 @@ class RocksDbIndexes {
         }
     }
 
-    void destroyAllIndexes(WriteBatch writeBatch) throws RocksDBException {
-        for (HashIndex hashIndex : hashIndices.values()) {
-            hashIndex.destroy(writeBatch);
-        }
-
-        for (SortedIndex sortedIndex : sortedIndices.values()) {
-            sortedIndex.destroy(writeBatch);
-        }
-    }
-
-    void scheduleAllIndexCfDestroy() {
-        rocksDb.flusher.awaitFlush(false)
-                .thenRunAsync(() -> 
sortedIndices.values().forEach(SortedIndex::destroySortedIndexCfIfNeeded), 
rocksDb.engine.threadPool());
-    }
-
     Stream<AbstractRocksDbIndexStorage> getAllStorages(int partitionId) {
         return Stream.concat(
                 hashIndices.values().stream().map(index -> 
index.getStorage(partitionId)),
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java
index 193fcfc8cf..72e71e6c56 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java
@@ -42,17 +42,17 @@ public class RocksDbMetaStorage {
      * Prefix to store partition meta information, such as last applied index 
and term.
      * Key format is {@code [prefix, tableId, partitionId]} in BE.
      */
-    static final byte[] PARTITION_META_PREFIX = {0};
+    public static final byte[] PARTITION_META_PREFIX = {0};
 
     /**
      * Prefix to store partition configuration. Key format is {@code [prefix, 
tableId, partitionId]} in BE.
      */
-    static final byte[] PARTITION_CONF_PREFIX = {1};
+    public static final byte[] PARTITION_CONF_PREFIX = {1};
 
     /**
-     * Prefix to store next row id to build in index. Key format is {@code 
[prefix, indexId, partitionId]} in BE.
+     * Prefix to store next row id to build in index. Key format is {@code 
[prefix, tableId, indexId, partitionId]} in BE.
      */
-    private static final byte[] INDEX_ROW_ID_PREFIX = {2};
+    public static final byte[] INDEX_ROW_ID_PREFIX = {2};
 
     private final ColumnFamily metaColumnFamily;
 
@@ -73,9 +73,9 @@ public class RocksDbMetaStorage {
      * @param indexId Index ID.
      * @param partitionId Partition ID.
      */
-    public @Nullable RowId getNextRowIdToBuild(int indexId, int partitionId) {
+    public @Nullable RowId getNextRowIdToBuild(int tableId, int indexId, int 
partitionId) {
         try {
-            byte[] lastBuiltRowIdBytes = 
metaColumnFamily.get(createKey(INDEX_ROW_ID_PREFIX, indexId, partitionId));
+            byte[] lastBuiltRowIdBytes = 
metaColumnFamily.get(createKey(INDEX_ROW_ID_PREFIX, tableId, indexId, 
partitionId));
 
             if (lastBuiltRowIdBytes == null) {
                 return initialRowIdToBuild(partitionId);
@@ -103,9 +103,11 @@ public class RocksDbMetaStorage {
      * @param indexId Index ID.
      * @param rowId Row ID.
      */
-    public void putNextRowIdToBuild(AbstractWriteBatch writeBatch, int 
indexId, int partitionId, @Nullable RowId rowId) {
+    public void putNextRowIdToBuild(AbstractWriteBatch writeBatch, int 
tableId, int indexId, int partitionId, @Nullable RowId rowId) {
         try {
-            writeBatch.put(metaColumnFamily.handle(), 
createKey(INDEX_ROW_ID_PREFIX, indexId, partitionId), 
indexLastBuildRowId(rowId));
+            byte[] key = createKey(INDEX_ROW_ID_PREFIX, tableId, indexId, 
partitionId);
+
+            writeBatch.put(metaColumnFamily.handle(), key, 
indexLastBuildRowId(rowId));
         } catch (RocksDBException e) {
             throw new StorageException(
                     "Failed to save next row ID to build: [partitionId={}, 
indexId={}, rowId={}]",
@@ -118,9 +120,9 @@ public class RocksDbMetaStorage {
     /**
      * Removes the "next row ID to build" information for the given 
partition's index.
      */
-    public void removeNextRowIdToBuild(AbstractWriteBatch writeBatch, int 
indexId, int partitionId) {
+    public void removeNextRowIdToBuild(AbstractWriteBatch writeBatch, int 
tableId, int indexId, int partitionId) {
         try {
-            writeBatch.delete(metaColumnFamily.handle(), 
createKey(INDEX_ROW_ID_PREFIX, indexId, partitionId));
+            writeBatch.delete(metaColumnFamily.handle(), 
createKey(INDEX_ROW_ID_PREFIX, tableId, indexId, partitionId));
         } catch (RocksDBException e) {
             throw new StorageException(
                     "Failed to remove next row ID to build: [partitionId={}, 
indexId={}]",
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
index b2eb6c9e36..134262982b 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.storage.rocksdb;
 
-import static 
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfigurationSchema.DEFAULT_DATA_REGION_NAME;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
 import java.nio.file.Path;
@@ -28,16 +27,15 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
 import 
org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
 import 
org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
+import org.apache.ignite.internal.close.ManuallyCloseable;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.engine.StorageEngine;
 import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
 import org.apache.ignite.internal.storage.index.StorageIndexDescriptorSupplier;
-import 
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataRegionConfiguration;
 import 
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataRegionView;
 import 
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration;
 import 
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance;
@@ -55,6 +53,22 @@ public class RocksDbStorageEngine implements StorageEngine {
 
     private static final IgniteLogger LOG = 
Loggers.forClass(RocksDbStorageEngine.class);
 
+    private static class RocksDbStorage implements ManuallyCloseable {
+        final RocksDbDataRegion dataRegion;
+
+        final SharedRocksDbInstance rocksDbInstance;
+
+        RocksDbStorage(RocksDbDataRegion dataRegion, SharedRocksDbInstance 
rocksDbInstance) {
+            this.dataRegion = dataRegion;
+            this.rocksDbInstance = rocksDbInstance;
+        }
+
+        @Override
+        public void close() throws Exception {
+            IgniteUtils.closeAllManually(rocksDbInstance::stop, 
dataRegion::stop);
+        }
+    }
+
     static {
         RocksDB.loadLibrary();
     }
@@ -67,14 +81,12 @@ public class RocksDbStorageEngine implements StorageEngine {
 
     private final ScheduledExecutorService scheduledPool;
 
-    private final Map<String, RocksDbDataRegion> regions = new 
ConcurrentHashMap<>();
-
     /**
-     * Mapping from the data region name to the shared RocksDB instance. Map 
is filled lazily.
-     * Most likely, the association of shared instances with regions will be 
removed/revisited in the future.
+     * Mapping from the data region name to the shared RocksDB instance. Most 
likely, the association of shared
+     * instances with regions will be removed/revisited in the future.
      */
     // TODO IGNITE-19762 Think of proper way to use regions and storages.
-    private final Map<String, SharedRocksDbInstance> sharedInstances = new 
ConcurrentHashMap<>();
+    private final Map<String, RocksDbStorage> storageByRegionName = new 
ConcurrentHashMap<>();
 
     /**
      * Constructor.
@@ -125,40 +137,49 @@ public class RocksDbStorageEngine implements 
StorageEngine {
 
     @Override
     public void start() throws StorageException {
-        registerDataRegion(DEFAULT_DATA_REGION_NAME);
+        registerDataRegion(engineConfig.defaultRegion().value());
 
         // TODO: IGNITE-17066 Add handling deleting/updating data regions 
configuration
         engineConfig.regions().listenElements(new 
ConfigurationNamedListListener<>() {
             @Override
             public CompletableFuture<?> 
onCreate(ConfigurationNotificationEvent<RocksDbDataRegionView> ctx) {
-                registerDataRegion(ctx.newName(RocksDbDataRegionView.class));
+                RocksDbDataRegionView newValue = ctx.newValue();
+
+                assert newValue != null;
+
+                registerDataRegion(newValue);
 
                 return nullCompletedFuture();
             }
         });
     }
 
-    private void registerDataRegion(String name) {
-        RocksDbDataRegionConfiguration dataRegionConfig = 
DEFAULT_DATA_REGION_NAME.equals(name)
-                ? engineConfig.defaultRegion()
-                : engineConfig.regions().get(name);
+    private void registerDataRegion(RocksDbDataRegionView dataRegionView) {
+        String regionName = dataRegionView.name();
 
-        var region = new RocksDbDataRegion(dataRegionConfig);
+        var region = new RocksDbDataRegion(dataRegionView);
 
         region.start();
 
-        RocksDbDataRegion previousRegion = 
regions.put(dataRegionConfig.name().value(), region);
+        SharedRocksDbInstance rocksDbInstance = newRocksDbInstance(regionName, 
region);
 
-        assert previousRegion == null : dataRegionConfig.name().value();
+        RocksDbStorage previousStorage = storageByRegionName.put(regionName, 
new RocksDbStorage(region, rocksDbInstance));
+
+        assert previousStorage == null : regionName;
+    }
+
+    private SharedRocksDbInstance newRocksDbInstance(String regionName, 
RocksDbDataRegion region) {
+        try {
+            return new SharedRocksDbInstanceCreator().create(this, region, 
storagePath.resolve("rocksdb-" + regionName));
+        } catch (Exception e) {
+            throw new StorageException("Failed to create new RocksDB 
instance", e);
+        }
     }
 
     @Override
     public void stop() throws StorageException {
         try {
-            IgniteUtils.closeAll(Stream.concat(
-                    regions.values().stream().map(region -> region::stop),
-                    sharedInstances.values().stream().map(instance -> 
instance::stop)
-            ));
+            IgniteUtils.closeAllManually(storageByRegionName.values());
         } catch (Exception e) {
             throw new StorageException("Error when stopping the rocksdb 
engine", e);
         }
@@ -177,34 +198,24 @@ public class RocksDbStorageEngine implements 
StorageEngine {
             StorageTableDescriptor tableDescriptor,
             StorageIndexDescriptorSupplier indexDescriptorSupplier
     ) throws StorageException {
-        RocksDbDataRegion dataRegion = 
regions.get(tableDescriptor.getDataRegion());
+        String regionName = tableDescriptor.getDataRegion();
 
-        int tableId = tableDescriptor.getId();
+        RocksDbStorage storage = storageByRegionName.get(regionName);
 
-        assert dataRegion != null : "tableId=" + tableId + ", dataRegion=" + 
tableDescriptor.getDataRegion();
+        assert storage != null :
+                String.format("RocksDB instance has not yet been created for 
[tableId=%d, region=%s]", tableDescriptor.getId(), regionName);
 
-        SharedRocksDbInstance sharedInstance = 
sharedInstances.computeIfAbsent(tableDescriptor.getDataRegion(), name -> {
-            try {
-                return new SharedRocksDbInstanceCreator().create(
-                        this,
-                        dataRegion,
-                        storagePath.resolve("rocksdb-" + name)
-                );
-            } catch (Exception e) {
-                throw new StorageException("Failed to create new RocksDB data 
region", e);
-            }
-        });
+        var tableStorage = new RocksDbTableStorage(storage.rocksDbInstance, 
tableDescriptor, indexDescriptorSupplier);
 
-        var storage = new RocksDbTableStorage(sharedInstance, tableDescriptor, 
indexDescriptorSupplier);
+        tableStorage.start();
 
-        storage.start();
-
-        return storage;
+        return tableStorage;
     }
 
     @Override
-    // TODO: IGNITE-21760 Implement
     public void dropMvTable(int tableId) {
-        throw new 
UnsupportedOperationException("https://issues.apache.org/jira/browse/IGNITE-21760";);
+        for (RocksDbStorage rocksDbStorage : storageByRegionName.values()) {
+            rocksDbStorage.rocksDbInstance.destroyTable(tableId);
+        }
     }
 }
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index b0aa9de923..3f7cd6ac93 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -18,13 +18,8 @@
 package org.apache.ignite.internal.storage.rocksdb;
 
 import static java.util.stream.Collectors.toList;
-import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.PARTITION_CONF_PREFIX;
-import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.PARTITION_META_PREFIX;
-import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.createKey;
 import static 
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance.DFLT_WRITE_OPTS;
-import static 
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance.deleteByPrefix;
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.createMissingMvPartitionErrorMessage;
-import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 
@@ -179,7 +174,7 @@ public class RocksDbTableStorage implements MvTableStorage {
                     }
 
                     if (destroy) {
-                        destroyTableData();
+                        rocksDb.destroyTable(getTableId());
                     }
                 });
     }
@@ -199,28 +194,6 @@ public class RocksDbTableStorage implements MvTableStorage 
{
         return stop(true);
     }
 
-    private void destroyTableData() {
-        try (WriteBatch writeBatch = new WriteBatch()) {
-            int tableId = getTableId();
-
-            byte[] tablePrefix = createKey(BYTE_EMPTY_ARRAY, tableId);
-
-            deleteByPrefix(writeBatch, rocksDb.partitionCf, tablePrefix);
-            deleteByPrefix(writeBatch, rocksDb.gcQueueCf, tablePrefix);
-
-            indexes.destroyAllIndexes(writeBatch);
-
-            deleteByPrefix(writeBatch, rocksDb.meta.columnFamily(), 
createKey(PARTITION_META_PREFIX, tableId));
-            deleteByPrefix(writeBatch, rocksDb.meta.columnFamily(), 
createKey(PARTITION_CONF_PREFIX, tableId));
-
-            rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch);
-
-            indexes.scheduleAllIndexCfDestroy();
-        } catch (RocksDBException e) {
-            throw new StorageException("Failed to destroy table data. 
[tableId={}]", e, getTableId());
-        }
-    }
-
     @Override
     public CompletableFuture<MvPartitionStorage> createMvPartition(int 
partitionId) throws StorageException {
         return inBusyLock(busyLock, () -> 
mvPartitionStorages.create(partitionId, partId -> {
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
index c54d2914e4..c683d971c3 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
@@ -30,12 +30,9 @@ import 
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance
 class SortedIndex extends Index<RocksDbSortedIndexStorage> {
     private final StorageSortedIndexDescriptor descriptor;
 
-    private final SharedRocksDbInstance rocksDb;
-
     private final RocksDbMetaStorage indexMetaStorage;
 
     private SortedIndex(
-            SharedRocksDbInstance rocksDb,
             int tableId,
             ColumnFamily indexCf,
             StorageSortedIndexDescriptor descriptor,
@@ -43,7 +40,6 @@ class SortedIndex extends Index<RocksDbSortedIndexStorage> {
     ) {
         super(tableId, descriptor.id(), indexCf);
 
-        this.rocksDb = rocksDb;
         this.descriptor = descriptor;
         this.indexMetaStorage = indexMetaStorage;
     }
@@ -56,29 +52,20 @@ class SortedIndex extends Index<RocksDbSortedIndexStorage> {
     ) {
         ColumnFamily indexCf = 
rocksDb.getOrCreateSortedIndexCf(sortedIndexCfName(descriptor.columns()), 
descriptor.id(), tableId);
 
-        return new SortedIndex(rocksDb, tableId, indexCf, descriptor, 
indexMetaStorage);
+        return new SortedIndex(tableId, indexCf, descriptor, indexMetaStorage);
     }
 
     static SortedIndex restoreExisting(
-            SharedRocksDbInstance rocksDb,
             int tableId,
             ColumnFamily indexCf,
             StorageSortedIndexDescriptor descriptor,
             RocksDbMetaStorage indexMetaStorage
     ) {
-        return new SortedIndex(rocksDb, tableId, indexCf, descriptor, 
indexMetaStorage);
+        return new SortedIndex(tableId, indexCf, descriptor, indexMetaStorage);
     }
 
     @Override
     RocksDbSortedIndexStorage createStorage(int partitionId) {
-        return new RocksDbSortedIndexStorage(descriptor, tableId, partitionId, 
indexCf, indexMetaStorage);
-    }
-
-    /**
-     * Signals the shared RocksDB instance that this index has been destroyed 
and all shared resources (like the Column Family) can be
-     * de-allocated.
-     */
-    void destroySortedIndexCfIfNeeded() {
-        rocksDb.destroySortedIndexCfIfNeeded(indexCf.nameBytes(), indexId);
+        return new RocksDbSortedIndexStorage(descriptor, tableId, partitionId, 
indexColumnFamily().columnFamily(), indexMetaStorage);
     }
 }
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java
index 11943fde99..deff59acaa 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java
@@ -61,6 +61,8 @@ public abstract class AbstractRocksDbIndexStorage implements 
IndexStorage {
     /** Common prefix for keys in all index storages, containing IDs of 
different entities. */
     public static final int PREFIX_WITH_IDS_LENGTH = TABLE_ID_SIZE + 
INDEX_ID_SIZE + PARTITION_ID_SIZE;
 
+    private final int tableId;
+
     protected final int indexId;
 
     protected final int partitionId;
@@ -76,12 +78,13 @@ public abstract class AbstractRocksDbIndexStorage 
implements IndexStorage {
     /** Row ID for which the index needs to be built, {@code null} means that 
the index building has completed. */
     private volatile @Nullable RowId nextRowIdToBuild;
 
-    AbstractRocksDbIndexStorage(int indexId, int partitionId, 
RocksDbMetaStorage indexMetaStorage) {
+    AbstractRocksDbIndexStorage(int tableId, int indexId, int partitionId, 
RocksDbMetaStorage indexMetaStorage) {
+        this.tableId = tableId;
         this.indexId = indexId;
         this.indexMetaStorage = indexMetaStorage;
         this.partitionId = partitionId;
 
-        nextRowIdToBuild = indexMetaStorage.getNextRowIdToBuild(indexId, 
partitionId);
+        nextRowIdToBuild = indexMetaStorage.getNextRowIdToBuild(tableId, 
indexId, partitionId);
     }
 
     @Override
@@ -100,7 +103,7 @@ public abstract class AbstractRocksDbIndexStorage 
implements IndexStorage {
 
             WriteBatchWithIndex writeBatch = 
PartitionDataHelper.requireWriteBatch();
 
-            indexMetaStorage.putNextRowIdToBuild(writeBatch, indexId, 
partitionId, rowId);
+            indexMetaStorage.putNextRowIdToBuild(writeBatch, tableId, indexId, 
partitionId, rowId);
 
             nextRowIdToBuild = rowId;
 
@@ -229,7 +232,7 @@ public abstract class AbstractRocksDbIndexStorage 
implements IndexStorage {
     public final void destroyData(WriteBatch writeBatch) throws 
RocksDBException {
         clearIndex(writeBatch);
 
-        indexMetaStorage.removeNextRowIdToBuild(writeBatch, indexId, 
partitionId);
+        indexMetaStorage.removeNextRowIdToBuild(writeBatch, tableId, indexId, 
partitionId);
 
         nextRowIdToBuild = initialRowIdToBuild(partitionId);
     }
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
index cdcf971341..1e18c03d3d 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
@@ -82,7 +82,7 @@ public class RocksDbHashIndexStorage extends 
AbstractRocksDbIndexStorage impleme
             ColumnFamily indexCf,
             RocksDbMetaStorage indexMetaStorage
     ) {
-        super(descriptor.id(), partitionId, indexMetaStorage);
+        super(tableId, descriptor.id(), partitionId, indexMetaStorage);
 
         this.descriptor = descriptor;
         this.indexCf = indexCf;
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
index 271702816f..b0b4c7316d 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
@@ -82,7 +82,7 @@ public class RocksDbSortedIndexStorage extends 
AbstractRocksDbIndexStorage imple
             ColumnFamily indexCf,
             RocksDbMetaStorage indexMetaStorage
     ) {
-        super(descriptor.id(), partitionId, indexMetaStorage);
+        super(tableId, descriptor.id(), partitionId, indexMetaStorage);
 
         this.descriptor = descriptor;
         this.indexCf = indexCf;
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/IndexColumnFamily.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/IndexColumnFamily.java
new file mode 100644
index 0000000000..c7c0c85256
--- /dev/null
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/IndexColumnFamily.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.instance;
+
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+
+/** A container class for an index ID and its Column Family instance. */
+public class IndexColumnFamily {
+    private final int indexId;
+
+    private final ColumnFamily columnFamily;
+
+    public IndexColumnFamily(int indexId, ColumnFamily columnFamily) {
+        this.indexId = indexId;
+        this.columnFamily = columnFamily;
+    }
+
+    public int indexId() {
+        return indexId;
+    }
+
+    public ColumnFamily columnFamily() {
+        return columnFamily;
+    }
+}
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java
index 0132462a1c..552646b01c 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java
@@ -20,9 +20,14 @@ package org.apache.ignite.internal.storage.rocksdb.instance;
 import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix;
 import static 
org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.toStringName;
+import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.INDEX_ROW_ID_PREFIX;
+import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.PARTITION_CONF_PREFIX;
+import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.PARTITION_META_PREFIX;
+import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.KEY_BYTE_ORDER;
 import static 
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstanceCreator.sortedIndexCfOptions;
 import static org.apache.ignite.internal.util.ByteUtils.intToBytes;
 
+import java.nio.ByteBuffer;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -30,6 +35,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -45,7 +51,6 @@ import 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.rocksdb.ColumnFamilyDescriptor;
-import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
@@ -241,13 +246,13 @@ public final class SharedRocksDbInstance {
     /**
      * Returns an "index ID - Column Family" mapping for all sorted indexes 
that currently exist in the storage.
      */
-    public Map<Integer, ColumnFamily> sortedIndexes(int targetTableId) {
-        var result = new HashMap<Integer, ColumnFamily>();
+    public List<IndexColumnFamily> sortedIndexes(int targetTableId) {
+        var result = new ArrayList<IndexColumnFamily>();
 
         for (SortedIndexColumnFamily indexCf : sortedIndexCfsByName.values()) {
             indexCf.indexIdToTableId.forEach((indexId, tableId) -> {
                 if (tableId == targetTableId) {
-                    result.put(indexId, indexCf.columnFamily);
+                    result.add(new IndexColumnFamily(indexId, 
indexCf.columnFamily));
                 }
             });
         }
@@ -282,16 +287,25 @@ public final class SharedRocksDbInstance {
     }
 
     /**
-     * Possibly drops the column family after destroying the index.
+     * Schedules a drop of a column family after destroying an index, if it 
was the last index managed by that CF.
      */
-    public void destroySortedIndexCfIfNeeded(byte[] cfName, int indexId) {
+    public CompletableFuture<Void> 
scheduleIndexCfsDestroy(List<IndexColumnFamily> indexColumnFamilies) {
+        assert !indexColumnFamilies.isEmpty();
+
+        return flusher.awaitFlush(false)
+                .thenRunAsync(() -> 
indexColumnFamilies.forEach(this::destroySortedIndexCfIfNeeded), 
engine.threadPool());
+    }
+
+    void destroySortedIndexCfIfNeeded(IndexColumnFamily indexColumnFamily) {
         if (!busyLock.enterBusy()) {
             throw new StorageClosedException();
         }
 
+        var cfNameBytes = new 
ByteArray(indexColumnFamily.columnFamily().nameBytes());
+
         try {
-            sortedIndexCfsByName.computeIfPresent(new ByteArray(cfName), 
(unused, indexCf) -> {
-                indexCf.indexIdToTableId.remove(indexId);
+            sortedIndexCfsByName.computeIfPresent(cfNameBytes, (unused, 
indexCf) -> {
+                indexCf.indexIdToTableId.remove(indexColumnFamily.indexId());
 
                 if (!indexCf.indexIdToTableId.isEmpty()) {
                     return indexCf;
@@ -306,6 +320,48 @@ public final class SharedRocksDbInstance {
         }
     }
 
+    /**
+     * Removes all data associated with the given table ID in this storage.
+     */
+    public void destroyTable(int tableId) {
+        try (WriteBatch writeBatch = new WriteBatch()) {
+            byte[] tableIdBytes = ByteBuffer.allocate(Integer.BYTES)
+                    .order(KEY_BYTE_ORDER)
+                    .putInt(tableId)
+                    .array();
+
+            deleteByPrefix(writeBatch, partitionCf, tableIdBytes);
+            deleteByPrefix(writeBatch, gcQueueCf, tableIdBytes);
+            deleteByPrefix(writeBatch, hashIndexCf, tableIdBytes);
+
+            List<IndexColumnFamily> sortedIndexCfs = sortedIndexes(tableId);
+
+            for (IndexColumnFamily indexColumnFamily : sortedIndexCfs) {
+                deleteByPrefix(writeBatch, indexColumnFamily.columnFamily(), 
tableIdBytes);
+            }
+
+            deleteByPrefix(writeBatch, meta.columnFamily(), 
metaPrefix(PARTITION_META_PREFIX, tableIdBytes));
+            deleteByPrefix(writeBatch, meta.columnFamily(), 
metaPrefix(PARTITION_CONF_PREFIX, tableIdBytes));
+            deleteByPrefix(writeBatch, meta.columnFamily(), 
metaPrefix(INDEX_ROW_ID_PREFIX, tableIdBytes));
+
+            db.write(DFLT_WRITE_OPTS, writeBatch);
+
+            if (!sortedIndexCfs.isEmpty()) {
+                scheduleIndexCfsDestroy(sortedIndexCfs);
+            }
+        } catch (RocksDBException e) {
+            throw new StorageException("Failed to destroy table data. 
[tableId={}]", e, tableId);
+        }
+    }
+
+    private static byte[] metaPrefix(byte[] metaPrefix, byte[] tableIdBytes) {
+        return ByteBuffer.allocate(metaPrefix.length + tableIdBytes.length)
+                .order(KEY_BYTE_ORDER)
+                .put(metaPrefix)
+                .put(tableIdBytes)
+                .array();
+    }
+
     private ColumnFamily createSortedIndexCf(byte[] cfName) {
         ColumnFamilyDescriptor cfDescriptor = new 
ColumnFamilyDescriptor(cfName, sortedIndexCfOptions(cfName));
 
@@ -313,7 +369,7 @@ public final class SharedRocksDbInstance {
         try {
             columnFamily = ColumnFamily.create(db, cfDescriptor);
         } catch (RocksDBException e) {
-            throw new StorageException("Failed to create new RocksDB column 
family: " + toStringName(cfDescriptor.getName()), e);
+            throw new StorageException("Failed to create new RocksDB column 
family: " + toStringName(cfName), e);
         }
 
         flusher.addColumnFamily(columnFamily.handle());
@@ -322,9 +378,7 @@ public final class SharedRocksDbInstance {
     }
 
     private void destroyColumnFamily(ColumnFamily columnFamily) {
-        ColumnFamilyHandle columnFamilyHandle = columnFamily.handle();
-
-        flusher.removeColumnFamily(columnFamilyHandle);
+        flusher.removeColumnFamily(columnFamily.handle());
 
         try {
             columnFamily.destroy();
diff --git 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/engine/RocksDbStorageEngineTest.java
 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/engine/RocksDbStorageEngineTest.java
index c376e6d326..f908c5d121 100644
--- 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/engine/RocksDbStorageEngineTest.java
+++ 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/engine/RocksDbStorageEngineTest.java
@@ -25,7 +25,6 @@ import 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
 import 
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.extension.ExtendWith;
 
 /**
@@ -47,10 +46,4 @@ public class RocksDbStorageEngineTest extends 
AbstractStorageEngineTest {
                 workDir
         );
     }
-
-    @Override
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-21760";)
-    protected void testDropMvTableOnRecovery() throws Exception {
-        super.testDropMvTableOnRecovery();
-    }
 }
diff --git 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceTest.java
 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceTest.java
index e0c85f22ec..fcd8b23454 100644
--- 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceTest.java
+++ 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceTest.java
@@ -17,15 +17,14 @@
 
 package org.apache.ignite.internal.storage.rocksdb.instance;
 
+import static java.util.stream.Collectors.toList;
 import static 
org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.sortedIndexCfName;
 import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.KEY_BYTE_ORDER;
 import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.allOf;
-import static org.hamcrest.Matchers.anEmptyMap;
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.hasKey;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.sameInstance;
@@ -34,7 +33,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.nio.ByteBuffer;
 import java.util.List;
-import java.util.Map;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.rocksdb.ColumnFamily;
@@ -68,7 +66,7 @@ class SharedRocksDbInstanceTest extends IgniteAbstractTest {
 
         engine.start();
 
-        dataRegion = new RocksDbDataRegion(engineConfig.defaultRegion());
+        dataRegion = new 
RocksDbDataRegion(engineConfig.defaultRegion().value());
 
         dataRegion.start();
 
@@ -118,19 +116,19 @@ class SharedRocksDbInstanceTest extends 
IgniteAbstractTest {
         assertThat(foo, is(not(sameInstance(bar))));
         assertThat(quux, is((sameInstance(baz))));
 
-        rocksDb.destroySortedIndexCfIfNeeded(fooName, 1);
+        rocksDb.destroySortedIndexCfIfNeeded(new IndexColumnFamily(1, foo));
 
         assertTrue(cfExists(fooName));
 
-        rocksDb.destroySortedIndexCfIfNeeded(barName, 2);
+        rocksDb.destroySortedIndexCfIfNeeded(new IndexColumnFamily(2, bar));
 
         assertFalse(cfExists(barName));
 
-        rocksDb.destroySortedIndexCfIfNeeded(bazName, 3);
+        rocksDb.destroySortedIndexCfIfNeeded(new IndexColumnFamily(3, baz));
 
         assertTrue(cfExists(fooName));
 
-        rocksDb.destroySortedIndexCfIfNeeded(quuxName, 4);
+        rocksDb.destroySortedIndexCfIfNeeded(new IndexColumnFamily(4, quux));
 
         assertFalse(cfExists(fooName));
     }
@@ -161,8 +159,25 @@ class SharedRocksDbInstanceTest extends IgniteAbstractTest 
{
         // Same index CF, different table.
         ColumnFamily quux = rocksDb.getOrCreateSortedIndexCf(quuxName, 4, 1);
 
-        assertThat(rocksDb.sortedIndexes(0), is(Map.of(1, foo, 2, bar, 3, 
baz)));
-        assertThat(rocksDb.sortedIndexes(1), is(Map.of(4, quux)));
+        assertThat(
+                
rocksDb.sortedIndexes(0).stream().map(IndexColumnFamily::indexId).collect(toList()),
+                containsInAnyOrder(1, 2, 3)
+        );
+
+        assertThat(
+                
rocksDb.sortedIndexes(0).stream().map(IndexColumnFamily::columnFamily).collect(toList()),
+                containsInAnyOrder(foo, bar, baz)
+        );
+
+        assertThat(
+                
rocksDb.sortedIndexes(1).stream().map(IndexColumnFamily::indexId).collect(toList()),
+                containsInAnyOrder(4)
+        );
+
+        assertThat(
+                
rocksDb.sortedIndexes(1).stream().map(IndexColumnFamily::columnFamily).collect(toList()),
+                containsInAnyOrder(quux)
+        );
 
         // Put some data in the CF. We then check that the non-empty CF is 
restored upon DB restart but the empty one is dropped.
         byte[] key = ByteBuffer.allocate(Integer.BYTES * 2)
@@ -177,8 +192,8 @@ class SharedRocksDbInstanceTest extends IgniteAbstractTest {
 
         rocksDb = createDb();
 
-        assertThat(rocksDb.sortedIndexes(0), allOf(hasKey(1), not(hasKey(2)), 
not(hasKey(3))));
-        assertThat(rocksDb.sortedIndexes(1), is(anEmptyMap()));
+        
assertThat(rocksDb.sortedIndexes(0).stream().map(IndexColumnFamily::indexId).collect(toList()),
 contains(1));
+        assertThat(rocksDb.sortedIndexes(1), is(empty()));
 
         assertTrue(cfExists(fooName));
         assertFalse(cfExists(barName));

Reply via email to