This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new a29862a834 IGNITE-21574 Implement index destruction for RocksDB engine 
(#3247)
a29862a834 is described below

commit a29862a834b1119aef8de768f310a7bbdfed7260
Author: Alexander Polovtcev <alex.polovt...@gmail.com>
AuthorDate: Thu Feb 22 16:49:40 2024 +0200

    IGNITE-21574 Implement index destruction for RocksDB engine (#3247)
---
 .../storage/AbstractMvTableStorageTest.java        |   3 +
 .../index/AbstractSortedIndexStorageTest.java      |  35 +++++
 .../AbstractPageMemorySortedIndexStorageTest.java  |   8 ++
 .../ignite/internal/storage/rocksdb/HashIndex.java |  12 +-
 .../storage/rocksdb/RocksDbMetaStorage.java        |  13 +-
 .../storage/rocksdb/RocksDbTableStorage.java       | 152 +++++++++------------
 .../internal/storage/rocksdb/SortedIndex.java      |  23 +---
 .../rocksdb/index/AbstractRocksDbIndexStorage.java |   2 +-
 .../rocksdb/index/RocksDbHashIndexStorage.java     |   7 +-
 .../storage/rocksdb/RocksDbMvTableStorageTest.java |   8 --
 .../rocksdb/index/RocksDbHashIndexStorageTest.java |   9 --
 11 files changed, 129 insertions(+), 143 deletions(-)

diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index 652b63905c..b16e3d2b89 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -267,6 +267,9 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
         assertThat(partitionStorage.flush(), willCompleteSuccessfully());
         assertThat(destroySortedIndexFuture, willCompleteSuccessfully());
         assertThat(destroyHashIndexFuture, willCompleteSuccessfully());
+
+        assertThat(tableStorage.getIndex(PARTITION_ID, sortedIdx.id()), 
is(nullValue()));
+        assertThat(tableStorage.getIndex(PARTITION_ID, hashIdx.id()), 
is(nullValue()));
     }
 
     @Test
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
index 34f00f9236..3b961138ad 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
@@ -28,6 +28,7 @@ import static 
org.apache.ignite.internal.storage.index.SortedIndexStorage.GREATE
 import static 
org.apache.ignite.internal.storage.index.SortedIndexStorage.GREATER_OR_EQUAL;
 import static org.apache.ignite.internal.storage.index.SortedIndexStorage.LESS;
 import static 
org.apache.ignite.internal.storage.index.SortedIndexStorage.LESS_OR_EQUAL;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.nullValue;
@@ -35,6 +36,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.sameInstance;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -51,6 +53,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.stream.IntStream;
@@ -70,6 +73,7 @@ import 
org.apache.ignite.internal.storage.index.impl.TestIndexRow;
 import org.apache.ignite.internal.testframework.VariableSource;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.sql.ColumnType;
+import org.hamcrest.Matchers;
 import org.intellij.lang.annotations.MagicConstant;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.RepeatedTest;
@@ -1341,6 +1345,37 @@ public abstract class AbstractSortedIndexStorageTest 
extends AbstractIndexStorag
         assertThrows(NoSuchElementException.class, scan::next);
     }
 
+    @Test
+    public void testDestroy() {
+        SortedIndexStorage index = createIndexStorage(INDEX_NAME, 
ColumnType.INT32, ColumnType.STRING);
+
+        int indexId = index.indexDescriptor().id();
+
+        assertThat(tableStorage.getIndex(TEST_PARTITION, indexId), 
is(sameInstance(index)));
+
+        var serializer = new BinaryTupleRowSerializer(index.indexDescriptor());
+
+        IndexRow row1 = serializer.serializeRow(new Object[]{ 1, "foo" }, new 
RowId(TEST_PARTITION));
+        IndexRow row2 = serializer.serializeRow(new Object[]{ 1, "foo" }, new 
RowId(TEST_PARTITION));
+        IndexRow row3 = serializer.serializeRow(new Object[]{ 2, "bar" }, new 
RowId(TEST_PARTITION));
+
+        put(index, row1);
+        put(index, row2);
+        put(index, row3);
+
+        CompletableFuture<Void> destroyFuture = 
tableStorage.destroyIndex(index.indexDescriptor().id());
+
+        assertThat(destroyFuture, willCompleteSuccessfully());
+
+        assertThat(tableStorage.getIndex(TEST_PARTITION, indexId), 
is(Matchers.nullValue()));
+
+        index = createIndexStorage(INDEX_NAME, ColumnType.INT32, 
ColumnType.STRING);
+
+        assertThat(getAll(index, row1), is(empty()));
+        assertThat(getAll(index, row2), is(empty()));
+        assertThat(getAll(index, row3), is(empty()));
+    }
+
     private List<ColumnParams> shuffledRandomColumnParams() {
         return shuffledColumnParams(d -> random.nextBoolean());
     }
