This is an automated email from the ASF dual-hosted git repository. apolovtsev pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new a29862a834 IGNITE-21574 Implement index destruction for RocksDB engine (#3247) a29862a834 is described below commit a29862a834b1119aef8de768f310a7bbdfed7260 Author: Alexander Polovtcev <alex.polovt...@gmail.com> AuthorDate: Thu Feb 22 16:49:40 2024 +0200 IGNITE-21574 Implement index destruction for RocksDB engine (#3247) --- .../storage/AbstractMvTableStorageTest.java | 3 + .../index/AbstractSortedIndexStorageTest.java | 35 +++++ .../AbstractPageMemorySortedIndexStorageTest.java | 8 ++ .../ignite/internal/storage/rocksdb/HashIndex.java | 12 +- .../storage/rocksdb/RocksDbMetaStorage.java | 13 +- .../storage/rocksdb/RocksDbTableStorage.java | 152 +++++++++------------ .../internal/storage/rocksdb/SortedIndex.java | 23 +--- .../rocksdb/index/AbstractRocksDbIndexStorage.java | 2 +- .../rocksdb/index/RocksDbHashIndexStorage.java | 7 +- .../storage/rocksdb/RocksDbMvTableStorageTest.java | 8 -- .../rocksdb/index/RocksDbHashIndexStorageTest.java | 9 -- 11 files changed, 129 insertions(+), 143 deletions(-) diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java index 652b63905c..b16e3d2b89 100644 --- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java +++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java @@ -267,6 +267,9 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest { assertThat(partitionStorage.flush(), willCompleteSuccessfully()); assertThat(destroySortedIndexFuture, willCompleteSuccessfully()); assertThat(destroyHashIndexFuture, willCompleteSuccessfully()); + + assertThat(tableStorage.getIndex(PARTITION_ID, sortedIdx.id()), is(nullValue())); + assertThat(tableStorage.getIndex(PARTITION_ID, hashIdx.id()), is(nullValue())); } @Test diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java index 34f00f9236..3b961138ad 100644 --- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java +++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java @@ -28,6 +28,7 @@ import static org.apache.ignite.internal.storage.index.SortedIndexStorage.GREATE import static org.apache.ignite.internal.storage.index.SortedIndexStorage.GREATER_OR_EQUAL; import static org.apache.ignite.internal.storage.index.SortedIndexStorage.LESS; import static org.apache.ignite.internal.storage.index.SortedIndexStorage.LESS_OR_EQUAL; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.nullValue; @@ -35,6 +36,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.sameInstance; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; @@ -51,6 +53,7 @@ import java.util.Collections; import java.util.List; import java.util.NoSuchElementException; import java.util.Objects; +import java.util.concurrent.CompletableFuture; import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.IntStream; @@ -70,6 +73,7 @@ import org.apache.ignite.internal.storage.index.impl.TestIndexRow; import org.apache.ignite.internal.testframework.VariableSource; import org.apache.ignite.internal.util.Cursor; import org.apache.ignite.sql.ColumnType; +import org.hamcrest.Matchers; import org.intellij.lang.annotations.MagicConstant; import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.RepeatedTest; @@ -1341,6 +1345,37 @@ public abstract class AbstractSortedIndexStorageTest extends AbstractIndexStorag assertThrows(NoSuchElementException.class, scan::next); } + @Test + public void testDestroy() { + SortedIndexStorage index = createIndexStorage(INDEX_NAME, ColumnType.INT32, ColumnType.STRING); + + int indexId = index.indexDescriptor().id(); + + assertThat(tableStorage.getIndex(TEST_PARTITION, indexId), is(sameInstance(index))); + + var serializer = new BinaryTupleRowSerializer(index.indexDescriptor()); + + IndexRow row1 = serializer.serializeRow(new Object[]{ 1, "foo" }, new RowId(TEST_PARTITION)); + IndexRow row2 = serializer.serializeRow(new Object[]{ 1, "foo" }, new RowId(TEST_PARTITION)); + IndexRow row3 = serializer.serializeRow(new Object[]{ 2, "bar" }, new RowId(TEST_PARTITION)); + + put(index, row1); + put(index, row2); + put(index, row3); + + CompletableFuture<Void> destroyFuture = tableStorage.destroyIndex(index.indexDescriptor().id()); + + assertThat(destroyFuture, willCompleteSuccessfully()); + + assertThat(tableStorage.getIndex(TEST_PARTITION, indexId), is(Matchers.nullValue())); + + index = createIndexStorage(INDEX_NAME, ColumnType.INT32, ColumnType.STRING); + + assertThat(getAll(index, row1), is(empty())); + assertThat(getAll(index, row2), is(empty())); + assertThat(getAll(index, row3), is(empty())); + } + private List<ColumnParams> shuffledRandomColumnParams() { return shuffledColumnParams(d -> random.nextBoolean()); } diff --git a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/AbstractPageMemorySortedIndexStorageTest.java b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/AbstractPageMemorySortedIndexStorageTest.java index 7640686be4..b6ad8b9acd 100644 --- a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/AbstractPageMemorySortedIndexStorageTest.java +++ b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/AbstractPageMemorySortedIndexStorageTest.java @@ -32,6 +32,7 @@ import org.apache.ignite.internal.storage.index.SortedIndexStorage; import org.apache.ignite.internal.storage.index.impl.BinaryTupleRowSerializer; import org.apache.ignite.internal.storage.pagememory.configuration.schema.BasePageMemoryStorageEngineConfiguration; import org.apache.ignite.sql.ColumnType; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; /** @@ -100,6 +101,13 @@ abstract class AbstractPageMemorySortedIndexStorageTest extends AbstractSortedIn assertThat(get(index, indexRow3.indexColumns()), empty()); } + @Disabled("https://issues.apache.org/jira/browse/IGNITE-21583") + @Test + @Override + public void testDestroy() { + super.testDestroy(); + } + private static IndexRow createIndexRow(BinaryTupleRowSerializer serializer, RowId rowId, Object... objects) { return serializer.serializeRow(objects, rowId); } diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java index d7bb4b7f2a..a7659e8c72 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java @@ -17,6 +17,10 @@ package org.apache.ignite.internal.storage.rocksdb; +import static org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.createKey; +import static org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance.deleteByPrefix; +import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY; + import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.ignite.internal.rocksdb.ColumnFamily; @@ -60,9 +64,11 @@ class HashIndex { /** * Removes all data associated with the index. */ - void destroy() { - // TODO: implement, see https://issues.apache.org/jira/browse/IGNITE-21574. - throw new UnsupportedOperationException("Not implemented yet"); + void destroy(WriteBatch writeBatch) throws RocksDBException { + close(); + + // Every index storage uses an "index ID" + "partition ID" prefix. We can remove everything by just using the index ID prefix. + deleteByPrefix(writeBatch, indexCf, createKey(BYTE_EMPTY_ARRAY, descriptor.id())); } /** diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java index 86b36e2834..63d2506945 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java @@ -39,24 +39,19 @@ import org.rocksdb.RocksDBException; public class RocksDbMetaStorage { /** * Prefix to store partition meta information, such as last applied index and term. - * Key format is {@code [prexif, tableId, partitionId]} in BE. + * Key format is {@code [prefix, tableId, partitionId]} in BE. */ public static final byte[] PARTITION_META_PREFIX = {0}; /** - * Prefix to store partition configuration. Key format is {@code [prexif, tableId, partitionId]} in BE. + * Prefix to store partition configuration. Key format is {@code [prefix, tableId, partitionId]} in BE. */ public static final byte[] PARTITION_CONF_PREFIX = {1}; /** - * Prefix to store index column family name. Key format is {@code [prexif, indexId]} in BE. + * Prefix to store next row id to build in index. Key format is {@code [prefix, indexId, partitionId]} in BE. */ - public static final byte[] INDEX_CF_PREFIX = {2}; - - /** - * Prefix to store next row id to build in index. Key format is {@code [prexif, indexId, partitionId]} in BE. - */ - public static final byte[] INDEX_ROW_ID_PREFIX = {3}; + public static final byte[] INDEX_ROW_ID_PREFIX = {2}; private final ColumnFamily metaColumnFamily; diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java index 7427163a65..69cad3ca2a 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java @@ -17,26 +17,26 @@ package org.apache.ignite.internal.storage.rocksdb; -import static java.nio.charset.StandardCharsets.UTF_8; -import static java.util.concurrent.CompletableFuture.failedFuture; import static java.util.stream.Collectors.toList; import static org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.PARTITION_CONF_PREFIX; import static org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.PARTITION_META_PREFIX; import static org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.createKey; import static org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance.DFLT_WRITE_OPTS; +import static org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance.deleteByPrefix; import static org.apache.ignite.internal.storage.util.StorageUtils.createMissingMvPartitionErrorMessage; import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.internal.storage.MvPartitionStorage; import org.apache.ignite.internal.storage.StorageException; @@ -48,7 +48,6 @@ import org.apache.ignite.internal.storage.index.IndexStorage; import org.apache.ignite.internal.storage.index.SortedIndexStorage; import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor; import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor; -import org.apache.ignite.internal.storage.rocksdb.index.RocksDbBinaryTupleComparator; import org.apache.ignite.internal.storage.rocksdb.index.RocksDbHashIndexStorage; import org.apache.ignite.internal.storage.rocksdb.index.RocksDbSortedIndexStorage; import org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance; @@ -56,9 +55,7 @@ import org.apache.ignite.internal.storage.util.MvPartitionStorages; import org.apache.ignite.internal.util.IgniteSpinBusyLock; import org.apache.ignite.internal.util.IgniteUtils; import org.jetbrains.annotations.Nullable; -import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; -import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.FlushOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; @@ -71,7 +68,7 @@ public class RocksDbTableStorage implements MvTableStorage { private final SharedRocksDbInstance rocksDb; /** Partition storages. */ - private volatile MvPartitionStorages<RocksDbMvPartitionStorage> mvPartitionStorages; + private final MvPartitionStorages<RocksDbMvPartitionStorage> mvPartitionStorages; /** Hash Index storages by Index IDs. */ private final ConcurrentMap<Integer, HashIndex> hashIndices = new ConcurrentHashMap<>(); @@ -99,8 +96,8 @@ public class RocksDbTableStorage implements MvTableStorage { StorageTableDescriptor tableDescriptor ) { this.rocksDb = rocksDb; - this.tableDescriptor = tableDescriptor; + this.mvPartitionStorages = new MvPartitionStorages<>(tableDescriptor.getId(), tableDescriptor.getPartitions()); } /** @@ -139,13 +136,7 @@ public class RocksDbTableStorage implements MvTableStorage { } @Override - public void start() throws StorageException { - inBusyLock(busyLock, () -> { - MvPartitionStorages<RocksDbMvPartitionStorage> mvPartitionStorages = - new MvPartitionStorages<>(tableDescriptor.getId(), tableDescriptor.getPartitions()); - - this.mvPartitionStorages = mvPartitionStorages; - }); + public void start() { } /** @@ -158,62 +149,52 @@ public class RocksDbTableStorage implements MvTableStorage { return inBusyLock(busyLock, () -> rocksDb.flusher.awaitFlush(schedule)); } - private void stop(boolean destroy) { + private CompletableFuture<Void> stop(boolean destroy) { if (!stopGuard.compareAndSet(false, true)) { - return; + return nullCompletedFuture(); } busyLock.block(); - if (destroy) { - destroyTableData(); - } + return mvPartitionStorages.getAllForCloseOrDestroy() + .thenAccept(partitionStorages -> { + var resources = new ArrayList<AutoCloseable>(); - List<AutoCloseable> resources = new ArrayList<>(); + for (HashIndex index : hashIndices.values()) { + resources.add(index::close); + } - resources.addAll( - sortedIndices.values().stream() - .map(index -> (AutoCloseable) index::close) - .collect(toList()) - ); + for (SortedIndex index : sortedIndices.values()) { + resources.add(index::close); + } - try { - mvPartitionStorages - .getAllForCloseOrDestroy() - // 10 seconds is taken by analogy with shutdown of thread pool, in general this should be fairly fast. - .get(10, TimeUnit.SECONDS) - .forEach(mvPartitionStorage -> resources.add(mvPartitionStorage::close)); - - for (HashIndex index : hashIndices.values()) { - resources.add(index::close); - } - - for (SortedIndex index : sortedIndices.values()) { - resources.add(index::close); - } + partitionStorages.forEach(mvPartitionStorage -> resources.add(mvPartitionStorage::close)); - Collections.reverse(resources); + try { + IgniteUtils.closeAll(resources); + } catch (Exception e) { + throw new StorageException("Failed to stop RocksDB table storage: " + getTableId(), e); + } - IgniteUtils.closeAll(resources); - } catch (Exception e) { - throw new StorageException("Failed to stop RocksDB table storage: " + getTableId(), e); - } + if (destroy) { + destroyTableData(); + } + }); } @Override public void close() throws StorageException { - stop(false); + // 10 seconds is taken by analogy with shutdown of thread pool, in general this should be fairly fast. + try { + stop(false).get(10, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new StorageException("Failed to stop RocksDB table storage: " + getTableId(), e); + } } @Override public CompletableFuture<Void> destroy() { - try { - stop(true); - - return nullCompletedFuture(); - } catch (Throwable t) { - return failedFuture(new StorageException("Failed to destroy RocksDB table storage: " + getTableId(), t)); - } + return stop(true); } private void destroyTableData() { @@ -222,24 +203,24 @@ public class RocksDbTableStorage implements MvTableStorage { byte[] tablePrefix = createKey(BYTE_EMPTY_ARRAY, tableId); - SharedRocksDbInstance.deleteByPrefix(writeBatch, rocksDb.partitionCf, tablePrefix); - SharedRocksDbInstance.deleteByPrefix(writeBatch, rocksDb.gcQueueCf, tablePrefix); + deleteByPrefix(writeBatch, rocksDb.partitionCf, tablePrefix); + deleteByPrefix(writeBatch, rocksDb.gcQueueCf, tablePrefix); + + for (HashIndex hashIndex : hashIndices.values()) { + hashIndex.destroy(writeBatch); + } - for (int indexId : hashIndices.keySet()) { - SharedRocksDbInstance.deleteByPrefix(writeBatch, rocksDb.hashIndexCf, createKey(BYTE_EMPTY_ARRAY, indexId)); + for (SortedIndex sortedIndex : sortedIndices.values()) { + sortedIndex.destroy(writeBatch); } - SharedRocksDbInstance.deleteByPrefix(writeBatch, rocksDb.meta.columnFamily(), createKey(PARTITION_META_PREFIX, tableId)); - SharedRocksDbInstance.deleteByPrefix(writeBatch, rocksDb.meta.columnFamily(), createKey(PARTITION_CONF_PREFIX, tableId)); + deleteByPrefix(writeBatch, rocksDb.meta.columnFamily(), createKey(PARTITION_META_PREFIX, tableId)); + deleteByPrefix(writeBatch, rocksDb.meta.columnFamily(), createKey(PARTITION_CONF_PREFIX, tableId)); rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch); } catch (RocksDBException e) { throw new StorageException("Failed to destroy table data. [tableId={}]", e, getTableId()); } - - for (SortedIndex sortedIndex : sortedIndices.values()) { - sortedIndex.destroy(); - } } @Override @@ -272,7 +253,7 @@ public class RocksDbTableStorage implements MvTableStorage { mvPartitionStorage.close(); // Operation to delete partition data should be fast, since we will write only the range of keys for deletion, and the - // RocksDB itself will then destroy the data on flash. + // RocksDB itself will then destroy the data on flush. mvPartitionStorage.destroyData(writeBatch); for (HashIndex hashIndex : hashIndices.values()) { @@ -285,7 +266,7 @@ public class RocksDbTableStorage implements MvTableStorage { rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch); - return awaitFlush(true); + return nullCompletedFuture(); } catch (RocksDBException e) { throw new StorageException("Error when destroying storage: [{}]", e, mvPartitionStorages.createStorageInfo(partitionId)); } @@ -337,22 +318,26 @@ public class RocksDbTableStorage implements MvTableStorage { return inBusyLock(busyLock, () -> { HashIndex hashIdx = hashIndices.remove(indexId); - if (hashIdx != null) { - hashIdx.destroy(); - } - - // Sorted Indexes have a separate Column Family per index, so we simply destroy it immediately after a flush completes - // in order to avoid concurrent access to the CF. SortedIndex sortedIdx = sortedIndices.remove(indexId); - if (sortedIdx != null) { - sortedIdx.destroy(); + if (hashIdx == null && sortedIdx == null) { + return nullCompletedFuture(); } - if (hashIdx == null) { + try (WriteBatch writeBatch = new WriteBatch()) { + if (hashIdx != null) { + hashIdx.destroy(writeBatch); + } + + if (sortedIdx != null) { + sortedIdx.destroy(writeBatch); + } + + rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch); + return nullCompletedFuture(); - } else { - return awaitFlush(false); + } catch (RocksDBException e) { + throw new StorageException("Error when destroying index: {}", e, indexId); } }); } @@ -362,17 +347,6 @@ public class RocksDbTableStorage implements MvTableStorage { return false; } - /** - * Creates a Column Family descriptor for a Sorted Index. - */ - private static ColumnFamilyDescriptor sortedIndexCfDescriptor(String cfName, StorageSortedIndexDescriptor descriptor) { - var comparator = new RocksDbBinaryTupleComparator(descriptor.columns()); - - ColumnFamilyOptions options = new ColumnFamilyOptions().setComparator(comparator); - - return new ColumnFamilyDescriptor(cfName.getBytes(UTF_8), options); - } - @Override public CompletableFuture<Void> startRebalancePartition(int partitionId) { return inBusyLock(busyLock, () -> mvPartitionStorages.startRebalance(partitionId, mvPartitionStorage -> { @@ -401,8 +375,8 @@ public class RocksDbTableStorage implements MvTableStorage { try (WriteBatch writeBatch = new WriteBatch()) { mvPartitionStorage.abortRebalance(writeBatch); - getHashIndexStorages(partitionId).forEach(index -> index.abortReblance(writeBatch)); - getSortedIndexStorages(partitionId).forEach(index -> index.abortReblance(writeBatch)); + getHashIndexStorages(partitionId).forEach(index -> index.abortRebalance(writeBatch)); + getSortedIndexStorages(partitionId).forEach(index -> index.abortRebalance(writeBatch)); rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch); diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java index 3e2144ab87..88d408381d 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java @@ -18,11 +18,6 @@ package org.apache.ignite.internal.storage.rocksdb; import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.sortedIndexCfName; -import static org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.INDEX_CF_PREFIX; -import static org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.createKey; -import static org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance.DFLT_WRITE_OPTS; -import static org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance.deleteByPrefix; -import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -79,18 +74,10 @@ class SortedIndex implements ManuallyCloseable { /** * Removes all data associated with the index. */ - void destroy() { - var indexId = descriptor.id(); - try (WriteBatch writeBatch = new WriteBatch()) { - deleteByPrefix(writeBatch, indexCf, createKey(BYTE_EMPTY_ARRAY, indexId)); - deleteByPrefix(writeBatch, indexMetaStorage.columnFamily(), createKey(INDEX_CF_PREFIX, indexId)); - - rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch); - } catch (RocksDBException e) { - throw new StorageException("Unable to destroy index " + indexId, e); - } + void destroy(WriteBatch writeBatch) { + close(); - rocksDb.dropCfOnIndexDestroy(indexCf.nameBytes(), indexId); + rocksDb.dropCfOnIndexDestroy(indexCf.nameBytes(), descriptor.id()); } /** @@ -112,9 +99,7 @@ class SortedIndex implements ManuallyCloseable { @Override public void close() { try { - IgniteUtils.closeAll( - storages.values().stream().map(index -> index::close) - ); + IgniteUtils.closeAll(storages.values().stream().map(index -> index::close)); } catch (Exception e) { throw new StorageException("Failed to close index storages: " + descriptor.id(), e); } diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java index 1a8271ef44..18b1ee358b 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java @@ -144,7 +144,7 @@ abstract class AbstractRocksDbIndexStorage implements IndexStorage { * * @throws StorageRebalanceException If there was an error when aborting the rebalance. */ - public void abortReblance(WriteBatch writeBatch) { + public void abortRebalance(WriteBatch writeBatch) { if (!state.compareAndSet(StorageState.REBALANCE, StorageState.RUNNABLE)) { throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo()); } diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java index 07d2159dca..e461981fc9 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java @@ -22,6 +22,7 @@ import static org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.IND import static org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.KEY_BYTE_ORDER; import static org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.PARTITION_ID_SIZE; import static org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.ROW_ID_SIZE; +import static org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance.deleteByPrefix; import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageInProgressOfRebalance; import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY; @@ -177,10 +178,6 @@ public class RocksDbHashIndexStorage extends AbstractRocksDbIndexStorage impleme @Override public void destroyData(WriteBatch writeBatch) throws RocksDBException { - byte[] rangeEnd = incrementPrefix(constantPrefix); - - assert rangeEnd != null; - - writeBatch.deleteRange(indexCf.handle(), constantPrefix, rangeEnd); + deleteByPrefix(writeBatch, indexCf, constantPrefix); } } diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java index 3ace0b7325..85dbd5bbd4 100644 --- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java +++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java @@ -42,7 +42,6 @@ import org.apache.ignite.internal.testframework.WorkDirectoryExtension; import org.apache.ignite.internal.util.IgniteUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -162,11 +161,4 @@ public class RocksDbMvTableStorageTest extends AbstractMvTableStorageTest { void storageAdvertisesItIsPersistent() { assertThat(tableStorage.isVolatile(), is(false)); } - - @Disabled("https://issues.apache.org/jira/browse/IGNITE-21574") - @Test - @Override - public void testDestroyIndex() { - super.testDestroyIndex(); - } } diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorageTest.java index 2862434319..89c082c3f4 100644 --- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorageTest.java +++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorageTest.java @@ -33,8 +33,6 @@ import org.apache.ignite.internal.testframework.WorkDirectoryExtension; import org.apache.ignite.internal.util.IgniteUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; /** @@ -72,11 +70,4 @@ public class RocksDbHashIndexStorageTest extends AbstractHashIndexStorageTest { engine == null ? null : engine::stop ); } - - @Disabled("https://issues.apache.org/jira/browse/IGNITE-21574") - @Test - @Override - public void testDestroy() { - super.testDestroy(); - } }