This is an automated email from the ASF dual-hosted git repository. apolovtsev pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 952c97b2d5 IGNITE-21909 Fix race on getting and destroying an index in SharedRocksDbInstance (#3544) 952c97b2d5 is described below commit 952c97b2d5ccf532fb8181c8a8cefaa783ea4a5d Author: Alexander Polovtcev <alex.polovt...@gmail.com> AuthorDate: Thu Apr 4 09:57:51 2024 +0300 IGNITE-21909 Fix race on getting and destroying an index in SharedRocksDbInstance (#3544) --- .../ignite/internal/storage/rocksdb/HashIndex.java | 2 +- .../ignite/internal/storage/rocksdb/Index.java | 28 ++++-- .../internal/storage/rocksdb/RocksDbIndexes.java | 12 ++- .../internal/storage/rocksdb/SortedIndex.java | 2 +- .../rocksdb/instance/SharedRocksDbInstance.java | 106 +++++++++++++-------- .../instance/SharedRocksDbInstanceTest.java | 44 ++++++++- 6 files changed, 134 insertions(+), 60 deletions(-) diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java index bd45c252f3..29b204014c 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java @@ -43,6 +43,6 @@ class HashIndex extends Index<RocksDbHashIndexStorage> { @Override RocksDbHashIndexStorage createStorage(int partitionId) { - return new RocksDbHashIndexStorage(descriptor, tableId, partitionId, indexColumnFamily().columnFamily(), indexMetaStorage); + return new RocksDbHashIndexStorage(descriptor, tableId(), partitionId, columnFamily(), indexMetaStorage); } } diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/Index.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/Index.java index 58c23aefaa..0625ceaf09 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/Index.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/Index.java @@ -25,7 +25,6 @@ import java.util.concurrent.ConcurrentMap; import org.apache.ignite.internal.rocksdb.ColumnFamily; import org.apache.ignite.internal.storage.StorageException; import org.apache.ignite.internal.storage.rocksdb.index.AbstractRocksDbIndexStorage; -import org.apache.ignite.internal.storage.rocksdb.instance.IndexColumnFamily; import org.apache.ignite.internal.storage.util.StorageState; import org.apache.ignite.internal.util.IgniteUtils; import org.jetbrains.annotations.Nullable; @@ -36,19 +35,30 @@ import org.rocksdb.WriteBatch; * Represents an index for all its partitions. */ abstract class Index<S extends AbstractRocksDbIndexStorage> { - final int tableId; + private final int tableId; - private final IndexColumnFamily indexColumnFamily; + private final int indexId; + + private final ColumnFamily columnFamily; private final ConcurrentMap<Integer, S> storageByPartitionId = new ConcurrentHashMap<>(); Index(int tableId, int indexId, ColumnFamily cf) { this.tableId = tableId; - this.indexColumnFamily = new IndexColumnFamily(indexId, cf); + this.indexId = indexId; + this.columnFamily = cf; + } + + int tableId() { + return tableId; + } + + int indexId() { + return indexId; } - IndexColumnFamily indexColumnFamily() { - return indexColumnFamily; + ColumnFamily columnFamily() { + return columnFamily; } /** @@ -76,7 +86,7 @@ abstract class Index<S extends AbstractRocksDbIndexStorage> { try { IgniteUtils.closeAll(storageByPartitionId.values().stream().map(index -> index::close)); } catch (Exception e) { - throw new StorageException("Failed to close index storages: " + indexColumnFamily.indexId(), e); + throw new StorageException("Failed to close index storages: " + indexId, e); } } @@ -87,7 +97,7 @@ abstract class Index<S extends AbstractRocksDbIndexStorage> { try { IgniteUtils.closeAll(storageByPartitionId.values().stream().map(index -> index::transitionToDestroyedState)); } catch (Exception e) { - throw new StorageException("Failed to transition index storages to the DESTROYED state: " + indexColumnFamily.indexId(), e); + throw new StorageException("Failed to transition index storages to the DESTROYED state: " + indexId, e); } } @@ -113,6 +123,6 @@ abstract class Index<S extends AbstractRocksDbIndexStorage> { void destroy(WriteBatch writeBatch) throws RocksDBException { transitionToDestroyedState(); - deleteByPrefix(writeBatch, indexColumnFamily.columnFamily(), indexPrefix(tableId, indexColumnFamily().indexId())); + deleteByPrefix(writeBatch, columnFamily, indexPrefix(tableId, indexId)); } } diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbIndexes.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbIndexes.java index 7ff1b893a1..152d241e55 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbIndexes.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbIndexes.java @@ -73,7 +73,7 @@ class RocksDbIndexes { } } - var indexCfsToDestroy = new ArrayList<IndexColumnFamily>(); + var indexCfsToDestroy = new ArrayList<ColumnFamily>(); for (IndexColumnFamily indexColumnFamily : rocksDb.sortedIndexes(tableId)) { int indexId = indexColumnFamily.indexId(); @@ -83,9 +83,11 @@ class RocksDbIndexes { var descriptor = (StorageSortedIndexDescriptor) indexDescriptorSupplier.get(indexId); if (descriptor == null) { + rocksDb.removeSortedIndex(indexId, cf); + deleteByPrefix(writeBatch, cf, indexPrefix(tableId, indexId)); - indexCfsToDestroy.add(indexColumnFamily); + indexCfsToDestroy.add(cf); } else { sortedIndices.put(indexId, SortedIndex.restoreExisting(tableId, cf, descriptor, rocksDb.meta)); } @@ -94,7 +96,7 @@ class RocksDbIndexes { rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch); if (!indexCfsToDestroy.isEmpty()) { - rocksDb.scheduleIndexCfsDestroy(indexCfsToDestroy); + rocksDb.scheduleIndexCfsDestroyIfNeeded(indexCfsToDestroy); } } } @@ -174,6 +176,8 @@ class RocksDbIndexes { } if (sortedIdx != null) { + rocksDb.removeSortedIndex(indexId, sortedIdx.columnFamily()); + sortedIdx.destroy(writeBatch); } @@ -181,7 +185,7 @@ class RocksDbIndexes { } if (sortedIdx != null) { - rocksDb.scheduleIndexCfsDestroy(List.of(sortedIdx.indexColumnFamily())); + rocksDb.scheduleIndexCfsDestroyIfNeeded(List.of(sortedIdx.columnFamily())); } } diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java index c683d971c3..df97ab2c54 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java @@ -66,6 +66,6 @@ class SortedIndex extends Index<RocksDbSortedIndexStorage> { @Override RocksDbSortedIndexStorage createStorage(int partitionId) { - return new RocksDbSortedIndexStorage(descriptor, tableId, partitionId, indexColumnFamily().columnFamily(), indexMetaStorage); + return new RocksDbSortedIndexStorage(descriptor, tableId(), partitionId, columnFamily(), indexMetaStorage); } } diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java index 8d31ae7477..26dad1528e 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java @@ -33,6 +33,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -77,10 +78,8 @@ public final class SharedRocksDbInstance { final Map<Integer, Integer> indexIdToTableId = new ConcurrentHashMap<>(); - SortedIndexColumnFamily(ColumnFamily columnFamily, int indexId, int tableId) { + SortedIndexColumnFamily(ColumnFamily columnFamily) { this.columnFamily = columnFamily; - - indexIdToTableId.put(indexId, tableId); } SortedIndexColumnFamily(ColumnFamily columnFamily, Map<Integer, Integer> indexIdToTableId) { @@ -272,12 +271,12 @@ public final class SharedRocksDbInstance { try { SortedIndexColumnFamily result = sortedIndexCfsByName.compute(new ByteArray(cfName), (unused, sortedIndexCf) -> { if (sortedIndexCf == null) { - return new SortedIndexColumnFamily(createSortedIndexCf(cfName), indexId, tableId); - } else { - sortedIndexCf.indexIdToTableId.put(indexId, tableId); - - return sortedIndexCf; + sortedIndexCf = new SortedIndexColumnFamily(createSortedIndexCf(cfName)); } + + sortedIndexCf.indexIdToTableId.put(indexId, tableId); + + return sortedIndexCf; }); return result.columnFamily; @@ -287,70 +286,95 @@ public final class SharedRocksDbInstance { } /** - * Schedules a drop of a column family after destroying an index, if it was the last index managed by that CF. + * Removes the given sorted index from this instance. This prevents this index to be returned by {@link #sortedIndexes} call. */ - public CompletableFuture<Void> scheduleIndexCfsDestroy(List<IndexColumnFamily> indexColumnFamilies) { - assert !indexColumnFamilies.isEmpty(); + public void removeSortedIndex(int indexId, ColumnFamily cf) { + var cfNameBytes = new ByteArray(cf.nameBytes()); - return flusher.awaitFlush(false) - .thenRunAsync(() -> indexColumnFamilies.forEach(this::destroySortedIndexCfIfNeeded), engine.threadPool()); + sortedIndexCfsByName.computeIfPresent(cfNameBytes, (unused, indexCf) -> { + indexCf.indexIdToTableId.remove(indexId); + + return indexCf; + }); } - void destroySortedIndexCfIfNeeded(IndexColumnFamily indexColumnFamily) { - if (!busyLock.enterBusy()) { - throw new StorageClosedException(); - } + /** + * Schedules a drop of a column family after destroying an index, if it was the last index managed by that CF. + */ + public CompletableFuture<Void> scheduleIndexCfsDestroyIfNeeded(List<ColumnFamily> columnFamilies) { + assert !columnFamilies.isEmpty(); - var cfNameBytes = new ByteArray(indexColumnFamily.columnFamily().nameBytes()); + return flusher.awaitFlush(false) + .thenRunAsync(() -> { + if (!busyLock.enterBusy()) { + throw new StorageClosedException(); + } + + try { + columnFamilies.forEach(this::destroySortedIndexCfIfNeeded); + } finally { + busyLock.leaveBusy(); + } + }, engine.threadPool()); + } - try { - sortedIndexCfsByName.computeIfPresent(cfNameBytes, (unused, indexCf) -> { - indexCf.indexIdToTableId.remove(indexColumnFamily.indexId()); + void destroySortedIndexCfIfNeeded(ColumnFamily columnFamily) { + var cfNameBytes = new ByteArray(columnFamily.nameBytes()); - if (!indexCf.indexIdToTableId.isEmpty()) { - return indexCf; - } + sortedIndexCfsByName.computeIfPresent(cfNameBytes, (unused, indexCf) -> { + if (!indexCf.indexIdToTableId.isEmpty()) { + return indexCf; + } - destroyColumnFamily(indexCf.columnFamily); + destroyColumnFamily(indexCf.columnFamily); - return null; - }); - } finally { - busyLock.leaveBusy(); - } + return null; + }); } /** * Removes all data associated with the given table ID in this storage. */ - public void destroyTable(int tableId) { + public void destroyTable(int targetTableId) { try (WriteBatch writeBatch = new WriteBatch()) { byte[] tableIdBytes = ByteBuffer.allocate(Integer.BYTES) .order(KEY_BYTE_ORDER) - .putInt(tableId) + .putInt(targetTableId) .array(); deleteByPrefix(writeBatch, partitionCf, tableIdBytes); deleteByPrefix(writeBatch, gcQueueCf, tableIdBytes); deleteByPrefix(writeBatch, hashIndexCf, tableIdBytes); - List<IndexColumnFamily> sortedIndexCfs = sortedIndexes(tableId); - - for (IndexColumnFamily indexColumnFamily : sortedIndexCfs) { - deleteByPrefix(writeBatch, indexColumnFamily.columnFamily(), tableIdBytes); - } - deleteByPrefix(writeBatch, meta.columnFamily(), metaPrefix(PARTITION_META_PREFIX, tableIdBytes)); deleteByPrefix(writeBatch, meta.columnFamily(), metaPrefix(PARTITION_CONF_PREFIX, tableIdBytes)); deleteByPrefix(writeBatch, meta.columnFamily(), metaPrefix(INDEX_ROW_ID_PREFIX, tableIdBytes)); + var cfsToRemove = new ArrayList<ColumnFamily>(); + + for (SortedIndexColumnFamily indexCf : sortedIndexCfsByName.values()) { + Iterator<Integer> it = indexCf.indexIdToTableId.values().iterator(); + + while (it.hasNext()) { + int tableId = it.next(); + + if (targetTableId == tableId) { + it.remove(); + + deleteByPrefix(writeBatch, indexCf.columnFamily, tableIdBytes); + + cfsToRemove.add(indexCf.columnFamily); + } + } + } + db.write(DFLT_WRITE_OPTS, writeBatch); - if (!sortedIndexCfs.isEmpty()) { - scheduleIndexCfsDestroy(sortedIndexCfs); + if (!cfsToRemove.isEmpty()) { + scheduleIndexCfsDestroyIfNeeded(cfsToRemove); } } catch (RocksDBException e) { - throw new StorageException("Failed to destroy table data. [tableId={}]", e, tableId); + throw new StorageException("Failed to destroy table data. [tableId={}]", e, targetTableId); } } diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceTest.java index b753b9c452..a3ee3ba3de 100644 --- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceTest.java +++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceTest.java @@ -126,19 +126,23 @@ class SharedRocksDbInstanceTest extends IgniteAbstractTest { assertThat(foo, is(not(sameInstance(bar)))); assertThat(quux, is((sameInstance(baz)))); - rocksDb.destroySortedIndexCfIfNeeded(new IndexColumnFamily(1, foo)); + rocksDb.removeSortedIndex(1, foo); + rocksDb.destroySortedIndexCfIfNeeded(foo); assertTrue(cfExists(fooName)); - rocksDb.destroySortedIndexCfIfNeeded(new IndexColumnFamily(2, bar)); + rocksDb.removeSortedIndex(2, bar); + rocksDb.destroySortedIndexCfIfNeeded(bar); assertFalse(cfExists(barName)); - rocksDb.destroySortedIndexCfIfNeeded(new IndexColumnFamily(3, baz)); + rocksDb.removeSortedIndex(3, baz); + rocksDb.destroySortedIndexCfIfNeeded(baz); assertTrue(cfExists(fooName)); - rocksDb.destroySortedIndexCfIfNeeded(new IndexColumnFamily(4, quux)); + rocksDb.removeSortedIndex(4, quux); + rocksDb.destroySortedIndexCfIfNeeded(quux); assertFalse(cfExists(fooName)); } @@ -277,6 +281,38 @@ class SharedRocksDbInstanceTest extends IgniteAbstractTest { assertThat(getIndexFuture.join().stream().map(IndexColumnFamily::indexId).collect(toList()), contains(0)); } + @Test + void testRemoveSortedIndex() { + int tableId = 0; + + int indexId = 0; + + byte[] fooName = sortedIndexCfName(List.of( + new StorageSortedIndexColumnDescriptor("a", NativeTypes.INT64, true, true) + )); + + ColumnFamily cf = rocksDb.getOrCreateSortedIndexCf(fooName, indexId, tableId); + + rocksDb.removeSortedIndex(indexId, cf); + + assertThat(rocksDb.sortedIndexes(tableId), is(empty())); + } + + @Test + void testTableDestroyRemovesSortedIndexes() { + int tableId = 0; + + byte[] fooName = sortedIndexCfName(List.of( + new StorageSortedIndexColumnDescriptor("a", NativeTypes.INT64, true, true) + )); + + rocksDb.getOrCreateSortedIndexCf(fooName, 0, tableId); + + rocksDb.destroyTable(tableId); + + assertThat(rocksDb.sortedIndexes(tableId), is(empty())); + } + private boolean cfExists(byte[] cfName) { try { // Check Column Family existence by trying to create a new one with the same name.