This is an automated email from the ASF dual-hosted git repository. tkalkirill pushed a commit to branch ignite-26037 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit e06b90d8a1ce062559306ace44a1685d2a9eb873 Author: Kirill Tkalenko <[email protected]> AuthorDate: Fri Aug 1 12:50:06 2025 +0300 IGNITE-26037 wip --- .../persistence/PersistentPageMemory.java | 103 ++++++++++++++------ .../persistence/checkpoint/CheckpointManager.java | 8 ++ .../persistence/checkpoint/Checkpointer.java | 10 +- .../internal/pagememory/util/PageHandler.java | 11 +++ .../PersistentPageMemoryMvTableStorageTest.java | 106 +++++++++++++++++++++ 5 files changed, 209 insertions(+), 29 deletions(-) diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java index 0788f279a49..ca398440065 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java @@ -1115,25 +1115,64 @@ public class PersistentPageMemory implements PageMemory { private long postWriteLockPage(long absPtr, FullPageId fullId) { writeTimestamp(absPtr, coarseCurrentTimeMillis()); + copyPageIfInCheckpoint(absPtr, fullId); + + assert getCrc(absPtr + PAGE_OVERHEAD) == 0; // TODO IGNITE-16612 + + return absPtr + PAGE_OVERHEAD; + } + + private void copyPageIfInCheckpoint(long absPtr, FullPageId fullId) { + Segment seg = segment(fullId); + + if (!seg.isInCheckpoint(fullId) || tempBufferPointer(absPtr) != INVALID_REL_PTR) { + return; + } + // Create a buffer copy if the page is scheduled for a checkpoint. - if (isInCheckpoint(fullId) && tempBufferPointer(absPtr) == INVALID_REL_PTR) { - long tmpRelPtr; + long tmpRelPtr; - PagePool checkpointPool = this.checkpointPool; + PagePool checkpointPool = this.checkpointPool; - while (true) { - tmpRelPtr = checkpointPool.borrowOrAllocateFreePage(tag(fullId.pageId())); + while (true) { + int tag = tag(fullId.pageId()); + tmpRelPtr = checkpointPool.borrowOrAllocateFreePage(tag); - if (tmpRelPtr != INVALID_REL_PTR) { - break; - } + if (tmpRelPtr != INVALID_REL_PTR) { + break; + } - // TODO https://issues.apache.org/jira/browse/IGNITE-23106 Replace spin-wait with a proper wait. - try { - Thread.sleep(1); - } catch (InterruptedException ignore) { - // No-op. - } + // TODO https://issues.apache.org/jira/browse/IGNITE-23106 Replace spin-wait with a proper wait. + try { + Thread.sleep(1); + } catch (InterruptedException ignore) { + // No-op. + } + } + + // The partition could have been deleted in parallel and we could get an empty page. + seg.writeLock().lock(); + + try { + // Double check to see if we need to clear the page because it has already been partitioned. + long relPtr = resolveRelativePointer(seg, fullId, generationTag(seg, fullId)); + + assert relPtr != INVALID_REL_PTR : "fullPageId=" + fullId + ", pageId=" + hexLong(fullId.pageId()); + + if (relPtr == OUTDATED_REL_PTR) { + relPtr = seg.refreshOutdatedPage( + fullId.groupId(), + fullId.effectivePageId(), + true + ); + + seg.pageReplacementPolicy.onRemove(relPtr); + + seg.pool.releaseFreePage(relPtr); + + checkpointPool.releaseFreePage(tmpRelPtr); + + return; } // Pin the page until checkpoint is not finished. @@ -1149,7 +1188,8 @@ public class PersistentPageMemory implements PageMemory { pageSize() ); - assert getType(tmpAbsPtr + PAGE_OVERHEAD) != 0 : "Invalid state. Type is 0! pageId = " + hexLong(fullId.pageId()); + assert getType(tmpAbsPtr + PAGE_OVERHEAD) != 0 : + "Invalid state. Type is 0! pageId = " + hexLong(fullId.pageId()) + ", cmpAbsPtr=" + tmpAbsPtr; assert getVersion(tmpAbsPtr + PAGE_OVERHEAD) != 0 : "Invalid state. Version is 0! pageId = " + hexLong(fullId.pageId()); @@ -1158,13 +1198,11 @@ public class PersistentPageMemory implements PageMemory { // info for checkpoint buffer cleaner. fullPageId(tmpAbsPtr, fullId); - assert getCrc(absPtr + PAGE_OVERHEAD) == 0; // TODO GG-11480 - assert getCrc(tmpAbsPtr + PAGE_OVERHEAD) == 0; // TODO GG-11480 + assert getCrc(absPtr + PAGE_OVERHEAD) == 0; // TODO IGNITE-16612 + assert getCrc(tmpAbsPtr + PAGE_OVERHEAD) == 0; // TODO IGNITE-16612 + } finally { + seg.writeLock().unlock(); } - - assert getCrc(absPtr + PAGE_OVERHEAD) == 0; // TODO IGNITE-16612 - - return absPtr + PAGE_OVERHEAD; } private void writeUnlockPage( @@ -1305,6 +1343,10 @@ public class PersistentPageMemory implements PageMemory { return segments[idx]; } + private Segment segment(FullPageId fullPageId) { + return segment(fullPageId.groupId(), fullPageId.pageId()); + } + /** * Returns segment index. * @@ -1733,6 +1775,12 @@ public class PersistentPageMemory implements PageMemory { return tag == null ? INIT_PART_GENERATION : tag; } + private boolean isInCheckpoint(FullPageId fullPageId) { + CheckpointPages checkpointPages = this.checkpointPages; + + return checkpointPages != null && checkpointPages.contains(fullPageId); + } + /** * Gets loaded pages map. */ @@ -1877,16 +1925,14 @@ public class PersistentPageMemory implements PageMemory { } /** - * Returns {@code true} if it was added to the checkpoint list. + * Returns {@code true} if page was added to the checkpoint list. * * @param pageId Page ID to check if it was added to the checkpoint list. */ - boolean isInCheckpoint(FullPageId pageId) { - Segment seg = segment(pageId.groupId(), pageId.pageId()); - - CheckpointPages pages0 = seg.checkpointPages; + private boolean isInCheckpoint(FullPageId pageId) { + Segment seg = segment(pageId); - return pages0 != null && pages0.contains(pageId); + return seg.isInCheckpoint(pageId); } /** @@ -1986,6 +2032,9 @@ public class PersistentPageMemory implements PageMemory { copyInBuffer(absPtr, buf); dirty(absPtr, false); + + // TODO: IGNITE-25861 Temporary solution needs to be done differently + return; } assert getType(buf) != 0 : "Invalid state. Type is 0! pageId = " + hexLong(fullId.pageId()); diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java index ee8f7730901..2e93df9d4a1 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java @@ -45,6 +45,7 @@ import org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStor import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore; import org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager; import org.jetbrains.annotations.Nullable; +import org.jetbrains.annotations.TestOnly; /** * Main class to abstract checkpoint-related processes and actions and hide them from higher-level components. @@ -377,4 +378,11 @@ public class CheckpointManager { compactor.prepareToDestroyPartition(groupPartitionId) ); } + + /** Returns compactor. */ + @TestOnly + // TODO: IGNITE-25861 Maybe get rid of it + public Compactor compactor() { + return compactor; + } } diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java index d07308cc7d4..8e96d3facdd 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java @@ -854,7 +854,10 @@ public class Checkpointer extends IgniteWorker { try { CompletableFuture<DeltaFilePageStoreIo> deltaFilePageStoreFuture = filePageStore.getNewDeltaFile(); - assert deltaFilePageStoreFuture != null; + // This can happen when the store is destroyed in parallel. + if (deltaFilePageStoreFuture == null) { + return; + } deltaFilePageStoreFuture.join().sync(); } finally { @@ -873,7 +876,10 @@ public class Checkpointer extends IgniteWorker { try { CompletableFuture<DeltaFilePageStoreIo> deltaFilePageStoreFuture = filePageStore.getNewDeltaFile(); - assert deltaFilePageStoreFuture != null; + // This can happen when the store is destroyed in parallel. + if (deltaFilePageStoreFuture == null) { + return; + } DeltaFilePageStoreIo deltaFilePageStoreIo = deltaFilePageStoreFuture.join(); diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageHandler.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageHandler.java index dba6d06464c..e1f5481f740 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageHandler.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageHandler.java @@ -18,8 +18,11 @@ package org.apache.ignite.internal.pagememory.util; import static java.lang.Boolean.FALSE; +import static org.apache.ignite.internal.util.StringUtils.hexLong; import org.apache.ignite.internal.lang.IgniteInternalCheckedException; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.pagememory.PageMemory; import org.apache.ignite.internal.pagememory.io.PageIo; import org.jetbrains.annotations.Nullable; @@ -31,6 +34,8 @@ import org.jetbrains.annotations.Nullable; * @param <R> Type of the result. */ public interface PageHandler<X, R> { + IgniteLogger LOG = Loggers.forClass(PageHandler.class); + /** No-op page handler. */ PageHandler<Void, Boolean> NO_OP = (groupId, pageId, page, pageAddr, io, arg, intArg) -> Boolean.TRUE; @@ -237,6 +242,12 @@ public interface PageHandler<X, R> { pageMem.writeUnlock(groupId, pageId, page, ok); } } + } catch (Throwable t) { + // This logging was added because of the case when an exception was thrown from "pageMem.writeLock" and it was not got in the + // "catch" in the calling code of this method, the exception seems to disappear. + LOG.error("Error writing page: [grpId={}, pageId={}]", t, groupId, hexLong(page)); + + throw t; } finally { if (releaseAfterWrite) { pageMem.releasePage(groupId, pageId, page); diff --git a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java index a1a6032572e..cf6022f7f0a 100644 --- a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java +++ b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java @@ -23,6 +23,7 @@ import static org.apache.ignite.internal.pagememory.persistence.checkpoint.Check import static org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine.ENGINE_NAME; import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; +import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.equalTo; @@ -33,6 +34,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.mockito.Mockito.mock; import java.nio.file.Path; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import org.apache.ignite.internal.components.LogSyncer; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; @@ -49,6 +51,7 @@ import org.apache.ignite.internal.storage.MvPartitionStorage; import org.apache.ignite.internal.storage.RowId; import org.apache.ignite.internal.storage.configurations.StorageConfiguration; import org.apache.ignite.internal.storage.configurations.StorageProfileConfiguration; +import org.apache.ignite.internal.storage.engine.MvPartitionMeta; import org.apache.ignite.internal.storage.engine.MvTableStorage; import org.apache.ignite.internal.storage.engine.StorageTableDescriptor; import org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryProfileConfiguration; @@ -267,4 +270,107 @@ public class PersistentPageMemoryMvTableStorageTest extends AbstractMvTableStora return null; }); } + + @Test + void createMvPartitionStorageAndDoCheckpointInParallel() throws Exception { + stopCompactor(); + + for (int i = 0; i < 10; i++) { + runRace( + () -> getOrCreateMvPartition(PARTITION_ID), + () -> assertThat(forceCheckpointAsync(), willCompleteSuccessfully()) + ); + + assertThat(tableStorage.destroyPartition(PARTITION_ID), willCompleteSuccessfully()); + } + } + + @Test + void clearMvPartitionStorageAndDoCheckpointInParallel() throws Exception { + stopCompactor(); + + for (int i = 0; i < 10; i++) { + getOrCreateMvPartition(PARTITION_ID); + + runRace( + () -> assertThat(tableStorage.clearPartition(PARTITION_ID), willCompleteSuccessfully()), + () -> assertThat(forceCheckpointAsync(), willCompleteSuccessfully()) + ); + + assertThat(tableStorage.destroyPartition(PARTITION_ID), willCompleteSuccessfully()); + } + } + + @Test + void destroyMvPartitionStorageAndDoCheckpointInParallel() throws Exception { + stopCompactor(); + + for (int i = 0; i < 10; i++) { + getOrCreateMvPartition(PARTITION_ID); + + runRace( + () -> assertThat(tableStorage.destroyPartition(PARTITION_ID), willCompleteSuccessfully()), + () -> assertThat(forceCheckpointAsync(), willCompleteSuccessfully()) + ); + } + } + + @Test + void startRebalancePartitionAndDoCheckpointInParallel() throws Exception { + stopCompactor(); + + getOrCreateMvPartition(PARTITION_ID); + + for (int i = 0; i < 10; i++) { + runRace( + () -> assertThat(tableStorage.startRebalancePartition(PARTITION_ID), willCompleteSuccessfully()), + () -> assertThat(forceCheckpointAsync(), willCompleteSuccessfully()) + ); + + assertThat(tableStorage.abortRebalancePartition(PARTITION_ID), willCompleteSuccessfully()); + } + } + + @Test + void abortRebalancePartitionAndDoCheckpointInParallel() throws Exception { + stopCompactor(); + + getOrCreateMvPartition(PARTITION_ID); + + for (int i = 0; i < 10; i++) { + assertThat(tableStorage.startRebalancePartition(PARTITION_ID), willCompleteSuccessfully()); + + runRace( + () -> assertThat(tableStorage.abortRebalancePartition(PARTITION_ID), willCompleteSuccessfully()), + () -> assertThat(forceCheckpointAsync(), willCompleteSuccessfully()) + ); + } + } + + @Test + void finishRebalancePartitionAndDoCheckpointInParallel() throws Exception { + stopCompactor(); + + getOrCreateMvPartition(PARTITION_ID); + + for (int i = 0; i < 10; i++) { + assertThat(tableStorage.startRebalancePartition(PARTITION_ID), willCompleteSuccessfully()); + + var meta = new MvPartitionMeta(1, 1, BYTE_EMPTY_ARRAY, null, BYTE_EMPTY_ARRAY); + + runRace( + () -> assertThat(tableStorage.finishRebalancePartition(PARTITION_ID, meta), willCompleteSuccessfully()), + () -> assertThat(forceCheckpointAsync(), willCompleteSuccessfully()) + ); + } + } + + private CompletableFuture<Void> forceCheckpointAsync() { + return engine.checkpointManager().forceCheckpoint("test").futureFor(FINISHED); + } + + // TODO: IGNITE-25861 Get rid of it + private void stopCompactor() throws Exception { + engine.checkpointManager().compactor().stop(); + } }
