This is an automated email from the ASF dual-hosted git repository. apolovtsev pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new bbdb28a993 IGNITE-21760 Remove destroyed RocksDB tables on recovery (#3488) bbdb28a993 is described below commit bbdb28a993556c42c201f7f899fa3d525e16f398 Author: Alexander Polovtcev <alex.polovt...@gmail.com> AuthorDate: Wed Mar 27 10:47:31 2024 +0200 IGNITE-21760 Remove destroyed RocksDB tables on recovery (#3488) --- check-rules/spotbugs-excludes.xml | 10 +++ .../storage/engine/AbstractStorageEngineTest.java | 2 +- .../ignite/internal/storage/rocksdb/HashIndex.java | 13 ++- .../ignite/internal/storage/rocksdb/Index.java | 20 ++--- .../storage/rocksdb/RocksDbDataRegion.java | 16 ++-- .../internal/storage/rocksdb/RocksDbIndexes.java | 49 +++--------- .../storage/rocksdb/RocksDbMetaStorage.java | 22 ++--- .../storage/rocksdb/RocksDbStorageEngine.java | 93 ++++++++++++---------- .../storage/rocksdb/RocksDbTableStorage.java | 29 +------ .../internal/storage/rocksdb/SortedIndex.java | 19 +---- .../rocksdb/index/AbstractRocksDbIndexStorage.java | 11 ++- .../rocksdb/index/RocksDbHashIndexStorage.java | 2 +- .../rocksdb/index/RocksDbSortedIndexStorage.java | 2 +- .../rocksdb/instance/IndexColumnFamily.java | 40 ++++++++++ .../rocksdb/instance/SharedRocksDbInstance.java | 78 +++++++++++++++--- .../rocksdb/engine/RocksDbStorageEngineTest.java | 7 -- .../instance/SharedRocksDbInstanceTest.java | 41 +++++++--- 17 files changed, 262 insertions(+), 192 deletions(-) diff --git a/check-rules/spotbugs-excludes.xml b/check-rules/spotbugs-excludes.xml index 3d4554aca2..a25c91874a 100644 --- a/check-rules/spotbugs-excludes.xml +++ b/check-rules/spotbugs-excludes.xml @@ -206,6 +206,16 @@ <Class name="org.apache.ignite.internal.tx.impl.TransactionInflights$TxContext"/> <Field name="inflights"/> </Match> + <Match> + <!-- Public byte array constants, not expected to be modified. --> + <Bug pattern="MS_MUTABLE_ARRAY"/> + <Class name="org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage"/> + <Or> + <Field name="PARTITION_META_PREFIX"/> + <Field name="PARTITION_CONF_PREFIX"/> + <Field name="INDEX_ROW_ID_PREFIX"/> + </Or> + </Match> <!-- end of false-positive exclusions --> diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/engine/AbstractStorageEngineTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/engine/AbstractStorageEngineTest.java index d7752d3fe6..f8eab538bb 100644 --- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/engine/AbstractStorageEngineTest.java +++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/engine/AbstractStorageEngineTest.java @@ -112,7 +112,7 @@ public abstract class AbstractStorageEngineTest extends BaseMvStoragesTest { } @Test - protected void testDropMvTableOnRecovery() throws Exception { + void testDropMvTableOnRecovery() throws Exception { assumeFalse(storageEngine.isVolatile()); int tableId = 1; diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java index 571e0f5853..bd45c252f3 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java @@ -17,9 +17,9 @@ package org.apache.ignite.internal.storage.rocksdb; +import org.apache.ignite.internal.rocksdb.ColumnFamily; import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor; import org.apache.ignite.internal.storage.rocksdb.index.RocksDbHashIndexStorage; -import org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance; /** * Class that represents a Hash Index defined for all partitions of a Table. @@ -29,8 +29,13 @@ class HashIndex extends Index<RocksDbHashIndexStorage> { private final RocksDbMetaStorage indexMetaStorage; - HashIndex(SharedRocksDbInstance rocksDb, int tableId, StorageHashIndexDescriptor descriptor, RocksDbMetaStorage indexMetaStorage) { - super(tableId, descriptor.id(), rocksDb.hashIndexCf()); + HashIndex( + int tableId, + ColumnFamily indexCf, + StorageHashIndexDescriptor descriptor, + RocksDbMetaStorage indexMetaStorage + ) { + super(tableId, descriptor.id(), indexCf); this.descriptor = descriptor; this.indexMetaStorage = indexMetaStorage; @@ -38,6 +43,6 @@ class HashIndex extends Index<RocksDbHashIndexStorage> { @Override RocksDbHashIndexStorage createStorage(int partitionId) { - return new RocksDbHashIndexStorage(descriptor, tableId, partitionId, indexCf, indexMetaStorage); + return new RocksDbHashIndexStorage(descriptor, tableId, partitionId, indexColumnFamily().columnFamily(), indexMetaStorage); } } diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/Index.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/Index.java index c184cf9a70..58c23aefaa 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/Index.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/Index.java @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentMap; import org.apache.ignite.internal.rocksdb.ColumnFamily; import org.apache.ignite.internal.storage.StorageException; import org.apache.ignite.internal.storage.rocksdb.index.AbstractRocksDbIndexStorage; +import org.apache.ignite.internal.storage.rocksdb.instance.IndexColumnFamily; import org.apache.ignite.internal.storage.util.StorageState; import org.apache.ignite.internal.util.IgniteUtils; import org.jetbrains.annotations.Nullable; @@ -37,16 +38,17 @@ import org.rocksdb.WriteBatch; abstract class Index<S extends AbstractRocksDbIndexStorage> { final int tableId; - final int indexId; - - final ColumnFamily indexCf; + private final IndexColumnFamily indexColumnFamily; private final ConcurrentMap<Integer, S> storageByPartitionId = new ConcurrentHashMap<>(); - Index(int tableId, int indexId, ColumnFamily indexCf) { + Index(int tableId, int indexId, ColumnFamily cf) { this.tableId = tableId; - this.indexId = indexId; - this.indexCf = indexCf; + this.indexColumnFamily = new IndexColumnFamily(indexId, cf); + } + + IndexColumnFamily indexColumnFamily() { + return indexColumnFamily; } /** @@ -74,7 +76,7 @@ abstract class Index<S extends AbstractRocksDbIndexStorage> { try { IgniteUtils.closeAll(storageByPartitionId.values().stream().map(index -> index::close)); } catch (Exception e) { - throw new StorageException("Failed to close index storages: " + indexId, e); + throw new StorageException("Failed to close index storages: " + indexColumnFamily.indexId(), e); } } @@ -85,7 +87,7 @@ abstract class Index<S extends AbstractRocksDbIndexStorage> { try { IgniteUtils.closeAll(storageByPartitionId.values().stream().map(index -> index::transitionToDestroyedState)); } catch (Exception e) { - throw new StorageException("Failed to transition index storages to the DESTROYED state: " + indexId, e); + throw new StorageException("Failed to transition index storages to the DESTROYED state: " + indexColumnFamily.indexId(), e); } } @@ -111,6 +113,6 @@ abstract class Index<S extends AbstractRocksDbIndexStorage> { void destroy(WriteBatch writeBatch) throws RocksDBException { transitionToDestroyedState(); - deleteByPrefix(writeBatch, indexCf, indexPrefix(tableId, indexId)); + deleteByPrefix(writeBatch, indexColumnFamily.columnFamily(), indexPrefix(tableId, indexColumnFamily().indexId())); } } diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbDataRegion.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbDataRegion.java index fb7a73512f..20581f50a5 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbDataRegion.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbDataRegion.java @@ -21,7 +21,6 @@ import static org.apache.ignite.internal.storage.rocksdb.configuration.schema.Ro import static org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataRegionConfigurationSchema.ROCKSDB_LRU_CACHE; import java.util.Locale; -import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataRegionConfiguration; import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataRegionView; import org.apache.ignite.internal.util.IgniteUtils; import org.rocksdb.Cache; @@ -34,7 +33,7 @@ import org.rocksdb.WriteBufferManager; */ public class RocksDbDataRegion { /** Region configuration. */ - private final RocksDbDataRegionConfiguration cfg; + private final RocksDbDataRegionView dataRegionView; /** RocksDB cache instance. */ private Cache cache; @@ -45,18 +44,16 @@ public class RocksDbDataRegion { /** * Constructor. * - * @param cfg Data region configuration. + * @param dataRegionView Data region configuration. */ - public RocksDbDataRegion(RocksDbDataRegionConfiguration cfg) { - this.cfg = cfg; + public RocksDbDataRegion(RocksDbDataRegionView dataRegionView) { + this.dataRegionView = dataRegionView; } /** * Start the rocksDb data region. */ public void start() { - RocksDbDataRegionView dataRegionView = cfg.value(); - long writeBufferSize = dataRegionView.writeBufferSize(); long totalCacheSize = dataRegionView.size() + writeBufferSize; @@ -73,7 +70,10 @@ public class RocksDbDataRegion { break; default: - assert false : dataRegionView.cache(); + throw new AssertionError(String.format( + "Unknown data region cache type: [dataRegion=%s, cacheType=%s]", + dataRegionView.name(), dataRegionView.cache() + )); } writeBufferManager = new WriteBufferManager(writeBufferSize, cache); diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbIndexes.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbIndexes.java index dee24e0369..7ff1b893a1 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbIndexes.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbIndexes.java @@ -27,7 +27,6 @@ import static org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbI import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.stream.Stream; @@ -39,6 +38,7 @@ import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor; import org.apache.ignite.internal.storage.index.StorageIndexDescriptorSupplier; import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor; import org.apache.ignite.internal.storage.rocksdb.index.AbstractRocksDbIndexStorage; +import org.apache.ignite.internal.storage.rocksdb.instance.IndexColumnFamily; import org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance; import org.jetbrains.annotations.Nullable; import org.rocksdb.RocksDBException; @@ -69,41 +69,32 @@ class RocksDbIndexes { if (descriptor == null) { deleteByPrefix(writeBatch, rocksDb.hashIndexCf(), indexPrefix(tableId, indexId)); } else { - hashIndices.put(indexId, new HashIndex(rocksDb, tableId, descriptor, rocksDb.meta)); + hashIndices.put(indexId, new HashIndex(tableId, rocksDb.hashIndexCf(), descriptor, rocksDb.meta)); } } - var indexCfsToDestroy = new ArrayList<Map.Entry<Integer, ColumnFamily>>(); + var indexCfsToDestroy = new ArrayList<IndexColumnFamily>(); - for (Map.Entry<Integer, ColumnFamily> e : rocksDb.sortedIndexes(tableId).entrySet()) { - int indexId = e.getKey(); + for (IndexColumnFamily indexColumnFamily : rocksDb.sortedIndexes(tableId)) { + int indexId = indexColumnFamily.indexId(); - ColumnFamily indexCf = e.getValue(); + ColumnFamily cf = indexColumnFamily.columnFamily(); var descriptor = (StorageSortedIndexDescriptor) indexDescriptorSupplier.get(indexId); if (descriptor == null) { - deleteByPrefix(writeBatch, indexCf, indexPrefix(tableId, indexId)); + deleteByPrefix(writeBatch, cf, indexPrefix(tableId, indexId)); - indexCfsToDestroy.add(e); + indexCfsToDestroy.add(indexColumnFamily); } else { - sortedIndices.put(indexId, SortedIndex.restoreExisting(rocksDb, tableId, indexCf, descriptor, rocksDb.meta)); + sortedIndices.put(indexId, SortedIndex.restoreExisting(tableId, cf, descriptor, rocksDb.meta)); } } rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch); if (!indexCfsToDestroy.isEmpty()) { - rocksDb.flusher.awaitFlush(false) - .thenRunAsync(() -> { - for (Map.Entry<Integer, ColumnFamily> e : indexCfsToDestroy) { - int indexId = e.getKey(); - - ColumnFamily indexCf = e.getValue(); - - rocksDb.destroySortedIndexCfIfNeeded(indexCf.nameBytes(), indexId); - } - }, rocksDb.engine.threadPool()); + rocksDb.scheduleIndexCfsDestroy(indexCfsToDestroy); } } } @@ -120,7 +111,7 @@ class RocksDbIndexes { HashIndexStorage getOrCreateHashIndex(int partitionId, StorageHashIndexDescriptor indexDescriptor) { HashIndex hashIndex = hashIndices.computeIfAbsent( indexDescriptor.id(), - id -> new HashIndex(rocksDb, tableId, indexDescriptor, rocksDb.meta) + id -> new HashIndex(tableId, rocksDb.hashIndexCf(), indexDescriptor, rocksDb.meta) ); return hashIndex.getOrCreateStorage(partitionId); @@ -190,8 +181,7 @@ class RocksDbIndexes { } if (sortedIdx != null) { - rocksDb.flusher.awaitFlush(false) - .thenRunAsync(sortedIdx::destroySortedIndexCfIfNeeded, rocksDb.engine.threadPool()); + rocksDb.scheduleIndexCfsDestroy(List.of(sortedIdx.indexColumnFamily())); } } @@ -205,21 +195,6 @@ class RocksDbIndexes { } } - void destroyAllIndexes(WriteBatch writeBatch) throws RocksDBException { - for (HashIndex hashIndex : hashIndices.values()) { - hashIndex.destroy(writeBatch); - } - - for (SortedIndex sortedIndex : sortedIndices.values()) { - sortedIndex.destroy(writeBatch); - } - } - - void scheduleAllIndexCfDestroy() { - rocksDb.flusher.awaitFlush(false) - .thenRunAsync(() -> sortedIndices.values().forEach(SortedIndex::destroySortedIndexCfIfNeeded), rocksDb.engine.threadPool()); - } - Stream<AbstractRocksDbIndexStorage> getAllStorages(int partitionId) { return Stream.concat( hashIndices.values().stream().map(index -> index.getStorage(partitionId)), diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java index 193fcfc8cf..72e71e6c56 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java @@ -42,17 +42,17 @@ public class RocksDbMetaStorage { * Prefix to store partition meta information, such as last applied index and term. * Key format is {@code [prefix, tableId, partitionId]} in BE. */ - static final byte[] PARTITION_META_PREFIX = {0}; + public static final byte[] PARTITION_META_PREFIX = {0}; /** * Prefix to store partition configuration. Key format is {@code [prefix, tableId, partitionId]} in BE. */ - static final byte[] PARTITION_CONF_PREFIX = {1}; + public static final byte[] PARTITION_CONF_PREFIX = {1}; /** - * Prefix to store next row id to build in index. Key format is {@code [prefix, indexId, partitionId]} in BE. + * Prefix to store next row id to build in index. Key format is {@code [prefix, tableId, indexId, partitionId]} in BE. */ - private static final byte[] INDEX_ROW_ID_PREFIX = {2}; + public static final byte[] INDEX_ROW_ID_PREFIX = {2}; private final ColumnFamily metaColumnFamily; @@ -73,9 +73,9 @@ public class RocksDbMetaStorage { * @param indexId Index ID. * @param partitionId Partition ID. */ - public @Nullable RowId getNextRowIdToBuild(int indexId, int partitionId) { + public @Nullable RowId getNextRowIdToBuild(int tableId, int indexId, int partitionId) { try { - byte[] lastBuiltRowIdBytes = metaColumnFamily.get(createKey(INDEX_ROW_ID_PREFIX, indexId, partitionId)); + byte[] lastBuiltRowIdBytes = metaColumnFamily.get(createKey(INDEX_ROW_ID_PREFIX, tableId, indexId, partitionId)); if (lastBuiltRowIdBytes == null) { return initialRowIdToBuild(partitionId); @@ -103,9 +103,11 @@ public class RocksDbMetaStorage { * @param indexId Index ID. * @param rowId Row ID. */ - public void putNextRowIdToBuild(AbstractWriteBatch writeBatch, int indexId, int partitionId, @Nullable RowId rowId) { + public void putNextRowIdToBuild(AbstractWriteBatch writeBatch, int tableId, int indexId, int partitionId, @Nullable RowId rowId) { try { - writeBatch.put(metaColumnFamily.handle(), createKey(INDEX_ROW_ID_PREFIX, indexId, partitionId), indexLastBuildRowId(rowId)); + byte[] key = createKey(INDEX_ROW_ID_PREFIX, tableId, indexId, partitionId); + + writeBatch.put(metaColumnFamily.handle(), key, indexLastBuildRowId(rowId)); } catch (RocksDBException e) { throw new StorageException( "Failed to save next row ID to build: [partitionId={}, indexId={}, rowId={}]", @@ -118,9 +120,9 @@ public class RocksDbMetaStorage { /** * Removes the "next row ID to build" information for the given partition's index. */ - public void removeNextRowIdToBuild(AbstractWriteBatch writeBatch, int indexId, int partitionId) { + public void removeNextRowIdToBuild(AbstractWriteBatch writeBatch, int tableId, int indexId, int partitionId) { try { - writeBatch.delete(metaColumnFamily.handle(), createKey(INDEX_ROW_ID_PREFIX, indexId, partitionId)); + writeBatch.delete(metaColumnFamily.handle(), createKey(INDEX_ROW_ID_PREFIX, tableId, indexId, partitionId)); } catch (RocksDBException e) { throw new StorageException( "Failed to remove next row ID to build: [partitionId={}, indexId={}]", diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java index b2eb6c9e36..134262982b 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.storage.rocksdb; -import static org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfigurationSchema.DEFAULT_DATA_REGION_NAME; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import java.nio.file.Path; @@ -28,16 +27,15 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.stream.Stream; import org.apache.ignite.configuration.notifications.ConfigurationNamedListListener; import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent; +import org.apache.ignite.internal.close.ManuallyCloseable; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.storage.StorageException; import org.apache.ignite.internal.storage.engine.StorageEngine; import org.apache.ignite.internal.storage.engine.StorageTableDescriptor; import org.apache.ignite.internal.storage.index.StorageIndexDescriptorSupplier; -import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataRegionConfiguration; import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataRegionView; import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration; import org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance; @@ -55,6 +53,22 @@ public class RocksDbStorageEngine implements StorageEngine { private static final IgniteLogger LOG = Loggers.forClass(RocksDbStorageEngine.class); + private static class RocksDbStorage implements ManuallyCloseable { + final RocksDbDataRegion dataRegion; + + final SharedRocksDbInstance rocksDbInstance; + + RocksDbStorage(RocksDbDataRegion dataRegion, SharedRocksDbInstance rocksDbInstance) { + this.dataRegion = dataRegion; + this.rocksDbInstance = rocksDbInstance; + } + + @Override + public void close() throws Exception { + IgniteUtils.closeAllManually(rocksDbInstance::stop, dataRegion::stop); + } + } + static { RocksDB.loadLibrary(); } @@ -67,14 +81,12 @@ public class RocksDbStorageEngine implements StorageEngine { private final ScheduledExecutorService scheduledPool; - private final Map<String, RocksDbDataRegion> regions = new ConcurrentHashMap<>(); - /** - * Mapping from the data region name to the shared RocksDB instance. Map is filled lazily. - * Most likely, the association of shared instances with regions will be removed/revisited in the future. + * Mapping from the data region name to the shared RocksDB instance. Most likely, the association of shared + * instances with regions will be removed/revisited in the future. */ // TODO IGNITE-19762 Think of proper way to use regions and storages. - private final Map<String, SharedRocksDbInstance> sharedInstances = new ConcurrentHashMap<>(); + private final Map<String, RocksDbStorage> storageByRegionName = new ConcurrentHashMap<>(); /** * Constructor. @@ -125,40 +137,49 @@ public class RocksDbStorageEngine implements StorageEngine { @Override public void start() throws StorageException { - registerDataRegion(DEFAULT_DATA_REGION_NAME); + registerDataRegion(engineConfig.defaultRegion().value()); // TODO: IGNITE-17066 Add handling deleting/updating data regions configuration engineConfig.regions().listenElements(new ConfigurationNamedListListener<>() { @Override public CompletableFuture<?> onCreate(ConfigurationNotificationEvent<RocksDbDataRegionView> ctx) { - registerDataRegion(ctx.newName(RocksDbDataRegionView.class)); + RocksDbDataRegionView newValue = ctx.newValue(); + + assert newValue != null; + + registerDataRegion(newValue); return nullCompletedFuture(); } }); } - private void registerDataRegion(String name) { - RocksDbDataRegionConfiguration dataRegionConfig = DEFAULT_DATA_REGION_NAME.equals(name) - ? engineConfig.defaultRegion() - : engineConfig.regions().get(name); + private void registerDataRegion(RocksDbDataRegionView dataRegionView) { + String regionName = dataRegionView.name(); - var region = new RocksDbDataRegion(dataRegionConfig); + var region = new RocksDbDataRegion(dataRegionView); region.start(); - RocksDbDataRegion previousRegion = regions.put(dataRegionConfig.name().value(), region); + SharedRocksDbInstance rocksDbInstance = newRocksDbInstance(regionName, region); - assert previousRegion == null : dataRegionConfig.name().value(); + RocksDbStorage previousStorage = storageByRegionName.put(regionName, new RocksDbStorage(region, rocksDbInstance)); + + assert previousStorage == null : regionName; + } + + private SharedRocksDbInstance newRocksDbInstance(String regionName, RocksDbDataRegion region) { + try { + return new SharedRocksDbInstanceCreator().create(this, region, storagePath.resolve("rocksdb-" + regionName)); + } catch (Exception e) { + throw new StorageException("Failed to create new RocksDB instance", e); + } } @Override public void stop() throws StorageException { try { - IgniteUtils.closeAll(Stream.concat( - regions.values().stream().map(region -> region::stop), - sharedInstances.values().stream().map(instance -> instance::stop) - )); + IgniteUtils.closeAllManually(storageByRegionName.values()); } catch (Exception e) { throw new StorageException("Error when stopping the rocksdb engine", e); } @@ -177,34 +198,24 @@ public class RocksDbStorageEngine implements StorageEngine { StorageTableDescriptor tableDescriptor, StorageIndexDescriptorSupplier indexDescriptorSupplier ) throws StorageException { - RocksDbDataRegion dataRegion = regions.get(tableDescriptor.getDataRegion()); + String regionName = tableDescriptor.getDataRegion(); - int tableId = tableDescriptor.getId(); + RocksDbStorage storage = storageByRegionName.get(regionName); - assert dataRegion != null : "tableId=" + tableId + ", dataRegion=" + tableDescriptor.getDataRegion(); + assert storage != null : + String.format("RocksDB instance has not yet been created for [tableId=%d, region=%s]", tableDescriptor.getId(), regionName); - SharedRocksDbInstance sharedInstance = sharedInstances.computeIfAbsent(tableDescriptor.getDataRegion(), name -> { - try { - return new SharedRocksDbInstanceCreator().create( - this, - dataRegion, - storagePath.resolve("rocksdb-" + name) - ); - } catch (Exception e) { - throw new StorageException("Failed to create new RocksDB data region", e); - } - }); + var tableStorage = new RocksDbTableStorage(storage.rocksDbInstance, tableDescriptor, indexDescriptorSupplier); - var storage = new RocksDbTableStorage(sharedInstance, tableDescriptor, indexDescriptorSupplier); + tableStorage.start(); - storage.start(); - - return storage; + return tableStorage; } @Override - // TODO: IGNITE-21760 Implement public void dropMvTable(int tableId) { - throw new UnsupportedOperationException("https://issues.apache.org/jira/browse/IGNITE-21760"); + for (RocksDbStorage rocksDbStorage : storageByRegionName.values()) { + rocksDbStorage.rocksDbInstance.destroyTable(tableId); + } } } diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java index b0aa9de923..3f7cd6ac93 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java @@ -18,13 +18,8 @@ package org.apache.ignite.internal.storage.rocksdb; import static java.util.stream.Collectors.toList; -import static org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.PARTITION_CONF_PREFIX; -import static org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.PARTITION_META_PREFIX; -import static org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.createKey; import static org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance.DFLT_WRITE_OPTS; -import static org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance.deleteByPrefix; import static org.apache.ignite.internal.storage.util.StorageUtils.createMissingMvPartitionErrorMessage; -import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock; @@ -179,7 +174,7 @@ public class RocksDbTableStorage implements MvTableStorage { } if (destroy) { - destroyTableData(); + rocksDb.destroyTable(getTableId()); } }); } @@ -199,28 +194,6 @@ public class RocksDbTableStorage implements MvTableStorage { return stop(true); } - private void destroyTableData() { - try (WriteBatch writeBatch = new WriteBatch()) { - int tableId = getTableId(); - - byte[] tablePrefix = createKey(BYTE_EMPTY_ARRAY, tableId); - - deleteByPrefix(writeBatch, rocksDb.partitionCf, tablePrefix); - deleteByPrefix(writeBatch, rocksDb.gcQueueCf, tablePrefix); - - indexes.destroyAllIndexes(writeBatch); - - deleteByPrefix(writeBatch, rocksDb.meta.columnFamily(), createKey(PARTITION_META_PREFIX, tableId)); - deleteByPrefix(writeBatch, rocksDb.meta.columnFamily(), createKey(PARTITION_CONF_PREFIX, tableId)); - - rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch); - - indexes.scheduleAllIndexCfDestroy(); - } catch (RocksDBException e) { - throw new StorageException("Failed to destroy table data. [tableId={}]", e, getTableId()); - } - } - @Override public CompletableFuture<MvPartitionStorage> createMvPartition(int partitionId) throws StorageException { return inBusyLock(busyLock, () -> mvPartitionStorages.create(partitionId, partId -> { diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java index c54d2914e4..c683d971c3 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java @@ -30,12 +30,9 @@ import org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance class SortedIndex extends Index<RocksDbSortedIndexStorage> { private final StorageSortedIndexDescriptor descriptor; - private final SharedRocksDbInstance rocksDb; - private final RocksDbMetaStorage indexMetaStorage; private SortedIndex( - SharedRocksDbInstance rocksDb, int tableId, ColumnFamily indexCf, StorageSortedIndexDescriptor descriptor, @@ -43,7 +40,6 @@ class SortedIndex extends Index<RocksDbSortedIndexStorage> { ) { super(tableId, descriptor.id(), indexCf); - this.rocksDb = rocksDb; this.descriptor = descriptor; this.indexMetaStorage = indexMetaStorage; } @@ -56,29 +52,20 @@ class SortedIndex extends Index<RocksDbSortedIndexStorage> { ) { ColumnFamily indexCf = rocksDb.getOrCreateSortedIndexCf(sortedIndexCfName(descriptor.columns()), descriptor.id(), tableId); - return new SortedIndex(rocksDb, tableId, indexCf, descriptor, indexMetaStorage); + return new SortedIndex(tableId, indexCf, descriptor, indexMetaStorage); } static SortedIndex restoreExisting( - SharedRocksDbInstance rocksDb, int tableId, ColumnFamily indexCf, StorageSortedIndexDescriptor descriptor, RocksDbMetaStorage indexMetaStorage ) { - return new SortedIndex(rocksDb, tableId, indexCf, descriptor, indexMetaStorage); + return new SortedIndex(tableId, indexCf, descriptor, indexMetaStorage); } @Override RocksDbSortedIndexStorage createStorage(int partitionId) { - return new RocksDbSortedIndexStorage(descriptor, tableId, partitionId, indexCf, indexMetaStorage); - } - - /** - * Signals the shared RocksDB instance that this index has been destroyed and all shared resources (like the Column Family) can be - * de-allocated. - */ - void destroySortedIndexCfIfNeeded() { - rocksDb.destroySortedIndexCfIfNeeded(indexCf.nameBytes(), indexId); + return new RocksDbSortedIndexStorage(descriptor, tableId, partitionId, indexColumnFamily().columnFamily(), indexMetaStorage); } } diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java index 11943fde99..deff59acaa 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java @@ -61,6 +61,8 @@ public abstract class AbstractRocksDbIndexStorage implements IndexStorage { /** Common prefix for keys in all index storages, containing IDs of different entities. */ public static final int PREFIX_WITH_IDS_LENGTH = TABLE_ID_SIZE + INDEX_ID_SIZE + PARTITION_ID_SIZE; + private final int tableId; + protected final int indexId; protected final int partitionId; @@ -76,12 +78,13 @@ public abstract class AbstractRocksDbIndexStorage implements IndexStorage { /** Row ID for which the index needs to be built, {@code null} means that the index building has completed. */ private volatile @Nullable RowId nextRowIdToBuild; - AbstractRocksDbIndexStorage(int indexId, int partitionId, RocksDbMetaStorage indexMetaStorage) { + AbstractRocksDbIndexStorage(int tableId, int indexId, int partitionId, RocksDbMetaStorage indexMetaStorage) { + this.tableId = tableId; this.indexId = indexId; this.indexMetaStorage = indexMetaStorage; this.partitionId = partitionId; - nextRowIdToBuild = indexMetaStorage.getNextRowIdToBuild(indexId, partitionId); + nextRowIdToBuild = indexMetaStorage.getNextRowIdToBuild(tableId, indexId, partitionId); } @Override @@ -100,7 +103,7 @@ public abstract class AbstractRocksDbIndexStorage implements IndexStorage { WriteBatchWithIndex writeBatch = PartitionDataHelper.requireWriteBatch(); - indexMetaStorage.putNextRowIdToBuild(writeBatch, indexId, partitionId, rowId); + indexMetaStorage.putNextRowIdToBuild(writeBatch, tableId, indexId, partitionId, rowId); nextRowIdToBuild = rowId; @@ -229,7 +232,7 @@ public abstract class AbstractRocksDbIndexStorage implements IndexStorage { public final void destroyData(WriteBatch writeBatch) throws RocksDBException { clearIndex(writeBatch); - indexMetaStorage.removeNextRowIdToBuild(writeBatch, indexId, partitionId); + indexMetaStorage.removeNextRowIdToBuild(writeBatch, tableId, indexId, partitionId); nextRowIdToBuild = initialRowIdToBuild(partitionId); } diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java index cdcf971341..1e18c03d3d 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java @@ -82,7 +82,7 @@ public class RocksDbHashIndexStorage extends AbstractRocksDbIndexStorage impleme ColumnFamily indexCf, RocksDbMetaStorage indexMetaStorage ) { - super(descriptor.id(), partitionId, indexMetaStorage); + super(tableId, descriptor.id(), partitionId, indexMetaStorage); this.descriptor = descriptor; this.indexCf = indexCf; diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java index 271702816f..b0b4c7316d 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java @@ -82,7 +82,7 @@ public class RocksDbSortedIndexStorage extends AbstractRocksDbIndexStorage imple ColumnFamily indexCf, RocksDbMetaStorage indexMetaStorage ) { - super(descriptor.id(), partitionId, indexMetaStorage); + super(tableId, descriptor.id(), partitionId, indexMetaStorage); this.descriptor = descriptor; this.indexCf = indexCf; diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/IndexColumnFamily.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/IndexColumnFamily.java new file mode 100644 index 0000000000..c7c0c85256 --- /dev/null +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/IndexColumnFamily.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.storage.rocksdb.instance; + +import org.apache.ignite.internal.rocksdb.ColumnFamily; + +/** A container class for an index ID and its Column Family instance. */ +public class IndexColumnFamily { + private final int indexId; + + private final ColumnFamily columnFamily; + + public IndexColumnFamily(int indexId, ColumnFamily columnFamily) { + this.indexId = indexId; + this.columnFamily = columnFamily; + } + + public int indexId() { + return indexId; + } + + public ColumnFamily columnFamily() { + return columnFamily; + } +} diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java index 0132462a1c..552646b01c 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java @@ -20,9 +20,14 @@ package org.apache.ignite.internal.storage.rocksdb.instance; import static java.util.stream.Collectors.toList; import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix; import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.toStringName; +import static org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.INDEX_ROW_ID_PREFIX; +import static org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.PARTITION_CONF_PREFIX; +import static org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.PARTITION_META_PREFIX; +import static org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.KEY_BYTE_ORDER; import static org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstanceCreator.sortedIndexCfOptions; import static org.apache.ignite.internal.util.ByteUtils.intToBytes; +import java.nio.ByteBuffer; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; @@ -30,6 +35,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -45,7 +51,6 @@ import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine; import org.apache.ignite.internal.util.IgniteSpinBusyLock; import org.apache.ignite.internal.util.IgniteUtils; import org.rocksdb.ColumnFamilyDescriptor; -import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; @@ -241,13 +246,13 @@ public final class SharedRocksDbInstance { /** * Returns an "index ID - Column Family" mapping for all sorted indexes that currently exist in the storage. */ - public Map<Integer, ColumnFamily> sortedIndexes(int targetTableId) { - var result = new HashMap<Integer, ColumnFamily>(); + public List<IndexColumnFamily> sortedIndexes(int targetTableId) { + var result = new ArrayList<IndexColumnFamily>(); for (SortedIndexColumnFamily indexCf : sortedIndexCfsByName.values()) { indexCf.indexIdToTableId.forEach((indexId, tableId) -> { if (tableId == targetTableId) { - result.put(indexId, indexCf.columnFamily); + result.add(new IndexColumnFamily(indexId, indexCf.columnFamily)); } }); } @@ -282,16 +287,25 @@ public final class SharedRocksDbInstance { } /** - * Possibly drops the column family after destroying the index. + * Schedules a drop of a column family after destroying an index, if it was the last index managed by that CF. */ - public void destroySortedIndexCfIfNeeded(byte[] cfName, int indexId) { + public CompletableFuture<Void> scheduleIndexCfsDestroy(List<IndexColumnFamily> indexColumnFamilies) { + assert !indexColumnFamilies.isEmpty(); + + return flusher.awaitFlush(false) + .thenRunAsync(() -> indexColumnFamilies.forEach(this::destroySortedIndexCfIfNeeded), engine.threadPool()); + } + + void destroySortedIndexCfIfNeeded(IndexColumnFamily indexColumnFamily) { if (!busyLock.enterBusy()) { throw new StorageClosedException(); } + var cfNameBytes = new ByteArray(indexColumnFamily.columnFamily().nameBytes()); + try { - sortedIndexCfsByName.computeIfPresent(new ByteArray(cfName), (unused, indexCf) -> { - indexCf.indexIdToTableId.remove(indexId); + sortedIndexCfsByName.computeIfPresent(cfNameBytes, (unused, indexCf) -> { + indexCf.indexIdToTableId.remove(indexColumnFamily.indexId()); if (!indexCf.indexIdToTableId.isEmpty()) { return indexCf; @@ -306,6 +320,48 @@ public final class SharedRocksDbInstance { } } + /** + * Removes all data associated with the given table ID in this storage. + */ + public void destroyTable(int tableId) { + try (WriteBatch writeBatch = new WriteBatch()) { + byte[] tableIdBytes = ByteBuffer.allocate(Integer.BYTES) + .order(KEY_BYTE_ORDER) + .putInt(tableId) + .array(); + + deleteByPrefix(writeBatch, partitionCf, tableIdBytes); + deleteByPrefix(writeBatch, gcQueueCf, tableIdBytes); + deleteByPrefix(writeBatch, hashIndexCf, tableIdBytes); + + List<IndexColumnFamily> sortedIndexCfs = sortedIndexes(tableId); + + for (IndexColumnFamily indexColumnFamily : sortedIndexCfs) { + deleteByPrefix(writeBatch, indexColumnFamily.columnFamily(), tableIdBytes); + } + + deleteByPrefix(writeBatch, meta.columnFamily(), metaPrefix(PARTITION_META_PREFIX, tableIdBytes)); + deleteByPrefix(writeBatch, meta.columnFamily(), metaPrefix(PARTITION_CONF_PREFIX, tableIdBytes)); + deleteByPrefix(writeBatch, meta.columnFamily(), metaPrefix(INDEX_ROW_ID_PREFIX, tableIdBytes)); + + db.write(DFLT_WRITE_OPTS, writeBatch); + + if (!sortedIndexCfs.isEmpty()) { + scheduleIndexCfsDestroy(sortedIndexCfs); + } + } catch (RocksDBException e) { + throw new StorageException("Failed to destroy table data. [tableId={}]", e, tableId); + } + } + + private static byte[] metaPrefix(byte[] metaPrefix, byte[] tableIdBytes) { + return ByteBuffer.allocate(metaPrefix.length + tableIdBytes.length) + .order(KEY_BYTE_ORDER) + .put(metaPrefix) + .put(tableIdBytes) + .array(); + } + private ColumnFamily createSortedIndexCf(byte[] cfName) { ColumnFamilyDescriptor cfDescriptor = new ColumnFamilyDescriptor(cfName, sortedIndexCfOptions(cfName)); @@ -313,7 +369,7 @@ public final class SharedRocksDbInstance { try { columnFamily = ColumnFamily.create(db, cfDescriptor); } catch (RocksDBException e) { - throw new StorageException("Failed to create new RocksDB column family: " + toStringName(cfDescriptor.getName()), e); + throw new StorageException("Failed to create new RocksDB column family: " + toStringName(cfName), e); } flusher.addColumnFamily(columnFamily.handle()); @@ -322,9 +378,7 @@ public final class SharedRocksDbInstance { } private void destroyColumnFamily(ColumnFamily columnFamily) { - ColumnFamilyHandle columnFamilyHandle = columnFamily.handle(); - - flusher.removeColumnFamily(columnFamilyHandle); + flusher.removeColumnFamily(columnFamily.handle()); try { columnFamily.destroy(); diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/engine/RocksDbStorageEngineTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/engine/RocksDbStorageEngineTest.java index c376e6d326..f908c5d121 100644 --- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/engine/RocksDbStorageEngineTest.java +++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/engine/RocksDbStorageEngineTest.java @@ -25,7 +25,6 @@ import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine; import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration; import org.apache.ignite.internal.testframework.WorkDirectory; import org.apache.ignite.internal.testframework.WorkDirectoryExtension; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.extension.ExtendWith; /** @@ -47,10 +46,4 @@ public class RocksDbStorageEngineTest extends AbstractStorageEngineTest { workDir ); } - - @Override - @Disabled("https://issues.apache.org/jira/browse/IGNITE-21760") - protected void testDropMvTableOnRecovery() throws Exception { - super.testDropMvTableOnRecovery(); - } } diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceTest.java index e0c85f22ec..fcd8b23454 100644 --- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceTest.java +++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceTest.java @@ -17,15 +17,14 @@ package org.apache.ignite.internal.storage.rocksdb.instance; +import static java.util.stream.Collectors.toList; import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.sortedIndexCfName; import static org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.KEY_BYTE_ORDER; import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.allOf; -import static org.hamcrest.Matchers.anEmptyMap; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.sameInstance; @@ -34,7 +33,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.nio.ByteBuffer; import java.util.List; -import java.util.Map; import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.rocksdb.ColumnFamily; @@ -68,7 +66,7 @@ class SharedRocksDbInstanceTest extends IgniteAbstractTest { engine.start(); - dataRegion = new RocksDbDataRegion(engineConfig.defaultRegion()); + dataRegion = new RocksDbDataRegion(engineConfig.defaultRegion().value()); dataRegion.start(); @@ -118,19 +116,19 @@ class SharedRocksDbInstanceTest extends IgniteAbstractTest { assertThat(foo, is(not(sameInstance(bar)))); assertThat(quux, is((sameInstance(baz)))); - rocksDb.destroySortedIndexCfIfNeeded(fooName, 1); + rocksDb.destroySortedIndexCfIfNeeded(new IndexColumnFamily(1, foo)); assertTrue(cfExists(fooName)); - rocksDb.destroySortedIndexCfIfNeeded(barName, 2); + rocksDb.destroySortedIndexCfIfNeeded(new IndexColumnFamily(2, bar)); assertFalse(cfExists(barName)); - rocksDb.destroySortedIndexCfIfNeeded(bazName, 3); + rocksDb.destroySortedIndexCfIfNeeded(new IndexColumnFamily(3, baz)); assertTrue(cfExists(fooName)); - rocksDb.destroySortedIndexCfIfNeeded(quuxName, 4); + rocksDb.destroySortedIndexCfIfNeeded(new IndexColumnFamily(4, quux)); assertFalse(cfExists(fooName)); } @@ -161,8 +159,25 @@ class SharedRocksDbInstanceTest extends IgniteAbstractTest { // Same index CF, different table. ColumnFamily quux = rocksDb.getOrCreateSortedIndexCf(quuxName, 4, 1); - assertThat(rocksDb.sortedIndexes(0), is(Map.of(1, foo, 2, bar, 3, baz))); - assertThat(rocksDb.sortedIndexes(1), is(Map.of(4, quux))); + assertThat( + rocksDb.sortedIndexes(0).stream().map(IndexColumnFamily::indexId).collect(toList()), + containsInAnyOrder(1, 2, 3) + ); + + assertThat( + rocksDb.sortedIndexes(0).stream().map(IndexColumnFamily::columnFamily).collect(toList()), + containsInAnyOrder(foo, bar, baz) + ); + + assertThat( + rocksDb.sortedIndexes(1).stream().map(IndexColumnFamily::indexId).collect(toList()), + containsInAnyOrder(4) + ); + + assertThat( + rocksDb.sortedIndexes(1).stream().map(IndexColumnFamily::columnFamily).collect(toList()), + containsInAnyOrder(quux) + ); // Put some data in the CF. We then check that the non-empty CF is restored upon DB restart but the empty one is dropped. byte[] key = ByteBuffer.allocate(Integer.BYTES * 2) @@ -177,8 +192,8 @@ class SharedRocksDbInstanceTest extends IgniteAbstractTest { rocksDb = createDb(); - assertThat(rocksDb.sortedIndexes(0), allOf(hasKey(1), not(hasKey(2)), not(hasKey(3)))); - assertThat(rocksDb.sortedIndexes(1), is(anEmptyMap())); + assertThat(rocksDb.sortedIndexes(0).stream().map(IndexColumnFamily::indexId).collect(toList()), contains(1)); + assertThat(rocksDb.sortedIndexes(1), is(empty())); assertTrue(cfExists(fooName)); assertFalse(cfExists(barName));