diff --git 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/AbstractPageMemorySortedIndexStorageTest.java
 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/AbstractPageMemorySortedIndexStorageTest.java
index 7640686be4..b6ad8b9acd 100644
--- 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/AbstractPageMemorySortedIndexStorageTest.java
+++ 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/AbstractPageMemorySortedIndexStorageTest.java
@@ -32,6 +32,7 @@ import 
org.apache.ignite.internal.storage.index.SortedIndexStorage;
 import org.apache.ignite.internal.storage.index.impl.BinaryTupleRowSerializer;
 import 
org.apache.ignite.internal.storage.pagememory.configuration.schema.BasePageMemoryStorageEngineConfiguration;
 import org.apache.ignite.sql.ColumnType;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 /**
@@ -100,6 +101,13 @@ abstract class AbstractPageMemorySortedIndexStorageTest 
extends AbstractSortedIn
         assertThat(get(index, indexRow3.indexColumns()), empty());
     }
 
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-21583";)
+    @Test
+    @Override
+    public void testDestroy() {
+        super.testDestroy();
+    }
+
     private static IndexRow createIndexRow(BinaryTupleRowSerializer 
serializer, RowId rowId, Object... objects) {
         return serializer.serializeRow(objects, rowId);
     }
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
index d7bb4b7f2a..a7659e8c72 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
@@ -17,6 +17,10 @@
 
 package org.apache.ignite.internal.storage.rocksdb;
 
+import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.createKey;
+import static 
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance.deleteByPrefix;
+import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
+
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.internal.rocksdb.ColumnFamily;
@@ -60,9 +64,11 @@ class HashIndex {
     /**
      * Removes all data associated with the index.
      */
