ibessonov commented on code in PR #1506:
URL: https://github.com/apache/ignite-3/pull/1506#discussion_r1071208048


##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java:
##########
@@ -548,6 +573,51 @@ public void testDestroyTableStorage() throws Exception {
         assertThat(tableStorage.destroy(), willCompleteSuccessfully());
     }
 
+    /**
+     * Checks that if we restart the storages after a crash in the middle of a 
rebalance, the storages will be empty.
+     */
+    @Test
+    public void testRestartStoragesAfterFailOnMiddleOfRebalance() {

Review Comment:
   ```suggestion
       public void testRestartStoragesAfterFailDuringRebalance() {
   ```



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java:
##########
@@ -18,61 +18,89 @@
 package org.apache.ignite.internal.storage.pagememory;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.internal.storage.MvPartitionStorage.REBALANCE_IN_PROGRESS;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReferenceArray;
+import java.util.function.Function;
 import org.apache.ignite.internal.pagememory.DataRegion;
 import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.freelist.FreeList;
+import org.apache.ignite.internal.pagememory.reuse.ReuseList;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
 import org.apache.ignite.internal.schema.configuration.TableConfiguration;
 import org.apache.ignite.internal.schema.configuration.TableView;
 import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.StorageRebalanceException;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.storage.index.HashIndexStorage;
 import org.apache.ignite.internal.storage.index.SortedIndexStorage;
 import 
org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage;
-import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteStringFormatter;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Abstract table storage implementation based on {@link PageMemory}.
  */
 public abstract class AbstractPageMemoryTableStorage implements MvTableStorage 
{
-    protected final TableConfiguration tableCfg;
+    protected static final VarHandle CLOSED;
 
-    protected TablesConfiguration tablesConfiguration;
+    static {
+        try {
+            CLOSED = 
MethodHandles.lookup().findVarHandle(AbstractPageMemoryTableStorage.class, 
"closed", boolean.class);
+        } catch (ReflectiveOperationException e) {
+            throw new ExceptionInInitializerError(e);
+        }
+    }
+
+    protected final TableConfiguration tableConfig;
 
-    protected volatile boolean started;
+    protected final TablesConfiguration tablesConfig;
 
     protected volatile 
AtomicReferenceArray<AbstractPageMemoryMvPartitionStorage> mvPartitions;
 
-    protected final ConcurrentMap<Integer, CompletableFuture<Void>> 
partitionIdDestroyFutureMap = new ConcurrentHashMap<>();
+    protected final ConcurrentMap<Integer, CompletableFuture<Void>> 
destroyFutureByPartitionId = new ConcurrentHashMap<>();
+
+    protected final ConcurrentMap<Integer, CompletableFuture<Void>> 
rebalanceFutureByPartitionId = new ConcurrentHashMap<>();
+
+    /** Busy lock to stop synchronously. */
+    protected final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    /** To avoid double closure. */
+    @SuppressWarnings("unused")
+    protected volatile boolean closed;

Review Comment:
   Almost everyone else calls it `stopGuard`. Why do you break the pattern?



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java:
##########
@@ -268,12 +377,52 @@ private void checkPartitionId(int partitionId) {
         int partitions = mvPartitions.length();
 
         if (partitionId < 0 || partitionId >= partitions) {
-            throw new IllegalArgumentException(S.toString(
-                    "Unable to access partition with id outside of configured 
range",
-                    "table", tableCfg.value().name(), false,
-                    "partitionId", partitionId, false,
-                    "partitions", partitions, false
+            throw new IllegalArgumentException(IgniteStringFormatter.format(
+                    "Unable to access partition with id outside of configured 
range: [table={}, partitionId={}, partitions={}]",
+                    getTableName(),
+                    partitionId,
+                    partitions
             ));
         }
     }
+
+    /**
+     * Returns multi-versioned partition storage without using {@link 
#busyLock}.
+     *
+     * @param partitionId Partition ID.
+     * @return {@code Null} if there is no storage.
+     */
+    @Nullable
+    AbstractPageMemoryMvPartitionStorage 
getMvPartitionStorageWithoutBusyLock(int partitionId) {
+        checkPartitionId(partitionId);
+
+        return mvPartitions.get(partitionId);
+    }
+
+    /**
+     * Returns multi-versioned partition storage, if it doesn't exist it will 
throw an exception from the
+     * {@code missingStorageExceptionFunction}, without using {@link 
#busyLock}.
+     *
+     * @param partitionId Partition ID.
+     * @param missingStorageExceptionFunction Function to create an exception 
if the store is missing.
+     */
+    AbstractPageMemoryMvPartitionStorage getMvPartitionStorageWithoutBusyLock(

Review Comment:
   Can it simply return null? Why do you need to provide exception generation 
function?
   Another option is to use `Optional` with its `orElseThrow`, for example, if 
you want to put everything into a single statement.



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java:
##########
@@ -268,12 +377,52 @@ private void checkPartitionId(int partitionId) {
         int partitions = mvPartitions.length();
 
         if (partitionId < 0 || partitionId >= partitions) {
-            throw new IllegalArgumentException(S.toString(
-                    "Unable to access partition with id outside of configured 
range",
-                    "table", tableCfg.value().name(), false,
-                    "partitionId", partitionId, false,
-                    "partitions", partitions, false
+            throw new IllegalArgumentException(IgniteStringFormatter.format(
+                    "Unable to access partition with id outside of configured 
range: [table={}, partitionId={}, partitions={}]",
+                    getTableName(),
+                    partitionId,
+                    partitions
             ));
         }
     }
+
+    /**
+     * Returns multi-versioned partition storage without using {@link 
#busyLock}.
+     *
+     * @param partitionId Partition ID.
+     * @return {@code Null} if there is no storage.
+     */
+    @Nullable
+    AbstractPageMemoryMvPartitionStorage 
getMvPartitionStorageWithoutBusyLock(int partitionId) {
+        checkPartitionId(partitionId);
+
+        return mvPartitions.get(partitionId);
+    }
+
+    /**
+     * Returns multi-versioned partition storage, if it doesn't exist it will 
throw an exception from the
+     * {@code missingStorageExceptionFunction}, without using {@link 
#busyLock}.
+     *
+     * @param partitionId Partition ID.
+     * @param missingStorageExceptionFunction Function to create an exception 
if the store is missing.
+     */
+    AbstractPageMemoryMvPartitionStorage getMvPartitionStorageWithoutBusyLock(
+            int partitionId,
+            Function<String, ? extends StorageException> 
missingStorageExceptionFunction
+    ) {
+        AbstractPageMemoryMvPartitionStorage mvPartitionStorage = 
getMvPartitionStorageWithoutBusyLock(partitionId);
+
+        if (mvPartitionStorage == null) {
+            throw 
missingStorageExceptionFunction.apply(IgniteStringFormatter.format("Partition 
ID {} does not exist", partitionId));
+        }
+
+        return mvPartitionStorage;
+    }
+
+    /**
+     * Returns table name.
+     */
+    public String getTableName() {

Review Comment:
   Offtopic. Given that tables can be renamed, we should figure out a way to 
print logs "consistently". But that's for the future



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java:
##########
@@ -167,86 +203,79 @@ public CompletableFuture<Void> destroy() {
 
     @Override
     public AbstractPageMemoryMvPartitionStorage getOrCreateMvPartition(int 
partitionId) throws StorageException {
-        AbstractPageMemoryMvPartitionStorage partition = 
getMvPartition(partitionId);
+        return inBusyLock(busyLock, () -> {
+            AbstractPageMemoryMvPartitionStorage partition = 
getMvPartitionStorageWithoutBusyLock(partitionId);
 
-        if (partition != null) {
-            return partition;
-        }
+            if (partition != null) {
+                return partition;
+            }
 
-        partition = createMvPartitionStorage(partitionId);
+            partition = createMvPartitionStorage(partitionId);
 
-        partition.start();
+            partition.start();
 
-        mvPartitions.set(partitionId, partition);
+            mvPartitions.set(partitionId, partition);
 
-        return partition;
+            return partition;
+        });
     }
 
     @Override
     public @Nullable AbstractPageMemoryMvPartitionStorage getMvPartition(int 
partitionId) {
-        assert started : "Storage has not started yet";
-
-        checkPartitionId(partitionId);
-
-        return mvPartitions.get(partitionId);
+        return inBusyLock(busyLock, () -> 
getMvPartitionStorageWithoutBusyLock(partitionId));
     }
 
     @Override
     public CompletableFuture<Void> destroyPartition(int partitionId) {
-        assert started : "Storage has not started yet";
+        return inBusyLock(busyLock, () -> {

Review Comment:
   I would recommend extracting such big closures into methods like 
"destroyPartitionBusy", this would make PR smaller and simplify the review. 
Padding will become smaller as well, that's always a good thing. Right now it's 
hard to tell whether you just re-formatted the code or changed something in it 
while doing it.



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java:
##########
@@ -405,42 +373,148 @@ private IndexMetaTree createIndexMetaTree(
     }
 
     @Override
-    public CompletableFuture<Void> startRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> 
destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage 
mvPartitionStorage) {
+        // It is enough for us to close the partition storage and its indexes 
(do not destroy). Prepare the data region, checkpointer, and
+        // compactor to remove the partition, and then simply delete the 
partition file and its delta files.
+        mvPartitionStorage.close();
 
-    @Override
-    public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
+        return 
destroyPartitionPhysically(createGroupPartitionId(mvPartitionStorage.partitionId()));
     }
 
     @Override
-    public CompletableFuture<Void> finishRebalancePartition(int partitionId, 
long lastAppliedIndex, long lastAppliedTerm) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> 
clearStorageAndUpdateDataStructures(AbstractPageMemoryMvPartitionStorage 
mvPartitionStorage) {
+        GroupPartitionId groupPartitionId = 
createGroupPartitionId(mvPartitionStorage.partitionId());
 
-    @Override
-    CompletableFuture<Void> 
destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage 
mvPartitionStorage) {
-        int partitionId = mvPartitionStorage.partitionId();
+        return destroyPartitionPhysically(groupPartitionId).thenAccept(unused 
-> {
+            TableView tableView = tableConfig.value();
 
-        // It is enough for us to close the partition storage and its indexes 
(do not destroy). Prepare the data region, checkpointer, and
-        // compactor to remove the partition, and then simply delete the 
partition file and its delta files.
+            PersistentPageMemory pageMemory = dataRegion.pageMemory();
 
-        mvPartitionStorage.close();
+            int partitionId = groupPartitionId.getPartitionId();
+
+            PartitionMeta meta = getOrCreatePartitionMeta(groupPartitionId, 
ensurePartitionFilePageStore(tableView, groupPartitionId));

Review Comment:
   So, it's a copy-paste after all. Why?



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java:
##########
@@ -96,49 +98,22 @@ public boolean isVolatile() {
 
     @Override
     protected void finishDestruction() {
-        dataRegion.pageMemory().onGroupDestroyed(tableCfg.tableId().value());
+        
dataRegion.pageMemory().onGroupDestroyed(tableConfig.tableId().value());
     }
 
     @Override
     public PersistentPageMemoryMvPartitionStorage createMvPartitionStorage(int 
partitionId) {
-        CompletableFuture<Void> partitionDestroyFuture = 
partitionIdDestroyFutureMap.get(partitionId);
+        waitPartitionToBeDestroyed(partitionId);
 
-        if (partitionDestroyFuture != null) {
-            try {
-                // Time is chosen randomly (long enough) so as not to call 
#join().
-                partitionDestroyFuture.get(10, TimeUnit.SECONDS);
-            } catch (Exception e) {
-                throw new StorageException("Error waiting for the destruction 
of the previous version of the partition: " + partitionId, e);
-            }
-        }
+        TableView tableView = tableConfig.value();
 
-        TableView tableView = tableCfg.value();
+        GroupPartitionId groupPartitionId = 
createGroupPartitionId(partitionId);
 
-        GroupPartitionId groupPartitionId = new 
GroupPartitionId(tableView.tableId(), partitionId);
+        PartitionMeta meta = 
getOrCreatePartitionMetaWithRecreatePartitionPageStoreIfRebalanceNotCompleted(groupPartitionId);

Review Comment:
   Why did you separated it into a different checkpoint lock section? Won't 
that break things?



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java:
##########
@@ -210,7 +177,8 @@ private FilePageStore 
ensurePartitionFilePageStore(TableView tableView, GroupPar
     /**
      * Returns id of the last started checkpoint, or {@code null} if no 
checkpoints were started yet.
      */
-    public @Nullable UUID lastCheckpointId() {
+    @Nullable
+    private UUID lastCheckpointId() {

Review Comment:
   Ok, now I'm confused! You always attributed Nullable annotation to types, 
not methods, and now you change it back. Why? I don't get it. Is this stated in 
code style guidelines and you just have to change it? I don't think so



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java:
##########
@@ -405,42 +373,148 @@ private IndexMetaTree createIndexMetaTree(
     }
 
     @Override
-    public CompletableFuture<Void> startRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> 
destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage 
mvPartitionStorage) {
+        // It is enough for us to close the partition storage and its indexes 
(do not destroy). Prepare the data region, checkpointer, and
+        // compactor to remove the partition, and then simply delete the 
partition file and its delta files.
+        mvPartitionStorage.close();
 
-    @Override
-    public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
+        return 
destroyPartitionPhysically(createGroupPartitionId(mvPartitionStorage.partitionId()));
     }
 
     @Override
-    public CompletableFuture<Void> finishRebalancePartition(int partitionId, 
long lastAppliedIndex, long lastAppliedTerm) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> 
clearStorageAndUpdateDataStructures(AbstractPageMemoryMvPartitionStorage 
mvPartitionStorage) {
+        GroupPartitionId groupPartitionId = 
createGroupPartitionId(mvPartitionStorage.partitionId());
 
-    @Override
-    CompletableFuture<Void> 
destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage 
mvPartitionStorage) {
-        int partitionId = mvPartitionStorage.partitionId();
+        return destroyPartitionPhysically(groupPartitionId).thenAccept(unused 
-> {
+            TableView tableView = tableConfig.value();
 
-        // It is enough for us to close the partition storage and its indexes 
(do not destroy). Prepare the data region, checkpointer, and
-        // compactor to remove the partition, and then simply delete the 
partition file and its delta files.
+            PersistentPageMemory pageMemory = dataRegion.pageMemory();
 
-        mvPartitionStorage.close();
+            int partitionId = groupPartitionId.getPartitionId();
+
+            PartitionMeta meta = getOrCreatePartitionMeta(groupPartitionId, 
ensurePartitionFilePageStore(tableView, groupPartitionId));
+
+            inCheckpointLock(() -> {
+                RowVersionFreeList rowVersionFreeList = 
createRowVersionFreeList(tableView, partitionId, pageMemory, meta);
 
-        int tableId = tableCfg.tableId().value();
+                IndexColumnsFreeList indexColumnsFreeList
+                        = createIndexColumnsFreeList(tableView, partitionId, 
rowVersionFreeList, pageMemory, meta);
 
-        GroupPartitionId groupPartitionId = new GroupPartitionId(tableId, 
partitionId);
+                VersionChainTree versionChainTree = 
createVersionChainTree(tableView, partitionId, rowVersionFreeList, pageMemory, 
meta);
 
+                IndexMetaTree indexMetaTree = createIndexMetaTree(tableView, 
partitionId, rowVersionFreeList, pageMemory, meta);
+
+                ((PersistentPageMemoryMvPartitionStorage) 
mvPartitionStorage).updateDataStructuresOnRebalance(
+                        meta,
+                        rowVersionFreeList,
+                        indexColumnsFreeList,
+                        versionChainTree,
+                        indexMetaTree
+                );
+
+                return null;
+            });
+        });
+    }
+
+    private CompletableFuture<Void> 
destroyPartitionPhysically(GroupPartitionId groupPartitionId) {
         
dataRegion.filePageStoreManager().getStore(groupPartitionId).markToDestroy();
 
-        dataRegion.pageMemory().invalidate(tableId, partitionId);
+        dataRegion.pageMemory().invalidate(groupPartitionId.getGroupId(), 
groupPartitionId.getPartitionId());
 
         return 
dataRegion.checkpointManager().onPartitionDestruction(groupPartitionId)
                 .thenAccept(unused -> 
dataRegion.partitionMetaManager().removeMeta(groupPartitionId))
                 .thenCompose(unused -> 
dataRegion.filePageStoreManager().destroyPartition(groupPartitionId));
     }
+
+    private GroupPartitionId createGroupPartitionId(int partitionId) {
+        return new GroupPartitionId(tableConfig.tableId().value(), 
partitionId);
+    }
+
+    private <V> V inCheckpointLock(Supplier<V> supplier) {
+        CheckpointTimeoutLock checkpointTimeoutLock = 
dataRegion.checkpointManager().checkpointTimeoutLock();
+
+        checkpointTimeoutLock.checkpointReadLock();
+
+        try {
+            return supplier.get();
+        } finally {
+            checkpointTimeoutLock.checkpointReadUnlock();
+        }
+    }
+
+    private PartitionMeta getOrCreatePartitionMeta(GroupPartitionId 
groupPartitionId, FilePageStore filePageStore) {
+        try {
+            PartitionMeta meta = 
dataRegion.partitionMetaManager().readOrCreateMeta(lastCheckpointId(), 
groupPartitionId, filePageStore);
+
+            dataRegion.partitionMetaManager().addMeta(groupPartitionId, meta);
+
+            filePageStore.pages(meta.pageCount());
+
+            filePageStore.setPageAllocationListener(pageIdx -> {
+                assert 
dataRegion.checkpointManager().checkpointTimeoutLock().checkpointLockIsHeldByThread();
+
+                meta.incrementPageCount(lastCheckpointId());
+            });
+
+            return meta;
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException(
+                    IgniteStringFormatter.format(
+                            "Error reading or creating partition meta 
information: [table={}, partitionId={}]",
+                            getTableName(),
+                            groupPartitionId.getPartitionId()
+                    ),
+                    e
+            );
+        }
+    }
+
+    private PartitionMeta 
getOrCreatePartitionMetaWithRecreatePartitionPageStoreIfRebalanceNotCompleted(GroupPartitionId
 groupPartitionId) {
+        TableView tableView = tableConfig.value();
+
+        FilePageStore filePageStore = ensurePartitionFilePageStore(tableView, 
groupPartitionId);
+
+        PartitionMeta partitionMeta = 
getOrCreatePartitionMeta(groupPartitionId, filePageStore);
+
+        if (partitionMeta.lastAppliedIndex() == REBALANCE_IN_PROGRESS) {
+            try {
+                // Time is chosen randomly (long enough) so as not to call 
#join().
+                destroyPartitionPhysically(groupPartitionId).get(10, 
TimeUnit.SECONDS);
+            } catch (Exception e) {
+                throw new StorageException(
+                        IgniteStringFormatter.format(
+                                "Error when physically destroying a partition: 
[table={}, partitionId={}]",
+                                getTableName(),
+                                groupPartitionId.getPartitionId()
+                        ),
+                        e
+                );
+            }
+
+            return getOrCreatePartitionMeta(groupPartitionId, 
ensurePartitionFilePageStore(tableView, groupPartitionId));
+        } else {
+            return partitionMeta;
+        }
+    }
+
+    private void waitPartitionToBeDestroyed(int partitionId) {
+        CompletableFuture<Void> partitionDestroyFuture = 
destroyFutureByPartitionId.get(partitionId);
+
+        if (partitionDestroyFuture != null) {
+            try {
+                // Time is chosen randomly (long enough) so as not to call 
#join().
+                partitionDestroyFuture.get(10, TimeUnit.SECONDS);

Review Comment:
   Same thing here.



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java:
##########
@@ -97,100 +93,88 @@ public HashIndexDescriptor indexDescriptor() {
 
     @Override
     public Cursor<RowId> get(BinaryTuple key) throws StorageException {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        return busy(() -> {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), 
this::createStorageInfo);
 
-        try {
-            IndexColumns indexColumns = new IndexColumns(partitionId, 
key.byteBuffer());
+            try {
+                IndexColumns indexColumns = new IndexColumns(partitionId, 
key.byteBuffer());
 
-            HashIndexRow lowerBound = new HashIndexRow(indexColumns, 
lowestRowId);
-            HashIndexRow upperBound = new HashIndexRow(indexColumns, 
highestRowId);
+                HashIndexRow lowerBound = new HashIndexRow(indexColumns, 
lowestRowId);
+                HashIndexRow upperBound = new HashIndexRow(indexColumns, 
highestRowId);
 
-            Cursor<HashIndexRow> cursor = hashIndexTree.find(lowerBound, 
upperBound);
+                Cursor<HashIndexRow> cursor = hashIndexTree.find(lowerBound, 
upperBound);
 
-            return new Cursor<>() {
-                @Override
-                public void close() {
-                    cursor.close();
-                }
-
-                @Override
-                public boolean hasNext() {
-                    if (!closeBusyLock.enterBusy()) {
-                        throwStorageClosedException();
+                return new Cursor<RowId>() {
+                    @Override
+                    public void close() {
+                        cursor.close();
                     }
 
-                    try {
-                        return cursor.hasNext();
-                    } finally {
-                        closeBusyLock.leaveBusy();
-                    }
-                }
+                    @Override
+                    public boolean hasNext() {
+                        return busy(() -> {
+                            
throwExceptionIfStorageInProgressOfRebalance(state.get(), 
PageMemoryHashIndexStorage.this::createStorageInfo);
 
-                @Override
-                public RowId next() {
-                    if (!closeBusyLock.enterBusy()) {
-                        throwStorageClosedException();
+                            return cursor.hasNext();
+                        });
                     }
 
-                    try {
-                        return cursor.next().rowId();
-                    } finally {
-                        closeBusyLock.leaveBusy();
+                    @Override
+                    public RowId next() {
+                        return busy(() -> {
+                            
throwExceptionIfStorageInProgressOfRebalance(state.get(), 
PageMemoryHashIndexStorage.this::createStorageInfo);
+
+                            return cursor.next().rowId();
+                        });
                     }
-                }
-            };
-        } catch (IgniteInternalCheckedException e) {
-            throw new StorageException("Failed to create scan cursor", e);
-        } finally {
-            closeBusyLock.leaveBusy();
-        }
+                };
+            } catch (Throwable e) {
+                throw new StorageException("Failed to create scan cursor", e);
+            }
+        });
     }
 
     @Override
     public void put(IndexRow row) throws StorageException {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        busy(() -> {

Review Comment:
   Same here



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java:
##########
@@ -203,19 +187,68 @@ public void destroy() throws StorageException {
      * Closes the hash index storage.
      */
     public void close() {
-        if (!CLOSED.compareAndSet(this, false, true)) {
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLOSED)) {
+            StorageState state = this.state.get();
+
+            assert state == StorageState.CLOSED : state;
+
             return;
         }
 
-        closeBusyLock.block();
+        busyLock.block();
 
         hashIndexTree.close();
     }
 
     /**
-     * Throws an exception that the storage is already closed.
+     * Prepares storage for rebalancing.
+     *
+     * <p>Stops ongoing index operations.
+     *
+     * @throws StorageRebalanceException If there was an error when starting 
the rebalance.
+     */
+    public void startRebalance() {
+        if (!state.compareAndSet(StorageState.RUNNABLE, 
StorageState.REBALANCE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), 
createStorageInfo());
+        }
+
+        // Stops ongoing operations on the storage.

Review Comment:
   This comment is not enough



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java:
##########
@@ -97,100 +93,88 @@ public HashIndexDescriptor indexDescriptor() {
 
     @Override
     public Cursor<RowId> get(BinaryTuple key) throws StorageException {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        return busy(() -> {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), 
this::createStorageInfo);
 
-        try {
-            IndexColumns indexColumns = new IndexColumns(partitionId, 
key.byteBuffer());
+            try {
+                IndexColumns indexColumns = new IndexColumns(partitionId, 
key.byteBuffer());
 
-            HashIndexRow lowerBound = new HashIndexRow(indexColumns, 
lowestRowId);
-            HashIndexRow upperBound = new HashIndexRow(indexColumns, 
highestRowId);
+                HashIndexRow lowerBound = new HashIndexRow(indexColumns, 
lowestRowId);
+                HashIndexRow upperBound = new HashIndexRow(indexColumns, 
highestRowId);
 
-            Cursor<HashIndexRow> cursor = hashIndexTree.find(lowerBound, 
upperBound);
+                Cursor<HashIndexRow> cursor = hashIndexTree.find(lowerBound, 
upperBound);
 
-            return new Cursor<>() {
-                @Override
-                public void close() {
-                    cursor.close();
-                }
-
-                @Override
-                public boolean hasNext() {
-                    if (!closeBusyLock.enterBusy()) {
-                        throwStorageClosedException();
+                return new Cursor<RowId>() {
+                    @Override
+                    public void close() {
+                        cursor.close();
                     }
 
-                    try {
-                        return cursor.hasNext();
-                    } finally {
-                        closeBusyLock.leaveBusy();
-                    }
-                }
+                    @Override
+                    public boolean hasNext() {
+                        return busy(() -> {
+                            
throwExceptionIfStorageInProgressOfRebalance(state.get(), 
PageMemoryHashIndexStorage.this::createStorageInfo);
 
-                @Override
-                public RowId next() {
-                    if (!closeBusyLock.enterBusy()) {
-                        throwStorageClosedException();
+                            return cursor.hasNext();
+                        });
                     }
 
-                    try {
-                        return cursor.next().rowId();
-                    } finally {
-                        closeBusyLock.leaveBusy();
+                    @Override
+                    public RowId next() {
+                        return busy(() -> {
+                            
throwExceptionIfStorageInProgressOfRebalance(state.get(), 
PageMemoryHashIndexStorage.this::createStorageInfo);
+
+                            return cursor.next().rowId();
+                        });
                     }
-                }
-            };
-        } catch (IgniteInternalCheckedException e) {
-            throw new StorageException("Failed to create scan cursor", e);
-        } finally {
-            closeBusyLock.leaveBusy();
-        }
+                };
+            } catch (Throwable e) {
+                throw new StorageException("Failed to create scan cursor", e);
+            }
+        });
     }
 
     @Override
     public void put(IndexRow row) throws StorageException {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        busy(() -> {
+            try {
+                IndexColumns indexColumns = new IndexColumns(partitionId, 
row.indexColumns().byteBuffer());
 
-        try {
-            IndexColumns indexColumns = new IndexColumns(partitionId, 
row.indexColumns().byteBuffer());
+                HashIndexRow hashIndexRow = new HashIndexRow(indexColumns, 
row.rowId());
 
-            HashIndexRow hashIndexRow = new HashIndexRow(indexColumns, 
row.rowId());
+                var insert = new InsertHashIndexRowInvokeClosure(hashIndexRow, 
freeList, hashIndexTree.inlineSize());
 
-            var insert = new InsertHashIndexRowInvokeClosure(hashIndexRow, 
freeList, hashIndexTree.inlineSize());
+                hashIndexTree.invoke(hashIndexRow, null, insert);
 
-            hashIndexTree.invoke(hashIndexRow, null, insert);
-        } catch (IgniteInternalCheckedException e) {
-            throw new StorageException("Failed to put value into index", e);
-        } finally {
-            closeBusyLock.leaveBusy();
-        }
+                return null;
+            } catch (IgniteInternalCheckedException e) {
+                throw new StorageException("Failed to put value into index", 
e);
+            }
+        });
     }
 
     @Override
     public void remove(IndexRow row) throws StorageException {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        busy(() -> {

Review Comment:
   Same here



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java:
##########
@@ -105,71 +102,65 @@ public SortedIndexDescriptor indexDescriptor() {
 
     @Override
     public Cursor<RowId> get(BinaryTuple key) throws StorageException {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        return busy(() -> {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), 
this::createStorageInfo);
 
-        try {
-            SortedIndexRowKey lowerBound = toSortedIndexRow(key, lowestRowId);
+            try {
+                SortedIndexRowKey lowerBound = toSortedIndexRow(key, 
lowestRowId);
 
-            SortedIndexRowKey upperBound = toSortedIndexRow(key, highestRowId);
+                SortedIndexRowKey upperBound = toSortedIndexRow(key, 
highestRowId);
 
-            return convertCursor(sortedIndexTree.find(lowerBound, upperBound), 
SortedIndexRow::rowId);
-        } catch (IgniteInternalCheckedException e) {
-            throw new StorageException("Failed to create scan cursor", e);
-        } finally {
-            closeBusyLock.leaveBusy();
-        }
+                return convertCursor(sortedIndexTree.find(lowerBound, 
upperBound), SortedIndexRow::rowId);
+            } catch (IgniteInternalCheckedException e) {
+                throw new StorageException("Failed to create scan cursor", e);
+            }
+        });
     }
 
     @Override
     public void put(IndexRow row) {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        busy(() -> {

Review Comment:
   Same here



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java:
##########
@@ -18,61 +18,89 @@
 package org.apache.ignite.internal.storage.pagememory;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.internal.storage.MvPartitionStorage.REBALANCE_IN_PROGRESS;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReferenceArray;
+import java.util.function.Function;
 import org.apache.ignite.internal.pagememory.DataRegion;
 import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.freelist.FreeList;
+import org.apache.ignite.internal.pagememory.reuse.ReuseList;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
 import org.apache.ignite.internal.schema.configuration.TableConfiguration;
 import org.apache.ignite.internal.schema.configuration.TableView;
 import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.StorageRebalanceException;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.storage.index.HashIndexStorage;
 import org.apache.ignite.internal.storage.index.SortedIndexStorage;
 import 
org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage;
-import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteStringFormatter;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Abstract table storage implementation based on {@link PageMemory}.
  */
 public abstract class AbstractPageMemoryTableStorage implements MvTableStorage 
{
-    protected final TableConfiguration tableCfg;
+    protected static final VarHandle CLOSED;

Review Comment:
   AtomicBoolean maybe? This is not a hot place and memory footprint is not 
that big, we have other places to worry about



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryStorageUtils.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory;
+
+import java.util.function.Supplier;
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.storage.StorageClosedException;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.StorageRebalanceException;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.IgniteStringFormatter;
+
+/**
+ * Helper class for {@link PageMemory}-based storages.
+ */
+public class PageMemoryStorageUtils {
+    /**
+     * Runs a function under a busyLock, if it was not possible to 
acquire(busy) busyLock throws an exception depending on
+     * {@link StorageState}.
+     *
+     * @param <V> Type of the returned value.
+     * @param busyLock Busy lock.
+     * @param supplier Function.
+     * @param storageInfoSupplier Storage state supplier.
+     * @param storageStateSupplier Storage information supplier, for example 
in the format "table=user, partitionId=1".
+     * @return Value.
+     * @throws StorageClosedException If the storage is closed.
+     * @throws StorageRebalanceException If storage is in the process of 
rebalancing.
+     * @throws StorageException For other {@link StorageState}.
+     */
+    public static <V> V inBusyLock(
+            IgniteSpinBusyLock busyLock,
+            Supplier<V> supplier,
+            Supplier<StorageState> storageStateSupplier,

Review Comment:
   Why can't we pass a state itself?



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java:
##########
@@ -811,8 +885,8 @@ private void checkForPresenceRows(
     ) {
         for (IgniteTuple3<RowId, BinaryRow, HybridTimestamp> row : rows) {
             assertThat(
-                    
getAll(mvPartitionStorage.scanVersions(row.get1())).stream().map(ReadResult::binaryRow).collect(toList()),
-                    containsInAnyOrder(row.get2())
+                    
toListOfByteArrays(mvPartitionStorage.scanVersions(row.get1())),
+                    containsInAnyOrder(row.get2().bytes())

Review Comment:
   I believe that you may have a bug here. `byte[]` cannot be compared with 
`equals`, so this assertion is very suspicious



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java:
##########
@@ -88,25 +92,46 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
     /** Partition id for 1 storage. */
     protected static final int PARTITION_ID_1 = 1 << 8;
 
-    private MvTableStorage tableStorage;
+    protected MvTableStorage tableStorage;
 
     private TableIndexView sortedIdx;
 
     private TableIndexView hashIdx;
 
+    private StorageEngine storageEngine;
+
     /**
      * Initializes the internal structures needed for tests.
      *
      * <p>This method *MUST* always be called in either subclass' constructor 
or setUp method.
      */
-    protected final void initialize(MvTableStorage tableStorage, 
TablesConfiguration tablesCfg) {
-        createTestTable(tableStorage.configuration());
-        createTestIndexes(tablesCfg);
+    protected final void initialize(StorageEngine storageEngine, 
TablesConfiguration tablesConfig) {
+        createTestTable(getTableConfig(tablesConfig));
+        createTestIndexes(tablesConfig);
+
+        this.storageEngine = storageEngine;

Review Comment:
   Do we have a start/stop for storage engines? I think we do, you should call 
them then.



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java:
##########
@@ -247,28 +233,20 @@ public void close() {
 
             @Override
             public boolean hasNext() {
-                if (!closeBusyLock.enterBusy()) {
-                    throwStorageClosedException();
-                }
+                return busy(() -> {
+                    throwExceptionIfStorageInProgressOfRebalance(state.get(), 
PageMemorySortedIndexStorage.this::createStorageInfo);
 
-                try {
                     return cursor.hasNext();
-                } finally {
-                    closeBusyLock.leaveBusy();
-                }
+                });

Review Comment:
   Is this code really easier to read then the previous one?



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java:
##########
@@ -298,76 +276,74 @@ public void close() {
 
         @Override
         public boolean hasNext() {
-            if (!closeBusyLock.enterBusy()) {
-                throwStorageClosedException();
-            }
-
-            try {
-                advanceIfNeeded();
+            return busy(() -> {

Review Comment:
   What's the point of all these changes?



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java:
##########
@@ -105,71 +102,65 @@ public SortedIndexDescriptor indexDescriptor() {
 
     @Override
     public Cursor<RowId> get(BinaryTuple key) throws StorageException {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        return busy(() -> {

Review Comment:
   Same here



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java:
##########
@@ -405,42 +373,148 @@ private IndexMetaTree createIndexMetaTree(
     }
 
     @Override
-    public CompletableFuture<Void> startRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> 
destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage 
mvPartitionStorage) {
+        // It is enough for us to close the partition storage and its indexes 
(do not destroy). Prepare the data region, checkpointer, and
+        // compactor to remove the partition, and then simply delete the 
partition file and its delta files.
+        mvPartitionStorage.close();
 
-    @Override
-    public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
+        return 
destroyPartitionPhysically(createGroupPartitionId(mvPartitionStorage.partitionId()));
     }
 
     @Override
-    public CompletableFuture<Void> finishRebalancePartition(int partitionId, 
long lastAppliedIndex, long lastAppliedTerm) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> 
clearStorageAndUpdateDataStructures(AbstractPageMemoryMvPartitionStorage 
mvPartitionStorage) {

Review Comment:
   Maybe you should make that method protected it it's expected to be overriden?



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java:
##########
@@ -298,76 +276,74 @@ public void close() {
 
         @Override
         public boolean hasNext() {
-            if (!closeBusyLock.enterBusy()) {
-                throwStorageClosedException();
-            }
-
-            try {
-                advanceIfNeeded();
+            return busy(() -> {
+                try {
+                    advanceIfNeeded();
 
-                return hasNext;
-            } catch (IgniteInternalCheckedException e) {
-                throw new StorageException("Error while advancing the cursor", 
e);
-            } finally {
-                closeBusyLock.leaveBusy();
-            }
+                    return hasNext;
+                } catch (IgniteInternalCheckedException e) {
+                    throw new StorageException("Error while advancing the 
cursor", e);
+                }
+            });
         }
 
         @Override
         public IndexRow next() {
-            if (!closeBusyLock.enterBusy()) {
-                throwStorageClosedException();
-            }
-
-            try {
-                advanceIfNeeded();
+            return busy(() -> {
+                try {
+                    advanceIfNeeded();
 
-                boolean hasNext = this.hasNext;
+                    boolean hasNext = this.hasNext;
 
-                if (!hasNext) {
-                    throw new NoSuchElementException();
-                }
+                    if (!hasNext) {
+                        throw new NoSuchElementException();
+                    }
 
-                this.hasNext = null;
+                    this.hasNext = null;
 
-                return toIndexRowImpl(treeRow);
-            } catch (IgniteInternalCheckedException e) {
-                throw new StorageException("Error while advancing the cursor", 
e);
-            } finally {
-                closeBusyLock.leaveBusy();
-            }
+                    return toIndexRowImpl(treeRow);
+                } catch (IgniteInternalCheckedException e) {
+                    throw new StorageException("Error while advancing the 
cursor", e);
+                }
+            });
         }
 
         @Override
         public @Nullable IndexRow peek() {
-            if (hasNext != null) {
-                if (hasNext) {
-                    return toIndexRowImpl(treeRow);
+            return busy(() -> {

Review Comment:
   I can repeat it once again if you wish



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java:
##########
@@ -247,28 +233,20 @@ public void close() {
 
             @Override
             public boolean hasNext() {
-                if (!closeBusyLock.enterBusy()) {
-                    throwStorageClosedException();
-                }
+                return busy(() -> {
+                    throwExceptionIfStorageInProgressOfRebalance(state.get(), 
PageMemorySortedIndexStorage.this::createStorageInfo);
 
-                try {
                     return cursor.hasNext();
-                } finally {
-                    closeBusyLock.leaveBusy();
-                }
+                });
             }
 
             @Override
             public R next() {
-                if (!closeBusyLock.enterBusy()) {
-                    throwStorageClosedException();
-                }
+                return busy(() -> {
+                    throwExceptionIfStorageInProgressOfRebalance(state.get(), 
PageMemorySortedIndexStorage.this::createStorageInfo);

Review Comment:
   Same question about this code



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java:
##########
@@ -157,35 +141,31 @@ protected AbstractPageMemoryMvPartitionStorage(
      * Starts a partition by initializing its internal structures.
      */
     public void start() {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        busy(() -> {

Review Comment:
   Same here



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java:
##########
@@ -298,76 +276,74 @@ public void close() {
 
         @Override
         public boolean hasNext() {
-            if (!closeBusyLock.enterBusy()) {
-                throwStorageClosedException();
-            }
-
-            try {
-                advanceIfNeeded();
+            return busy(() -> {
+                try {
+                    advanceIfNeeded();
 
-                return hasNext;
-            } catch (IgniteInternalCheckedException e) {
-                throw new StorageException("Error while advancing the cursor", 
e);
-            } finally {
-                closeBusyLock.leaveBusy();
-            }
+                    return hasNext;
+                } catch (IgniteInternalCheckedException e) {
+                    throw new StorageException("Error while advancing the 
cursor", e);
+                }
+            });
         }
 
         @Override
         public IndexRow next() {
-            if (!closeBusyLock.enterBusy()) {
-                throwStorageClosedException();
-            }
-
-            try {
-                advanceIfNeeded();
+            return busy(() -> {

Review Comment:
   Let me repeat my question, what's the point?



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java:
##########
@@ -405,42 +373,148 @@ private IndexMetaTree createIndexMetaTree(
     }
 
     @Override
-    public CompletableFuture<Void> startRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> 
destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage 
mvPartitionStorage) {
+        // It is enough for us to close the partition storage and its indexes 
(do not destroy). Prepare the data region, checkpointer, and
+        // compactor to remove the partition, and then simply delete the 
partition file and its delta files.
+        mvPartitionStorage.close();
 
-    @Override
-    public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
+        return 
destroyPartitionPhysically(createGroupPartitionId(mvPartitionStorage.partitionId()));
     }
 
     @Override
-    public CompletableFuture<Void> finishRebalancePartition(int partitionId, 
long lastAppliedIndex, long lastAppliedTerm) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> 
clearStorageAndUpdateDataStructures(AbstractPageMemoryMvPartitionStorage 
mvPartitionStorage) {
+        GroupPartitionId groupPartitionId = 
createGroupPartitionId(mvPartitionStorage.partitionId());
 
-    @Override
-    CompletableFuture<Void> 
destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage 
mvPartitionStorage) {
-        int partitionId = mvPartitionStorage.partitionId();
+        return destroyPartitionPhysically(groupPartitionId).thenAccept(unused 
-> {
+            TableView tableView = tableConfig.value();
 
-        // It is enough for us to close the partition storage and its indexes 
(do not destroy). Prepare the data region, checkpointer, and
-        // compactor to remove the partition, and then simply delete the 
partition file and its delta files.
+            PersistentPageMemory pageMemory = dataRegion.pageMemory();
 
-        mvPartitionStorage.close();
+            int partitionId = groupPartitionId.getPartitionId();
+
+            PartitionMeta meta = getOrCreatePartitionMeta(groupPartitionId, 
ensurePartitionFilePageStore(tableView, groupPartitionId));
+
+            inCheckpointLock(() -> {
+                RowVersionFreeList rowVersionFreeList = 
createRowVersionFreeList(tableView, partitionId, pageMemory, meta);
 
-        int tableId = tableCfg.tableId().value();
+                IndexColumnsFreeList indexColumnsFreeList
+                        = createIndexColumnsFreeList(tableView, partitionId, 
rowVersionFreeList, pageMemory, meta);
 
-        GroupPartitionId groupPartitionId = new GroupPartitionId(tableId, 
partitionId);
+                VersionChainTree versionChainTree = 
createVersionChainTree(tableView, partitionId, rowVersionFreeList, pageMemory, 
meta);
 
+                IndexMetaTree indexMetaTree = createIndexMetaTree(tableView, 
partitionId, rowVersionFreeList, pageMemory, meta);
+
+                ((PersistentPageMemoryMvPartitionStorage) 
mvPartitionStorage).updateDataStructuresOnRebalance(
+                        meta,
+                        rowVersionFreeList,
+                        indexColumnsFreeList,
+                        versionChainTree,
+                        indexMetaTree
+                );
+
+                return null;
+            });
+        });
+    }
+
+    private CompletableFuture<Void> 
destroyPartitionPhysically(GroupPartitionId groupPartitionId) {
         
dataRegion.filePageStoreManager().getStore(groupPartitionId).markToDestroy();
 
-        dataRegion.pageMemory().invalidate(tableId, partitionId);
+        dataRegion.pageMemory().invalidate(groupPartitionId.getGroupId(), 
groupPartitionId.getPartitionId());

Review Comment:
   So, if we create a partition, and the previous one is still not destroyed, 
we would have to wait for the checkpoint, without triggering it. Maybe we 
should fix it in the future. I remember that we had an idea to return Future 
from create/getOrCreatePartition



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java:
##########
@@ -396,4 +372,56 @@ private int compareRows(SortedIndexRowKey key1, 
SortedIndexRowKey key2) {
             );
         }
     }
+
+    /**
+     * Prepares storage for rebalancing.
+     *
+     * <p>Stops ongoing index operations.
+     *
+     * @throws StorageRebalanceException If there was an error when starting 
the rebalance.
+     */
+    public void startRebalance() {
+        if (!state.compareAndSet(StorageState.RUNNABLE, 
StorageState.REBALANCE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), 
createStorageInfo());
+        }
+
+        // Stops ongoing operations on the storage.

Review Comment:
   I don't think that it properly reflects what you're doing. Basically, you 
propagate that state that you just set to all further storage operations, while 
also waiting that all operations that saw the previous state are already 
completed.



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java:
##########
@@ -157,35 +141,31 @@ protected AbstractPageMemoryMvPartitionStorage(
      * Starts a partition by initializing its internal structures.
      */
     public void start() {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        busy(() -> {
+            try (Cursor<IndexMeta> cursor = indexMetaTree.find(null, null)) {
+                NamedListView<TableIndexView> indexesCfgView = 
tableStorage.tablesConfiguration().indexes().value();
 
-        try (Cursor<IndexMeta> cursor = indexMetaTree.find(null, null)) {
-            NamedListView<TableIndexView> indexesCfgView = 
tablesConfiguration.indexes().value();
+                while (cursor.hasNext()) {
+                    IndexMeta indexMeta = cursor.next();
 
-            while (cursor.hasNext()) {
-                IndexMeta indexMeta = cursor.next();
+                    TableIndexView indexCfgView = 
getByInternalId(indexesCfgView, indexMeta.id());
 
-                TableIndexView indexCfgView = getByInternalId(indexesCfgView, 
indexMeta.id());
+                    if (indexCfgView instanceof HashIndexView) {
+                        hashIndexes.put(indexCfgView.id(), 
createOrRestoreHashIndex(indexMeta));

Review Comment:
   Thank you for the fix!



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java:
##########
@@ -396,4 +372,56 @@ private int compareRows(SortedIndexRowKey key1, 
SortedIndexRowKey key2) {
             );
         }
     }
+
+    /**
+     * Prepares storage for rebalancing.
+     *
+     * <p>Stops ongoing index operations.
+     *
+     * @throws StorageRebalanceException If there was an error when starting 
the rebalance.
+     */
+    public void startRebalance() {
+        if (!state.compareAndSet(StorageState.RUNNABLE, 
StorageState.REBALANCE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), 
createStorageInfo());
+        }
+
+        // Stops ongoing operations on the storage.
+        busyLock.block();
+        busyLock.unblock();
+    }
+
+    /**
+     * Completes the rebalancing of the storage.
+     *
+     * @throws StorageRebalanceException If there is an error while completing 
the storage rebalance.
+     */
+    public void completeRebalance() {
+        if (!state.compareAndSet(StorageState.REBALANCE, 
StorageState.RUNNABLE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), 
createStorageInfo());
+        }
+    }
+
+    /**
+     * Updates the internal data structures of the storage on rebalance.
+     *
+     * @param freeList Free list to store index columns.
+     * @param sortedIndexTree Sorted index tree instance.
+     * @throws StorageRebalanceException If the storage is not in the process 
of rebalancing.
+     */
+    public void updateDataStructuresOnRebalance(IndexColumnsFreeList freeList, 
SortedIndexTree sortedIndexTree) {
+        throwExceptionIfStorageNotInProgressOfRebalance(state.get(), 
this::createStorageInfo);
+
+        this.freeList = freeList;
+
+        this.sortedIndexTree.close();

Review Comment:
   Same question here. Looks like a huge flaw in the design.



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java:
##########
@@ -405,42 +373,148 @@ private IndexMetaTree createIndexMetaTree(
     }
 
     @Override
-    public CompletableFuture<Void> startRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> 
destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage 
mvPartitionStorage) {
+        // It is enough for us to close the partition storage and its indexes 
(do not destroy). Prepare the data region, checkpointer, and
+        // compactor to remove the partition, and then simply delete the 
partition file and its delta files.
+        mvPartitionStorage.close();
 
-    @Override
-    public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
+        return 
destroyPartitionPhysically(createGroupPartitionId(mvPartitionStorage.partitionId()));
     }
 
     @Override
-    public CompletableFuture<Void> finishRebalancePartition(int partitionId, 
long lastAppliedIndex, long lastAppliedTerm) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> 
clearStorageAndUpdateDataStructures(AbstractPageMemoryMvPartitionStorage 
mvPartitionStorage) {
+        GroupPartitionId groupPartitionId = 
createGroupPartitionId(mvPartitionStorage.partitionId());
 
-    @Override
-    CompletableFuture<Void> 
destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage 
mvPartitionStorage) {
-        int partitionId = mvPartitionStorage.partitionId();
+        return destroyPartitionPhysically(groupPartitionId).thenAccept(unused 
-> {
+            TableView tableView = tableConfig.value();
 
-        // It is enough for us to close the partition storage and its indexes 
(do not destroy). Prepare the data region, checkpointer, and
-        // compactor to remove the partition, and then simply delete the 
partition file and its delta files.
+            PersistentPageMemory pageMemory = dataRegion.pageMemory();
 
-        mvPartitionStorage.close();
+            int partitionId = groupPartitionId.getPartitionId();
+
+            PartitionMeta meta = getOrCreatePartitionMeta(groupPartitionId, 
ensurePartitionFilePageStore(tableView, groupPartitionId));
+
+            inCheckpointLock(() -> {
+                RowVersionFreeList rowVersionFreeList = 
createRowVersionFreeList(tableView, partitionId, pageMemory, meta);
 
-        int tableId = tableCfg.tableId().value();
+                IndexColumnsFreeList indexColumnsFreeList
+                        = createIndexColumnsFreeList(tableView, partitionId, 
rowVersionFreeList, pageMemory, meta);
 
-        GroupPartitionId groupPartitionId = new GroupPartitionId(tableId, 
partitionId);
+                VersionChainTree versionChainTree = 
createVersionChainTree(tableView, partitionId, rowVersionFreeList, pageMemory, 
meta);
 
+                IndexMetaTree indexMetaTree = createIndexMetaTree(tableView, 
partitionId, rowVersionFreeList, pageMemory, meta);
+
+                ((PersistentPageMemoryMvPartitionStorage) 
mvPartitionStorage).updateDataStructuresOnRebalance(
+                        meta,
+                        rowVersionFreeList,
+                        indexColumnsFreeList,
+                        versionChainTree,
+                        indexMetaTree
+                );
+
+                return null;
+            });
+        });
+    }
+
+    private CompletableFuture<Void> 
destroyPartitionPhysically(GroupPartitionId groupPartitionId) {
         
dataRegion.filePageStoreManager().getStore(groupPartitionId).markToDestroy();
 
-        dataRegion.pageMemory().invalidate(tableId, partitionId);
+        dataRegion.pageMemory().invalidate(groupPartitionId.getGroupId(), 
groupPartitionId.getPartitionId());
 
         return 
dataRegion.checkpointManager().onPartitionDestruction(groupPartitionId)
                 .thenAccept(unused -> 
dataRegion.partitionMetaManager().removeMeta(groupPartitionId))
                 .thenCompose(unused -> 
dataRegion.filePageStoreManager().destroyPartition(groupPartitionId));
     }
+
+    private GroupPartitionId createGroupPartitionId(int partitionId) {
+        return new GroupPartitionId(tableConfig.tableId().value(), 
partitionId);
+    }
+
+    private <V> V inCheckpointLock(Supplier<V> supplier) {
+        CheckpointTimeoutLock checkpointTimeoutLock = 
dataRegion.checkpointManager().checkpointTimeoutLock();
+
+        checkpointTimeoutLock.checkpointReadLock();
+
+        try {
+            return supplier.get();
+        } finally {
+            checkpointTimeoutLock.checkpointReadUnlock();
+        }
+    }
+
+    private PartitionMeta getOrCreatePartitionMeta(GroupPartitionId 
groupPartitionId, FilePageStore filePageStore) {
+        try {
+            PartitionMeta meta = 
dataRegion.partitionMetaManager().readOrCreateMeta(lastCheckpointId(), 
groupPartitionId, filePageStore);
+
+            dataRegion.partitionMetaManager().addMeta(groupPartitionId, meta);
+
+            filePageStore.pages(meta.pageCount());
+
+            filePageStore.setPageAllocationListener(pageIdx -> {
+                assert 
dataRegion.checkpointManager().checkpointTimeoutLock().checkpointLockIsHeldByThread();
+
+                meta.incrementPageCount(lastCheckpointId());
+            });
+
+            return meta;
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException(
+                    IgniteStringFormatter.format(
+                            "Error reading or creating partition meta 
information: [table={}, partitionId={}]",
+                            getTableName(),
+                            groupPartitionId.getPartitionId()
+                    ),
+                    e
+            );
+        }
+    }
+
+    private PartitionMeta 
getOrCreatePartitionMetaWithRecreatePartitionPageStoreIfRebalanceNotCompleted(GroupPartitionId
 groupPartitionId) {
+        TableView tableView = tableConfig.value();
+
+        FilePageStore filePageStore = ensurePartitionFilePageStore(tableView, 
groupPartitionId);

Review Comment:
   Just a nitpick, but what exactly do you ensure? You ensure that _partition 
file page store ..._ what?... Exists? Initialized? Started? Last word is missing



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java:
##########
@@ -800,67 +732,65 @@ public PartitionTimestampCursor scan(HybridTimestamp 
timestamp) throws StorageEx
             } else {
                 return new TimestampCursor(treeCursor, timestamp);
             }
-        } finally {
-            closeBusyLock.leaveBusy();
-        }
+        });
     }
 
     @Override
     public @Nullable RowId closestRowId(RowId lowerBound) throws 
StorageException {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        return busy(() -> {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), 
this::createStorageInfo);
 
-        try (Cursor<VersionChain> cursor = versionChainTree.find(new 
VersionChainKey(lowerBound), null)) {
-            return cursor.hasNext() ? cursor.next().rowId() : null;
-        } catch (Exception e) {
-            throw new StorageException("Error occurred while trying to read a 
row id", e);
-        } finally {
-            closeBusyLock.leaveBusy();
-        }
+            try (Cursor<VersionChain> cursor = versionChainTree.find(new 
VersionChainKey(lowerBound), null)) {
+                return cursor.hasNext() ? cursor.next().rowId() : null;
+            } catch (Exception e) {
+                throw new StorageException("Error occurred while trying to 
read a row id", e);
+            }
+        });
     }
 
     @Override
     public long rowsCount() {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        return busy(() -> {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), 
this::createStorageInfo);
 
-        try {
-            return versionChainTree.size();
-        } catch (IgniteInternalCheckedException e) {
-            throw new StorageException("Error occurred while fetching the 
size.", e);
-        } finally {
-            closeBusyLock.leaveBusy();
-        }
+            try {
+                return versionChainTree.size();
+            } catch (IgniteInternalCheckedException e) {
+                throw new StorageException("Error occurred while fetching the 
size", e);
+            }
+        });
     }
 
     private abstract class BasePartitionTimestampCursor implements 
PartitionTimestampCursor {
-        protected final Cursor<VersionChain> treeCursor;
+        final Cursor<VersionChain> treeCursor;

Review Comment:
   Ok, why changing visibility here?



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java:
##########
@@ -405,42 +373,148 @@ private IndexMetaTree createIndexMetaTree(
     }
 
     @Override
-    public CompletableFuture<Void> startRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> 
destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage 
mvPartitionStorage) {
+        // It is enough for us to close the partition storage and its indexes 
(do not destroy). Prepare the data region, checkpointer, and
+        // compactor to remove the partition, and then simply delete the 
partition file and its delta files.
+        mvPartitionStorage.close();
 
-    @Override
-    public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
+        return 
destroyPartitionPhysically(createGroupPartitionId(mvPartitionStorage.partitionId()));
     }
 
     @Override
-    public CompletableFuture<Void> finishRebalancePartition(int partitionId, 
long lastAppliedIndex, long lastAppliedTerm) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> 
clearStorageAndUpdateDataStructures(AbstractPageMemoryMvPartitionStorage 
mvPartitionStorage) {
+        GroupPartitionId groupPartitionId = 
createGroupPartitionId(mvPartitionStorage.partitionId());
 
-    @Override
-    CompletableFuture<Void> 
destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage 
mvPartitionStorage) {
-        int partitionId = mvPartitionStorage.partitionId();
+        return destroyPartitionPhysically(groupPartitionId).thenAccept(unused 
-> {
+            TableView tableView = tableConfig.value();
 
-        // It is enough for us to close the partition storage and its indexes 
(do not destroy). Prepare the data region, checkpointer, and
-        // compactor to remove the partition, and then simply delete the 
partition file and its delta files.
+            PersistentPageMemory pageMemory = dataRegion.pageMemory();
 
-        mvPartitionStorage.close();
+            int partitionId = groupPartitionId.getPartitionId();
+
+            PartitionMeta meta = getOrCreatePartitionMeta(groupPartitionId, 
ensurePartitionFilePageStore(tableView, groupPartitionId));
+
+            inCheckpointLock(() -> {
+                RowVersionFreeList rowVersionFreeList = 
createRowVersionFreeList(tableView, partitionId, pageMemory, meta);
 
-        int tableId = tableCfg.tableId().value();
+                IndexColumnsFreeList indexColumnsFreeList
+                        = createIndexColumnsFreeList(tableView, partitionId, 
rowVersionFreeList, pageMemory, meta);
 
-        GroupPartitionId groupPartitionId = new GroupPartitionId(tableId, 
partitionId);
+                VersionChainTree versionChainTree = 
createVersionChainTree(tableView, partitionId, rowVersionFreeList, pageMemory, 
meta);
 
+                IndexMetaTree indexMetaTree = createIndexMetaTree(tableView, 
partitionId, rowVersionFreeList, pageMemory, meta);
+
+                ((PersistentPageMemoryMvPartitionStorage) 
mvPartitionStorage).updateDataStructuresOnRebalance(
+                        meta,
+                        rowVersionFreeList,
+                        indexColumnsFreeList,
+                        versionChainTree,
+                        indexMetaTree
+                );
+
+                return null;
+            });
+        });
+    }
+
+    private CompletableFuture<Void> 
destroyPartitionPhysically(GroupPartitionId groupPartitionId) {
         
dataRegion.filePageStoreManager().getStore(groupPartitionId).markToDestroy();
 
-        dataRegion.pageMemory().invalidate(tableId, partitionId);
+        dataRegion.pageMemory().invalidate(groupPartitionId.getGroupId(), 
groupPartitionId.getPartitionId());
 
         return 
dataRegion.checkpointManager().onPartitionDestruction(groupPartitionId)
                 .thenAccept(unused -> 
dataRegion.partitionMetaManager().removeMeta(groupPartitionId))
                 .thenCompose(unused -> 
dataRegion.filePageStoreManager().destroyPartition(groupPartitionId));
     }
+
+    private GroupPartitionId createGroupPartitionId(int partitionId) {
+        return new GroupPartitionId(tableConfig.tableId().value(), 
partitionId);
+    }
+
+    private <V> V inCheckpointLock(Supplier<V> supplier) {
+        CheckpointTimeoutLock checkpointTimeoutLock = 
dataRegion.checkpointManager().checkpointTimeoutLock();
+
+        checkpointTimeoutLock.checkpointReadLock();
+
+        try {
+            return supplier.get();
+        } finally {
+            checkpointTimeoutLock.checkpointReadUnlock();
+        }
+    }
+
+    private PartitionMeta getOrCreatePartitionMeta(GroupPartitionId 
groupPartitionId, FilePageStore filePageStore) {
+        try {
+            PartitionMeta meta = 
dataRegion.partitionMetaManager().readOrCreateMeta(lastCheckpointId(), 
groupPartitionId, filePageStore);
+
+            dataRegion.partitionMetaManager().addMeta(groupPartitionId, meta);
+
+            filePageStore.pages(meta.pageCount());
+
+            filePageStore.setPageAllocationListener(pageIdx -> {
+                assert 
dataRegion.checkpointManager().checkpointTimeoutLock().checkpointLockIsHeldByThread();
+
+                meta.incrementPageCount(lastCheckpointId());
+            });
+
+            return meta;
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException(
+                    IgniteStringFormatter.format(
+                            "Error reading or creating partition meta 
information: [table={}, partitionId={}]",
+                            getTableName(),
+                            groupPartitionId.getPartitionId()
+                    ),
+                    e
+            );
+        }
+    }
+
+    private PartitionMeta 
getOrCreatePartitionMetaWithRecreatePartitionPageStoreIfRebalanceNotCompleted(GroupPartitionId
 groupPartitionId) {

Review Comment:
   Dude, can you please come up with shorter name? Sometimes you should write 
documentation instead of creating super-long method names. By the way, I see no 
comments in your code. Is it so simple that it doesn't require any?



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java:
##########
@@ -987,24 +909,174 @@ public boolean hasNext() {
                     ReadResult result = findLatestRowVersion(chain);
 
                     if (result.isEmpty() && !result.isWriteIntent()) {
-                        continue;
+                        return null;
                     }
 
                     nextRead = result;
                     currentChain = chain;
 
                     return true;
-                } finally {
-                    closeBusyLock.leaveBusy();
+                });
+
+                if (hasNext != null) {
+                    return hasNext;
                 }
             }
         }
     }
 
+    private class ScanVersionsCursor implements Cursor<ReadResult> {
+        final RowId rowId;
+
+        @Nullable
+        private Boolean hasNext;
+
+        @Nullable
+        private VersionChain versionChain;
+
+        @Nullable
+        private RowVersion rowVersion;
+
+        private ScanVersionsCursor(RowId rowId) {
+            this.rowId = rowId;
+        }
+
+        @Override
+        public void close() {
+            // No-op.
+        }
+
+        @Override
+        public boolean hasNext() {
+            return busy(() -> {
+                advanceIfNeeded();
+
+                return hasNext;
+            });
+        }
+
+        @Override
+        public ReadResult next() {
+            return busy(() -> {
+                advanceIfNeeded();
+
+                if (!hasNext) {
+                    throw new NoSuchElementException();
+                }
+
+                hasNext = null;
+
+                return 
rowVersionToResultNotFillingLastCommittedTs(versionChain, rowVersion);
+            });
+        }
+
+        private void advanceIfNeeded() {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), 
AbstractPageMemoryMvPartitionStorage.this::createStorageInfo);
+
+            if (hasNext != null) {
+                return;
+            }
+
+            if (versionChain == null) {
+                try {
+                    versionChain = versionChainTree.findOne(new 
VersionChainKey(rowId));
+                } catch (IgniteInternalCheckedException e) {
+                    throw new StorageException(e);
+                }
+
+                rowVersion = versionChain == null ? null : 
readRowVersion(versionChain.headLink(), ALWAYS_LOAD_VALUE);
+            } else {
+                rowVersion = !rowVersion.hasNextLink() ? null : 
readRowVersion(rowVersion.nextLink(), ALWAYS_LOAD_VALUE);
+            }
+
+            hasNext = rowVersion != null;
+        }
+    }
+
+    @Override
+    public void close() {
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLOSED)) {
+            StorageState state = this.state.get();
+
+            assert state == StorageState.CLOSED : state;
+
+            return;
+        }
+
+        busyLock.block();
+
+        versionChainTree.close();

Review Comment:
   Can we close all resources in big "closeAll" call like we do in every other 
place?



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java:
##########
@@ -216,51 +197,41 @@ public void lastApplied(long lastAppliedIndex, long 
lastAppliedTerm) throws Stor
 
     @Override
     public long persistedIndex() {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
-
-        try {
-            return persistedIndex;
-        } finally {
-            closeBusyLock.leaveBusy();
-        }
+        return busy(() -> persistedIndex);
     }
 
     @Override
     @Nullable
     public RaftGroupConfiguration committedGroupConfiguration() {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
-
-        try {
-            replicationProtocolGroupConfigReadWriteLock.readLock().lock();
-
+        return busy(() -> {

Review Comment:
   What is changed in this method? Why did you have to refactor it?



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java:
##########
@@ -96,49 +98,22 @@ public boolean isVolatile() {
 
     @Override
     protected void finishDestruction() {
-        dataRegion.pageMemory().onGroupDestroyed(tableCfg.tableId().value());
+        
dataRegion.pageMemory().onGroupDestroyed(tableConfig.tableId().value());
     }
 
     @Override
     public PersistentPageMemoryMvPartitionStorage createMvPartitionStorage(int 
partitionId) {
-        CompletableFuture<Void> partitionDestroyFuture = 
partitionIdDestroyFutureMap.get(partitionId);
+        waitPartitionToBeDestroyed(partitionId);
 
-        if (partitionDestroyFuture != null) {
-            try {
-                // Time is chosen randomly (long enough) so as not to call 
#join().
-                partitionDestroyFuture.get(10, TimeUnit.SECONDS);
-            } catch (Exception e) {
-                throw new StorageException("Error waiting for the destruction 
of the previous version of the partition: " + partitionId, e);
-            }
-        }
+        TableView tableView = tableConfig.value();
 
-        TableView tableView = tableCfg.value();
+        GroupPartitionId groupPartitionId = 
createGroupPartitionId(partitionId);
 
-        GroupPartitionId groupPartitionId = new 
GroupPartitionId(tableView.tableId(), partitionId);
+        PartitionMeta meta = 
getOrCreatePartitionMetaWithRecreatePartitionPageStoreIfRebalanceNotCompleted(groupPartitionId);

Review Comment:
   What's the point of refactoring this code anyway?



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java:
##########
@@ -365,4 +315,70 @@ private void syncMetadataOnCheckpoint(@Nullable Executor 
executor) throws Ignite
             });
         }
     }
+
+    @Override
+    public void lastAppliedOnRebalance(long lastAppliedIndex, long 
lastAppliedTerm) throws StorageException {
+        throwExceptionIfStorageNotInProgressOfRebalance(state.get(), 
this::createStorageInfo);
+
+        lastApplied0(lastAppliedIndex, lastAppliedTerm);
+
+        persistedIndex = lastAppliedIndex;
+    }
+
+    /**
+     * Updates the internal data structures of the storage and its indexes on 
rebalance.
+     *
+     * @param meta Partition meta.
+     * @param rowVersionFreeList Free list for {@link RowVersion}.
+     * @param indexFreeList Free list fot {@link IndexColumns}.
+     * @param versionChainTree Table tree for {@link VersionChain}.
+     * @param indexMetaTree Tree that contains SQL indexes' metadata.
+     * @throws StorageRebalanceException If the storage is not in the process 
of rebalancing.
+     */
+    public void updateDataStructuresOnRebalance(
+            PartitionMeta meta,
+            RowVersionFreeList rowVersionFreeList,
+            IndexColumnsFreeList indexFreeList,
+            VersionChainTree versionChainTree,
+            IndexMetaTree indexMetaTree
+    ) {
+        throwExceptionIfStorageNotInProgressOfRebalance(state.get(), 
this::createStorageInfo);
+
+        this.meta = meta;
+
+        this.rowVersionFreeList.close();

Review Comment:
   Ok, same question here. Why are these structures not already closed?



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java:
##########
@@ -528,20 +516,16 @@ private static byte[] rowBytes(@Nullable BinaryRow row) {
     @Override
     public @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, 
UUID txId, UUID commitTableId, int commitPartitionId)
             throws TxIdMismatchException, StorageException {
-        assert rowId.partitionId() == partitionId : rowId;
-
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        return busy(() -> {
+            assert rowId.partitionId() == partitionId : rowId;
 
-        try {
             VersionChain currentChain = findVersionChain(rowId);
 
             if (currentChain == null) {
                 RowVersion newVersion = insertRowVersion(row, NULL_LINK);
 
-                VersionChain versionChain = 
VersionChain.createUncommitted(rowId, txId, commitTableId, commitPartitionId, 
newVersion.link(),
-                        NULL_LINK);
+                VersionChain versionChain = 
VersionChain.createUncommitted(rowId, txId, commitTableId, commitPartitionId,
+                        newVersion.link(), NULL_LINK);

Review Comment:
   What's the idea behind this change? Old formatting was bad, new one is 
equally bad. Why bother?



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java:
##########
@@ -203,19 +187,68 @@ public void destroy() throws StorageException {
      * Closes the hash index storage.
      */
     public void close() {
-        if (!CLOSED.compareAndSet(this, false, true)) {
+        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLOSED)) {
+            StorageState state = this.state.get();
+
+            assert state == StorageState.CLOSED : state;
+
             return;
         }
 
-        closeBusyLock.block();
+        busyLock.block();
 
         hashIndexTree.close();
     }
 
     /**
-     * Throws an exception that the storage is already closed.
+     * Prepares storage for rebalancing.
+     *
+     * <p>Stops ongoing index operations.
+     *
+     * @throws StorageRebalanceException If there was an error when starting 
the rebalance.
+     */
+    public void startRebalance() {
+        if (!state.compareAndSet(StorageState.RUNNABLE, 
StorageState.REBALANCE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), 
createStorageInfo());
+        }
+
+        // Stops ongoing operations on the storage.
+        busyLock.block();
+        busyLock.unblock();
+    }
+
+    /**
+     * Completes the rebalancing of the storage.
+     *
+     * @throws StorageRebalanceException If there is an error while completing 
the storage rebalance.
      */
-    private void throwStorageClosedException() {
-        throw new StorageClosedException();
+    public void completeRebalance() {
+        if (!state.compareAndSet(StorageState.REBALANCE, 
StorageState.RUNNABLE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), 
createStorageInfo());
+        }
+    }
+
+    /**
+     * Updates the internal data structures of the storage on rebalance.
+     *
+     * @param freeList Free list to store index columns.
+     * @param hashIndexTree Hash index tree instance.
+     * @throws StorageRebalanceException If the storage is not in the process 
of rebalancing.
+     */
+    public void updateDataStructuresOnRebalance(IndexColumnsFreeList freeList, 
HashIndexTree hashIndexTree) {
+        throwExceptionIfStorageNotInProgressOfRebalance(state.get(), 
this::createStorageInfo);
+
+        this.freeList = freeList;
+
+        this.hashIndexTree.close();

Review Comment:
   This is interesting. Why wasn't this object closed before? Something's wrong 
here



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java:
##########
@@ -105,71 +102,65 @@ public SortedIndexDescriptor indexDescriptor() {
 
     @Override
     public Cursor<RowId> get(BinaryTuple key) throws StorageException {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        return busy(() -> {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), 
this::createStorageInfo);
 
-        try {
-            SortedIndexRowKey lowerBound = toSortedIndexRow(key, lowestRowId);
+            try {
+                SortedIndexRowKey lowerBound = toSortedIndexRow(key, 
lowestRowId);
 
-            SortedIndexRowKey upperBound = toSortedIndexRow(key, highestRowId);
+                SortedIndexRowKey upperBound = toSortedIndexRow(key, 
highestRowId);
 
-            return convertCursor(sortedIndexTree.find(lowerBound, upperBound), 
SortedIndexRow::rowId);
-        } catch (IgniteInternalCheckedException e) {
-            throw new StorageException("Failed to create scan cursor", e);
-        } finally {
-            closeBusyLock.leaveBusy();
-        }
+                return convertCursor(sortedIndexTree.find(lowerBound, 
upperBound), SortedIndexRow::rowId);
+            } catch (IgniteInternalCheckedException e) {
+                throw new StorageException("Failed to create scan cursor", e);
+            }
+        });
     }
 
     @Override
     public void put(IndexRow row) {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        busy(() -> {
+            try {
+                SortedIndexRow sortedIndexRow = 
toSortedIndexRow(row.indexColumns(), row.rowId());
 
-        try {
-            SortedIndexRow sortedIndexRow = 
toSortedIndexRow(row.indexColumns(), row.rowId());
+                var insert = new 
InsertSortedIndexRowInvokeClosure(sortedIndexRow, freeList, 
sortedIndexTree.inlineSize());
 
-            var insert = new InsertSortedIndexRowInvokeClosure(sortedIndexRow, 
freeList, sortedIndexTree.inlineSize());
+                sortedIndexTree.invoke(sortedIndexRow, null, insert);
 
-            sortedIndexTree.invoke(sortedIndexRow, null, insert);
-        } catch (IgniteInternalCheckedException e) {
-            throw new StorageException("Failed to put value into index", e);
-        } finally {
-            closeBusyLock.leaveBusy();
-        }
+                return null;
+            } catch (IgniteInternalCheckedException e) {
+                throw new StorageException("Failed to put value into index", 
e);
+            }
+        });
     }
 
     @Override
     public void remove(IndexRow row) {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        busy(() -> {

Review Comment:
   Same here



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java:
##########
@@ -97,100 +93,88 @@ public HashIndexDescriptor indexDescriptor() {
 
     @Override
     public Cursor<RowId> get(BinaryTuple key) throws StorageException {
-        if (!closeBusyLock.enterBusy()) {
-            throwStorageClosedException();
-        }
+        return busy(() -> {

Review Comment:
   Why did you have to do this? These are changes for the sake of changes. Did 
you want to artificially inflate you PR? Well, you succeeded in that case. 
Please roll it back or at least create a new "get*" method that would reduce 
the amount of unnecessary changes.
   I feel like half of all changes in this PR are unjustified. I don't like it. 
Reviewing PRs like this one is a nightmare



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java:
##########
@@ -405,42 +373,148 @@ private IndexMetaTree createIndexMetaTree(
     }
 
     @Override
-    public CompletableFuture<Void> startRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> 
destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage 
mvPartitionStorage) {
+        // It is enough for us to close the partition storage and its indexes 
(do not destroy). Prepare the data region, checkpointer, and
+        // compactor to remove the partition, and then simply delete the 
partition file and its delta files.
+        mvPartitionStorage.close();
 
-    @Override
-    public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
+        return 
destroyPartitionPhysically(createGroupPartitionId(mvPartitionStorage.partitionId()));
     }
 
     @Override
-    public CompletableFuture<Void> finishRebalancePartition(int partitionId, 
long lastAppliedIndex, long lastAppliedTerm) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> 
clearStorageAndUpdateDataStructures(AbstractPageMemoryMvPartitionStorage 
mvPartitionStorage) {
+        GroupPartitionId groupPartitionId = 
createGroupPartitionId(mvPartitionStorage.partitionId());
 
-    @Override
-    CompletableFuture<Void> 
destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage 
mvPartitionStorage) {
-        int partitionId = mvPartitionStorage.partitionId();
+        return destroyPartitionPhysically(groupPartitionId).thenAccept(unused 
-> {
+            TableView tableView = tableConfig.value();
 
-        // It is enough for us to close the partition storage and its indexes 
(do not destroy). Prepare the data region, checkpointer, and
-        // compactor to remove the partition, and then simply delete the 
partition file and its delta files.
+            PersistentPageMemory pageMemory = dataRegion.pageMemory();
 
-        mvPartitionStorage.close();
+            int partitionId = groupPartitionId.getPartitionId();
+
+            PartitionMeta meta = getOrCreatePartitionMeta(groupPartitionId, 
ensurePartitionFilePageStore(tableView, groupPartitionId));
+
+            inCheckpointLock(() -> {
+                RowVersionFreeList rowVersionFreeList = 
createRowVersionFreeList(tableView, partitionId, pageMemory, meta);
 
-        int tableId = tableCfg.tableId().value();
+                IndexColumnsFreeList indexColumnsFreeList
+                        = createIndexColumnsFreeList(tableView, partitionId, 
rowVersionFreeList, pageMemory, meta);
 
-        GroupPartitionId groupPartitionId = new GroupPartitionId(tableId, 
partitionId);
+                VersionChainTree versionChainTree = 
createVersionChainTree(tableView, partitionId, rowVersionFreeList, pageMemory, 
meta);
 
+                IndexMetaTree indexMetaTree = createIndexMetaTree(tableView, 
partitionId, rowVersionFreeList, pageMemory, meta);
+
+                ((PersistentPageMemoryMvPartitionStorage) 
mvPartitionStorage).updateDataStructuresOnRebalance(
+                        meta,
+                        rowVersionFreeList,
+                        indexColumnsFreeList,
+                        versionChainTree,
+                        indexMetaTree
+                );
+
+                return null;
+            });
+        });
+    }
+
+    private CompletableFuture<Void> 
destroyPartitionPhysically(GroupPartitionId groupPartitionId) {
         
dataRegion.filePageStoreManager().getStore(groupPartitionId).markToDestroy();
 
-        dataRegion.pageMemory().invalidate(tableId, partitionId);
+        dataRegion.pageMemory().invalidate(groupPartitionId.getGroupId(), 
groupPartitionId.getPartitionId());
 
         return 
dataRegion.checkpointManager().onPartitionDestruction(groupPartitionId)
                 .thenAccept(unused -> 
dataRegion.partitionMetaManager().removeMeta(groupPartitionId))
                 .thenCompose(unused -> 
dataRegion.filePageStoreManager().destroyPartition(groupPartitionId));
     }
+
+    private GroupPartitionId createGroupPartitionId(int partitionId) {
+        return new GroupPartitionId(tableConfig.tableId().value(), 
partitionId);
+    }
+
+    private <V> V inCheckpointLock(Supplier<V> supplier) {
+        CheckpointTimeoutLock checkpointTimeoutLock = 
dataRegion.checkpointManager().checkpointTimeoutLock();
+
+        checkpointTimeoutLock.checkpointReadLock();
+
+        try {
+            return supplier.get();
+        } finally {
+            checkpointTimeoutLock.checkpointReadUnlock();
+        }
+    }
+
+    private PartitionMeta getOrCreatePartitionMeta(GroupPartitionId 
groupPartitionId, FilePageStore filePageStore) {
+        try {
+            PartitionMeta meta = 
dataRegion.partitionMetaManager().readOrCreateMeta(lastCheckpointId(), 
groupPartitionId, filePageStore);
+
+            dataRegion.partitionMetaManager().addMeta(groupPartitionId, meta);
+
+            filePageStore.pages(meta.pageCount());
+
+            filePageStore.setPageAllocationListener(pageIdx -> {
+                assert 
dataRegion.checkpointManager().checkpointTimeoutLock().checkpointLockIsHeldByThread();
+
+                meta.incrementPageCount(lastCheckpointId());
+            });
+
+            return meta;
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException(
+                    IgniteStringFormatter.format(
+                            "Error reading or creating partition meta 
information: [table={}, partitionId={}]",
+                            getTableName(),
+                            groupPartitionId.getPartitionId()
+                    ),
+                    e
+            );
+        }
+    }
+
+    private PartitionMeta 
getOrCreatePartitionMetaWithRecreatePartitionPageStoreIfRebalanceNotCompleted(GroupPartitionId
 groupPartitionId) {
+        TableView tableView = tableConfig.value();
+
+        FilePageStore filePageStore = ensurePartitionFilePageStore(tableView, 
groupPartitionId);
+
+        PartitionMeta partitionMeta = 
getOrCreatePartitionMeta(groupPartitionId, filePageStore);
+
+        if (partitionMeta.lastAppliedIndex() == REBALANCE_IN_PROGRESS) {
+            try {
+                // Time is chosen randomly (long enough) so as not to call 
#join().
+                destroyPartitionPhysically(groupPartitionId).get(10, 
TimeUnit.SECONDS);

Review Comment:
   I don't see a TODO, I asked about it last time. This code stinks. `Time is 
chosen randomly (long enough)` - if it's long enough, it's not random. If it's 
random, it can't be long enough. I don't like this solution, please create a 
JIRA to fix all such places



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to