This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 399f0f15da IGNITE-21680 Remove destroyed RocksDB indexes on recovery
(#3435)
399f0f15da is described below
commit 399f0f15daaebf1099589d734c85f2763f0344a2
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Wed Mar 20 09:44:54 2024 +0200
IGNITE-21680 Remove destroyed RocksDB indexes on recovery (#3435)
---
.../apache/ignite/internal/rocksdb/RocksUtils.java | 3 +-
.../internal/rocksdb/flush/RocksDbFlusher.java | 2 +-
.../storage/AbstractMvTableStorageTest.java | 126 +++++++++++-
.../storage/rocksdb/ColumnFamilyUtils.java | 2 +-
.../ignite/internal/storage/rocksdb/HashIndex.java | 25 +--
.../ignite/internal/storage/rocksdb/Index.java | 8 +-
.../internal/storage/rocksdb/RocksDbIndexes.java | 229 +++++++++++++++++++++
.../storage/rocksdb/RocksDbMetaStorage.java | 17 +-
.../storage/rocksdb/RocksDbMvPartitionStorage.java | 3 +-
.../storage/rocksdb/RocksDbStorageEngine.java | 6 +-
.../storage/rocksdb/RocksDbStorageUtils.java | 15 ++
.../storage/rocksdb/RocksDbTableStorage.java | 179 +++++-----------
.../internal/storage/rocksdb/SortedIndex.java | 53 +++--
.../index/RocksDbBinaryTupleComparator.java | 28 +--
.../rocksdb/index/RocksDbSortedIndexStorage.java | 1 +
.../storage/rocksdb/instance/IndexIdCursor.java | 116 +++++++++++
.../rocksdb/instance/SharedRocksDbInstance.java | 202 ++++++++++++------
.../instance/SharedRocksDbInstanceCreator.java | 8 +-
.../storage/rocksdb/RocksDbMvTableStorageTest.java | 8 -
.../rocksdb/instance/IndexIdCursorTest.java | 93 +++++++++
.../instance/SharedRocksDbInstanceTest.java | 167 +++++++++++++++
21 files changed, 1016 insertions(+), 275 deletions(-)
diff --git
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksUtils.java
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksUtils.java
index e95b8f8132..e06b5941e6 100644
---
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksUtils.java
+++
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksUtils.java
@@ -47,8 +47,7 @@ public class RocksUtils {
/**
* Checks the status of the iterator and throws an exception if it is not
correct.
*
- * <p>Check the status first. This operation is guaranteed to throw if an
internal error has occurred during the iteration. Otherwise,
- * we've exhausted the data range.
+ * <p>This operation is guaranteed to throw if an internal error has
occurred during the iteration.
*
* @param it RocksDB iterator.
* @throws IgniteInternalException if the iterator has an incorrect status.
diff --git
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
index 043fe36550..62528bb05d 100644
---
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
+++
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
@@ -123,7 +123,7 @@ public class RocksDbFlusher {
/**
* Returns an instance of {@link AbstractEventListener} to process actual
RocksDB events. Returned listener must be set into
- * {@link Options#setListeners(List)} before database is started.
Otherwise, no events would occurre.
+ * {@link Options#setListeners(List)} before database is started.
Otherwise, no events would occur.
*/
public AbstractEventListener listener() {
return new RocksDbFlushListener(this);
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index 21930352c7..24806134f0 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -314,6 +314,113 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvStoragesTest {
assertThat(tableStorage.getIndex(PARTITION_ID, hashIdx.id()),
is(nullValue()));
}
+ /**
+ * Tests that removing one Sorted Index does not affect the data in the
other.
+ */
+ @Test
+ public void testDestroySortedIndexIndependence() {
+ CatalogTableDescriptor catalogTableDescriptor =
catalogService.table(TABLE_NAME, clock.nowLong());
+
+ var catalogSortedIndex1 = new CatalogSortedIndexDescriptor(
+ 200,
+ "TEST_INDEX_1",
+ catalogTableDescriptor.id(),
+ false,
+ AVAILABLE,
+ catalogService.latestCatalogVersion(),
+ List.of(new CatalogIndexColumnDescriptor("STRKEY",
ASC_NULLS_LAST))
+ );
+
+ var catalogSortedIndex2 = new CatalogSortedIndexDescriptor(
+ 201,
+ "TEST_INDEX_2",
+ catalogTableDescriptor.id(),
+ false,
+ AVAILABLE,
+ catalogService.latestCatalogVersion(),
+ List.of(new CatalogIndexColumnDescriptor("STRKEY",
ASC_NULLS_LAST))
+ );
+
+ var sortedIndexDescriptor1 = new
StorageSortedIndexDescriptor(catalogTableDescriptor, catalogSortedIndex1);
+ var sortedIndexDescriptor2 = new
StorageSortedIndexDescriptor(catalogTableDescriptor, catalogSortedIndex2);
+
+ MvPartitionStorage partitionStorage =
getOrCreateMvPartition(PARTITION_ID);
+
+ SortedIndexStorage sortedIndexStorage1 =
tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIndexDescriptor1);
+ SortedIndexStorage sortedIndexStorage2 =
tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIndexDescriptor2);
+
+ List<TestRow> rows = List.of(
+ new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(0,
"0"), new TestValue(0, "0"))),
+ new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(1,
"1"), new TestValue(1, "1")))
+ );
+
+ fillStorages(partitionStorage, null, sortedIndexStorage1, rows);
+ fillStorages(partitionStorage, null, sortedIndexStorage2, rows);
+
+ checkForPresenceRows(null, null, sortedIndexStorage1, rows);
+ checkForPresenceRows(null, null, sortedIndexStorage2, rows);
+
+ assertThat(tableStorage.destroyIndex(sortedIndexDescriptor1.id()),
willCompleteSuccessfully());
+
+ assertThat(tableStorage.getIndex(PARTITION_ID,
sortedIndexDescriptor1.id()), is(nullValue()));
+
+ checkForPresenceRows(null, null, sortedIndexStorage2, rows);
+ }
+
+ /**
+ * Tests that removing one Hash Index does not affect the data in the
other.
+ */
+ @Test
+ public void testDestroyHashIndexIndependence() {
+ CatalogTableDescriptor catalogTableDescriptor =
catalogService.table(TABLE_NAME, clock.nowLong());
+
+ var catalogHashIndex1 = new CatalogHashIndexDescriptor(
+ 200,
+ "TEST_INDEX_1",
+ catalogTableDescriptor.id(),
+ true,
+ AVAILABLE,
+ catalogService.latestCatalogVersion(),
+ List.of("STRKEY")
+ );
+
+ var catalogHashIndex2 = new CatalogHashIndexDescriptor(
+ 201,
+ "TEST_INDEX_2",
+ catalogTableDescriptor.id(),
+ true,
+ AVAILABLE,
+ catalogService.latestCatalogVersion(),
+ List.of("STRKEY")
+ );
+
+ var hashIndexDescriptor1 = new
StorageHashIndexDescriptor(catalogTableDescriptor, catalogHashIndex1);
+ var hashIndexDescriptor2 = new
StorageHashIndexDescriptor(catalogTableDescriptor, catalogHashIndex2);
+
+ MvPartitionStorage partitionStorage =
getOrCreateMvPartition(PARTITION_ID);
+
+ HashIndexStorage hashIndexStorage1 =
tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIndexDescriptor1);
+ HashIndexStorage hashIndexStorage2 =
tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIndexDescriptor2);
+
+ List<TestRow> rows = List.of(
+ new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(0,
"0"), new TestValue(0, "0"))),
+ new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(1,
"1"), new TestValue(1, "1")))
+ );
+
+ fillStorages(partitionStorage, hashIndexStorage1, null, rows);
+ fillStorages(partitionStorage, hashIndexStorage2, null, rows);
+
+ checkForPresenceRows(null, hashIndexStorage1, null, rows);
+ checkForPresenceRows(null, hashIndexStorage2, null, rows);
+
+ assertThat(tableStorage.destroyIndex(hashIndexDescriptor1.id()),
willCompleteSuccessfully());
+
+ assertThat(tableStorage.getIndex(PARTITION_ID,
hashIndexDescriptor1.id()), is(nullValue()));
+
+ checkForPresenceRows(null, hashIndexStorage2, null, rows);
+ }
+
+
@Test
public void testHashIndexIndependence() {
MvPartitionStorage partitionStorage1 =
getOrCreateMvPartition(PARTITION_ID);
@@ -859,8 +966,8 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvStoragesTest {
private static void fillStorages(
MvPartitionStorage mvPartitionStorage,
- HashIndexStorage hashIndexStorage,
- SortedIndexStorage sortedIndexStorage,
+ @Nullable HashIndexStorage hashIndexStorage,
+ @Nullable SortedIndexStorage sortedIndexStorage,
List<TestRow> rows
) {
assertThat(rows, hasSize(greaterThanOrEqualTo(2)));
@@ -878,9 +985,6 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvStoragesTest {
assertNotNull(binaryRow);
assertNotNull(timestamp);
- IndexRow hashIndexRow =
indexRow(hashIndexStorage.indexDescriptor(), binaryRow, rowId);
- IndexRow sortedIndexRow =
indexRow(sortedIndexStorage.indexDescriptor(), binaryRow, rowId);
-
mvPartitionStorage.runConsistently(locker -> {
locker.lock(rowId);
@@ -892,9 +996,17 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvStoragesTest {
mvPartitionStorage.addWriteCommitted(rowId, binaryRow,
timestamp);
}
- hashIndexStorage.put(hashIndexRow);
+ if (hashIndexStorage != null) {
+ IndexRow hashIndexRow =
indexRow(hashIndexStorage.indexDescriptor(), binaryRow, rowId);
- sortedIndexStorage.put(sortedIndexRow);
+ hashIndexStorage.put(hashIndexRow);
+ }
+
+ if (sortedIndexStorage != null) {
+ IndexRow sortedIndexRow =
indexRow(sortedIndexStorage.indexDescriptor(), binaryRow, rowId);
+
+ sortedIndexStorage.put(sortedIndexRow);
+ }
return null;
});
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java
index 339e05bc86..1cd093415d 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java
@@ -110,7 +110,7 @@ public class ColumnFamilyUtils {
*
* @see #comparatorFromCfName(byte[])
*/
- static byte[] sortedIndexCfName(List<StorageSortedIndexColumnDescriptor>
columns) {
+ public static byte[]
sortedIndexCfName(List<StorageSortedIndexColumnDescriptor> columns) {
ByteBuffer buf = ByteBuffer.allocate(SORTED_INDEX_CF_PREFIX.length() +
columns.size() * 2);
buf.put(SORTED_INDEX_CF_PREFIX.getBytes(UTF_8));
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
index 7f489d6e2f..a5f3bcce9e 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
@@ -17,7 +17,7 @@
package org.apache.ignite.internal.storage.rocksdb;
-import static
org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.createKey;
+import static
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.createKey;
import static
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance.deleteByPrefix;
import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.rocksdb.ColumnFamily;
import org.apache.ignite.internal.storage.index.HashIndexStorage;
import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor;
import
org.apache.ignite.internal.storage.rocksdb.index.RocksDbHashIndexStorage;
+import
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;
@@ -39,14 +40,23 @@ class HashIndex extends Index<RocksDbHashIndexStorage> {
private final RocksDbMetaStorage indexMetaStorage;
- HashIndex(ColumnFamily indexCf, StorageHashIndexDescriptor descriptor,
RocksDbMetaStorage indexMetaStorage) {
+ HashIndex(SharedRocksDbInstance rocksDb, StorageHashIndexDescriptor
descriptor, RocksDbMetaStorage indexMetaStorage) {
super(descriptor.id());
- this.indexCf = indexCf;
+ this.indexCf = rocksDb.hashIndexCf();
this.descriptor = descriptor;
this.indexMetaStorage = indexMetaStorage;
}
+ /**
+ * Returns hash index storage for partition.
+ *
+ * @param partitionId Partition ID.
+ */
+ @Nullable RocksDbHashIndexStorage getStorage(int partitionId) {
+ return storageByPartitionId.get(partitionId);
+ }
+
/**
* Creates a new Hash Index storage or returns an existing one.
*/
@@ -66,13 +76,4 @@ class HashIndex extends Index<RocksDbHashIndexStorage> {
// Every index storage uses an "index ID" + "partition ID" prefix. We
can remove everything by just using the index ID prefix.
deleteByPrefix(writeBatch, indexCf, createKey(BYTE_EMPTY_ARRAY,
descriptor.id()));
}
-
- /**
- * Returns hash index storage for partition.
- *
- * @param partitionId Partition ID.
- */
- @Nullable RocksDbHashIndexStorage get(int partitionId) {
- return storageByPartitionId.get(partitionId);
- }
}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/Index.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/Index.java
index 2640359a91..30f49dbcb7 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/Index.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/Index.java
@@ -67,12 +67,12 @@ abstract class Index<S extends AbstractRocksDbIndexStorage>
{
* @throws RocksDBException If failed to delete data.
*/
void destroy(int partitionId, WriteBatch writeBatch) throws
RocksDBException {
- S hashIndex = storageByPartitionId.remove(partitionId);
+ S storage = storageByPartitionId.remove(partitionId);
- if (hashIndex != null) {
- hashIndex.transitionToDestroyedState();
+ if (storage != null) {
+ storage.transitionToDestroyedState();
- hashIndex.destroyData(writeBatch);
+ storage.destroyData(writeBatch);
}
}
}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbIndexes.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbIndexes.java
new file mode 100644
index 0000000000..6fef05303a
--- /dev/null
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbIndexes.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import static java.util.stream.Collectors.toList;
+import static
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.createKey;
+import static
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance.DFLT_WRITE_OPTS;
+import static
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance.deleteByPrefix;
+import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.IntFunction;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.storage.index.HashIndexStorage;
+import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.storage.index.SortedIndexStorage;
+import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor;
+import org.apache.ignite.internal.storage.index.StorageIndexDescriptorSupplier;
+import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor;
+import
org.apache.ignite.internal.storage.rocksdb.index.AbstractRocksDbIndexStorage;
+import
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+
+/** Manager for all RocksDB-based indexes. */
+class RocksDbIndexes {
+ /** Hash Index storages by Index IDs. */
+ private final ConcurrentMap<Integer, HashIndex> hashIndices = new
ConcurrentHashMap<>();
+
+ /** Sorted Index storages by Index IDs. */
+ private final ConcurrentMap<Integer, SortedIndex> sortedIndices = new
ConcurrentHashMap<>();
+
+ private final SharedRocksDbInstance rocksDb;
+
+ /** Callback for getting partition storages using partition IDs. */
+ private final IntFunction<RocksDbMvPartitionStorage>
partitionStorageProvider;
+
+ RocksDbIndexes(SharedRocksDbInstance rocksDb,
IntFunction<RocksDbMvPartitionStorage> partitionStorageProvider) {
+ this.rocksDb = rocksDb;
+ this.partitionStorageProvider = partitionStorageProvider;
+ }
+
+ void recoverIndexes(StorageIndexDescriptorSupplier
indexDescriptorSupplier) throws RocksDBException {
+ try (WriteBatch writeBatch = new WriteBatch()) {
+ for (int indexId : rocksDb.hashIndexIds()) {
+ var descriptor = (StorageHashIndexDescriptor)
indexDescriptorSupplier.get(indexId);
+
+ if (descriptor == null) {
+ deleteByPrefix(writeBatch, rocksDb.hashIndexCf(),
createKey(BYTE_EMPTY_ARRAY, indexId));
+ } else {
+ hashIndices.put(indexId, new HashIndex(rocksDb,
descriptor, rocksDb.meta));
+ }
+ }
+
+ var indexCfsToDestroy = new ArrayList<Map.Entry<Integer,
ColumnFamily>>();
+
+ for (Map.Entry<Integer, ColumnFamily> e :
rocksDb.sortedIndexes().entrySet()) {
+ int indexId = e.getKey();
+
+ ColumnFamily indexCf = e.getValue();
+
+ var descriptor = (StorageSortedIndexDescriptor)
indexDescriptorSupplier.get(indexId);
+
+ if (descriptor == null) {
+ deleteByPrefix(writeBatch, indexCf,
createKey(BYTE_EMPTY_ARRAY, indexId));
+
+ indexCfsToDestroy.add(e);
+ } else {
+ sortedIndices.put(indexId,
SortedIndex.restoreExisting(rocksDb, indexCf, descriptor, rocksDb.meta));
+ }
+ }
+
+ rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch);
+
+ if (!indexCfsToDestroy.isEmpty()) {
+ rocksDb.flusher.awaitFlush(false)
+ .thenRunAsync(() -> {
+ for (Map.Entry<Integer, ColumnFamily> e :
indexCfsToDestroy) {
+ int indexId = e.getKey();
+
+ ColumnFamily indexCf = e.getValue();
+
+
rocksDb.destroySortedIndexCfIfNeeded(indexCf.nameBytes(), indexId);
+ }
+ }, rocksDb.engine.threadPool());
+ }
+ }
+ }
+
+ SortedIndexStorage getOrCreateSortedIndex(int partitionId,
StorageSortedIndexDescriptor indexDescriptor) {
+ SortedIndex sortedIndex = sortedIndices.computeIfAbsent(
+ indexDescriptor.id(),
+ id -> SortedIndex.createNew(rocksDb, indexDescriptor,
rocksDb.meta)
+ );
+
+ return
sortedIndex.getOrCreateStorage(partitionStorageProvider.apply(partitionId));
+ }
+
+ HashIndexStorage getOrCreateHashIndex(int partitionId,
StorageHashIndexDescriptor indexDescriptor) {
+ HashIndex hashIndex = hashIndices.computeIfAbsent(
+ indexDescriptor.id(),
+ id -> new HashIndex(rocksDb, indexDescriptor, rocksDb.meta)
+ );
+
+ return
hashIndex.getOrCreateStorage(partitionStorageProvider.apply(partitionId));
+ }
+
+ @Nullable IndexStorage getIndex(int partitionId, int indexId) {
+ HashIndex hashIndex = hashIndices.get(indexId);
+
+ if (hashIndex != null) {
+ assert !sortedIndices.containsKey(indexId) : indexId;
+
+ return hashIndex.getStorage(partitionId);
+ }
+
+ SortedIndex sortedIndex = sortedIndices.get(indexId);
+
+ if (sortedIndex != null) {
+ return sortedIndex.getStorage(partitionId);
+ }
+
+ return null;
+ }
+
+ void startRebalance(int partitionId, WriteBatch writeBatch) {
+ getAllStorages(partitionId).forEach(indexStorage ->
indexStorage.startRebalance(writeBatch));
+ }
+
+ void abortRebalance(int partitionId, WriteBatch writeBatch) {
+ getAllStorages(partitionId).forEach(indexStorage ->
indexStorage.abortRebalance(writeBatch));
+ }
+
+ void finishRebalance(int partitionId) {
+
getAllStorages(partitionId).forEach(AbstractRocksDbIndexStorage::finishRebalance);
+ }
+
+ List<AutoCloseable> getResourcesForClose() {
+ return allIndexes().map(index -> (AutoCloseable)
index::close).collect(toList());
+ }
+
+ List<AutoCloseable> getResourcesForDestroy() {
+ return allIndexes().map(index -> (AutoCloseable)
index::transitionToDestroyedState).collect(toList());
+ }
+
+ private Stream<Index<?>> allIndexes() {
+ return Stream.concat(hashIndices.values().stream(),
sortedIndices.values().stream());
+ }
+
+ void destroyIndex(int indexId) throws RocksDBException {
+ HashIndex hashIdx = hashIndices.remove(indexId);
+
+ SortedIndex sortedIdx = sortedIndices.remove(indexId);
+
+ if (hashIdx == null && sortedIdx == null) {
+ return;
+ }
+
+ try (WriteBatch writeBatch = new WriteBatch()) {
+ if (hashIdx != null) {
+ hashIdx.destroy(writeBatch);
+ }
+
+ if (sortedIdx != null) {
+ sortedIdx.destroy(writeBatch);
+ }
+
+ rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch);
+ }
+
+ if (sortedIdx != null) {
+ rocksDb.flusher.awaitFlush(false)
+ .thenRunAsync(sortedIdx::destroySortedIndexCfIfNeeded,
rocksDb.engine.threadPool());
+ }
+ }
+
+ void destroyAllIndexesForPartition(int partitionId, WriteBatch writeBatch)
throws RocksDBException {
+ for (HashIndex hashIndex : hashIndices.values()) {
+ hashIndex.destroy(partitionId, writeBatch);
+ }
+
+ for (SortedIndex sortedIndex : sortedIndices.values()) {
+ sortedIndex.destroy(partitionId, writeBatch);
+ }
+ }
+
+ void destroyAllIndexes(WriteBatch writeBatch) throws RocksDBException {
+ for (HashIndex hashIndex : hashIndices.values()) {
+ hashIndex.destroy(writeBatch);
+ }
+
+ for (SortedIndex sortedIndex : sortedIndices.values()) {
+ sortedIndex.destroy(writeBatch);
+ }
+ }
+
+ void scheduleAllIndexCfDestroy() {
+ rocksDb.flusher.awaitFlush(false)
+ .thenRunAsync(() ->
sortedIndices.values().forEach(SortedIndex::destroySortedIndexCfIfNeeded),
rocksDb.engine.threadPool());
+ }
+
+ Stream<AbstractRocksDbIndexStorage> getAllStorages(int partitionId) {
+ return Stream.concat(
+ hashIndices.values().stream().map(index ->
index.getStorage(partitionId)),
+ sortedIndices.values().stream().map(index ->
index.getStorage(partitionId))
+ );
+ }
+}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java
index edfb1d4e62..6511fd0275 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java
@@ -17,9 +17,9 @@
package org.apache.ignite.internal.storage.rocksdb;
-import static java.nio.ByteOrder.BIG_ENDIAN;
import static
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.KEY_BYTE_ORDER;
import static
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.ROW_ID_SIZE;
+import static
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.createKey;
import static
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.getRowIdUuid;
import static
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.putRowIdUuid;
import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
@@ -59,21 +59,6 @@ public class RocksDbMetaStorage {
this.metaColumnFamily = metaColumnFamily;
}
- /**
- * Creates a byte array, that uses the {@code prefix} as a prefix, and
every other {@code int} values as a 4-bytes chunk in Big Endian.
- */
- public static byte[] createKey(byte[] prefix, int... values) {
- ByteBuffer buf = ByteBuffer.allocate(prefix.length + Integer.BYTES *
values.length).order(BIG_ENDIAN);
-
- buf.put(prefix);
-
- for (int value : values) {
- buf.putInt(value);
- }
-
- return buf.array();
- }
-
/**
* Returns a column family instance, associated with the meta storage.
*/
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 734ea4b9a0..162046ebbd 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -34,8 +34,8 @@ import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.put
import static
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.readTimestampDesc;
import static
org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.PARTITION_CONF_PREFIX;
import static
org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.PARTITION_META_PREFIX;
-import static
org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.createKey;
import static
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.KEY_BYTE_ORDER;
+import static
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.createKey;
import static
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.normalize;
import static
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance.DFLT_WRITE_OPTS;
import static
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageState;
@@ -877,7 +877,6 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
});
}
- // TODO: IGNITE-16914 Play with prefix settings and benchmark results.
@Override
public PartitionTimestampCursor scan(HybridTimestamp timestamp) throws
StorageException {
Objects.requireNonNull(timestamp, "timestamp is null");
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
index db16777194..6e4dd1110b 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
@@ -195,6 +195,10 @@ public class RocksDbStorageEngine implements StorageEngine
{
}
});
- return new RocksDbTableStorage(sharedInstance, tableDescriptor);
+ var storage = new RocksDbTableStorage(sharedInstance, tableDescriptor,
indexDescriptorSupplier);
+
+ storage.start();
+
+ return storage;
}
}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageUtils.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageUtils.java
index 2476459c96..44030ceab2 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageUtils.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageUtils.java
@@ -68,4 +68,19 @@ public class RocksDbStorageUtils {
static long normalize(long value) {
return value ^ (1L << 63);
}
+
+ /**
+ * Creates a byte array, that uses the {@code prefix} as a prefix, and
every other {@code int} values as a 4-bytes chunk in Big Endian.
+ */
+ public static byte[] createKey(byte[] prefix, int... values) {
+ ByteBuffer buf = ByteBuffer.allocate(prefix.length + Integer.BYTES *
values.length).order(KEY_BYTE_ORDER);
+
+ buf.put(prefix);
+
+ for (int value : values) {
+ buf.putInt(value);
+ }
+
+ return buf.array();
+ }
}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index 6ae2185420..30c5ac411a 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.storage.rocksdb;
import static java.util.stream.Collectors.toList;
import static
org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.PARTITION_CONF_PREFIX;
import static
org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.PARTITION_META_PREFIX;
-import static
org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.createKey;
+import static
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.createKey;
import static
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance.DFLT_WRITE_OPTS;
import static
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance.deleteByPrefix;
import static
org.apache.ignite.internal.storage.util.StorageUtils.createMissingMvPartitionErrorMessage;
@@ -30,15 +30,11 @@ import static
org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import java.util.ArrayList;
import java.util.List;
-import java.util.Objects;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Stream;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.StorageRebalanceException;
@@ -48,9 +44,9 @@ import
org.apache.ignite.internal.storage.index.HashIndexStorage;
import org.apache.ignite.internal.storage.index.IndexStorage;
import org.apache.ignite.internal.storage.index.SortedIndexStorage;
import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor;
+import org.apache.ignite.internal.storage.index.StorageIndexDescriptorSupplier;
import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor;
-import
org.apache.ignite.internal.storage.rocksdb.index.RocksDbHashIndexStorage;
-import
org.apache.ignite.internal.storage.rocksdb.index.RocksDbSortedIndexStorage;
+import
org.apache.ignite.internal.storage.rocksdb.index.AbstractRocksDbIndexStorage;
import
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance;
import org.apache.ignite.internal.storage.util.MvPartitionStorages;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
@@ -71,11 +67,7 @@ public class RocksDbTableStorage implements MvTableStorage {
/** Partition storages. */
private final MvPartitionStorages<RocksDbMvPartitionStorage>
mvPartitionStorages;
- /** Hash Index storages by Index IDs. */
- private final ConcurrentMap<Integer, HashIndex> hashIndices = new
ConcurrentHashMap<>();
-
- /** Sorted Index storages by Index IDs. */
- private final ConcurrentMap<Integer, SortedIndex> sortedIndices = new
ConcurrentHashMap<>();
+ private final RocksDbIndexes indexes;
/** Busy lock to stop synchronously. */
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
@@ -86,6 +78,8 @@ public class RocksDbTableStorage implements MvTableStorage {
/** Table descriptor. */
private final StorageTableDescriptor tableDescriptor;
+ private final StorageIndexDescriptorSupplier indexDescriptorSupplier;
+
/**
* Constructor.
*
@@ -94,11 +88,22 @@ public class RocksDbTableStorage implements MvTableStorage {
*/
RocksDbTableStorage(
SharedRocksDbInstance rocksDb,
- StorageTableDescriptor tableDescriptor
+ StorageTableDescriptor tableDescriptor,
+ StorageIndexDescriptorSupplier indexDescriptorSupplier
) {
this.rocksDb = rocksDb;
this.tableDescriptor = tableDescriptor;
this.mvPartitionStorages = new
MvPartitionStorages<>(tableDescriptor.getId(), tableDescriptor.getPartitions());
+ this.indexes = new RocksDbIndexes(rocksDb,
this::getMvPartitionChecked);
+ this.indexDescriptorSupplier = indexDescriptorSupplier;
+ }
+
+ void start() {
+ try {
+ indexes.recoverIndexes(indexDescriptorSupplier);
+ } catch (RocksDBException e) {
+ throw new StorageException("Unable to recover indexes", e);
+ }
}
/**
@@ -140,7 +145,7 @@ public class RocksDbTableStorage implements MvTableStorage {
* Returns a future to wait next flush operation from the current point in
time. Uses {@link RocksDB#getLatestSequenceNumber()} to
* achieve this.
*
- * @param schedule {@code true} if {@link RocksDB#flush(FlushOptions)}
should be explicitly triggerred in the near future.
+ * @param schedule {@code true} if {@link RocksDB#flush(FlushOptions)}
should be explicitly triggered in the near future.
*/
public CompletableFuture<Void> awaitFlush(boolean schedule) {
return inBusyLock(busyLock, () ->
rocksDb.flusher.awaitFlush(schedule));
@@ -157,13 +162,15 @@ public class RocksDbTableStorage implements
MvTableStorage {
.thenAccept(partitionStorages -> {
var resources = new ArrayList<AutoCloseable>();
- Stream.concat(hashIndices.values().stream(),
sortedIndices.values().stream()).forEach(index -> resources.add(
- destroy ? index::transitionToDestroyedState :
index::close
- ));
+ if (destroy) {
+ resources.addAll(indexes.getResourcesForDestroy());
- partitionStorages.forEach(mvPartitionStorage ->
resources.add(
- destroy ?
mvPartitionStorage::transitionToDestroyedState : mvPartitionStorage::close
- ));
+ partitionStorages.forEach(mvPartitionStorage ->
resources.add(mvPartitionStorage::transitionToDestroyedState));
+ } else {
+ resources.addAll(indexes.getResourcesForClose());
+
+ partitionStorages.forEach(mvPartitionStorage ->
resources.add(mvPartitionStorage::close));
+ }
try {
IgniteUtils.closeAll(resources);
@@ -201,18 +208,14 @@ public class RocksDbTableStorage implements
MvTableStorage {
deleteByPrefix(writeBatch, rocksDb.partitionCf, tablePrefix);
deleteByPrefix(writeBatch, rocksDb.gcQueueCf, tablePrefix);
- for (HashIndex hashIndex : hashIndices.values()) {
- hashIndex.destroy(writeBatch);
- }
-
- for (SortedIndex sortedIndex : sortedIndices.values()) {
- sortedIndex.destroy(writeBatch);
- }
+ indexes.destroyAllIndexes(writeBatch);
deleteByPrefix(writeBatch, rocksDb.meta.columnFamily(),
createKey(PARTITION_META_PREFIX, tableId));
deleteByPrefix(writeBatch, rocksDb.meta.columnFamily(),
createKey(PARTITION_CONF_PREFIX, tableId));
rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch);
+
+ indexes.scheduleAllIndexCfDestroy();
} catch (RocksDBException e) {
throw new StorageException("Failed to destroy table data.
[tableId={}]", e, getTableId());
}
@@ -241,6 +244,16 @@ public class RocksDbTableStorage implements MvTableStorage
{
return inBusyLock(busyLock, () ->
mvPartitionStorages.get(partitionId));
}
+ private RocksDbMvPartitionStorage getMvPartitionChecked(int partitionId) {
+ RocksDbMvPartitionStorage partitionStorage =
mvPartitionStorages.get(partitionId);
+
+ if (partitionStorage == null) {
+ throw new
StorageException(createMissingMvPartitionErrorMessage(partitionId));
+ }
+
+ return partitionStorage;
+ }
+
@Override
public CompletableFuture<Void> destroyPartition(int partitionId) {
return inBusyLock(busyLock, () ->
mvPartitionStorages.destroy(partitionId, mvPartitionStorage -> {
@@ -251,13 +264,7 @@ public class RocksDbTableStorage implements MvTableStorage
{
// RocksDB itself will then destroy the data on flush.
mvPartitionStorage.destroyData(writeBatch);
- for (HashIndex hashIndex : hashIndices.values()) {
- hashIndex.destroy(partitionId, writeBatch);
- }
-
- for (SortedIndex sortedIndex : sortedIndices.values()) {
- sortedIndex.destroy(partitionId, writeBatch);
- }
+ indexes.destroyAllIndexesForPartition(partitionId, writeBatch);
rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch);
@@ -270,65 +277,19 @@ public class RocksDbTableStorage implements
MvTableStorage {
@Override
public SortedIndexStorage getOrCreateSortedIndex(int partitionId,
StorageSortedIndexDescriptor indexDescriptor) {
- return inBusyLock(busyLock, () -> {
- SortedIndex storages = sortedIndices.computeIfAbsent(
- indexDescriptor.id(),
- id -> createSortedIndex(indexDescriptor)
- );
-
- RocksDbMvPartitionStorage partitionStorage =
mvPartitionStorages.get(partitionId);
-
- if (partitionStorage == null) {
- throw new
StorageException(createMissingMvPartitionErrorMessage(partitionId));
- }
-
- return storages.getOrCreateStorage(partitionStorage);
- });
- }
-
- private SortedIndex createSortedIndex(StorageSortedIndexDescriptor
indexDescriptor) {
- return new SortedIndex(rocksDb, indexDescriptor, rocksDb.meta);
+ return inBusyLock(busyLock, () ->
indexes.getOrCreateSortedIndex(partitionId, indexDescriptor));
}
@Override
public HashIndexStorage getOrCreateHashIndex(int partitionId,
StorageHashIndexDescriptor indexDescriptor) {
- return inBusyLock(busyLock, () -> {
- HashIndex storages = hashIndices.computeIfAbsent(
- indexDescriptor.id(),
- id -> new HashIndex(rocksDb.hashIndexCf, indexDescriptor,
rocksDb.meta)
- );
-
- RocksDbMvPartitionStorage partitionStorage =
mvPartitionStorages.get(partitionId);
-
- if (partitionStorage == null) {
- throw new
StorageException(createMissingMvPartitionErrorMessage(partitionId));
- }
-
- return storages.getOrCreateStorage(partitionStorage);
- });
+ return inBusyLock(busyLock, () ->
indexes.getOrCreateHashIndex(partitionId, indexDescriptor));
}
@Override
public CompletableFuture<Void> destroyIndex(int indexId) {
return inBusyLock(busyLock, () -> {
- HashIndex hashIdx = hashIndices.remove(indexId);
-
- SortedIndex sortedIdx = sortedIndices.remove(indexId);
-
- if (hashIdx == null && sortedIdx == null) {
- return nullCompletedFuture();
- }
-
- try (WriteBatch writeBatch = new WriteBatch()) {
- if (hashIdx != null) {
- hashIdx.destroy(writeBatch);
- }
-
- if (sortedIdx != null) {
- sortedIdx.destroy(writeBatch);
- }
-
- rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch);
+ try {
+ indexes.destroyIndex(indexId);
return nullCompletedFuture();
} catch (RocksDBException e) {
@@ -348,8 +309,7 @@ public class RocksDbTableStorage implements MvTableStorage {
try (WriteBatch writeBatch = new WriteBatch()) {
mvPartitionStorage.startRebalance(writeBatch);
- getHashIndexStorages(partitionId).forEach(index ->
index.startRebalance(writeBatch));
- getSortedIndexStorages(partitionId).forEach(index ->
index.startRebalance(writeBatch));
+ indexes.startRebalance(partitionId, writeBatch);
rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch);
@@ -370,8 +330,7 @@ public class RocksDbTableStorage implements MvTableStorage {
try (WriteBatch writeBatch = new WriteBatch()) {
mvPartitionStorage.abortRebalance(writeBatch);
- getHashIndexStorages(partitionId).forEach(index ->
index.abortRebalance(writeBatch));
- getSortedIndexStorages(partitionId).forEach(index ->
index.abortRebalance(writeBatch));
+ indexes.abortRebalance(partitionId, writeBatch);
rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch);
@@ -396,8 +355,7 @@ public class RocksDbTableStorage implements MvTableStorage {
try (WriteBatch writeBatch = new WriteBatch()) {
mvPartitionStorage.finishRebalance(writeBatch,
lastAppliedIndex, lastAppliedTerm, groupConfig);
-
getHashIndexStorages(partitionId).forEach(RocksDbHashIndexStorage::finishRebalance);
-
getSortedIndexStorages(partitionId).forEach(RocksDbSortedIndexStorage::finishRebalance);
+ indexes.finishRebalance(partitionId);
rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch);
@@ -414,18 +372,13 @@ public class RocksDbTableStorage implements
MvTableStorage {
@Override
public CompletableFuture<Void> clearPartition(int partitionId) {
return inBusyLock(busyLock, () ->
mvPartitionStorages.clear(partitionId, mvPartitionStorage -> {
- List<RocksDbHashIndexStorage> hashIndexStorages =
getHashIndexStorages(partitionId);
- List<RocksDbSortedIndexStorage> sortedIndexStorages =
getSortedIndexStorages(partitionId);
+ List<AbstractRocksDbIndexStorage> indexStorages =
indexes.getAllStorages(partitionId).collect(toList());
try (WriteBatch writeBatch = new WriteBatch()) {
mvPartitionStorage.startCleanup(writeBatch);
- for (RocksDbHashIndexStorage hashIndexStorage :
hashIndexStorages) {
- hashIndexStorage.startCleanup(writeBatch);
- }
-
- for (RocksDbSortedIndexStorage sortedIndexStorage :
sortedIndexStorages) {
- sortedIndexStorage.startCleanup(writeBatch);
+ for (AbstractRocksDbIndexStorage storage : indexStorages) {
+ storage.startCleanup(writeBatch);
}
rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch);
@@ -436,8 +389,7 @@ public class RocksDbTableStorage implements MvTableStorage {
} finally {
mvPartitionStorage.finishCleanup();
-
hashIndexStorages.forEach(RocksDbHashIndexStorage::finishCleanup);
-
sortedIndexStorages.forEach(RocksDbSortedIndexStorage::finishCleanup);
+
indexStorages.forEach(AbstractRocksDbIndexStorage::finishCleanup);
}
}));
}
@@ -449,34 +401,13 @@ public class RocksDbTableStorage implements
MvTableStorage {
return tableDescriptor.getId();
}
- private List<RocksDbHashIndexStorage> getHashIndexStorages(int
partitionId) {
- return hashIndices.values().stream().map(indexes ->
indexes.get(partitionId)).filter(Objects::nonNull).collect(toList());
- }
-
- private List<RocksDbSortedIndexStorage> getSortedIndexStorages(int
partitionId) {
- return sortedIndices.values().stream().map(indexes ->
indexes.get(partitionId)).filter(Objects::nonNull).collect(toList());
- }
-
@Override
public @Nullable IndexStorage getIndex(int partitionId, int indexId) {
return inBusyLock(busyLock, () -> {
- if (mvPartitionStorages.get(partitionId) == null) {
- throw new
StorageException(createMissingMvPartitionErrorMessage(partitionId));
- }
-
- HashIndex hashIndex = hashIndices.get(indexId);
-
- if (hashIndex != null) {
- return hashIndex.get(partitionId);
- }
-
- SortedIndex sortedIndex = sortedIndices.get(indexId);
-
- if (sortedIndex != null) {
- return sortedIndex.get(partitionId);
- }
+ // Check for partition existence.
+ getMvPartitionChecked(partitionId);
- return (IndexStorage) null;
+ return indexes.getIndex(partitionId, indexId);
});
}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
index fd16d093a6..a0cf97bf1a 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
@@ -18,6 +18,9 @@
package org.apache.ignite.internal.storage.rocksdb;
import static
org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.sortedIndexCfName;
+import static
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.createKey;
+import static
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance.deleteByPrefix;
+import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
import org.apache.ignite.internal.rocksdb.ColumnFamily;
import org.apache.ignite.internal.storage.index.SortedIndexStorage;
@@ -25,6 +28,7 @@ import
org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor;
import
org.apache.ignite.internal.storage.rocksdb.index.RocksDbSortedIndexStorage;
import
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance;
import org.jetbrains.annotations.Nullable;
+import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;
/**
@@ -39,8 +43,9 @@ class SortedIndex extends Index<RocksDbSortedIndexStorage> {
private final RocksDbMetaStorage indexMetaStorage;
- SortedIndex(
+ private SortedIndex(
SharedRocksDbInstance rocksDb,
+ ColumnFamily indexCf,
StorageSortedIndexDescriptor descriptor,
RocksDbMetaStorage indexMetaStorage
) {
@@ -48,13 +53,38 @@ class SortedIndex extends Index<RocksDbSortedIndexStorage> {
this.rocksDb = rocksDb;
this.descriptor = descriptor;
- this.indexCf = rocksDb.getSortedIndexCfOnIndexCreate(
- sortedIndexCfName(descriptor.columns()),
- descriptor.id()
- );
+ this.indexCf = indexCf;
this.indexMetaStorage = indexMetaStorage;
}
+ static SortedIndex createNew(
+ SharedRocksDbInstance rocksDb,
+ StorageSortedIndexDescriptor descriptor,
+ RocksDbMetaStorage indexMetaStorage
+ ) {
+ ColumnFamily indexCf =
rocksDb.getOrCreateSortedIndexCf(sortedIndexCfName(descriptor.columns()),
descriptor.id());
+
+ return new SortedIndex(rocksDb, indexCf, descriptor, indexMetaStorage);
+ }
+
+ static SortedIndex restoreExisting(
+ SharedRocksDbInstance rocksDb,
+ ColumnFamily indexCf,
+ StorageSortedIndexDescriptor descriptor,
+ RocksDbMetaStorage indexMetaStorage
+ ) {
+ return new SortedIndex(rocksDb, indexCf, descriptor, indexMetaStorage);
+ }
+
+ /**
+ * Returns sorted index storage for partition.
+ *
+ * @param partitionId Partition ID.
+ */
+ @Nullable RocksDbSortedIndexStorage getStorage(int partitionId) {
+ return storageByPartitionId.get(partitionId);
+ }
+
/**
* Creates a new Sorted Index storage or returns an existing one.
*/
@@ -68,18 +98,17 @@ class SortedIndex extends Index<RocksDbSortedIndexStorage> {
/**
* Removes all data associated with the index.
*/
- void destroy(WriteBatch writeBatch) {
+ void destroy(WriteBatch writeBatch) throws RocksDBException {
transitionToDestroyedState();
- rocksDb.dropCfOnIndexDestroy(indexCf.nameBytes(), descriptor.id());
+ deleteByPrefix(writeBatch, indexCf, createKey(BYTE_EMPTY_ARRAY,
descriptor.id()));
}
/**
- * Returns sorted index storage for partition.
- *
- * @param partitionId Partition ID.
+ * Signals the shared RocksDB instance that this index has been destroyed
and all shared resources (like the Column Family) can
+ * be de-allocated.
*/
- @Nullable RocksDbSortedIndexStorage get(int partitionId) {
- return storageByPartitionId.get(partitionId);
+ void destroySortedIndexCfIfNeeded() {
+ rocksDb.destroySortedIndexCfIfNeeded(indexCf.nameBytes(),
descriptor.id());
}
}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbBinaryTupleComparator.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbBinaryTupleComparator.java
index fa70fe3672..575a6d12c9 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbBinaryTupleComparator.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbBinaryTupleComparator.java
@@ -57,36 +57,40 @@ public class RocksDbBinaryTupleComparator extends
AbstractComparator {
@Override
public int compare(ByteBuffer a, ByteBuffer b) {
- int compareTableId = Integer.compare(a.getInt(), b.getInt());
+ int compareIndexId = Integer.compare(a.getInt(), b.getInt());
- if (compareTableId != 0) {
- return compareTableId;
+ if (compareIndexId != 0) {
+ return compareIndexId;
+ }
+
+ // Handle index ID-only buffers, probably coming from the range
tombstones.
+ if (!bothHasRemaining(a, b)) {
+ return Integer.compare(a.remaining(), b.remaining());
}
- // Compare table ids.
int comparePartitionId = Short.compareUnsigned(a.getShort(),
b.getShort());
if (comparePartitionId != 0) {
return comparePartitionId;
}
- ByteBuffer firstBinaryTupleBuffer =
a.slice().order(ByteOrder.LITTLE_ENDIAN);
- ByteBuffer secondBinaryTupleBuffer =
b.slice().order(ByteOrder.LITTLE_ENDIAN);
-
// Handle partition bounds.
- if (!firstBinaryTupleBuffer.hasRemaining()) {
- return -1;
+ if (!bothHasRemaining(a, b)) {
+ return Integer.compare(a.remaining(), b.remaining());
}
- if (!secondBinaryTupleBuffer.hasRemaining()) {
- return 1;
- }
+ ByteBuffer firstBinaryTupleBuffer =
a.slice().order(ByteOrder.LITTLE_ENDIAN);
+ ByteBuffer secondBinaryTupleBuffer =
b.slice().order(ByteOrder.LITTLE_ENDIAN);
int compareTuples = comparator.compare(firstBinaryTupleBuffer,
secondBinaryTupleBuffer);
return compareTuples == 0 ? compareRowIds(a, b) : compareTuples;
}
+ private static boolean bothHasRemaining(ByteBuffer a, ByteBuffer b) {
+ return a.hasRemaining() && b.hasRemaining();
+ }
+
private static int compareRowIds(ByteBuffer a, ByteBuffer b) {
long firstMostSignBits = a.getLong(a.limit() - Long.BYTES * 2);
long secondMostSignBits = b.getLong(b.limit() - Long.BYTES * 2);
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
index c71799b3fc..bd23e53012 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
@@ -53,6 +53,7 @@ import org.rocksdb.WriteBatchWithIndex;
*
* <p>This storage uses the following format for keys:
* <pre>
+ * Index ID - 4 bytes
* Partition ID - 2 bytes
* Tuple value - variable length
* Row ID (UUID) - 16 bytes
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/IndexIdCursor.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/IndexIdCursor.java
new file mode 100644
index 0000000000..0417cc7585
--- /dev/null
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/IndexIdCursor.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.instance;
+
+import static
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.KEY_BYTE_ORDER;
+
+import java.nio.ByteBuffer;
+import java.util.NoSuchElementException;
+import org.apache.ignite.internal.rocksdb.RocksUtils;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.RocksIterator;
+
+/**
+ * Cursor that iterates over distinct index IDs in a Sorted Index column
family.
+ *
+ * <p>Sorted index column family can contain multiple indexes that have the
same order and type of indexed columns. Index ID is stored
+ * as the first 4 bytes of a RocksDB key.
+ */
+class IndexIdCursor implements Cursor<Integer> {
+ private final RocksIterator it;
+
+ private @Nullable ByteBuffer curIndexId = null;
+
+ private boolean hasNext = true;
+
+ IndexIdCursor(RocksIterator it) {
+ this.it = it;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (!hasNext) {
+ return false;
+ }
+
+ if (curIndexId == null) {
+ curIndexId =
ByteBuffer.allocate(Integer.BYTES).order(KEY_BYTE_ORDER);
+
+ it.seekToFirst();
+ } else if (setNextIndexId()) {
+ it.seek(curIndexId);
+
+ curIndexId.rewind();
+ } else {
+ hasNext = false;
+
+ return false;
+ }
+
+ hasNext = it.isValid();
+
+ if (!hasNext) {
+ RocksUtils.checkIterator(it);
+ }
+
+ return hasNext;
+ }
+
+ @Override
+ public Integer next() {
+ if (!hasNext) {
+ throw new NoSuchElementException();
+ }
+
+ assert curIndexId != null;
+
+ it.key(curIndexId);
+
+ curIndexId.rewind();
+
+ return curIndexId.getInt(0);
+ }
+
+ @Override
+ public void close() {
+ it.close();
+ }
+
+ /**
+ * Sets the next hypothetical index ID by incrementing the current index
ID by one.
+ *
+ * @return {@code false} if the current index ID already has the maximum
possible value and the ID range has been exhausted.
+ * {@code true} otherwise.
+ */
+ private boolean setNextIndexId() {
+ assert curIndexId != null;
+
+ for (int i = curIndexId.remaining() - 1; i >= 0; i--) {
+ byte b = (byte) (curIndexId.get(i) + 1);
+
+ curIndexId.put(i, b);
+
+ if (b != 0) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java
index 69bb41b49d..6fe23169ab 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java
@@ -24,9 +24,12 @@ import static
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbI
import java.nio.file.Path;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -38,6 +41,7 @@ import
org.apache.ignite.internal.storage.StorageClosedException;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage;
import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.rocksdb.ColumnFamilyDescriptor;
@@ -54,6 +58,35 @@ public final class SharedRocksDbInstance {
/** Write options. */
public static final WriteOptions DFLT_WRITE_OPTS = new
WriteOptions().setDisableWAL(true);
+ /**
+ * Class that represents a Column Family for sorted indexes and all index
IDs that map to this Column Family.
+ *
+ * <p>Sorted indexes that have the same order and type of indexed columns
get mapped to the same Column Family in order to share
+ * the same comparator.
+ */
+ private static class SortedIndexColumnFamily implements AutoCloseable {
+ final ColumnFamily columnFamily;
+
+ final Set<Integer> indexIds;
+
+ SortedIndexColumnFamily(ColumnFamily columnFamily, int indexId) {
+ this.columnFamily = columnFamily;
+ this.indexIds = new HashSet<>();
+
+ indexIds.add(indexId);
+ }
+
+ SortedIndexColumnFamily(ColumnFamily columnFamily, Set<Integer>
indexIds) {
+ this.columnFamily = columnFamily;
+ this.indexIds = indexIds;
+ }
+
+ @Override
+ public void close() {
+ columnFamily.handle().close();
+ }
+ }
+
/** RocksDB storage engine instance. */
public final RocksDbStorageEngine engine;
@@ -76,13 +109,10 @@ public final class SharedRocksDbInstance {
public final ColumnFamily gcQueueCf;
/** Column Family for Hash Index data. */
- public final ColumnFamily hashIndexCf;
+ private final ColumnFamily hashIndexCf;
/** Column Family instances for different types of sorted indexes,
identified by the column family name. */
- private final ConcurrentMap<ByteArray, ColumnFamily> sortedIndexCfs;
-
- /** Column family names mapped to sets of index IDs, that use that CF. */
- private final ConcurrentMap<ByteArray, Set<Integer>>
sortedIndexIdsByCfName = new ConcurrentHashMap<>();
+ private final ConcurrentMap<ByteArray, SortedIndexColumnFamily>
sortedIndexCfsByName = new ConcurrentHashMap<>();
/** Busy lock to stop synchronously. */
private final IgniteSpinBusyLock busyLock;
@@ -100,7 +130,7 @@ public final class SharedRocksDbInstance {
ColumnFamily partitionCf,
ColumnFamily gcQueueCf,
ColumnFamily hashIndexCf,
- ConcurrentMap<ByteArray, ColumnFamily> sortedIndexCfs
+ List<ColumnFamily> sortedIndexCfs
) {
this.engine = engine;
this.path = path;
@@ -113,7 +143,29 @@ public final class SharedRocksDbInstance {
this.partitionCf = partitionCf;
this.gcQueueCf = gcQueueCf;
this.hashIndexCf = hashIndexCf;
- this.sortedIndexCfs = sortedIndexCfs;
+
+ recoverExistingSortedIndexes(sortedIndexCfs);
+ }
+
+ private void recoverExistingSortedIndexes(List<ColumnFamily>
sortedIndexCfs) {
+ for (ColumnFamily sortedIndexCf : sortedIndexCfs) {
+ var indexIds = new HashSet<Integer>();
+
+ try (Cursor<Integer> sortedIndexIdCursor =
indexIdsCursor(sortedIndexCf)) {
+ for (Integer indexId : sortedIndexIdCursor) {
+ indexIds.add(indexId);
+ }
+ }
+
+ if (indexIds.isEmpty()) {
+ destroyColumnFamily(sortedIndexCf);
+ } else {
+ this.sortedIndexCfsByName.put(
+ new ByteArray(sortedIndexCf.nameBytes()),
+ new SortedIndexColumnFamily(sortedIndexCf, indexIds)
+ );
+ }
+ }
}
/**
@@ -141,10 +193,7 @@ public final class SharedRocksDbInstance {
resources.add(partitionCf.handle());
resources.add(gcQueueCf.handle());
resources.add(hashIndexCf.handle());
- resources.addAll(sortedIndexCfs.values().stream()
- .map(ColumnFamily::handle)
- .collect(toList())
- );
+ resources.addAll(sortedIndexCfsByName.values());
resources.add(db);
resources.add(flusher::stop);
@@ -159,32 +208,57 @@ public final class SharedRocksDbInstance {
}
/**
- * Returns Column Family instance with the desired name. Creates it it it
doesn't exist.
- * Tracks every created index by its {@code indexId}.
+ * Returns the Column Family containing all hash indexes.
*/
- public ColumnFamily getSortedIndexCfOnIndexCreate(byte[] cfName, int
indexId) {
- if (!busyLock.enterBusy()) {
- throw new StorageClosedException();
+ public ColumnFamily hashIndexCf() {
+ return hashIndexCf;
+ }
+
+ /**
+ * Returns a collection of all hash index IDs that currently exist in the
storage.
+ */
+ public Collection<Integer> hashIndexIds() {
+ try (Cursor<Integer> hashIndexIdCursor = indexIdsCursor(hashIndexCf)) {
+ return hashIndexIdCursor.stream().collect(toList());
}
+ }
- try {
- ColumnFamily[] result = {null};
+ /**
+ * Returns an "index ID - Column Family" mapping for all sorted indexes
that currently exist in the storage.
+ */
+ public Map<Integer, ColumnFamily> sortedIndexes() {
+ var result = new HashMap<Integer, ColumnFamily>();
- sortedIndexIdsByCfName.compute(new ByteArray(cfName), (name,
indexIds) -> {
- ColumnFamily columnFamily = getOrCreateColumnFamily(cfName,
name);
+ for (SortedIndexColumnFamily indexCf : sortedIndexCfsByName.values()) {
+ for (Integer indexId : indexCf.indexIds) {
+ result.put(indexId, indexCf.columnFamily);
+ }
+ }
- result[0] = columnFamily;
+ return result;
+ }
- if (indexIds == null) {
- indexIds = new HashSet<>();
- }
+ /**
+ * Returns Column Family instance with the desired name. Creates it if it
doesn't exist. Tracks every created index by its
+ * {@code indexId}.
+ */
+ public ColumnFamily getOrCreateSortedIndexCf(byte[] cfName, int indexId) {
+ if (!busyLock.enterBusy()) {
+ throw new StorageClosedException();
+ }
- indexIds.add(indexId);
+ try {
+ SortedIndexColumnFamily result = sortedIndexCfsByName.compute(new
ByteArray(cfName), (unused, sortedIndexCf) -> {
+ if (sortedIndexCf == null) {
+ return new
SortedIndexColumnFamily(createColumnFamily(cfName), indexId);
+ } else {
+ sortedIndexCf.indexIds.add(indexId);
- return indexIds;
+ return sortedIndexCf;
+ }
});
- return result[0];
+ return result.columnFamily;
} finally {
busyLock.leaveBusy();
}
@@ -193,67 +267,59 @@ public final class SharedRocksDbInstance {
/**
* Possibly drops the column family after destroying the index.
*/
- public void dropCfOnIndexDestroy(byte[] cfName, int indexId) {
+ public void destroySortedIndexCfIfNeeded(byte[] cfName, int indexId) {
if (!busyLock.enterBusy()) {
throw new StorageClosedException();
}
try {
- sortedIndexIdsByCfName.compute(new ByteArray(cfName), (name,
indexIds) -> {
- if (indexIds == null) {
- return null;
- }
+ sortedIndexCfsByName.computeIfPresent(new ByteArray(cfName),
(unused, indexCf) -> {
+ indexCf.indexIds.remove(indexId);
- indexIds.remove(indexId);
-
- if (indexIds.isEmpty()) {
- indexIds = null;
-
- destroyColumnFamily(name);
+ if (!indexCf.indexIds.isEmpty()) {
+ return indexCf;
}
- return indexIds;
+ destroyColumnFamily(indexCf.columnFamily);
+
+ return null;
});
} finally {
busyLock.leaveBusy();
}
}
- private ColumnFamily getOrCreateColumnFamily(byte[] cfName, ByteArray
name) {
- return sortedIndexCfs.computeIfAbsent(name, unused -> {
- ColumnFamilyDescriptor cfDescriptor = new
ColumnFamilyDescriptor(cfName, sortedIndexCfOptions(cfName));
-
- ColumnFamily columnFamily;
- try {
- columnFamily = ColumnFamily.create(db, cfDescriptor);
- } catch (RocksDBException e) {
- throw new StorageException("Failed to create new RocksDB
column family: " + toStringName(cfDescriptor.getName()), e);
- }
+ private static Cursor<Integer> indexIdsCursor(ColumnFamily cf) {
+ return new IndexIdCursor(cf.newIterator());
+ }
- flusher.addColumnFamily(columnFamily.handle());
+ private ColumnFamily createColumnFamily(byte[] cfName) {
+ ColumnFamilyDescriptor cfDescriptor = new
ColumnFamilyDescriptor(cfName, sortedIndexCfOptions(cfName));
- return columnFamily;
- });
- }
+ ColumnFamily columnFamily;
+ try {
+ columnFamily = ColumnFamily.create(db, cfDescriptor);
+ } catch (RocksDBException e) {
+ throw new StorageException("Failed to create new RocksDB column
family: " + toStringName(cfDescriptor.getName()), e);
+ }
- private void destroyColumnFamily(ByteArray cfName) {
- sortedIndexCfs.computeIfPresent(cfName, (unused, columnFamily) -> {
- ColumnFamilyHandle columnFamilyHandle = columnFamily.handle();
+ flusher.addColumnFamily(columnFamily.handle());
- flusher.removeColumnFamily(columnFamilyHandle);
+ return columnFamily;
+ }
- try {
- db.dropColumnFamily(columnFamilyHandle);
+ private void destroyColumnFamily(ColumnFamily columnFamily) {
+ ColumnFamilyHandle columnFamilyHandle = columnFamily.handle();
- db.destroyColumnFamilyHandle(columnFamilyHandle);
- } catch (RocksDBException e) {
- throw new StorageException(
- "Failed to destroy RocksDB Column Family. [cfName={},
path={}]",
- e, toStringName(cfName.bytes()), path
- );
- }
+ flusher.removeColumnFamily(columnFamilyHandle);
- return null;
- });
+ try {
+ columnFamily.destroy();
+ } catch (RocksDBException e) {
+ throw new StorageException(
+ "Failed to destroy RocksDB Column Family. [cfName={},
path={}]",
+ e, columnFamily.name(), path
+ );
+ }
}
}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceCreator.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceCreator.java
index ddd11d7dac..c97fb21033 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceCreator.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceCreator.java
@@ -27,9 +27,6 @@ import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.rocksdb.ColumnFamily;
import org.apache.ignite.internal.rocksdb.flush.RocksDbFlusher;
import org.apache.ignite.internal.storage.StorageException;
@@ -100,7 +97,7 @@ public class SharedRocksDbInstanceCreator {
ColumnFamily partitionCf = null;
ColumnFamily gcQueueCf = null;
ColumnFamily hashIndexCf = null;
- ConcurrentMap<ByteArray, ColumnFamily> sortedIndexCfs = new
ConcurrentHashMap<>();
+ var sortedIndexCfs = new ArrayList<ColumnFamily>();
// Read all existing Column Families from the db and parse them
according to type: meta, partition data or index.
for (ColumnFamilyHandle cfHandle : cfHandles) {
@@ -128,7 +125,7 @@ public class SharedRocksDbInstanceCreator {
break;
case SORTED_INDEX:
- sortedIndexCfs.put(new
ByteArray(cf.handle().getName()), cf);
+ sortedIndexCfs.add(cf);
break;
@@ -211,6 +208,7 @@ public class SharedRocksDbInstanceCreator {
}
}
+ @SuppressWarnings("resource")
private static ColumnFamilyOptions defaultCfOptions() {
return new ColumnFamilyOptions()
.setMemtablePrefixBloomSizeRatio(0.125)
diff --git
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
index 07d62ed440..994cba7fe0 100644
---
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
+++
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
@@ -41,7 +41,6 @@ import
org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.util.IgniteUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -159,11 +158,4 @@ public class RocksDbMvTableStorageTest extends
AbstractMvTableStorageTest {
void storageAdvertisesItIsPersistent() {
assertThat(tableStorage.isVolatile(), is(false));
}
-
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-21680")
- @Test
- @Override
- public void testIndexDestructionOnRecovery() throws Exception {
- super.testIndexDestructionOnRecovery();
- }
}
diff --git
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/IndexIdCursorTest.java
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/IndexIdCursorTest.java
new file mode 100644
index 0000000000..315acfc112
--- /dev/null
+++
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/IndexIdCursorTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.instance;
+
+import static java.util.stream.Collectors.toList;
+import static
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.createKey;
+import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
+
+import java.util.List;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+/** Class that contains tests for {@link IndexIdCursor}. */
+class IndexIdCursorTest extends IgniteAbstractTest {
+ private RocksDB rocksDb;
+
+ @BeforeEach
+ void setUp() throws RocksDBException {
+ rocksDb = RocksDB.open(workDir.toString());
+ }
+
+ @AfterEach
+ void tearDown() {
+ rocksDb.close();
+ }
+
+ @Test
+ void testEmptyCursor() {
+ assertThat(getAll(), is(empty()));
+ }
+
+ @Test
+ void testSingleElement() throws RocksDBException {
+ insertData(0, 0);
+
+ assertThat(getAll(), contains(0));
+ }
+
+ @Test
+ void testBorders() throws RocksDBException {
+ insertData(Integer.MIN_VALUE, 0);
+ insertData(Integer.MAX_VALUE, 0);
+ insertData(-1, 0);
+ insertData(0, 0);
+
+ // RocksDB uses unsigned comparison.
+ assertThat(getAll(), contains(0, Integer.MAX_VALUE, Integer.MIN_VALUE,
-1));
+ }
+
+ @Test
+ void testRemovesDuplicatesAndSorts() throws RocksDBException {
+ for (int i = 5; i >= 0; i--) {
+ for (int j = 0; j < 3; j++) {
+ insertData(i, j);
+ }
+ }
+
+ assertThat(getAll(), contains(0, 1, 2, 3, 4, 5));
+ }
+
+ private List<Integer> getAll() {
+ try (var cursor = new IndexIdCursor(rocksDb.newIterator())) {
+ return cursor.stream().collect(toList());
+ }
+ }
+
+ private void insertData(int indexId, int extra) throws RocksDBException {
+ rocksDb.put(createKey(BYTE_EMPTY_ARRAY, indexId, extra),
BYTE_EMPTY_ARRAY);
+ }
+}
diff --git
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceTest.java
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceTest.java
new file mode 100644
index 0000000000..7ef2efc91d
--- /dev/null
+++
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.instance;
+
+import static
org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.sortedIndexCfName;
+import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasKey;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.sameInstance;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.Map;
+import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import
org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor.StorageSortedIndexColumnDescriptor;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbDataRegion;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.type.NativeTypes;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDBException;
+
+/** Contains tests for {@link SharedRocksDbInstance}. */
+@ExtendWith(ConfigurationExtension.class)
+class SharedRocksDbInstanceTest extends IgniteAbstractTest {
+ private RocksDbStorageEngine engine;
+
+ private RocksDbDataRegion dataRegion;
+
+ private SharedRocksDbInstance rocksDb;
+
+ @BeforeEach
+ void setUp(@InjectConfiguration RocksDbStorageEngineConfiguration
engineConfig) throws Exception {
+ engine = new RocksDbStorageEngine("test", engineConfig, workDir);
+
+ engine.start();
+
+ dataRegion = new RocksDbDataRegion(engineConfig.defaultRegion());
+
+ dataRegion.start();
+
+ rocksDb = createDb();
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ IgniteUtils.closeAllManually(
+ rocksDb == null ? null : rocksDb::stop,
+ dataRegion == null ? null : dataRegion::stop,
+ engine == null ? null : engine::stop
+ );
+ }
+
+ private SharedRocksDbInstance createDb() throws Exception {
+ return new SharedRocksDbInstanceCreator().create(engine, dataRegion,
workDir);
+ }
+
+ @Test
+ void testSortedIndexCfCaching() {
+ byte[] fooName = sortedIndexCfName(List.of(
+ new StorageSortedIndexColumnDescriptor("x", NativeTypes.INT64,
true, true)
+ ));
+
+ byte[] barName = sortedIndexCfName(List.of(
+ new StorageSortedIndexColumnDescriptor("y", NativeTypes.UUID,
true, true)
+ ));
+
+ byte[] bazName = sortedIndexCfName(List.of(
+ new StorageSortedIndexColumnDescriptor("z", NativeTypes.INT64,
true, true)
+ ));
+
+ ColumnFamily foo = rocksDb.getOrCreateSortedIndexCf(fooName, 1);
+ ColumnFamily bar = rocksDb.getOrCreateSortedIndexCf(barName, 2);
+ ColumnFamily baz = rocksDb.getOrCreateSortedIndexCf(bazName, 3);
+
+ assertThat(foo, is(sameInstance(baz)));
+ assertThat(foo, is(not(sameInstance(bar))));
+
+ rocksDb.destroySortedIndexCfIfNeeded(fooName, 1);
+
+ assertTrue(cfExists(fooName));
+
+ rocksDb.destroySortedIndexCfIfNeeded(barName, 2);
+
+ assertFalse(cfExists(barName));
+
+ rocksDb.destroySortedIndexCfIfNeeded(bazName, 3);
+
+ assertFalse(cfExists(bazName));
+ }
+
+ @Test
+ void testSortedIndexRecovery() throws Exception {
+ byte[] fooName = sortedIndexCfName(List.of(
+ new StorageSortedIndexColumnDescriptor("x", NativeTypes.INT64,
true, true)
+ ));
+
+ byte[] barName = sortedIndexCfName(List.of(
+ new StorageSortedIndexColumnDescriptor("y", NativeTypes.UUID,
true, true)
+ ));
+
+ byte[] bazName = sortedIndexCfName(List.of(
+ new StorageSortedIndexColumnDescriptor("z", NativeTypes.INT64,
true, true)
+ ));
+
+ ColumnFamily foo = rocksDb.getOrCreateSortedIndexCf(fooName, 1);
+ ColumnFamily bar = rocksDb.getOrCreateSortedIndexCf(barName, 2);
+ ColumnFamily baz = rocksDb.getOrCreateSortedIndexCf(bazName, 3);
+
+ assertThat(rocksDb.sortedIndexes(), is(Map.of(1, foo, 2, bar, 3,
baz)));
+
+ // Put some data in the CF. We then check that the non-empty CF is
restored upon DB restart but the empty one is dropped.
+ foo.put(ByteUtils.intToBytes(1), BYTE_EMPTY_ARRAY);
+
+ rocksDb.stop();
+
+ rocksDb = createDb();
+
+ assertThat(rocksDb.sortedIndexes(), hasKey(1));
+ assertThat(rocksDb.sortedIndexes(), not(hasKey(2)));
+ assertThat(rocksDb.sortedIndexes(), not(hasKey(3)));
+
+ assertTrue(cfExists(fooName));
+ assertFalse(cfExists(barName));
+ }
+
+ private boolean cfExists(byte[] cfName) {
+ try {
+ // Check Column Family existence by trying to create a new one
with the same name.
+ ColumnFamilyHandle handle = rocksDb.db.createColumnFamily(new
ColumnFamilyDescriptor(cfName));
+
+ rocksDb.db.destroyColumnFamilyHandle(handle);
+
+ return false;
+ } catch (RocksDBException e) {
+ return true;
+ }
+ }
+}