-    void destroy() {
-        // TODO: implement, see 
https://issues.apache.org/jira/browse/IGNITE-21574.
-        throw new UnsupportedOperationException("Not implemented yet");
+    void destroy(WriteBatch writeBatch) throws RocksDBException {
+        close();
+
+        // Every index storage uses an "index ID" + "partition ID" prefix. We 
can remove everything by just using the index ID prefix.
+        deleteByPrefix(writeBatch, indexCf, createKey(BYTE_EMPTY_ARRAY, 
descriptor.id()));
     }
 
     /**
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java
index 86b36e2834..63d2506945 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java
@@ -39,24 +39,19 @@ import org.rocksdb.RocksDBException;
 public class RocksDbMetaStorage {
     /**
      * Prefix to store partition meta information, such as last applied index 
and term.
-     * Key format is {@code [prexif, tableId, partitionId]} in BE.
+     * Key format is {@code [prefix, tableId, partitionId]} in BE.
      */
     public static final byte[] PARTITION_META_PREFIX = {0};
 
     /**
-     * Prefix to store partition configuration. Key format is {@code [prexif, 
tableId, partitionId]} in BE.
+     * Prefix to store partition configuration. Key format is {@code [prefix, 
tableId, partitionId]} in BE.
      */
     public static final byte[] PARTITION_CONF_PREFIX = {1};
 
     /**
-     * Prefix to store index column family name. Key format is {@code [prexif, 
indexId]} in BE.
+     * Prefix to store next row id to build in index. Key format is {@code 
[prefix, indexId, partitionId]} in BE.
      */
-    public static final byte[] INDEX_CF_PREFIX = {2};
-
-    /**
-     * Prefix to store next row id to build in index. Key format is {@code 
[prexif, indexId, partitionId]} in BE.
-     */
-    public static final byte[] INDEX_ROW_ID_PREFIX = {3};
+    public static final byte[] INDEX_ROW_ID_PREFIX = {2};
 
     private final ColumnFamily metaColumnFamily;
 
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index 7427163a65..69cad3ca2a 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -17,26 +17,26 @@
 
 package org.apache.ignite.internal.storage.rocksdb;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.stream.Collectors.toList;
 import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.PARTITION_CONF_PREFIX;
 import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.PARTITION_META_PREFIX;
 import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.createKey;
 import static 
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance.DFLT_WRITE_OPTS;
+import static 
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance.deleteByPrefix;
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.createMissingMvPartitionErrorMessage;
 import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.StorageException;
@@ -48,7 +48,6 @@ import org.apache.ignite.internal.storage.index.IndexStorage;
 import org.apache.ignite.internal.storage.index.SortedIndexStorage;
 import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor;
 import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor;
-import 
org.apache.ignite.internal.storage.rocksdb.index.RocksDbBinaryTupleComparator;
 import 
org.apache.ignite.internal.storage.rocksdb.index.RocksDbHashIndexStorage;
 import 
org.apache.ignite.internal.storage.rocksdb.index.RocksDbSortedIndexStorage;
 import 
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance;
@@ -56,9 +55,7 @@ import 
org.apache.ignite.internal.storage.util.MvPartitionStorages;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.jetbrains.annotations.Nullable;
-import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.FlushOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
@@ -71,7 +68,7 @@ public class RocksDbTableStorage implements MvTableStorage {
     private final SharedRocksDbInstance rocksDb;
 
     /** Partition storages. */
-    private volatile MvPartitionStorages<RocksDbMvPartitionStorage> 
mvPartitionStorages;
+    private final MvPartitionStorages<RocksDbMvPartitionStorage> 
mvPartitionStorages;
 
     /** Hash Index storages by Index IDs. */
     private final ConcurrentMap<Integer, HashIndex> hashIndices = new 
ConcurrentHashMap<>();
@@ -99,8 +96,8 @@ public class RocksDbTableStorage implements MvTableStorage {
             StorageTableDescriptor tableDescriptor
     ) {
         this.rocksDb = rocksDb;
-
         this.tableDescriptor = tableDescriptor;
+        this.mvPartitionStorages = new 
MvPartitionStorages<>(tableDescriptor.getId(), tableDescriptor.getPartitions());
     }
 
     /**
@@ -139,13 +136,7 @@ public class RocksDbTableStorage implements MvTableStorage 
{
     }
 
     @Override
-    public void start() throws StorageException {
-        inBusyLock(busyLock, () -> {
-            MvPartitionStorages<RocksDbMvPartitionStorage> mvPartitionStorages 
=
-                    new MvPartitionStorages<>(tableDescriptor.getId(), 
tableDescriptor.getPartitions());
-
-            this.mvPartitionStorages = mvPartitionStorages;
-        });
+    public void start() {
     }
 
     /**
@@ -158,62 +149,52 @@ public class RocksDbTableStorage implements 
MvTableStorage {
         return inBusyLock(busyLock, () -> 
rocksDb.flusher.awaitFlush(schedule));
     }
 
-    private void stop(boolean destroy) {
+    private CompletableFuture<Void> stop(boolean destroy) {
         if (!stopGuard.compareAndSet(false, true)) {
-            return;
+            return nullCompletedFuture();
         }
 
         busyLock.block();
 
-        if (destroy) {
-            destroyTableData();
-        }
+        return mvPartitionStorages.getAllForCloseOrDestroy()
+                .thenAccept(partitionStorages -> {
+                    var resources = new ArrayList<AutoCloseable>();
 
-        List<AutoCloseable> resources = new ArrayList<>();
+                    for (HashIndex index : hashIndices.values()) {
+                        resources.add(index::close);
+                    }
 
-        resources.addAll(
-                sortedIndices.values().stream()
-                        .map(index -> (AutoCloseable) index::close)
-                        .collect(toList())
-        );
+                    for (SortedIndex index : sortedIndices.values()) {
+                        resources.add(index::close);
+                    }
 
-        try {
-            mvPartitionStorages
-                    .getAllForCloseOrDestroy()
-                    // 10 seconds is taken by analogy with shutdown of thread 
pool, in general this should be fairly fast.
-                    .get(10, TimeUnit.SECONDS)
-                    .forEach(mvPartitionStorage -> 
resources.add(mvPartitionStorage::close));
-
-            for (HashIndex index : hashIndices.values()) {
-                resources.add(index::close);
-            }
-
-            for (SortedIndex index : sortedIndices.values()) {
-                resources.add(index::close);
-            }
+                    partitionStorages.forEach(mvPartitionStorage -> 
resources.add(mvPartitionStorage::close));
 
-            Collections.reverse(resources);
+                    try {
+                        IgniteUtils.closeAll(resources);
+                    } catch (Exception e) {
+                        throw new StorageException("Failed to stop RocksDB 
table storage: " + getTableId(), e);
+                    }
 
-            IgniteUtils.closeAll(resources);
-        } catch (Exception e) {
-            throw new StorageException("Failed to stop RocksDB table storage: 
" + getTableId(), e);
-        }
+                    if (destroy) {
+                        destroyTableData();
+                    }
+                });
     }
 
     @Override
     public void close() throws StorageException {
-        stop(false);
+        // 10 seconds is taken by analogy with shutdown of thread pool, in 
general this should be fairly fast.
+        try {
+            stop(false).get(10, TimeUnit.SECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException 
e) {
+            throw new StorageException("Failed to stop RocksDB table storage: 
" + getTableId(), e);
+        }
     }
 
     @Override
     public CompletableFuture<Void> destroy() {
-        try {
-            stop(true);
-
-            return nullCompletedFuture();
-        } catch (Throwable t) {
-            return failedFuture(new StorageException("Failed to destroy 
RocksDB table storage: " + getTableId(), t));
-        }
+        return stop(true);
     }
 
     private void destroyTableData() {
@@ -222,24 +203,24 @@ public class RocksDbTableStorage implements 
MvTableStorage {
 
             byte[] tablePrefix = createKey(BYTE_EMPTY_ARRAY, tableId);
 
-            SharedRocksDbInstance.deleteByPrefix(writeBatch, 
rocksDb.partitionCf, tablePrefix);
-            SharedRocksDbInstance.deleteByPrefix(writeBatch, 
rocksDb.gcQueueCf, tablePrefix);
+            deleteByPrefix(writeBatch, rocksDb.partitionCf, tablePrefix);
+            deleteByPrefix(writeBatch, rocksDb.gcQueueCf, tablePrefix);
+
+            for (HashIndex hashIndex : hashIndices.values()) {
+                hashIndex.destroy(writeBatch);
+            }
 
-            for (int indexId : hashIndices.keySet()) {
-                SharedRocksDbInstance.deleteByPrefix(writeBatch, 
rocksDb.hashIndexCf, createKey(BYTE_EMPTY_ARRAY, indexId));
+            for (SortedIndex sortedIndex : sortedIndices.values()) {
+                sortedIndex.destroy(writeBatch);
             }
 
-            SharedRocksDbInstance.deleteByPrefix(writeBatch, 
rocksDb.meta.columnFamily(), createKey(PARTITION_META_PREFIX, tableId));
-            SharedRocksDbInstance.deleteByPrefix(writeBatch, 
rocksDb.meta.columnFamily(), createKey(PARTITION_CONF_PREFIX, tableId));
+            deleteByPrefix(writeBatch, rocksDb.meta.columnFamily(), 
createKey(PARTITION_META_PREFIX, tableId));
+            deleteByPrefix(writeBatch, rocksDb.meta.columnFamily(), 
createKey(PARTITION_CONF_PREFIX, tableId));
 
             rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch);
         } catch (RocksDBException e) {
             throw new StorageException("Failed to destroy table data. 
[tableId={}]", e, getTableId());
         }
-
-        for (SortedIndex sortedIndex : sortedIndices.values()) {
-            sortedIndex.destroy();
-        }
     }
 
     @Override
@@ -272,7 +253,7 @@ public class RocksDbTableStorage implements MvTableStorage {
                 mvPartitionStorage.close();
 
                 // Operation to delete partition data should be fast, since we 
will write only the range of keys for deletion, and the
-                // RocksDB itself will then destroy the data on flash.
+                // RocksDB itself will then destroy the data on flush.
                 mvPartitionStorage.destroyData(writeBatch);
 
                 for (HashIndex hashIndex : hashIndices.values()) {
@@ -285,7 +266,7 @@ public class RocksDbTableStorage implements MvTableStorage {
 
                 rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch);
 
-                return awaitFlush(true);
+                return nullCompletedFuture();
             } catch (RocksDBException e) {
                 throw new StorageException("Error when destroying storage: 
[{}]", e, mvPartitionStorages.createStorageInfo(partitionId));
             }
@@ -337,22 +318,26 @@ public class RocksDbTableStorage implements 
MvTableStorage {
         return inBusyLock(busyLock, () -> {
             HashIndex hashIdx = hashIndices.remove(indexId);
 
-            if (hashIdx != null) {
-                hashIdx.destroy();
-            }
-
-            // Sorted Indexes have a separate Column Family per index, so we 
simply destroy it immediately after a flush completes
-            // in order to avoid concurrent access to the CF.
             SortedIndex sortedIdx = sortedIndices.remove(indexId);
 
-            if (sortedIdx != null) {
-                sortedIdx.destroy();
+            if (hashIdx == null && sortedIdx == null) {
+                return nullCompletedFuture();
             }
 
-            if (hashIdx == null) {
+            try (WriteBatch writeBatch = new WriteBatch()) {
+                if (hashIdx != null) {
+                    hashIdx.destroy(writeBatch);
+                }
+
+                if (sortedIdx != null) {
+                    sortedIdx.destroy(writeBatch);
+                }
+
+                rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch);
+
                 return nullCompletedFuture();
-            } else {
-                return awaitFlush(false);
+            } catch (RocksDBException e) {
+                throw new StorageException("Error when destroying index: {}", 
e, indexId);
             }
         });
     }
@@ -362,17 +347,6 @@ public class RocksDbTableStorage implements MvTableStorage 
{
         return false;
     }
 
-    /**
-     * Creates a Column Family descriptor for a Sorted Index.
-     */
-    private static ColumnFamilyDescriptor sortedIndexCfDescriptor(String 
cfName, StorageSortedIndexDescriptor descriptor) {
-        var comparator = new 
RocksDbBinaryTupleComparator(descriptor.columns());
-
-        ColumnFamilyOptions options = new 
ColumnFamilyOptions().setComparator(comparator);
-
-        return new ColumnFamilyDescriptor(cfName.getBytes(UTF_8), options);
-    }
-
     @Override
     public CompletableFuture<Void> startRebalancePartition(int partitionId) {
         return inBusyLock(busyLock, () -> 
mvPartitionStorages.startRebalance(partitionId, mvPartitionStorage -> {
@@ -401,8 +375,8 @@ public class RocksDbTableStorage implements MvTableStorage {
             try (WriteBatch writeBatch = new WriteBatch()) {
                 mvPartitionStorage.abortRebalance(writeBatch);
 
-                getHashIndexStorages(partitionId).forEach(index -> 
index.abortReblance(writeBatch));
-                getSortedIndexStorages(partitionId).forEach(index -> 
index.abortReblance(writeBatch));
+                getHashIndexStorages(partitionId).forEach(index -> 
index.abortRebalance(writeBatch));
+                getSortedIndexStorages(partitionId).forEach(index -> 
index.abortRebalance(writeBatch));
 
                 rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch);
 
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
index 3e2144ab87..88d408381d 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
@@ -18,11 +18,6 @@
 package org.apache.ignite.internal.storage.rocksdb;
 
 import static 
org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.sortedIndexCfName;
-import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.INDEX_CF_PREFIX;
-import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.createKey;
-import static 
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance.DFLT_WRITE_OPTS;
-import static 
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance.deleteByPrefix;
-import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -79,18 +74,10 @@ class SortedIndex implements ManuallyCloseable {
     /**
      * Removes all data associated with the index.
      */
-    void destroy() {
-        var indexId = descriptor.id();
-        try (WriteBatch writeBatch = new WriteBatch()) {
-            deleteByPrefix(writeBatch, indexCf, createKey(BYTE_EMPTY_ARRAY, 
indexId));
-            deleteByPrefix(writeBatch, indexMetaStorage.columnFamily(), 
createKey(INDEX_CF_PREFIX, indexId));
-
-            rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch);
-        } catch (RocksDBException e) {
-            throw new StorageException("Unable to destroy index " + indexId, 
e);
-        }
+    void destroy(WriteBatch writeBatch) {
+        close();
 
-        rocksDb.dropCfOnIndexDestroy(indexCf.nameBytes(), indexId);
+        rocksDb.dropCfOnIndexDestroy(indexCf.nameBytes(), descriptor.id());
     }
 
     /**
@@ -112,9 +99,7 @@ class SortedIndex implements ManuallyCloseable {
     @Override
     public void close() {
         try {
-            IgniteUtils.closeAll(
-                    storages.values().stream().map(index -> index::close)
-            );
+            IgniteUtils.closeAll(storages.values().stream().map(index -> 
index::close));
         } catch (Exception e) {
             throw new StorageException("Failed to close index storages: " + 
descriptor.id(), e);
         }
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java
index 1a8271ef44..18b1ee358b 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java
@@ -144,7 +144,7 @@ abstract class AbstractRocksDbIndexStorage implements 
IndexStorage {
      *
      * @throws StorageRebalanceException If there was an error when aborting 
the rebalance.
      */
-    public void abortReblance(WriteBatch writeBatch) {
+    public void abortRebalance(WriteBatch writeBatch) {
         if (!state.compareAndSet(StorageState.REBALANCE, 
StorageState.RUNNABLE)) {
             throwExceptionDependingOnStorageStateOnRebalance(state.get(), 
createStorageInfo());
         }
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
index 07d2159dca..e461981fc9 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
@@ -22,6 +22,7 @@ import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.IND
 import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.KEY_BYTE_ORDER;
 import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.PARTITION_ID_SIZE;
 import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.ROW_ID_SIZE;
+import static 
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance.deleteByPrefix;
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageInProgressOfRebalance;
 import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
 
@@ -177,10 +178,6 @@ public class RocksDbHashIndexStorage extends 
AbstractRocksDbIndexStorage impleme
 
     @Override
     public void destroyData(WriteBatch writeBatch) throws RocksDBException {
-        byte[] rangeEnd = incrementPrefix(constantPrefix);
-
-        assert rangeEnd != null;
-
-        writeBatch.deleteRange(indexCf.handle(), constantPrefix, rangeEnd);
+        deleteByPrefix(writeBatch, indexCf, constantPrefix);
     }
 }
diff --git 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
index 3ace0b7325..85dbd5bbd4 100644
--- 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
+++ 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
@@ -42,7 +42,6 @@ import 
org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
@@ -162,11 +161,4 @@ public class RocksDbMvTableStorageTest extends 
AbstractMvTableStorageTest {
     void storageAdvertisesItIsPersistent() {
         assertThat(tableStorage.isVolatile(), is(false));
     }
-
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-21574";)
-    @Test
-    @Override
-    public void testDestroyIndex() {
-        super.testDestroyIndex();
-    }
 }
diff --git 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorageTest.java
 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorageTest.java
index 2862434319..89c082c3f4 100644
--- 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorageTest.java
+++ 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorageTest.java
@@ -33,8 +33,6 @@ import 
org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
 /**
@@ -72,11 +70,4 @@ public class RocksDbHashIndexStorageTest extends 
AbstractHashIndexStorageTest {
                 engine == null ? null : engine::stop
         );
     }
-
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-21574";)
-    @Test
-    @Override
-    public void testDestroy() {
-        super.testDestroy();
-    }
 }

Reply via email to