tkalkirill commented on code in PR #1506:
URL: https://github.com/apache/ignite-3/pull/1506#discussion_r1072415322


##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java:
##########
@@ -405,42 +373,148 @@ private IndexMetaTree createIndexMetaTree(
     }
 
     @Override
-    public CompletableFuture<Void> startRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> 
destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage 
mvPartitionStorage) {
+        // It is enough for us to close the partition storage and its indexes 
(do not destroy). Prepare the data region, checkpointer, and
+        // compactor to remove the partition, and then simply delete the 
partition file and its delta files.
+        mvPartitionStorage.close();
 
-    @Override
-    public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
+        return 
destroyPartitionPhysically(createGroupPartitionId(mvPartitionStorage.partitionId()));
     }
 
     @Override
-    public CompletableFuture<Void> finishRebalancePartition(int partitionId, 
long lastAppliedIndex, long lastAppliedTerm) {
-        // TODO: IGNITE-18029 Implement
-        throw new UnsupportedOperationException();
-    }
+    CompletableFuture<Void> 
clearStorageAndUpdateDataStructures(AbstractPageMemoryMvPartitionStorage 
mvPartitionStorage) {
+        GroupPartitionId groupPartitionId = 
createGroupPartitionId(mvPartitionStorage.partitionId());
 
-    @Override
-    CompletableFuture<Void> 
destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage 
mvPartitionStorage) {
-        int partitionId = mvPartitionStorage.partitionId();
+        return destroyPartitionPhysically(groupPartitionId).thenAccept(unused 
-> {
+            TableView tableView = tableConfig.value();
 
-        // It is enough for us to close the partition storage and its indexes 
(do not destroy). Prepare the data region, checkpointer, and
-        // compactor to remove the partition, and then simply delete the 
partition file and its delta files.
+            PersistentPageMemory pageMemory = dataRegion.pageMemory();
 
-        mvPartitionStorage.close();
+            int partitionId = groupPartitionId.getPartitionId();
+
+            PartitionMeta meta = getOrCreatePartitionMeta(groupPartitionId, 
ensurePartitionFilePageStore(tableView, groupPartitionId));
+
+            inCheckpointLock(() -> {
+                RowVersionFreeList rowVersionFreeList = 
createRowVersionFreeList(tableView, partitionId, pageMemory, meta);
 
-        int tableId = tableCfg.tableId().value();
+                IndexColumnsFreeList indexColumnsFreeList
+                        = createIndexColumnsFreeList(tableView, partitionId, 
rowVersionFreeList, pageMemory, meta);
 
-        GroupPartitionId groupPartitionId = new GroupPartitionId(tableId, 
partitionId);
+                VersionChainTree versionChainTree = 
createVersionChainTree(tableView, partitionId, rowVersionFreeList, pageMemory, 
meta);
 
+                IndexMetaTree indexMetaTree = createIndexMetaTree(tableView, 
partitionId, rowVersionFreeList, pageMemory, meta);
+
+                ((PersistentPageMemoryMvPartitionStorage) 
mvPartitionStorage).updateDataStructuresOnRebalance(
+                        meta,
+                        rowVersionFreeList,
+                        indexColumnsFreeList,
+                        versionChainTree,
+                        indexMetaTree
+                );
+
+                return null;
+            });
+        });
+    }
+
+    private CompletableFuture<Void> 
destroyPartitionPhysically(GroupPartitionId groupPartitionId) {
         
dataRegion.filePageStoreManager().getStore(groupPartitionId).markToDestroy();
 
-        dataRegion.pageMemory().invalidate(tableId, partitionId);
+        dataRegion.pageMemory().invalidate(groupPartitionId.getGroupId(), 
groupPartitionId.getPartitionId());
 
         return 
dataRegion.checkpointManager().onPartitionDestruction(groupPartitionId)
                 .thenAccept(unused -> 
dataRegion.partitionMetaManager().removeMeta(groupPartitionId))
                 .thenCompose(unused -> 
dataRegion.filePageStoreManager().destroyPartition(groupPartitionId));
     }
+
+    private GroupPartitionId createGroupPartitionId(int partitionId) {
+        return new GroupPartitionId(tableConfig.tableId().value(), 
partitionId);
+    }
+
+    private <V> V inCheckpointLock(Supplier<V> supplier) {
+        CheckpointTimeoutLock checkpointTimeoutLock = 
dataRegion.checkpointManager().checkpointTimeoutLock();
+
+        checkpointTimeoutLock.checkpointReadLock();
+
+        try {
+            return supplier.get();
+        } finally {
+            checkpointTimeoutLock.checkpointReadUnlock();
+        }
+    }
+
+    private PartitionMeta getOrCreatePartitionMeta(GroupPartitionId 
groupPartitionId, FilePageStore filePageStore) {
+        try {
+            PartitionMeta meta = 
dataRegion.partitionMetaManager().readOrCreateMeta(lastCheckpointId(), 
groupPartitionId, filePageStore);
+
+            dataRegion.partitionMetaManager().addMeta(groupPartitionId, meta);
+
+            filePageStore.pages(meta.pageCount());
+
+            filePageStore.setPageAllocationListener(pageIdx -> {
+                assert 
dataRegion.checkpointManager().checkpointTimeoutLock().checkpointLockIsHeldByThread();
+
+                meta.incrementPageCount(lastCheckpointId());
+            });
+
+            return meta;
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException(
+                    IgniteStringFormatter.format(
+                            "Error reading or creating partition meta 
information: [table={}, partitionId={}]",
+                            getTableName(),
+                            groupPartitionId.getPartitionId()
+                    ),
+                    e
+            );
+        }
+    }
+
+    private PartitionMeta 
getOrCreatePartitionMetaWithRecreatePartitionPageStoreIfRebalanceNotCompleted(GroupPartitionId
 groupPartitionId) {
+        TableView tableView = tableConfig.value();
+
+        FilePageStore filePageStore = ensurePartitionFilePageStore(tableView, 
groupPartitionId);

Review Comment:
   Try to fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to