This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 952c97b2d5 IGNITE-21909 Fix race on getting and destroying an index in 
SharedRocksDbInstance (#3544)
952c97b2d5 is described below

commit 952c97b2d5ccf532fb8181c8a8cefaa783ea4a5d
Author: Alexander Polovtcev <alex.polovt...@gmail.com>
AuthorDate: Thu Apr 4 09:57:51 2024 +0300

    IGNITE-21909 Fix race on getting and destroying an index in 
SharedRocksDbInstance (#3544)
---
 .../ignite/internal/storage/rocksdb/HashIndex.java |   2 +-
 .../ignite/internal/storage/rocksdb/Index.java     |  28 ++++--
 .../internal/storage/rocksdb/RocksDbIndexes.java   |  12 ++-
 .../internal/storage/rocksdb/SortedIndex.java      |   2 +-
 .../rocksdb/instance/SharedRocksDbInstance.java    | 106 +++++++++++++--------
 .../instance/SharedRocksDbInstanceTest.java        |  44 ++++++++-
 6 files changed, 134 insertions(+), 60 deletions(-)

diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
index bd45c252f3..29b204014c 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
@@ -43,6 +43,6 @@ class HashIndex extends Index<RocksDbHashIndexStorage> {
 
     @Override
     RocksDbHashIndexStorage createStorage(int partitionId) {
-        return new RocksDbHashIndexStorage(descriptor, tableId, partitionId, 
indexColumnFamily().columnFamily(), indexMetaStorage);
+        return new RocksDbHashIndexStorage(descriptor, tableId(), partitionId, 
columnFamily(), indexMetaStorage);
     }
 }
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/Index.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/Index.java
index 58c23aefaa..0625ceaf09 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/Index.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/Index.java
@@ -25,7 +25,6 @@ import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.internal.rocksdb.ColumnFamily;
 import org.apache.ignite.internal.storage.StorageException;
 import 
org.apache.ignite.internal.storage.rocksdb.index.AbstractRocksDbIndexStorage;
-import org.apache.ignite.internal.storage.rocksdb.instance.IndexColumnFamily;
 import org.apache.ignite.internal.storage.util.StorageState;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.jetbrains.annotations.Nullable;
@@ -36,19 +35,30 @@ import org.rocksdb.WriteBatch;
  * Represents an index for all its partitions.
  */
 abstract class Index<S extends AbstractRocksDbIndexStorage> {
-    final int tableId;
+    private final int tableId;
 
-    private final IndexColumnFamily indexColumnFamily;
+    private final int indexId;
+
+    private final ColumnFamily columnFamily;
 
     private final ConcurrentMap<Integer, S> storageByPartitionId = new 
ConcurrentHashMap<>();
 
     Index(int tableId, int indexId, ColumnFamily cf) {
         this.tableId = tableId;
-        this.indexColumnFamily = new IndexColumnFamily(indexId, cf);
+        this.indexId = indexId;
+        this.columnFamily = cf;
+    }
+
+    int tableId() {
+        return tableId;
+    }
+
+    int indexId() {
+        return indexId;
     }
 
-    IndexColumnFamily indexColumnFamily() {
-        return indexColumnFamily;
+    ColumnFamily columnFamily() {
+        return columnFamily;
     }
 
     /**
@@ -76,7 +86,7 @@ abstract class Index<S extends AbstractRocksDbIndexStorage> {
         try {
             
IgniteUtils.closeAll(storageByPartitionId.values().stream().map(index -> 
index::close));
         } catch (Exception e) {
-            throw new StorageException("Failed to close index storages: " + 
indexColumnFamily.indexId(), e);
+            throw new StorageException("Failed to close index storages: " + 
indexId, e);
         }
     }
 
@@ -87,7 +97,7 @@ abstract class Index<S extends AbstractRocksDbIndexStorage> {
         try {
             
IgniteUtils.closeAll(storageByPartitionId.values().stream().map(index -> 
index::transitionToDestroyedState));
         } catch (Exception e) {
-            throw new StorageException("Failed to transition index storages to 
the DESTROYED state: " + indexColumnFamily.indexId(), e);
+            throw new StorageException("Failed to transition index storages to 
the DESTROYED state: " + indexId, e);
         }
     }
 
@@ -113,6 +123,6 @@ abstract class Index<S extends AbstractRocksDbIndexStorage> 
{
     void destroy(WriteBatch writeBatch) throws RocksDBException {
         transitionToDestroyedState();
 
-        deleteByPrefix(writeBatch, indexColumnFamily.columnFamily(), 
indexPrefix(tableId, indexColumnFamily().indexId()));
+        deleteByPrefix(writeBatch, columnFamily, indexPrefix(tableId, 
indexId));
     }
 }
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbIndexes.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbIndexes.java
index 7ff1b893a1..152d241e55 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbIndexes.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbIndexes.java
@@ -73,7 +73,7 @@ class RocksDbIndexes {
                 }
             }
 
-            var indexCfsToDestroy = new ArrayList<IndexColumnFamily>();
+            var indexCfsToDestroy = new ArrayList<ColumnFamily>();
 
             for (IndexColumnFamily indexColumnFamily : 
rocksDb.sortedIndexes(tableId)) {
                 int indexId = indexColumnFamily.indexId();
@@ -83,9 +83,11 @@ class RocksDbIndexes {
                 var descriptor = (StorageSortedIndexDescriptor) 
indexDescriptorSupplier.get(indexId);
 
                 if (descriptor == null) {
+                    rocksDb.removeSortedIndex(indexId, cf);
+
                     deleteByPrefix(writeBatch, cf, indexPrefix(tableId, 
indexId));
 
-                    indexCfsToDestroy.add(indexColumnFamily);
+                    indexCfsToDestroy.add(cf);
                 } else {
                     sortedIndices.put(indexId, 
SortedIndex.restoreExisting(tableId, cf, descriptor, rocksDb.meta));
                 }
@@ -94,7 +96,7 @@ class RocksDbIndexes {
             rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch);
 
             if (!indexCfsToDestroy.isEmpty()) {
-                rocksDb.scheduleIndexCfsDestroy(indexCfsToDestroy);
+                rocksDb.scheduleIndexCfsDestroyIfNeeded(indexCfsToDestroy);
             }
         }
     }
@@ -174,6 +176,8 @@ class RocksDbIndexes {
             }
 
             if (sortedIdx != null) {
+                rocksDb.removeSortedIndex(indexId, sortedIdx.columnFamily());
+
                 sortedIdx.destroy(writeBatch);
             }
 
@@ -181,7 +185,7 @@ class RocksDbIndexes {
         }
 
         if (sortedIdx != null) {
-            
rocksDb.scheduleIndexCfsDestroy(List.of(sortedIdx.indexColumnFamily()));
+            
rocksDb.scheduleIndexCfsDestroyIfNeeded(List.of(sortedIdx.columnFamily()));
         }
     }
 
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
index c683d971c3..df97ab2c54 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
@@ -66,6 +66,6 @@ class SortedIndex extends Index<RocksDbSortedIndexStorage> {
 
     @Override
     RocksDbSortedIndexStorage createStorage(int partitionId) {
-        return new RocksDbSortedIndexStorage(descriptor, tableId, partitionId, 
indexColumnFamily().columnFamily(), indexMetaStorage);
+        return new RocksDbSortedIndexStorage(descriptor, tableId(), 
partitionId, columnFamily(), indexMetaStorage);
     }
 }
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java
index 8d31ae7477..26dad1528e 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java
@@ -33,6 +33,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -77,10 +78,8 @@ public final class SharedRocksDbInstance {
 
         final Map<Integer, Integer> indexIdToTableId = new 
ConcurrentHashMap<>();
 
-        SortedIndexColumnFamily(ColumnFamily columnFamily, int indexId, int 
tableId) {
+        SortedIndexColumnFamily(ColumnFamily columnFamily) {
             this.columnFamily = columnFamily;
-
-            indexIdToTableId.put(indexId, tableId);
         }
 
         SortedIndexColumnFamily(ColumnFamily columnFamily, Map<Integer, 
Integer> indexIdToTableId) {
@@ -272,12 +271,12 @@ public final class SharedRocksDbInstance {
         try {
             SortedIndexColumnFamily result = sortedIndexCfsByName.compute(new 
ByteArray(cfName), (unused, sortedIndexCf) -> {
                 if (sortedIndexCf == null) {
-                    return new 
SortedIndexColumnFamily(createSortedIndexCf(cfName), indexId, tableId);
-                } else {
-                    sortedIndexCf.indexIdToTableId.put(indexId, tableId);
-
-                    return sortedIndexCf;
+                    sortedIndexCf = new 
SortedIndexColumnFamily(createSortedIndexCf(cfName));
                 }
+
+                sortedIndexCf.indexIdToTableId.put(indexId, tableId);
+
+                return sortedIndexCf;
             });
 
             return result.columnFamily;
@@ -287,70 +286,95 @@ public final class SharedRocksDbInstance {
     }
 
     /**
-     * Schedules a drop of a column family after destroying an index, if it 
was the last index managed by that CF.
+     * Removes the given sorted index from this instance. This prevents this 
index to be returned by {@link #sortedIndexes} call.
      */
-    public CompletableFuture<Void> 
scheduleIndexCfsDestroy(List<IndexColumnFamily> indexColumnFamilies) {
-        assert !indexColumnFamilies.isEmpty();
+    public void removeSortedIndex(int indexId, ColumnFamily cf) {
+        var cfNameBytes = new ByteArray(cf.nameBytes());
 
-        return flusher.awaitFlush(false)
-                .thenRunAsync(() -> 
indexColumnFamilies.forEach(this::destroySortedIndexCfIfNeeded), 
engine.threadPool());
+        sortedIndexCfsByName.computeIfPresent(cfNameBytes, (unused, indexCf) 
-> {
+            indexCf.indexIdToTableId.remove(indexId);
+
+            return indexCf;
+        });
     }
 
-    void destroySortedIndexCfIfNeeded(IndexColumnFamily indexColumnFamily) {
-        if (!busyLock.enterBusy()) {
-            throw new StorageClosedException();
-        }
+    /**
+     * Schedules a drop of a column family after destroying an index, if it 
was the last index managed by that CF.
+     */
+    public CompletableFuture<Void> 
scheduleIndexCfsDestroyIfNeeded(List<ColumnFamily> columnFamilies) {
+        assert !columnFamilies.isEmpty();
 
-        var cfNameBytes = new 
ByteArray(indexColumnFamily.columnFamily().nameBytes());
+        return flusher.awaitFlush(false)
+                .thenRunAsync(() -> {
+                    if (!busyLock.enterBusy()) {
+                        throw new StorageClosedException();
+                    }
+
+                    try {
+                        
columnFamilies.forEach(this::destroySortedIndexCfIfNeeded);
+                    } finally {
+                        busyLock.leaveBusy();
+                    }
+                }, engine.threadPool());
+    }
 
-        try {
-            sortedIndexCfsByName.computeIfPresent(cfNameBytes, (unused, 
indexCf) -> {
-                indexCf.indexIdToTableId.remove(indexColumnFamily.indexId());
+    void destroySortedIndexCfIfNeeded(ColumnFamily columnFamily) {
+        var cfNameBytes = new ByteArray(columnFamily.nameBytes());
 
-                if (!indexCf.indexIdToTableId.isEmpty()) {
-                    return indexCf;
-                }
+        sortedIndexCfsByName.computeIfPresent(cfNameBytes, (unused, indexCf) 
-> {
+            if (!indexCf.indexIdToTableId.isEmpty()) {
+                return indexCf;
+            }
 
-                destroyColumnFamily(indexCf.columnFamily);
+            destroyColumnFamily(indexCf.columnFamily);
 
-                return null;
-            });
-        } finally {
-            busyLock.leaveBusy();
-        }
+            return null;
+        });
     }
 
     /**
      * Removes all data associated with the given table ID in this storage.
      */
-    public void destroyTable(int tableId) {
+    public void destroyTable(int targetTableId) {
         try (WriteBatch writeBatch = new WriteBatch()) {
             byte[] tableIdBytes = ByteBuffer.allocate(Integer.BYTES)
                     .order(KEY_BYTE_ORDER)
-                    .putInt(tableId)
+                    .putInt(targetTableId)
                     .array();
 
             deleteByPrefix(writeBatch, partitionCf, tableIdBytes);
             deleteByPrefix(writeBatch, gcQueueCf, tableIdBytes);
             deleteByPrefix(writeBatch, hashIndexCf, tableIdBytes);
 
-            List<IndexColumnFamily> sortedIndexCfs = sortedIndexes(tableId);
-
-            for (IndexColumnFamily indexColumnFamily : sortedIndexCfs) {
-                deleteByPrefix(writeBatch, indexColumnFamily.columnFamily(), 
tableIdBytes);
-            }
-
             deleteByPrefix(writeBatch, meta.columnFamily(), 
metaPrefix(PARTITION_META_PREFIX, tableIdBytes));
             deleteByPrefix(writeBatch, meta.columnFamily(), 
metaPrefix(PARTITION_CONF_PREFIX, tableIdBytes));
             deleteByPrefix(writeBatch, meta.columnFamily(), 
metaPrefix(INDEX_ROW_ID_PREFIX, tableIdBytes));
 
+            var cfsToRemove = new ArrayList<ColumnFamily>();
+
+            for (SortedIndexColumnFamily indexCf : 
sortedIndexCfsByName.values()) {
+                Iterator<Integer> it = 
indexCf.indexIdToTableId.values().iterator();
+
+                while (it.hasNext()) {
+                    int tableId = it.next();
+
+                    if (targetTableId == tableId) {
+                        it.remove();
+
+                        deleteByPrefix(writeBatch, indexCf.columnFamily, 
tableIdBytes);
+
+                        cfsToRemove.add(indexCf.columnFamily);
+                    }
+                }
+            }
+
             db.write(DFLT_WRITE_OPTS, writeBatch);
 
-            if (!sortedIndexCfs.isEmpty()) {
-                scheduleIndexCfsDestroy(sortedIndexCfs);
+            if (!cfsToRemove.isEmpty()) {
+                scheduleIndexCfsDestroyIfNeeded(cfsToRemove);
             }
         } catch (RocksDBException e) {
-            throw new StorageException("Failed to destroy table data. 
[tableId={}]", e, tableId);
+            throw new StorageException("Failed to destroy table data. 
[tableId={}]", e, targetTableId);
         }
     }
 
diff --git 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceTest.java
 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceTest.java
index b753b9c452..a3ee3ba3de 100644
--- 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceTest.java
+++ 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceTest.java
@@ -126,19 +126,23 @@ class SharedRocksDbInstanceTest extends 
IgniteAbstractTest {
         assertThat(foo, is(not(sameInstance(bar))));
         assertThat(quux, is((sameInstance(baz))));
 
-        rocksDb.destroySortedIndexCfIfNeeded(new IndexColumnFamily(1, foo));
+        rocksDb.removeSortedIndex(1, foo);
+        rocksDb.destroySortedIndexCfIfNeeded(foo);
 
         assertTrue(cfExists(fooName));
 
-        rocksDb.destroySortedIndexCfIfNeeded(new IndexColumnFamily(2, bar));
+        rocksDb.removeSortedIndex(2, bar);
+        rocksDb.destroySortedIndexCfIfNeeded(bar);
 
         assertFalse(cfExists(barName));
 
-        rocksDb.destroySortedIndexCfIfNeeded(new IndexColumnFamily(3, baz));
+        rocksDb.removeSortedIndex(3, baz);
+        rocksDb.destroySortedIndexCfIfNeeded(baz);
 
         assertTrue(cfExists(fooName));
 
-        rocksDb.destroySortedIndexCfIfNeeded(new IndexColumnFamily(4, quux));
+        rocksDb.removeSortedIndex(4, quux);
+        rocksDb.destroySortedIndexCfIfNeeded(quux);
 
         assertFalse(cfExists(fooName));
     }
@@ -277,6 +281,38 @@ class SharedRocksDbInstanceTest extends IgniteAbstractTest 
{
         
assertThat(getIndexFuture.join().stream().map(IndexColumnFamily::indexId).collect(toList()),
 contains(0));
     }
 
+    @Test
+    void testRemoveSortedIndex() {
+        int tableId = 0;
+
+        int indexId = 0;
+
+        byte[] fooName = sortedIndexCfName(List.of(
+                new StorageSortedIndexColumnDescriptor("a", NativeTypes.INT64, 
true, true)
+        ));
+
+        ColumnFamily cf = rocksDb.getOrCreateSortedIndexCf(fooName, indexId, 
tableId);
+
+        rocksDb.removeSortedIndex(indexId, cf);
+
+        assertThat(rocksDb.sortedIndexes(tableId), is(empty()));
+    }
+
+    @Test
+    void testTableDestroyRemovesSortedIndexes() {
+        int tableId = 0;
+
+        byte[] fooName = sortedIndexCfName(List.of(
+                new StorageSortedIndexColumnDescriptor("a", NativeTypes.INT64, 
true, true)
+        ));
+
+        rocksDb.getOrCreateSortedIndexCf(fooName, 0, tableId);
+
+        rocksDb.destroyTable(tableId);
+
+        assertThat(rocksDb.sortedIndexes(tableId), is(empty()));
+    }
+
     private boolean cfExists(byte[] cfName) {
         try {
             // Check Column Family existence by trying to create a new one 
with the same name.

Reply via email to