This is an automated email from the ASF dual-hosted git repository. apolovtsev pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 399f0f15da IGNITE-21680 Remove destroyed RocksDB indexes on recovery (#3435) 399f0f15da is described below commit 399f0f15daaebf1099589d734c85f2763f0344a2 Author: Alexander Polovtcev <alex.polovt...@gmail.com> AuthorDate: Wed Mar 20 09:44:54 2024 +0200 IGNITE-21680 Remove destroyed RocksDB indexes on recovery (#3435) --- .../apache/ignite/internal/rocksdb/RocksUtils.java | 3 +- .../internal/rocksdb/flush/RocksDbFlusher.java | 2 +- .../storage/AbstractMvTableStorageTest.java | 126 +++++++++++- .../storage/rocksdb/ColumnFamilyUtils.java | 2 +- .../ignite/internal/storage/rocksdb/HashIndex.java | 25 +-- .../ignite/internal/storage/rocksdb/Index.java | 8 +- .../internal/storage/rocksdb/RocksDbIndexes.java | 229 +++++++++++++++++++++ .../storage/rocksdb/RocksDbMetaStorage.java | 17 +- .../storage/rocksdb/RocksDbMvPartitionStorage.java | 3 +- .../storage/rocksdb/RocksDbStorageEngine.java | 6 +- .../storage/rocksdb/RocksDbStorageUtils.java | 15 ++ .../storage/rocksdb/RocksDbTableStorage.java | 179 +++++----------- .../internal/storage/rocksdb/SortedIndex.java | 53 +++-- .../index/RocksDbBinaryTupleComparator.java | 28 +-- .../rocksdb/index/RocksDbSortedIndexStorage.java | 1 + .../storage/rocksdb/instance/IndexIdCursor.java | 116 +++++++++++ .../rocksdb/instance/SharedRocksDbInstance.java | 202 ++++++++++++------ .../instance/SharedRocksDbInstanceCreator.java | 8 +- .../storage/rocksdb/RocksDbMvTableStorageTest.java | 8 - .../rocksdb/instance/IndexIdCursorTest.java | 93 +++++++++ .../instance/SharedRocksDbInstanceTest.java | 167 +++++++++++++++ 21 files changed, 1016 insertions(+), 275 deletions(-) diff --git a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksUtils.java b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksUtils.java index e95b8f8132..e06b5941e6 100644 --- a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksUtils.java +++ b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksUtils.java @@ -47,8 +47,7 @@ public class RocksUtils { /** * Checks the status of the iterator and throws an exception if it is not correct. * - * <p>Check the status first. This operation is guaranteed to throw if an internal error has occurred during the iteration. Otherwise, - * we've exhausted the data range. + * <p>This operation is guaranteed to throw if an internal error has occurred during the iteration. * * @param it RocksDB iterator. * @throws IgniteInternalException if the iterator has an incorrect status. diff --git a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java index 043fe36550..62528bb05d 100644 --- a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java +++ b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java @@ -123,7 +123,7 @@ public class RocksDbFlusher { /** * Returns an instance of {@link AbstractEventListener} to process actual RocksDB events. Returned listener must be set into - * {@link Options#setListeners(List)} before database is started. Otherwise, no events would occurre. + * {@link Options#setListeners(List)} before database is started. Otherwise, no events would occur. */ public AbstractEventListener listener() { return new RocksDbFlushListener(this); diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java index 21930352c7..24806134f0 100644 --- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java +++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java @@ -314,6 +314,113 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest { assertThat(tableStorage.getIndex(PARTITION_ID, hashIdx.id()), is(nullValue())); } + /** + * Tests that removing one Sorted Index does not affect the data in the other. + */ + @Test + public void testDestroySortedIndexIndependence() { + CatalogTableDescriptor catalogTableDescriptor = catalogService.table(TABLE_NAME, clock.nowLong()); + + var catalogSortedIndex1 = new CatalogSortedIndexDescriptor( + 200, + "TEST_INDEX_1", + catalogTableDescriptor.id(), + false, + AVAILABLE, + catalogService.latestCatalogVersion(), + List.of(new CatalogIndexColumnDescriptor("STRKEY", ASC_NULLS_LAST)) + ); + + var catalogSortedIndex2 = new CatalogSortedIndexDescriptor( + 201, + "TEST_INDEX_2", + catalogTableDescriptor.id(), + false, + AVAILABLE, + catalogService.latestCatalogVersion(), + List.of(new CatalogIndexColumnDescriptor("STRKEY", ASC_NULLS_LAST)) + ); + + var sortedIndexDescriptor1 = new StorageSortedIndexDescriptor(catalogTableDescriptor, catalogSortedIndex1); + var sortedIndexDescriptor2 = new StorageSortedIndexDescriptor(catalogTableDescriptor, catalogSortedIndex2); + + MvPartitionStorage partitionStorage = getOrCreateMvPartition(PARTITION_ID); + + SortedIndexStorage sortedIndexStorage1 = tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIndexDescriptor1); + SortedIndexStorage sortedIndexStorage2 = tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIndexDescriptor2); + + List<TestRow> rows = List.of( + new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(0, "0"), new TestValue(0, "0"))), + new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(1, "1"), new TestValue(1, "1"))) + ); + + fillStorages(partitionStorage, null, sortedIndexStorage1, rows); + fillStorages(partitionStorage, null, sortedIndexStorage2, rows); + + checkForPresenceRows(null, null, sortedIndexStorage1, rows); + checkForPresenceRows(null, null, sortedIndexStorage2, rows); + + assertThat(tableStorage.destroyIndex(sortedIndexDescriptor1.id()), willCompleteSuccessfully()); + + assertThat(tableStorage.getIndex(PARTITION_ID, sortedIndexDescriptor1.id()), is(nullValue())); + + checkForPresenceRows(null, null, sortedIndexStorage2, rows); + } + + /** + * Tests that removing one Hash Index does not affect the data in the other. + */ + @Test + public void testDestroyHashIndexIndependence() { + CatalogTableDescriptor catalogTableDescriptor = catalogService.table(TABLE_NAME, clock.nowLong()); + + var catalogHashIndex1 = new CatalogHashIndexDescriptor( + 200, + "TEST_INDEX_1", + catalogTableDescriptor.id(), + true, + AVAILABLE, + catalogService.latestCatalogVersion(), + List.of("STRKEY") + ); + + var catalogHashIndex2 = new CatalogHashIndexDescriptor( + 201, + "TEST_INDEX_2", + catalogTableDescriptor.id(), + true, + AVAILABLE, + catalogService.latestCatalogVersion(), + List.of("STRKEY") + ); + + var hashIndexDescriptor1 = new StorageHashIndexDescriptor(catalogTableDescriptor, catalogHashIndex1); + var hashIndexDescriptor2 = new StorageHashIndexDescriptor(catalogTableDescriptor, catalogHashIndex2); + + MvPartitionStorage partitionStorage = getOrCreateMvPartition(PARTITION_ID); + + HashIndexStorage hashIndexStorage1 = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIndexDescriptor1); + HashIndexStorage hashIndexStorage2 = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIndexDescriptor2); + + List<TestRow> rows = List.of( + new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(0, "0"), new TestValue(0, "0"))), + new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(1, "1"), new TestValue(1, "1"))) + ); + + fillStorages(partitionStorage, hashIndexStorage1, null, rows); + fillStorages(partitionStorage, hashIndexStorage2, null, rows); + + checkForPresenceRows(null, hashIndexStorage1, null, rows); + checkForPresenceRows(null, hashIndexStorage2, null, rows); + + assertThat(tableStorage.destroyIndex(hashIndexDescriptor1.id()), willCompleteSuccessfully()); + + assertThat(tableStorage.getIndex(PARTITION_ID, hashIndexDescriptor1.id()), is(nullValue())); + + checkForPresenceRows(null, hashIndexStorage2, null, rows); + } + + @Test public void testHashIndexIndependence() { MvPartitionStorage partitionStorage1 = getOrCreateMvPartition(PARTITION_ID); @@ -859,8 +966,8 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest { private static void fillStorages( MvPartitionStorage mvPartitionStorage, - HashIndexStorage hashIndexStorage, - SortedIndexStorage sortedIndexStorage, + @Nullable HashIndexStorage hashIndexStorage, + @Nullable SortedIndexStorage sortedIndexStorage, List<TestRow> rows ) { assertThat(rows, hasSize(greaterThanOrEqualTo(2))); @@ -878,9 +985,6 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest { assertNotNull(binaryRow); assertNotNull(timestamp); - IndexRow hashIndexRow = indexRow(hashIndexStorage.indexDescriptor(), binaryRow, rowId); - IndexRow sortedIndexRow = indexRow(sortedIndexStorage.indexDescriptor(), binaryRow, rowId); - mvPartitionStorage.runConsistently(locker -> { locker.lock(rowId); @@ -892,9 +996,17 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest { mvPartitionStorage.addWriteCommitted(rowId, binaryRow, timestamp); } - hashIndexStorage.put(hashIndexRow); + if (hashIndexStorage != null) { + IndexRow hashIndexRow = indexRow(hashIndexStorage.indexDescriptor(), binaryRow, rowId); - sortedIndexStorage.put(sortedIndexRow); + hashIndexStorage.put(hashIndexRow); + } + + if (sortedIndexStorage != null) { + IndexRow sortedIndexRow = indexRow(sortedIndexStorage.indexDescriptor(), binaryRow, rowId); + + sortedIndexStorage.put(sortedIndexRow); + } return null; }); diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java index 339e05bc86..1cd093415d 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java @@ -110,7 +110,7 @@ public class ColumnFamilyUtils { * * @see #comparatorFromCfName(byte[]) */ - static byte[] sortedIndexCfName(List<StorageSortedIndexColumnDescriptor> columns) { + public static byte[] sortedIndexCfName(List<StorageSortedIndexColumnDescriptor> columns) { ByteBuffer buf = ByteBuffer.allocate(SORTED_INDEX_CF_PREFIX.length() + columns.size() * 2); buf.put(SORTED_INDEX_CF_PREFIX.getBytes(UTF_8)); diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java index 7f489d6e2f..a5f3bcce9e 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java @@ -17,7 +17,7 @@ package org.apache.ignite.internal.storage.rocksdb; -import static org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.createKey; +import static org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.createKey; import static org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance.deleteByPrefix; import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY; @@ -25,6 +25,7 @@ import org.apache.ignite.internal.rocksdb.ColumnFamily; import org.apache.ignite.internal.storage.index.HashIndexStorage; import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor; import org.apache.ignite.internal.storage.rocksdb.index.RocksDbHashIndexStorage; +import org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance; import org.jetbrains.annotations.Nullable; import org.rocksdb.RocksDBException; import org.rocksdb.WriteBatch; @@ -39,14 +40,23 @@ class HashIndex extends Index<RocksDbHashIndexStorage> { private final RocksDbMetaStorage indexMetaStorage; - HashIndex(ColumnFamily indexCf, StorageHashIndexDescriptor descriptor, RocksDbMetaStorage indexMetaStorage) { + HashIndex(SharedRocksDbInstance rocksDb, StorageHashIndexDescriptor descriptor, RocksDbMetaStorage indexMetaStorage) { super(descriptor.id()); - this.indexCf = indexCf; + this.indexCf = rocksDb.hashIndexCf(); this.descriptor = descriptor; this.indexMetaStorage = indexMetaStorage; } + /** + * Returns hash index storage for partition. + * + * @param partitionId Partition ID. + */ + @Nullable RocksDbHashIndexStorage getStorage(int partitionId) { + return storageByPartitionId.get(partitionId); + } + /** * Creates a new Hash Index storage or returns an existing one. */ @@ -66,13 +76,4 @@ class HashIndex extends Index<RocksDbHashIndexStorage> { // Every index storage uses an "index ID" + "partition ID" prefix. We can remove everything by just using the index ID prefix. deleteByPrefix(writeBatch, indexCf, createKey(BYTE_EMPTY_ARRAY, descriptor.id())); } - - /** - * Returns hash index storage for partition. - * - * @param partitionId Partition ID. - */ - @Nullable RocksDbHashIndexStorage get(int partitionId) { - return storageByPartitionId.get(partitionId); - } } diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/Index.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/Index.java index 2640359a91..30f49dbcb7 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/Index.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/Index.java @@ -67,12 +67,12 @@ abstract class Index<S extends AbstractRocksDbIndexStorage> { * @throws RocksDBException If failed to delete data. */ void destroy(int partitionId, WriteBatch writeBatch) throws RocksDBException { - S hashIndex = storageByPartitionId.remove(partitionId); + S storage = storageByPartitionId.remove(partitionId); - if (hashIndex != null) { - hashIndex.transitionToDestroyedState(); + if (storage != null) { + storage.transitionToDestroyedState(); - hashIndex.destroyData(writeBatch); + storage.destroyData(writeBatch); } } } diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbIndexes.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbIndexes.java new file mode 100644 index 0000000000..6fef05303a --- /dev/null +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbIndexes.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.storage.rocksdb; + +import static java.util.stream.Collectors.toList; +import static org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.createKey; +import static org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance.DFLT_WRITE_OPTS; +import static org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance.deleteByPrefix; +import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.IntFunction; +import java.util.stream.Stream; +import org.apache.ignite.internal.rocksdb.ColumnFamily; +import org.apache.ignite.internal.storage.index.HashIndexStorage; +import org.apache.ignite.internal.storage.index.IndexStorage; +import org.apache.ignite.internal.storage.index.SortedIndexStorage; +import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor; +import org.apache.ignite.internal.storage.index.StorageIndexDescriptorSupplier; +import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor; +import org.apache.ignite.internal.storage.rocksdb.index.AbstractRocksDbIndexStorage; +import org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance; +import org.jetbrains.annotations.Nullable; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; + +/** Manager for all RocksDB-based indexes. */ +class RocksDbIndexes { + /** Hash Index storages by Index IDs. */ + private final ConcurrentMap<Integer, HashIndex> hashIndices = new ConcurrentHashMap<>(); + + /** Sorted Index storages by Index IDs. */ + private final ConcurrentMap<Integer, SortedIndex> sortedIndices = new ConcurrentHashMap<>(); + + private final SharedRocksDbInstance rocksDb; + + /** Callback for getting partition storages using partition IDs. */ + private final IntFunction<RocksDbMvPartitionStorage> partitionStorageProvider; + + RocksDbIndexes(SharedRocksDbInstance rocksDb, IntFunction<RocksDbMvPartitionStorage> partitionStorageProvider) { + this.rocksDb = rocksDb; + this.partitionStorageProvider = partitionStorageProvider; + } + + void recoverIndexes(StorageIndexDescriptorSupplier indexDescriptorSupplier) throws RocksDBException { + try (WriteBatch writeBatch = new WriteBatch()) { + for (int indexId : rocksDb.hashIndexIds()) { + var descriptor = (StorageHashIndexDescriptor) indexDescriptorSupplier.get(indexId); + + if (descriptor == null) { + deleteByPrefix(writeBatch, rocksDb.hashIndexCf(), createKey(BYTE_EMPTY_ARRAY, indexId)); + } else { + hashIndices.put(indexId, new HashIndex(rocksDb, descriptor, rocksDb.meta)); + } + } + + var indexCfsToDestroy = new ArrayList<Map.Entry<Integer, ColumnFamily>>(); + + for (Map.Entry<Integer, ColumnFamily> e : rocksDb.sortedIndexes().entrySet()) { + int indexId = e.getKey(); + + ColumnFamily indexCf = e.getValue(); + + var descriptor = (StorageSortedIndexDescriptor) indexDescriptorSupplier.get(indexId); + + if (descriptor == null) { + deleteByPrefix(writeBatch, indexCf, createKey(BYTE_EMPTY_ARRAY, indexId)); + + indexCfsToDestroy.add(e); + } else { + sortedIndices.put(indexId, SortedIndex.restoreExisting(rocksDb, indexCf, descriptor, rocksDb.meta)); + } + } + + rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch); + + if (!indexCfsToDestroy.isEmpty()) { + rocksDb.flusher.awaitFlush(false) + .thenRunAsync(() -> { + for (Map.Entry<Integer, ColumnFamily> e : indexCfsToDestroy) { + int indexId = e.getKey(); + + ColumnFamily indexCf = e.getValue(); + + rocksDb.destroySortedIndexCfIfNeeded(indexCf.nameBytes(), indexId); + } + }, rocksDb.engine.threadPool()); + } + } + } + + SortedIndexStorage getOrCreateSortedIndex(int partitionId, StorageSortedIndexDescriptor indexDescriptor) { + SortedIndex sortedIndex = sortedIndices.computeIfAbsent( + indexDescriptor.id(), + id -> SortedIndex.createNew(rocksDb, indexDescriptor, rocksDb.meta) + ); + + return sortedIndex.getOrCreateStorage(partitionStorageProvider.apply(partitionId)); + } + + HashIndexStorage getOrCreateHashIndex(int partitionId, StorageHashIndexDescriptor indexDescriptor) { + HashIndex hashIndex = hashIndices.computeIfAbsent( + indexDescriptor.id(), + id -> new HashIndex(rocksDb, indexDescriptor, rocksDb.meta) + ); + + return hashIndex.getOrCreateStorage(partitionStorageProvider.apply(partitionId)); + } + + @Nullable IndexStorage getIndex(int partitionId, int indexId) { + HashIndex hashIndex = hashIndices.get(indexId); + + if (hashIndex != null) { + assert !sortedIndices.containsKey(indexId) : indexId; + + return hashIndex.getStorage(partitionId); + } + + SortedIndex sortedIndex = sortedIndices.get(indexId); + + if (sortedIndex != null) { + return sortedIndex.getStorage(partitionId); + } + + return null; + } + + void startRebalance(int partitionId, WriteBatch writeBatch) { + getAllStorages(partitionId).forEach(indexStorage -> indexStorage.startRebalance(writeBatch)); + } + + void abortRebalance(int partitionId, WriteBatch writeBatch) { + getAllStorages(partitionId).forEach(indexStorage -> indexStorage.abortRebalance(writeBatch)); + } + + void finishRebalance(int partitionId) { + getAllStorages(partitionId).forEach(AbstractRocksDbIndexStorage::finishRebalance); + } + + List<AutoCloseable> getResourcesForClose() { + return allIndexes().map(index -> (AutoCloseable) index::close).collect(toList()); + } + + List<AutoCloseable> getResourcesForDestroy() { + return allIndexes().map(index -> (AutoCloseable) index::transitionToDestroyedState).collect(toList()); + } + + private Stream<Index<?>> allIndexes() { + return Stream.concat(hashIndices.values().stream(), sortedIndices.values().stream()); + } + + void destroyIndex(int indexId) throws RocksDBException { + HashIndex hashIdx = hashIndices.remove(indexId); + + SortedIndex sortedIdx = sortedIndices.remove(indexId); + + if (hashIdx == null && sortedIdx == null) { + return; + } + + try (WriteBatch writeBatch = new WriteBatch()) { + if (hashIdx != null) { + hashIdx.destroy(writeBatch); + } + + if (sortedIdx != null) { + sortedIdx.destroy(writeBatch); + } + + rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch); + } + + if (sortedIdx != null) { + rocksDb.flusher.awaitFlush(false) + .thenRunAsync(sortedIdx::destroySortedIndexCfIfNeeded, rocksDb.engine.threadPool()); + } + } + + void destroyAllIndexesForPartition(int partitionId, WriteBatch writeBatch) throws RocksDBException { + for (HashIndex hashIndex : hashIndices.values()) { + hashIndex.destroy(partitionId, writeBatch); + } + + for (SortedIndex sortedIndex : sortedIndices.values()) { + sortedIndex.destroy(partitionId, writeBatch); + } + } + + void destroyAllIndexes(WriteBatch writeBatch) throws RocksDBException { + for (HashIndex hashIndex : hashIndices.values()) { + hashIndex.destroy(writeBatch); + } + + for (SortedIndex sortedIndex : sortedIndices.values()) { + sortedIndex.destroy(writeBatch); + } + } + + void scheduleAllIndexCfDestroy() { + rocksDb.flusher.awaitFlush(false) + .thenRunAsync(() -> sortedIndices.values().forEach(SortedIndex::destroySortedIndexCfIfNeeded), rocksDb.engine.threadPool()); + } + + Stream<AbstractRocksDbIndexStorage> getAllStorages(int partitionId) { + return Stream.concat( + hashIndices.values().stream().map(index -> index.getStorage(partitionId)), + sortedIndices.values().stream().map(index -> index.getStorage(partitionId)) + ); + } +} diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java index edfb1d4e62..6511fd0275 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java @@ -17,9 +17,9 @@ package org.apache.ignite.internal.storage.rocksdb; -import static java.nio.ByteOrder.BIG_ENDIAN; import static org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.KEY_BYTE_ORDER; import static org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.ROW_ID_SIZE; +import static org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.createKey; import static org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.getRowIdUuid; import static org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.putRowIdUuid; import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY; @@ -59,21 +59,6 @@ public class RocksDbMetaStorage { this.metaColumnFamily = metaColumnFamily; } - /** - * Creates a byte array, that uses the {@code prefix} as a prefix, and every other {@code int} values as a 4-bytes chunk in Big Endian. - */ - public static byte[] createKey(byte[] prefix, int... values) { - ByteBuffer buf = ByteBuffer.allocate(prefix.length + Integer.BYTES * values.length).order(BIG_ENDIAN); - - buf.put(prefix); - - for (int value : values) { - buf.putInt(value); - } - - return buf.array(); - } - /** * Returns a column family instance, associated with the meta storage. */ diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java index 734ea4b9a0..162046ebbd 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java @@ -34,8 +34,8 @@ import static org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.put import static org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.readTimestampDesc; import static org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.PARTITION_CONF_PREFIX; import static org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.PARTITION_META_PREFIX; -import static org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.createKey; import static org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.KEY_BYTE_ORDER; +import static org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.createKey; import static org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.normalize; import static org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance.DFLT_WRITE_OPTS; import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageState; @@ -877,7 +877,6 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { }); } - // TODO: IGNITE-16914 Play with prefix settings and benchmark results. @Override public PartitionTimestampCursor scan(HybridTimestamp timestamp) throws StorageException { Objects.requireNonNull(timestamp, "timestamp is null"); diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java index db16777194..6e4dd1110b 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java @@ -195,6 +195,10 @@ public class RocksDbStorageEngine implements StorageEngine { } }); - return new RocksDbTableStorage(sharedInstance, tableDescriptor); + var storage = new RocksDbTableStorage(sharedInstance, tableDescriptor, indexDescriptorSupplier); + + storage.start(); + + return storage; } } diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageUtils.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageUtils.java index 2476459c96..44030ceab2 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageUtils.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageUtils.java @@ -68,4 +68,19 @@ public class RocksDbStorageUtils { static long normalize(long value) { return value ^ (1L << 63); } + + /** + * Creates a byte array, that uses the {@code prefix} as a prefix, and every other {@code int} values as a 4-bytes chunk in Big Endian. + */ + public static byte[] createKey(byte[] prefix, int... values) { + ByteBuffer buf = ByteBuffer.allocate(prefix.length + Integer.BYTES * values.length).order(KEY_BYTE_ORDER); + + buf.put(prefix); + + for (int value : values) { + buf.putInt(value); + } + + return buf.array(); + } } diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java index 6ae2185420..30c5ac411a 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java @@ -20,7 +20,7 @@ package org.apache.ignite.internal.storage.rocksdb; import static java.util.stream.Collectors.toList; import static org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.PARTITION_CONF_PREFIX; import static org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.PARTITION_META_PREFIX; -import static org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.createKey; +import static org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.createKey; import static org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance.DFLT_WRITE_OPTS; import static org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance.deleteByPrefix; import static org.apache.ignite.internal.storage.util.StorageUtils.createMissingMvPartitionErrorMessage; @@ -30,15 +30,11 @@ import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock; import java.util.ArrayList; import java.util.List; -import java.util.Objects; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Stream; import org.apache.ignite.internal.storage.MvPartitionStorage; import org.apache.ignite.internal.storage.StorageException; import org.apache.ignite.internal.storage.StorageRebalanceException; @@ -48,9 +44,9 @@ import org.apache.ignite.internal.storage.index.HashIndexStorage; import org.apache.ignite.internal.storage.index.IndexStorage; import org.apache.ignite.internal.storage.index.SortedIndexStorage; import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor; +import org.apache.ignite.internal.storage.index.StorageIndexDescriptorSupplier; import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor; -import org.apache.ignite.internal.storage.rocksdb.index.RocksDbHashIndexStorage; -import org.apache.ignite.internal.storage.rocksdb.index.RocksDbSortedIndexStorage; +import org.apache.ignite.internal.storage.rocksdb.index.AbstractRocksDbIndexStorage; import org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance; import org.apache.ignite.internal.storage.util.MvPartitionStorages; import org.apache.ignite.internal.util.IgniteSpinBusyLock; @@ -71,11 +67,7 @@ public class RocksDbTableStorage implements MvTableStorage { /** Partition storages. */ private final MvPartitionStorages<RocksDbMvPartitionStorage> mvPartitionStorages; - /** Hash Index storages by Index IDs. */ - private final ConcurrentMap<Integer, HashIndex> hashIndices = new ConcurrentHashMap<>(); - - /** Sorted Index storages by Index IDs. */ - private final ConcurrentMap<Integer, SortedIndex> sortedIndices = new ConcurrentHashMap<>(); + private final RocksDbIndexes indexes; /** Busy lock to stop synchronously. */ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock(); @@ -86,6 +78,8 @@ public class RocksDbTableStorage implements MvTableStorage { /** Table descriptor. */ private final StorageTableDescriptor tableDescriptor; + private final StorageIndexDescriptorSupplier indexDescriptorSupplier; + /** * Constructor. * @@ -94,11 +88,22 @@ public class RocksDbTableStorage implements MvTableStorage { */ RocksDbTableStorage( SharedRocksDbInstance rocksDb, - StorageTableDescriptor tableDescriptor + StorageTableDescriptor tableDescriptor, + StorageIndexDescriptorSupplier indexDescriptorSupplier ) { this.rocksDb = rocksDb; this.tableDescriptor = tableDescriptor; this.mvPartitionStorages = new MvPartitionStorages<>(tableDescriptor.getId(), tableDescriptor.getPartitions()); + this.indexes = new RocksDbIndexes(rocksDb, this::getMvPartitionChecked); + this.indexDescriptorSupplier = indexDescriptorSupplier; + } + + void start() { + try { + indexes.recoverIndexes(indexDescriptorSupplier); + } catch (RocksDBException e) { + throw new StorageException("Unable to recover indexes", e); + } } /** @@ -140,7 +145,7 @@ public class RocksDbTableStorage implements MvTableStorage { * Returns a future to wait next flush operation from the current point in time. Uses {@link RocksDB#getLatestSequenceNumber()} to * achieve this. * - * @param schedule {@code true} if {@link RocksDB#flush(FlushOptions)} should be explicitly triggerred in the near future. + * @param schedule {@code true} if {@link RocksDB#flush(FlushOptions)} should be explicitly triggered in the near future. */ public CompletableFuture<Void> awaitFlush(boolean schedule) { return inBusyLock(busyLock, () -> rocksDb.flusher.awaitFlush(schedule)); @@ -157,13 +162,15 @@ public class RocksDbTableStorage implements MvTableStorage { .thenAccept(partitionStorages -> { var resources = new ArrayList<AutoCloseable>(); - Stream.concat(hashIndices.values().stream(), sortedIndices.values().stream()).forEach(index -> resources.add( - destroy ? index::transitionToDestroyedState : index::close - )); + if (destroy) { + resources.addAll(indexes.getResourcesForDestroy()); - partitionStorages.forEach(mvPartitionStorage -> resources.add( - destroy ? mvPartitionStorage::transitionToDestroyedState : mvPartitionStorage::close - )); + partitionStorages.forEach(mvPartitionStorage -> resources.add(mvPartitionStorage::transitionToDestroyedState)); + } else { + resources.addAll(indexes.getResourcesForClose()); + + partitionStorages.forEach(mvPartitionStorage -> resources.add(mvPartitionStorage::close)); + } try { IgniteUtils.closeAll(resources); @@ -201,18 +208,14 @@ public class RocksDbTableStorage implements MvTableStorage { deleteByPrefix(writeBatch, rocksDb.partitionCf, tablePrefix); deleteByPrefix(writeBatch, rocksDb.gcQueueCf, tablePrefix); - for (HashIndex hashIndex : hashIndices.values()) { - hashIndex.destroy(writeBatch); - } - - for (SortedIndex sortedIndex : sortedIndices.values()) { - sortedIndex.destroy(writeBatch); - } + indexes.destroyAllIndexes(writeBatch); deleteByPrefix(writeBatch, rocksDb.meta.columnFamily(), createKey(PARTITION_META_PREFIX, tableId)); deleteByPrefix(writeBatch, rocksDb.meta.columnFamily(), createKey(PARTITION_CONF_PREFIX, tableId)); rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch); + + indexes.scheduleAllIndexCfDestroy(); } catch (RocksDBException e) { throw new StorageException("Failed to destroy table data. [tableId={}]", e, getTableId()); } @@ -241,6 +244,16 @@ public class RocksDbTableStorage implements MvTableStorage { return inBusyLock(busyLock, () -> mvPartitionStorages.get(partitionId)); } + private RocksDbMvPartitionStorage getMvPartitionChecked(int partitionId) { + RocksDbMvPartitionStorage partitionStorage = mvPartitionStorages.get(partitionId); + + if (partitionStorage == null) { + throw new StorageException(createMissingMvPartitionErrorMessage(partitionId)); + } + + return partitionStorage; + } + @Override public CompletableFuture<Void> destroyPartition(int partitionId) { return inBusyLock(busyLock, () -> mvPartitionStorages.destroy(partitionId, mvPartitionStorage -> { @@ -251,13 +264,7 @@ public class RocksDbTableStorage implements MvTableStorage { // RocksDB itself will then destroy the data on flush. mvPartitionStorage.destroyData(writeBatch); - for (HashIndex hashIndex : hashIndices.values()) { - hashIndex.destroy(partitionId, writeBatch); - } - - for (SortedIndex sortedIndex : sortedIndices.values()) { - sortedIndex.destroy(partitionId, writeBatch); - } + indexes.destroyAllIndexesForPartition(partitionId, writeBatch); rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch); @@ -270,65 +277,19 @@ public class RocksDbTableStorage implements MvTableStorage { @Override public SortedIndexStorage getOrCreateSortedIndex(int partitionId, StorageSortedIndexDescriptor indexDescriptor) { - return inBusyLock(busyLock, () -> { - SortedIndex storages = sortedIndices.computeIfAbsent( - indexDescriptor.id(), - id -> createSortedIndex(indexDescriptor) - ); - - RocksDbMvPartitionStorage partitionStorage = mvPartitionStorages.get(partitionId); - - if (partitionStorage == null) { - throw new StorageException(createMissingMvPartitionErrorMessage(partitionId)); - } - - return storages.getOrCreateStorage(partitionStorage); - }); - } - - private SortedIndex createSortedIndex(StorageSortedIndexDescriptor indexDescriptor) { - return new SortedIndex(rocksDb, indexDescriptor, rocksDb.meta); + return inBusyLock(busyLock, () -> indexes.getOrCreateSortedIndex(partitionId, indexDescriptor)); } @Override public HashIndexStorage getOrCreateHashIndex(int partitionId, StorageHashIndexDescriptor indexDescriptor) { - return inBusyLock(busyLock, () -> { - HashIndex storages = hashIndices.computeIfAbsent( - indexDescriptor.id(), - id -> new HashIndex(rocksDb.hashIndexCf, indexDescriptor, rocksDb.meta) - ); - - RocksDbMvPartitionStorage partitionStorage = mvPartitionStorages.get(partitionId); - - if (partitionStorage == null) { - throw new StorageException(createMissingMvPartitionErrorMessage(partitionId)); - } - - return storages.getOrCreateStorage(partitionStorage); - }); + return inBusyLock(busyLock, () -> indexes.getOrCreateHashIndex(partitionId, indexDescriptor)); } @Override public CompletableFuture<Void> destroyIndex(int indexId) { return inBusyLock(busyLock, () -> { - HashIndex hashIdx = hashIndices.remove(indexId); - - SortedIndex sortedIdx = sortedIndices.remove(indexId); - - if (hashIdx == null && sortedIdx == null) { - return nullCompletedFuture(); - } - - try (WriteBatch writeBatch = new WriteBatch()) { - if (hashIdx != null) { - hashIdx.destroy(writeBatch); - } - - if (sortedIdx != null) { - sortedIdx.destroy(writeBatch); - } - - rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch); + try { + indexes.destroyIndex(indexId); return nullCompletedFuture(); } catch (RocksDBException e) { @@ -348,8 +309,7 @@ public class RocksDbTableStorage implements MvTableStorage { try (WriteBatch writeBatch = new WriteBatch()) { mvPartitionStorage.startRebalance(writeBatch); - getHashIndexStorages(partitionId).forEach(index -> index.startRebalance(writeBatch)); - getSortedIndexStorages(partitionId).forEach(index -> index.startRebalance(writeBatch)); + indexes.startRebalance(partitionId, writeBatch); rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch); @@ -370,8 +330,7 @@ public class RocksDbTableStorage implements MvTableStorage { try (WriteBatch writeBatch = new WriteBatch()) { mvPartitionStorage.abortRebalance(writeBatch); - getHashIndexStorages(partitionId).forEach(index -> index.abortRebalance(writeBatch)); - getSortedIndexStorages(partitionId).forEach(index -> index.abortRebalance(writeBatch)); + indexes.abortRebalance(partitionId, writeBatch); rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch); @@ -396,8 +355,7 @@ public class RocksDbTableStorage implements MvTableStorage { try (WriteBatch writeBatch = new WriteBatch()) { mvPartitionStorage.finishRebalance(writeBatch, lastAppliedIndex, lastAppliedTerm, groupConfig); - getHashIndexStorages(partitionId).forEach(RocksDbHashIndexStorage::finishRebalance); - getSortedIndexStorages(partitionId).forEach(RocksDbSortedIndexStorage::finishRebalance); + indexes.finishRebalance(partitionId); rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch); @@ -414,18 +372,13 @@ public class RocksDbTableStorage implements MvTableStorage { @Override public CompletableFuture<Void> clearPartition(int partitionId) { return inBusyLock(busyLock, () -> mvPartitionStorages.clear(partitionId, mvPartitionStorage -> { - List<RocksDbHashIndexStorage> hashIndexStorages = getHashIndexStorages(partitionId); - List<RocksDbSortedIndexStorage> sortedIndexStorages = getSortedIndexStorages(partitionId); + List<AbstractRocksDbIndexStorage> indexStorages = indexes.getAllStorages(partitionId).collect(toList()); try (WriteBatch writeBatch = new WriteBatch()) { mvPartitionStorage.startCleanup(writeBatch); - for (RocksDbHashIndexStorage hashIndexStorage : hashIndexStorages) { - hashIndexStorage.startCleanup(writeBatch); - } - - for (RocksDbSortedIndexStorage sortedIndexStorage : sortedIndexStorages) { - sortedIndexStorage.startCleanup(writeBatch); + for (AbstractRocksDbIndexStorage storage : indexStorages) { + storage.startCleanup(writeBatch); } rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch); @@ -436,8 +389,7 @@ public class RocksDbTableStorage implements MvTableStorage { } finally { mvPartitionStorage.finishCleanup(); - hashIndexStorages.forEach(RocksDbHashIndexStorage::finishCleanup); - sortedIndexStorages.forEach(RocksDbSortedIndexStorage::finishCleanup); + indexStorages.forEach(AbstractRocksDbIndexStorage::finishCleanup); } })); } @@ -449,34 +401,13 @@ public class RocksDbTableStorage implements MvTableStorage { return tableDescriptor.getId(); } - private List<RocksDbHashIndexStorage> getHashIndexStorages(int partitionId) { - return hashIndices.values().stream().map(indexes -> indexes.get(partitionId)).filter(Objects::nonNull).collect(toList()); - } - - private List<RocksDbSortedIndexStorage> getSortedIndexStorages(int partitionId) { - return sortedIndices.values().stream().map(indexes -> indexes.get(partitionId)).filter(Objects::nonNull).collect(toList()); - } - @Override public @Nullable IndexStorage getIndex(int partitionId, int indexId) { return inBusyLock(busyLock, () -> { - if (mvPartitionStorages.get(partitionId) == null) { - throw new StorageException(createMissingMvPartitionErrorMessage(partitionId)); - } - - HashIndex hashIndex = hashIndices.get(indexId); - - if (hashIndex != null) { - return hashIndex.get(partitionId); - } - - SortedIndex sortedIndex = sortedIndices.get(indexId); - - if (sortedIndex != null) { - return sortedIndex.get(partitionId); - } + // Check for partition existence. + getMvPartitionChecked(partitionId); - return (IndexStorage) null; + return indexes.getIndex(partitionId, indexId); }); } diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java index fd16d093a6..a0cf97bf1a 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java @@ -18,6 +18,9 @@ package org.apache.ignite.internal.storage.rocksdb; import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.sortedIndexCfName; +import static org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.createKey; +import static org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance.deleteByPrefix; +import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY; import org.apache.ignite.internal.rocksdb.ColumnFamily; import org.apache.ignite.internal.storage.index.SortedIndexStorage; @@ -25,6 +28,7 @@ import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor; import org.apache.ignite.internal.storage.rocksdb.index.RocksDbSortedIndexStorage; import org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance; import org.jetbrains.annotations.Nullable; +import org.rocksdb.RocksDBException; import org.rocksdb.WriteBatch; /** @@ -39,8 +43,9 @@ class SortedIndex extends Index<RocksDbSortedIndexStorage> { private final RocksDbMetaStorage indexMetaStorage; - SortedIndex( + private SortedIndex( SharedRocksDbInstance rocksDb, + ColumnFamily indexCf, StorageSortedIndexDescriptor descriptor, RocksDbMetaStorage indexMetaStorage ) { @@ -48,13 +53,38 @@ class SortedIndex extends Index<RocksDbSortedIndexStorage> { this.rocksDb = rocksDb; this.descriptor = descriptor; - this.indexCf = rocksDb.getSortedIndexCfOnIndexCreate( - sortedIndexCfName(descriptor.columns()), - descriptor.id() - ); + this.indexCf = indexCf; this.indexMetaStorage = indexMetaStorage; } + static SortedIndex createNew( + SharedRocksDbInstance rocksDb, + StorageSortedIndexDescriptor descriptor, + RocksDbMetaStorage indexMetaStorage + ) { + ColumnFamily indexCf = rocksDb.getOrCreateSortedIndexCf(sortedIndexCfName(descriptor.columns()), descriptor.id()); + + return new SortedIndex(rocksDb, indexCf, descriptor, indexMetaStorage); + } + + static SortedIndex restoreExisting( + SharedRocksDbInstance rocksDb, + ColumnFamily indexCf, + StorageSortedIndexDescriptor descriptor, + RocksDbMetaStorage indexMetaStorage + ) { + return new SortedIndex(rocksDb, indexCf, descriptor, indexMetaStorage); + } + + /** + * Returns sorted index storage for partition. + * + * @param partitionId Partition ID. + */ + @Nullable RocksDbSortedIndexStorage getStorage(int partitionId) { + return storageByPartitionId.get(partitionId); + } + /** * Creates a new Sorted Index storage or returns an existing one. */ @@ -68,18 +98,17 @@ class SortedIndex extends Index<RocksDbSortedIndexStorage> { /** * Removes all data associated with the index. */ - void destroy(WriteBatch writeBatch) { + void destroy(WriteBatch writeBatch) throws RocksDBException { transitionToDestroyedState(); - rocksDb.dropCfOnIndexDestroy(indexCf.nameBytes(), descriptor.id()); + deleteByPrefix(writeBatch, indexCf, createKey(BYTE_EMPTY_ARRAY, descriptor.id())); } /** - * Returns sorted index storage for partition. - * - * @param partitionId Partition ID. + * Signals the shared RocksDB instance that this index has been destroyed and all shared resources (like the Column Family) can + * be de-allocated. */ - @Nullable RocksDbSortedIndexStorage get(int partitionId) { - return storageByPartitionId.get(partitionId); + void destroySortedIndexCfIfNeeded() { + rocksDb.destroySortedIndexCfIfNeeded(indexCf.nameBytes(), descriptor.id()); } } diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbBinaryTupleComparator.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbBinaryTupleComparator.java index fa70fe3672..575a6d12c9 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbBinaryTupleComparator.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbBinaryTupleComparator.java @@ -57,36 +57,40 @@ public class RocksDbBinaryTupleComparator extends AbstractComparator { @Override public int compare(ByteBuffer a, ByteBuffer b) { - int compareTableId = Integer.compare(a.getInt(), b.getInt()); + int compareIndexId = Integer.compare(a.getInt(), b.getInt()); - if (compareTableId != 0) { - return compareTableId; + if (compareIndexId != 0) { + return compareIndexId; + } + + // Handle index ID-only buffers, probably coming from the range tombstones. + if (!bothHasRemaining(a, b)) { + return Integer.compare(a.remaining(), b.remaining()); } - // Compare table ids. int comparePartitionId = Short.compareUnsigned(a.getShort(), b.getShort()); if (comparePartitionId != 0) { return comparePartitionId; } - ByteBuffer firstBinaryTupleBuffer = a.slice().order(ByteOrder.LITTLE_ENDIAN); - ByteBuffer secondBinaryTupleBuffer = b.slice().order(ByteOrder.LITTLE_ENDIAN); - // Handle partition bounds. - if (!firstBinaryTupleBuffer.hasRemaining()) { - return -1; + if (!bothHasRemaining(a, b)) { + return Integer.compare(a.remaining(), b.remaining()); } - if (!secondBinaryTupleBuffer.hasRemaining()) { - return 1; - } + ByteBuffer firstBinaryTupleBuffer = a.slice().order(ByteOrder.LITTLE_ENDIAN); + ByteBuffer secondBinaryTupleBuffer = b.slice().order(ByteOrder.LITTLE_ENDIAN); int compareTuples = comparator.compare(firstBinaryTupleBuffer, secondBinaryTupleBuffer); return compareTuples == 0 ? compareRowIds(a, b) : compareTuples; } + private static boolean bothHasRemaining(ByteBuffer a, ByteBuffer b) { + return a.hasRemaining() && b.hasRemaining(); + } + private static int compareRowIds(ByteBuffer a, ByteBuffer b) { long firstMostSignBits = a.getLong(a.limit() - Long.BYTES * 2); long secondMostSignBits = b.getLong(b.limit() - Long.BYTES * 2); diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java index c71799b3fc..bd23e53012 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java @@ -53,6 +53,7 @@ import org.rocksdb.WriteBatchWithIndex; * * <p>This storage uses the following format for keys: * <pre> + * Index ID - 4 bytes * Partition ID - 2 bytes * Tuple value - variable length * Row ID (UUID) - 16 bytes diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/IndexIdCursor.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/IndexIdCursor.java new file mode 100644 index 0000000000..0417cc7585 --- /dev/null +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/IndexIdCursor.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.storage.rocksdb.instance; + +import static org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.KEY_BYTE_ORDER; + +import java.nio.ByteBuffer; +import java.util.NoSuchElementException; +import org.apache.ignite.internal.rocksdb.RocksUtils; +import org.apache.ignite.internal.util.Cursor; +import org.jetbrains.annotations.Nullable; +import org.rocksdb.RocksIterator; + +/** + * Cursor that iterates over distinct index IDs in a Sorted Index column family. + * + * <p>Sorted index column family can contain multiple indexes that have the same order and type of indexed columns. Index ID is stored + * as the first 4 bytes of a RocksDB key. + */ +class IndexIdCursor implements Cursor<Integer> { + private final RocksIterator it; + + private @Nullable ByteBuffer curIndexId = null; + + private boolean hasNext = true; + + IndexIdCursor(RocksIterator it) { + this.it = it; + } + + @Override + public boolean hasNext() { + if (!hasNext) { + return false; + } + + if (curIndexId == null) { + curIndexId = ByteBuffer.allocate(Integer.BYTES).order(KEY_BYTE_ORDER); + + it.seekToFirst(); + } else if (setNextIndexId()) { + it.seek(curIndexId); + + curIndexId.rewind(); + } else { + hasNext = false; + + return false; + } + + hasNext = it.isValid(); + + if (!hasNext) { + RocksUtils.checkIterator(it); + } + + return hasNext; + } + + @Override + public Integer next() { + if (!hasNext) { + throw new NoSuchElementException(); + } + + assert curIndexId != null; + + it.key(curIndexId); + + curIndexId.rewind(); + + return curIndexId.getInt(0); + } + + @Override + public void close() { + it.close(); + } + + /** + * Sets the next hypothetical index ID by incrementing the current index ID by one. + * + * @return {@code false} if the current index ID already has the maximum possible value and the ID range has been exhausted. + * {@code true} otherwise. + */ + private boolean setNextIndexId() { + assert curIndexId != null; + + for (int i = curIndexId.remaining() - 1; i >= 0; i--) { + byte b = (byte) (curIndexId.get(i) + 1); + + curIndexId.put(i, b); + + if (b != 0) { + return true; + } + } + + return false; + } +} diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java index 69bb41b49d..6fe23169ab 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java @@ -24,9 +24,12 @@ import static org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbI import java.nio.file.Path; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -38,6 +41,7 @@ import org.apache.ignite.internal.storage.StorageClosedException; import org.apache.ignite.internal.storage.StorageException; import org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage; import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine; +import org.apache.ignite.internal.util.Cursor; import org.apache.ignite.internal.util.IgniteSpinBusyLock; import org.apache.ignite.internal.util.IgniteUtils; import org.rocksdb.ColumnFamilyDescriptor; @@ -54,6 +58,35 @@ public final class SharedRocksDbInstance { /** Write options. */ public static final WriteOptions DFLT_WRITE_OPTS = new WriteOptions().setDisableWAL(true); + /** + * Class that represents a Column Family for sorted indexes and all index IDs that map to this Column Family. + * + * <p>Sorted indexes that have the same order and type of indexed columns get mapped to the same Column Family in order to share + * the same comparator. + */ + private static class SortedIndexColumnFamily implements AutoCloseable { + final ColumnFamily columnFamily; + + final Set<Integer> indexIds; + + SortedIndexColumnFamily(ColumnFamily columnFamily, int indexId) { + this.columnFamily = columnFamily; + this.indexIds = new HashSet<>(); + + indexIds.add(indexId); + } + + SortedIndexColumnFamily(ColumnFamily columnFamily, Set<Integer> indexIds) { + this.columnFamily = columnFamily; + this.indexIds = indexIds; + } + + @Override + public void close() { + columnFamily.handle().close(); + } + } + /** RocksDB storage engine instance. */ public final RocksDbStorageEngine engine; @@ -76,13 +109,10 @@ public final class SharedRocksDbInstance { public final ColumnFamily gcQueueCf; /** Column Family for Hash Index data. */ - public final ColumnFamily hashIndexCf; + private final ColumnFamily hashIndexCf; /** Column Family instances for different types of sorted indexes, identified by the column family name. */ - private final ConcurrentMap<ByteArray, ColumnFamily> sortedIndexCfs; - - /** Column family names mapped to sets of index IDs, that use that CF. */ - private final ConcurrentMap<ByteArray, Set<Integer>> sortedIndexIdsByCfName = new ConcurrentHashMap<>(); + private final ConcurrentMap<ByteArray, SortedIndexColumnFamily> sortedIndexCfsByName = new ConcurrentHashMap<>(); /** Busy lock to stop synchronously. */ private final IgniteSpinBusyLock busyLock; @@ -100,7 +130,7 @@ public final class SharedRocksDbInstance { ColumnFamily partitionCf, ColumnFamily gcQueueCf, ColumnFamily hashIndexCf, - ConcurrentMap<ByteArray, ColumnFamily> sortedIndexCfs + List<ColumnFamily> sortedIndexCfs ) { this.engine = engine; this.path = path; @@ -113,7 +143,29 @@ public final class SharedRocksDbInstance { this.partitionCf = partitionCf; this.gcQueueCf = gcQueueCf; this.hashIndexCf = hashIndexCf; - this.sortedIndexCfs = sortedIndexCfs; + + recoverExistingSortedIndexes(sortedIndexCfs); + } + + private void recoverExistingSortedIndexes(List<ColumnFamily> sortedIndexCfs) { + for (ColumnFamily sortedIndexCf : sortedIndexCfs) { + var indexIds = new HashSet<Integer>(); + + try (Cursor<Integer> sortedIndexIdCursor = indexIdsCursor(sortedIndexCf)) { + for (Integer indexId : sortedIndexIdCursor) { + indexIds.add(indexId); + } + } + + if (indexIds.isEmpty()) { + destroyColumnFamily(sortedIndexCf); + } else { + this.sortedIndexCfsByName.put( + new ByteArray(sortedIndexCf.nameBytes()), + new SortedIndexColumnFamily(sortedIndexCf, indexIds) + ); + } + } } /** @@ -141,10 +193,7 @@ public final class SharedRocksDbInstance { resources.add(partitionCf.handle()); resources.add(gcQueueCf.handle()); resources.add(hashIndexCf.handle()); - resources.addAll(sortedIndexCfs.values().stream() - .map(ColumnFamily::handle) - .collect(toList()) - ); + resources.addAll(sortedIndexCfsByName.values()); resources.add(db); resources.add(flusher::stop); @@ -159,32 +208,57 @@ public final class SharedRocksDbInstance { } /** - * Returns Column Family instance with the desired name. Creates it it it doesn't exist. - * Tracks every created index by its {@code indexId}. + * Returns the Column Family containing all hash indexes. */ - public ColumnFamily getSortedIndexCfOnIndexCreate(byte[] cfName, int indexId) { - if (!busyLock.enterBusy()) { - throw new StorageClosedException(); + public ColumnFamily hashIndexCf() { + return hashIndexCf; + } + + /** + * Returns a collection of all hash index IDs that currently exist in the storage. + */ + public Collection<Integer> hashIndexIds() { + try (Cursor<Integer> hashIndexIdCursor = indexIdsCursor(hashIndexCf)) { + return hashIndexIdCursor.stream().collect(toList()); } + } - try { - ColumnFamily[] result = {null}; + /** + * Returns an "index ID - Column Family" mapping for all sorted indexes that currently exist in the storage. + */ + public Map<Integer, ColumnFamily> sortedIndexes() { + var result = new HashMap<Integer, ColumnFamily>(); - sortedIndexIdsByCfName.compute(new ByteArray(cfName), (name, indexIds) -> { - ColumnFamily columnFamily = getOrCreateColumnFamily(cfName, name); + for (SortedIndexColumnFamily indexCf : sortedIndexCfsByName.values()) { + for (Integer indexId : indexCf.indexIds) { + result.put(indexId, indexCf.columnFamily); + } + } - result[0] = columnFamily; + return result; + } - if (indexIds == null) { - indexIds = new HashSet<>(); - } + /** + * Returns Column Family instance with the desired name. Creates it if it doesn't exist. Tracks every created index by its + * {@code indexId}. + */ + public ColumnFamily getOrCreateSortedIndexCf(byte[] cfName, int indexId) { + if (!busyLock.enterBusy()) { + throw new StorageClosedException(); + } - indexIds.add(indexId); + try { + SortedIndexColumnFamily result = sortedIndexCfsByName.compute(new ByteArray(cfName), (unused, sortedIndexCf) -> { + if (sortedIndexCf == null) { + return new SortedIndexColumnFamily(createColumnFamily(cfName), indexId); + } else { + sortedIndexCf.indexIds.add(indexId); - return indexIds; + return sortedIndexCf; + } }); - return result[0]; + return result.columnFamily; } finally { busyLock.leaveBusy(); } @@ -193,67 +267,59 @@ public final class SharedRocksDbInstance { /** * Possibly drops the column family after destroying the index. */ - public void dropCfOnIndexDestroy(byte[] cfName, int indexId) { + public void destroySortedIndexCfIfNeeded(byte[] cfName, int indexId) { if (!busyLock.enterBusy()) { throw new StorageClosedException(); } try { - sortedIndexIdsByCfName.compute(new ByteArray(cfName), (name, indexIds) -> { - if (indexIds == null) { - return null; - } + sortedIndexCfsByName.computeIfPresent(new ByteArray(cfName), (unused, indexCf) -> { + indexCf.indexIds.remove(indexId); - indexIds.remove(indexId); - - if (indexIds.isEmpty()) { - indexIds = null; - - destroyColumnFamily(name); + if (!indexCf.indexIds.isEmpty()) { + return indexCf; } - return indexIds; + destroyColumnFamily(indexCf.columnFamily); + + return null; }); } finally { busyLock.leaveBusy(); } } - private ColumnFamily getOrCreateColumnFamily(byte[] cfName, ByteArray name) { - return sortedIndexCfs.computeIfAbsent(name, unused -> { - ColumnFamilyDescriptor cfDescriptor = new ColumnFamilyDescriptor(cfName, sortedIndexCfOptions(cfName)); - - ColumnFamily columnFamily; - try { - columnFamily = ColumnFamily.create(db, cfDescriptor); - } catch (RocksDBException e) { - throw new StorageException("Failed to create new RocksDB column family: " + toStringName(cfDescriptor.getName()), e); - } + private static Cursor<Integer> indexIdsCursor(ColumnFamily cf) { + return new IndexIdCursor(cf.newIterator()); + } - flusher.addColumnFamily(columnFamily.handle()); + private ColumnFamily createColumnFamily(byte[] cfName) { + ColumnFamilyDescriptor cfDescriptor = new ColumnFamilyDescriptor(cfName, sortedIndexCfOptions(cfName)); - return columnFamily; - }); - } + ColumnFamily columnFamily; + try { + columnFamily = ColumnFamily.create(db, cfDescriptor); + } catch (RocksDBException e) { + throw new StorageException("Failed to create new RocksDB column family: " + toStringName(cfDescriptor.getName()), e); + } - private void destroyColumnFamily(ByteArray cfName) { - sortedIndexCfs.computeIfPresent(cfName, (unused, columnFamily) -> { - ColumnFamilyHandle columnFamilyHandle = columnFamily.handle(); + flusher.addColumnFamily(columnFamily.handle()); - flusher.removeColumnFamily(columnFamilyHandle); + return columnFamily; + } - try { - db.dropColumnFamily(columnFamilyHandle); + private void destroyColumnFamily(ColumnFamily columnFamily) { + ColumnFamilyHandle columnFamilyHandle = columnFamily.handle(); - db.destroyColumnFamilyHandle(columnFamilyHandle); - } catch (RocksDBException e) { - throw new StorageException( - "Failed to destroy RocksDB Column Family. [cfName={}, path={}]", - e, toStringName(cfName.bytes()), path - ); - } + flusher.removeColumnFamily(columnFamilyHandle); - return null; - }); + try { + columnFamily.destroy(); + } catch (RocksDBException e) { + throw new StorageException( + "Failed to destroy RocksDB Column Family. [cfName={}, path={}]", + e, columnFamily.name(), path + ); + } } } diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceCreator.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceCreator.java index ddd11d7dac..c97fb21033 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceCreator.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceCreator.java @@ -27,9 +27,6 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import org.apache.ignite.internal.lang.ByteArray; import org.apache.ignite.internal.rocksdb.ColumnFamily; import org.apache.ignite.internal.rocksdb.flush.RocksDbFlusher; import org.apache.ignite.internal.storage.StorageException; @@ -100,7 +97,7 @@ public class SharedRocksDbInstanceCreator { ColumnFamily partitionCf = null; ColumnFamily gcQueueCf = null; ColumnFamily hashIndexCf = null; - ConcurrentMap<ByteArray, ColumnFamily> sortedIndexCfs = new ConcurrentHashMap<>(); + var sortedIndexCfs = new ArrayList<ColumnFamily>(); // Read all existing Column Families from the db and parse them according to type: meta, partition data or index. for (ColumnFamilyHandle cfHandle : cfHandles) { @@ -128,7 +125,7 @@ public class SharedRocksDbInstanceCreator { break; case SORTED_INDEX: - sortedIndexCfs.put(new ByteArray(cf.handle().getName()), cf); + sortedIndexCfs.add(cf); break; @@ -211,6 +208,7 @@ public class SharedRocksDbInstanceCreator { } } + @SuppressWarnings("resource") private static ColumnFamilyOptions defaultCfOptions() { return new ColumnFamilyOptions() .setMemtablePrefixBloomSizeRatio(0.125) diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java index 07d62ed440..994cba7fe0 100644 --- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java +++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java @@ -41,7 +41,6 @@ import org.apache.ignite.internal.testframework.WorkDirectoryExtension; import org.apache.ignite.internal.util.IgniteUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -159,11 +158,4 @@ public class RocksDbMvTableStorageTest extends AbstractMvTableStorageTest { void storageAdvertisesItIsPersistent() { assertThat(tableStorage.isVolatile(), is(false)); } - - @Disabled("https://issues.apache.org/jira/browse/IGNITE-21680") - @Test - @Override - public void testIndexDestructionOnRecovery() throws Exception { - super.testIndexDestructionOnRecovery(); - } } diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/IndexIdCursorTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/IndexIdCursorTest.java new file mode 100644 index 0000000000..315acfc112 --- /dev/null +++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/IndexIdCursorTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.storage.rocksdb.instance; + +import static java.util.stream.Collectors.toList; +import static org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.createKey; +import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.is; + +import java.util.List; +import org.apache.ignite.internal.testframework.IgniteAbstractTest; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; + +/** Class that contains tests for {@link IndexIdCursor}. */ +class IndexIdCursorTest extends IgniteAbstractTest { + private RocksDB rocksDb; + + @BeforeEach + void setUp() throws RocksDBException { + rocksDb = RocksDB.open(workDir.toString()); + } + + @AfterEach + void tearDown() { + rocksDb.close(); + } + + @Test + void testEmptyCursor() { + assertThat(getAll(), is(empty())); + } + + @Test + void testSingleElement() throws RocksDBException { + insertData(0, 0); + + assertThat(getAll(), contains(0)); + } + + @Test + void testBorders() throws RocksDBException { + insertData(Integer.MIN_VALUE, 0); + insertData(Integer.MAX_VALUE, 0); + insertData(-1, 0); + insertData(0, 0); + + // RocksDB uses unsigned comparison. + assertThat(getAll(), contains(0, Integer.MAX_VALUE, Integer.MIN_VALUE, -1)); + } + + @Test + void testRemovesDuplicatesAndSorts() throws RocksDBException { + for (int i = 5; i >= 0; i--) { + for (int j = 0; j < 3; j++) { + insertData(i, j); + } + } + + assertThat(getAll(), contains(0, 1, 2, 3, 4, 5)); + } + + private List<Integer> getAll() { + try (var cursor = new IndexIdCursor(rocksDb.newIterator())) { + return cursor.stream().collect(toList()); + } + } + + private void insertData(int indexId, int extra) throws RocksDBException { + rocksDb.put(createKey(BYTE_EMPTY_ARRAY, indexId, extra), BYTE_EMPTY_ARRAY); + } +} diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceTest.java new file mode 100644 index 0000000000..7ef2efc91d --- /dev/null +++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceTest.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.storage.rocksdb.instance; + +import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.sortedIndexCfName; +import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.sameInstance; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.List; +import java.util.Map; +import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; +import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; +import org.apache.ignite.internal.rocksdb.ColumnFamily; +import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor.StorageSortedIndexColumnDescriptor; +import org.apache.ignite.internal.storage.rocksdb.RocksDbDataRegion; +import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine; +import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration; +import org.apache.ignite.internal.testframework.IgniteAbstractTest; +import org.apache.ignite.internal.type.NativeTypes; +import org.apache.ignite.internal.util.ByteUtils; +import org.apache.ignite.internal.util.IgniteUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDBException; + +/** Contains tests for {@link SharedRocksDbInstance}. */ +@ExtendWith(ConfigurationExtension.class) +class SharedRocksDbInstanceTest extends IgniteAbstractTest { + private RocksDbStorageEngine engine; + + private RocksDbDataRegion dataRegion; + + private SharedRocksDbInstance rocksDb; + + @BeforeEach + void setUp(@InjectConfiguration RocksDbStorageEngineConfiguration engineConfig) throws Exception { + engine = new RocksDbStorageEngine("test", engineConfig, workDir); + + engine.start(); + + dataRegion = new RocksDbDataRegion(engineConfig.defaultRegion()); + + dataRegion.start(); + + rocksDb = createDb(); + } + + @AfterEach + void tearDown() throws Exception { + IgniteUtils.closeAllManually( + rocksDb == null ? null : rocksDb::stop, + dataRegion == null ? null : dataRegion::stop, + engine == null ? null : engine::stop + ); + } + + private SharedRocksDbInstance createDb() throws Exception { + return new SharedRocksDbInstanceCreator().create(engine, dataRegion, workDir); + } + + @Test + void testSortedIndexCfCaching() { + byte[] fooName = sortedIndexCfName(List.of( + new StorageSortedIndexColumnDescriptor("x", NativeTypes.INT64, true, true) + )); + + byte[] barName = sortedIndexCfName(List.of( + new StorageSortedIndexColumnDescriptor("y", NativeTypes.UUID, true, true) + )); + + byte[] bazName = sortedIndexCfName(List.of( + new StorageSortedIndexColumnDescriptor("z", NativeTypes.INT64, true, true) + )); + + ColumnFamily foo = rocksDb.getOrCreateSortedIndexCf(fooName, 1); + ColumnFamily bar = rocksDb.getOrCreateSortedIndexCf(barName, 2); + ColumnFamily baz = rocksDb.getOrCreateSortedIndexCf(bazName, 3); + + assertThat(foo, is(sameInstance(baz))); + assertThat(foo, is(not(sameInstance(bar)))); + + rocksDb.destroySortedIndexCfIfNeeded(fooName, 1); + + assertTrue(cfExists(fooName)); + + rocksDb.destroySortedIndexCfIfNeeded(barName, 2); + + assertFalse(cfExists(barName)); + + rocksDb.destroySortedIndexCfIfNeeded(bazName, 3); + + assertFalse(cfExists(bazName)); + } + + @Test + void testSortedIndexRecovery() throws Exception { + byte[] fooName = sortedIndexCfName(List.of( + new StorageSortedIndexColumnDescriptor("x", NativeTypes.INT64, true, true) + )); + + byte[] barName = sortedIndexCfName(List.of( + new StorageSortedIndexColumnDescriptor("y", NativeTypes.UUID, true, true) + )); + + byte[] bazName = sortedIndexCfName(List.of( + new StorageSortedIndexColumnDescriptor("z", NativeTypes.INT64, true, true) + )); + + ColumnFamily foo = rocksDb.getOrCreateSortedIndexCf(fooName, 1); + ColumnFamily bar = rocksDb.getOrCreateSortedIndexCf(barName, 2); + ColumnFamily baz = rocksDb.getOrCreateSortedIndexCf(bazName, 3); + + assertThat(rocksDb.sortedIndexes(), is(Map.of(1, foo, 2, bar, 3, baz))); + + // Put some data in the CF. We then check that the non-empty CF is restored upon DB restart but the empty one is dropped. + foo.put(ByteUtils.intToBytes(1), BYTE_EMPTY_ARRAY); + + rocksDb.stop(); + + rocksDb = createDb(); + + assertThat(rocksDb.sortedIndexes(), hasKey(1)); + assertThat(rocksDb.sortedIndexes(), not(hasKey(2))); + assertThat(rocksDb.sortedIndexes(), not(hasKey(3))); + + assertTrue(cfExists(fooName)); + assertFalse(cfExists(barName)); + } + + private boolean cfExists(byte[] cfName) { + try { + // Check Column Family existence by trying to create a new one with the same name. + ColumnFamilyHandle handle = rocksDb.db.createColumnFamily(new ColumnFamilyDescriptor(cfName)); + + rocksDb.db.destroyColumnFamilyHandle(handle); + + return false; + } catch (RocksDBException e) { + return true; + } + } +}