tkalkirill commented on code in PR #1506: URL: https://github.com/apache/ignite-3/pull/1506#discussion_r1072431782
########## modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java: ########## @@ -405,42 +373,148 @@ private IndexMetaTree createIndexMetaTree( } @Override - public CompletableFuture<Void> startRebalancePartition(int partitionId) { - // TODO: IGNITE-18029 Implement - throw new UnsupportedOperationException(); - } + CompletableFuture<Void> destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) { + // It is enough for us to close the partition storage and its indexes (do not destroy). Prepare the data region, checkpointer, and + // compactor to remove the partition, and then simply delete the partition file and its delta files. + mvPartitionStorage.close(); - @Override - public CompletableFuture<Void> abortRebalancePartition(int partitionId) { - // TODO: IGNITE-18029 Implement - throw new UnsupportedOperationException(); + return destroyPartitionPhysically(createGroupPartitionId(mvPartitionStorage.partitionId())); } @Override - public CompletableFuture<Void> finishRebalancePartition(int partitionId, long lastAppliedIndex, long lastAppliedTerm) { - // TODO: IGNITE-18029 Implement - throw new UnsupportedOperationException(); - } + CompletableFuture<Void> clearStorageAndUpdateDataStructures(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) { + GroupPartitionId groupPartitionId = createGroupPartitionId(mvPartitionStorage.partitionId()); - @Override - CompletableFuture<Void> destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) { - int partitionId = mvPartitionStorage.partitionId(); + return destroyPartitionPhysically(groupPartitionId).thenAccept(unused -> { + TableView tableView = tableConfig.value(); - // It is enough for us to close the partition storage and its indexes (do not destroy). Prepare the data region, checkpointer, and - // compactor to remove the partition, and then simply delete the partition file and its delta files. + PersistentPageMemory pageMemory = dataRegion.pageMemory(); - mvPartitionStorage.close(); + int partitionId = groupPartitionId.getPartitionId(); + + PartitionMeta meta = getOrCreatePartitionMeta(groupPartitionId, ensurePartitionFilePageStore(tableView, groupPartitionId)); + + inCheckpointLock(() -> { + RowVersionFreeList rowVersionFreeList = createRowVersionFreeList(tableView, partitionId, pageMemory, meta); - int tableId = tableCfg.tableId().value(); + IndexColumnsFreeList indexColumnsFreeList + = createIndexColumnsFreeList(tableView, partitionId, rowVersionFreeList, pageMemory, meta); - GroupPartitionId groupPartitionId = new GroupPartitionId(tableId, partitionId); + VersionChainTree versionChainTree = createVersionChainTree(tableView, partitionId, rowVersionFreeList, pageMemory, meta); + IndexMetaTree indexMetaTree = createIndexMetaTree(tableView, partitionId, rowVersionFreeList, pageMemory, meta); + + ((PersistentPageMemoryMvPartitionStorage) mvPartitionStorage).updateDataStructuresOnRebalance( + meta, + rowVersionFreeList, + indexColumnsFreeList, + versionChainTree, + indexMetaTree + ); + + return null; + }); + }); + } + + private CompletableFuture<Void> destroyPartitionPhysically(GroupPartitionId groupPartitionId) { dataRegion.filePageStoreManager().getStore(groupPartitionId).markToDestroy(); - dataRegion.pageMemory().invalidate(tableId, partitionId); + dataRegion.pageMemory().invalidate(groupPartitionId.getGroupId(), groupPartitionId.getPartitionId()); return dataRegion.checkpointManager().onPartitionDestruction(groupPartitionId) .thenAccept(unused -> dataRegion.partitionMetaManager().removeMeta(groupPartitionId)) .thenCompose(unused -> dataRegion.filePageStoreManager().destroyPartition(groupPartitionId)); } + + private GroupPartitionId createGroupPartitionId(int partitionId) { + return new GroupPartitionId(tableConfig.tableId().value(), partitionId); + } + + private <V> V inCheckpointLock(Supplier<V> supplier) { + CheckpointTimeoutLock checkpointTimeoutLock = dataRegion.checkpointManager().checkpointTimeoutLock(); + + checkpointTimeoutLock.checkpointReadLock(); + + try { + return supplier.get(); + } finally { + checkpointTimeoutLock.checkpointReadUnlock(); + } + } + + private PartitionMeta getOrCreatePartitionMeta(GroupPartitionId groupPartitionId, FilePageStore filePageStore) { + try { + PartitionMeta meta = dataRegion.partitionMetaManager().readOrCreateMeta(lastCheckpointId(), groupPartitionId, filePageStore); + + dataRegion.partitionMetaManager().addMeta(groupPartitionId, meta); + + filePageStore.pages(meta.pageCount()); + + filePageStore.setPageAllocationListener(pageIdx -> { + assert dataRegion.checkpointManager().checkpointTimeoutLock().checkpointLockIsHeldByThread(); + + meta.incrementPageCount(lastCheckpointId()); + }); + + return meta; + } catch (IgniteInternalCheckedException e) { + throw new StorageException( + IgniteStringFormatter.format( + "Error reading or creating partition meta information: [table={}, partitionId={}]", + getTableName(), + groupPartitionId.getPartitionId() + ), + e + ); + } + } + + private PartitionMeta getOrCreatePartitionMetaWithRecreatePartitionPageStoreIfRebalanceNotCompleted(GroupPartitionId groupPartitionId) { + TableView tableView = tableConfig.value(); + + FilePageStore filePageStore = ensurePartitionFilePageStore(tableView, groupPartitionId); + + PartitionMeta partitionMeta = getOrCreatePartitionMeta(groupPartitionId, filePageStore); + + if (partitionMeta.lastAppliedIndex() == REBALANCE_IN_PROGRESS) { + try { + // Time is chosen randomly (long enough) so as not to call #join(). + destroyPartitionPhysically(groupPartitionId).get(10, TimeUnit.SECONDS); Review Comment: Added TODO. ########## modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java: ########## @@ -405,42 +373,148 @@ private IndexMetaTree createIndexMetaTree( } @Override - public CompletableFuture<Void> startRebalancePartition(int partitionId) { - // TODO: IGNITE-18029 Implement - throw new UnsupportedOperationException(); - } + CompletableFuture<Void> destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) { + // It is enough for us to close the partition storage and its indexes (do not destroy). Prepare the data region, checkpointer, and + // compactor to remove the partition, and then simply delete the partition file and its delta files. + mvPartitionStorage.close(); - @Override - public CompletableFuture<Void> abortRebalancePartition(int partitionId) { - // TODO: IGNITE-18029 Implement - throw new UnsupportedOperationException(); + return destroyPartitionPhysically(createGroupPartitionId(mvPartitionStorage.partitionId())); } @Override - public CompletableFuture<Void> finishRebalancePartition(int partitionId, long lastAppliedIndex, long lastAppliedTerm) { - // TODO: IGNITE-18029 Implement - throw new UnsupportedOperationException(); - } + CompletableFuture<Void> clearStorageAndUpdateDataStructures(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) { + GroupPartitionId groupPartitionId = createGroupPartitionId(mvPartitionStorage.partitionId()); - @Override - CompletableFuture<Void> destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage mvPartitionStorage) { - int partitionId = mvPartitionStorage.partitionId(); + return destroyPartitionPhysically(groupPartitionId).thenAccept(unused -> { + TableView tableView = tableConfig.value(); - // It is enough for us to close the partition storage and its indexes (do not destroy). Prepare the data region, checkpointer, and - // compactor to remove the partition, and then simply delete the partition file and its delta files. + PersistentPageMemory pageMemory = dataRegion.pageMemory(); - mvPartitionStorage.close(); + int partitionId = groupPartitionId.getPartitionId(); + + PartitionMeta meta = getOrCreatePartitionMeta(groupPartitionId, ensurePartitionFilePageStore(tableView, groupPartitionId)); + + inCheckpointLock(() -> { + RowVersionFreeList rowVersionFreeList = createRowVersionFreeList(tableView, partitionId, pageMemory, meta); - int tableId = tableCfg.tableId().value(); + IndexColumnsFreeList indexColumnsFreeList + = createIndexColumnsFreeList(tableView, partitionId, rowVersionFreeList, pageMemory, meta); - GroupPartitionId groupPartitionId = new GroupPartitionId(tableId, partitionId); + VersionChainTree versionChainTree = createVersionChainTree(tableView, partitionId, rowVersionFreeList, pageMemory, meta); + IndexMetaTree indexMetaTree = createIndexMetaTree(tableView, partitionId, rowVersionFreeList, pageMemory, meta); + + ((PersistentPageMemoryMvPartitionStorage) mvPartitionStorage).updateDataStructuresOnRebalance( + meta, + rowVersionFreeList, + indexColumnsFreeList, + versionChainTree, + indexMetaTree + ); + + return null; + }); + }); + } + + private CompletableFuture<Void> destroyPartitionPhysically(GroupPartitionId groupPartitionId) { dataRegion.filePageStoreManager().getStore(groupPartitionId).markToDestroy(); - dataRegion.pageMemory().invalidate(tableId, partitionId); + dataRegion.pageMemory().invalidate(groupPartitionId.getGroupId(), groupPartitionId.getPartitionId()); return dataRegion.checkpointManager().onPartitionDestruction(groupPartitionId) .thenAccept(unused -> dataRegion.partitionMetaManager().removeMeta(groupPartitionId)) .thenCompose(unused -> dataRegion.filePageStoreManager().destroyPartition(groupPartitionId)); } + + private GroupPartitionId createGroupPartitionId(int partitionId) { + return new GroupPartitionId(tableConfig.tableId().value(), partitionId); + } + + private <V> V inCheckpointLock(Supplier<V> supplier) { + CheckpointTimeoutLock checkpointTimeoutLock = dataRegion.checkpointManager().checkpointTimeoutLock(); + + checkpointTimeoutLock.checkpointReadLock(); + + try { + return supplier.get(); + } finally { + checkpointTimeoutLock.checkpointReadUnlock(); + } + } + + private PartitionMeta getOrCreatePartitionMeta(GroupPartitionId groupPartitionId, FilePageStore filePageStore) { + try { + PartitionMeta meta = dataRegion.partitionMetaManager().readOrCreateMeta(lastCheckpointId(), groupPartitionId, filePageStore); + + dataRegion.partitionMetaManager().addMeta(groupPartitionId, meta); + + filePageStore.pages(meta.pageCount()); + + filePageStore.setPageAllocationListener(pageIdx -> { + assert dataRegion.checkpointManager().checkpointTimeoutLock().checkpointLockIsHeldByThread(); + + meta.incrementPageCount(lastCheckpointId()); + }); + + return meta; + } catch (IgniteInternalCheckedException e) { + throw new StorageException( + IgniteStringFormatter.format( + "Error reading or creating partition meta information: [table={}, partitionId={}]", + getTableName(), + groupPartitionId.getPartitionId() + ), + e + ); + } + } + + private PartitionMeta getOrCreatePartitionMetaWithRecreatePartitionPageStoreIfRebalanceNotCompleted(GroupPartitionId groupPartitionId) { + TableView tableView = tableConfig.value(); + + FilePageStore filePageStore = ensurePartitionFilePageStore(tableView, groupPartitionId); + + PartitionMeta partitionMeta = getOrCreatePartitionMeta(groupPartitionId, filePageStore); + + if (partitionMeta.lastAppliedIndex() == REBALANCE_IN_PROGRESS) { + try { + // Time is chosen randomly (long enough) so as not to call #join(). + destroyPartitionPhysically(groupPartitionId).get(10, TimeUnit.SECONDS); + } catch (Exception e) { + throw new StorageException( + IgniteStringFormatter.format( + "Error when physically destroying a partition: [table={}, partitionId={}]", + getTableName(), + groupPartitionId.getPartitionId() + ), + e + ); + } + + return getOrCreatePartitionMeta(groupPartitionId, ensurePartitionFilePageStore(tableView, groupPartitionId)); + } else { + return partitionMeta; + } + } + + private void waitPartitionToBeDestroyed(int partitionId) { + CompletableFuture<Void> partitionDestroyFuture = destroyFutureByPartitionId.get(partitionId); + + if (partitionDestroyFuture != null) { + try { + // Time is chosen randomly (long enough) so as not to call #join(). + partitionDestroyFuture.get(10, TimeUnit.SECONDS); Review Comment: Added TODO. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org