This is an automated email from the ASF dual-hosted git repository. tkalkirill pushed a commit to branch ignite-26132 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 0bab240bb94a788df2c74a7c6c30e82ea33f7905 Author: Kirill Tkalenko <[email protected]> AuthorDate: Thu Aug 7 16:09:42 2025 +0300 IGNITE-26132 wip --- .../persistence/PersistentPageMemory.java | 54 +++++++++++++--------- .../persistence/checkpoint/CheckpointManager.java | 2 +- .../checkpoint/CheckpointPagesWriter.java | 37 ++++++++++----- .../checkpoint/CheckpointPagesWriterTest.java | 10 ++-- 4 files changed, 67 insertions(+), 36 deletions(-) diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java index 0788f279a49..3765b351c97 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java @@ -1910,10 +1910,13 @@ public class PersistentPageMemory implements PageMemory { * @param absPtr Absolute page pointer. * @param fullId Full page ID. * @param buf Buffer for copy page content for future write via {@link PageStoreWriter}. + * @param tag Current partition generation tag. * @param pageSingleAcquire Page is acquired only once. We don't pin the page second time (until page will not be copied) in case * checkpoint temporary buffer is used. * @param pageStoreWriter Checkpoint page writer. * @param tracker Checkpoint metrics tracker. + * @param useTryWriteLockLockOnPage {@code True} if need to use the <b>try write lock</b> on page, {@code false} for blocking + * <b>write lock</b> on page. */ private void copyPageForCheckpoint( long absPtr, @@ -1922,31 +1925,32 @@ public class PersistentPageMemory implements PageMemory { int tag, boolean pageSingleAcquire, PageStoreWriter pageStoreWriter, - CheckpointMetricsTracker tracker + CheckpointMetricsTracker tracker, + boolean useTryWriteLockLockOnPage ) throws IgniteInternalCheckedException { - assert absPtr != 0; - assert isAcquired(absPtr) || !isInCheckpoint(fullId); + assert absPtr != 0 : hexLong(fullId.pageId()); + assert isAcquired(absPtr) || !isInCheckpoint(fullId) : hexLong(fullId.pageId()); - // Exception protection flag. - // No need to write if exception occurred. - boolean canWrite = false; + if (useTryWriteLockLockOnPage) { + if (!rwLock.tryWriteLock(absPtr + PAGE_LOCK_OFFSET, TAG_LOCK_ALWAYS)) { + // We release the page only once here because this page will be copied sometime later and + // will be released properly then. + if (!pageSingleAcquire) { + PageHeader.releasePage(absPtr); + } - boolean locked = rwLock.tryWriteLock(absPtr + PAGE_LOCK_OFFSET, TAG_LOCK_ALWAYS); + buf.clear(); - if (!locked) { - // We release the page only once here because this page will be copied sometime later and - // will be released properly then. - if (!pageSingleAcquire) { - PageHeader.releasePage(absPtr); - } - - buf.clear(); + if (isInCheckpoint(fullId)) { + pageStoreWriter.writePage(fullId, buf, TRY_AGAIN_TAG); + } - if (isInCheckpoint(fullId)) { - pageStoreWriter.writePage(fullId, buf, TRY_AGAIN_TAG); + return; } + } else { + boolean locked = rwLock.writeLock(absPtr + PAGE_LOCK_OFFSET, TAG_LOCK_ALWAYS); - return; + assert locked : hexLong(fullId.pageId()); } if (!removeOnCheckpoint(fullId)) { @@ -1959,6 +1963,10 @@ public class PersistentPageMemory implements PageMemory { return; } + // Exception protection flag. + // No need to write if exception occurred. + boolean canWrite = false; + try { long tmpRelPtr = tempBufferPointer(absPtr); @@ -2010,20 +2018,24 @@ public class PersistentPageMemory implements PageMemory { } /** - * Prepare page for write during checkpoint. {@link PageStoreWriter} will be called when the page will be ready to write. + * Tries to copy a page from memory for checkpoint and then pass the contents to {@code pageStoreWriter} if it has not already been + * written or invalidated (due to partition destruction). {@link PageStoreWriter} will be called when the page will be ready to write. * * @param fullId Page ID to get byte buffer for. The page ID must be present in the collection returned by the {@link #beginCheckpoint} * method call. * @param buf Temporary buffer to write changes into. * @param pageStoreWriter Checkpoint page write context. * @param tracker Checkpoint metrics tracker. + * @param useTryWriteLockLockOnPage {@code True} if need to use the <b>try write lock</b> on page, {@code false} for blocking + * <b>write lock</b> on page. * @throws IgniteInternalCheckedException If failed to obtain page data. */ public void checkpointWritePage( FullPageId fullId, ByteBuffer buf, PageStoreWriter pageStoreWriter, - CheckpointMetricsTracker tracker + CheckpointMetricsTracker tracker, + boolean useTryWriteLockLockOnPage ) throws IgniteInternalCheckedException { assert buf.remaining() == pageSize() : buf.remaining(); @@ -2094,7 +2106,7 @@ public class PersistentPageMemory implements PageMemory { } } - copyPageForCheckpoint(absPtr, fullId, buf, tag, pageSingleAcquire, pageStoreWriter, tracker); + copyPageForCheckpoint(absPtr, fullId, buf, tag, pageSingleAcquire, pageStoreWriter, tracker, useTryWriteLockLockOnPage); } /** diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java index ee8f7730901..52ce770185b 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java @@ -328,7 +328,7 @@ public class CheckpointManager { pageId.partitionId() ); - assert partitionView != null : String.format("Unable to find view for dirty pages: [patitionId=%s, pageMemory=%s]", + assert partitionView != null : String.format("Unable to find view for dirty pages: [partitionId=%s, pageMemory=%s]", GroupPartitionId.convert(pageId), pageMemory); return pageIndexesForDeltaFilePageStore(partitionView); diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java index c010d1f1580..a024ee5b4c3 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java @@ -61,6 +61,12 @@ public class CheckpointPagesWriter implements Runnable { /** Logger. */ private static final IgniteLogger LOG = Loggers.forClass(CheckpointPagesWriter.class); + /** + * Maximum number of attempts to write retry dirty pages after which the blocking write lock will be used to write pages. Value is taken + * speculatively. + */ + private static final int MAX_ATTEMPT_WRITE_RETRY_DIRTY_PAGES = 32; + /** * Size of a batch of pages that we drain from a single checkpoint buffer at the same time. The value of {@code 10} is chosen * arbitrarily. We may reconsider it in <a href="https://issues.apache.org/jira/browse/IGNITE-23106">IGNITE-23106</a> if necessary. @@ -172,10 +178,12 @@ public class CheckpointPagesWriter implements Runnable { writeDirtyPages(pageMemory, queueResult.getValue(), tmpWriteBuf, pageStoreWriter); } + int attemptWriteRetryDirtyPages = 0; + while (!shutdownNow.getAsBoolean() && !pageIdsToRetry.isEmpty()) { updateHeartbeat.run(); - pageIdsToRetry = writeRetryDirtyPages(pageIdsToRetry, tmpWriteBuf); + pageIdsToRetry = writeRetryDirtyPages(pageIdsToRetry, tmpWriteBuf, attemptWriteRetryDirtyPages++); } doneFut.complete(null); @@ -209,7 +217,7 @@ public class CheckpointPagesWriter implements Runnable { continue; } - writeDirtyPage(pageMemory, pageId, tmpWriteBuf, pageStoreWriter); + writeDirtyPage(pageMemory, pageId, tmpWriteBuf, pageStoreWriter, true); } } finally { checkpointProgress.unblockPartitionDestruction(partitionId); @@ -220,26 +228,33 @@ public class CheckpointPagesWriter implements Runnable { PersistentPageMemory pageMemory, FullPageId pageId, ByteBuffer tmpWriteBuf, - PageStoreWriter pageStoreWriter + PageStoreWriter pageStoreWriter, + boolean useTryWriteLockLockOnPage ) throws IgniteInternalCheckedException { // Should also be done for partitions that will be destroyed to remove their pages from the data region. - pageMemory.checkpointWritePage(pageId, tmpWriteBuf.rewind(), pageStoreWriter, tracker); + pageMemory.checkpointWritePage(pageId, tmpWriteBuf.rewind(), pageStoreWriter, tracker, true); drainCheckpointBuffers(tmpWriteBuf); } private Map<PersistentPageMemory, List<FullPageId>> writeRetryDirtyPages( Map<PersistentPageMemory, List<FullPageId>> pageIdsToRetry, - ByteBuffer tmpWriteBuf + ByteBuffer tmpWriteBuf, + int attempt ) throws IgniteInternalCheckedException { + boolean useTryWriteLockOnPage = attempt < MAX_ATTEMPT_WRITE_RETRY_DIRTY_PAGES; + if (LOG.isInfoEnabled()) { int pageCount = pageIdsToRetry.values().stream().mapToInt(List::size).sum(); - LOG.info("Checkpoint pages were not written yet due to " - + "unsuccessful page write lock acquisition and will be retried [pageCount={}]", pageCount); + LOG.info( + "Checkpoint pages were not written yet due to unsuccessful page write lock acquisition and will be retried: " + + "[pageCount={}, attempt={}, useTryWriteLockOnPage={}]", + pageCount, attempt, useTryWriteLockOnPage + ); } - var newPageIdsToRetry = new HashMap<PersistentPageMemory, List<FullPageId>>(); + var newPageIdsToRetry = useTryWriteLockOnPage ? new HashMap<PersistentPageMemory, List<FullPageId>>() : null; for (Entry<PersistentPageMemory, List<FullPageId>> entry : pageIdsToRetry.entrySet()) { PersistentPageMemory pageMemory = entry.getKey(); @@ -266,7 +281,7 @@ public class CheckpointPagesWriter implements Runnable { checkpointProgress.blockPartitionDestruction(partitionId); } - writeDirtyPage(pageMemory, pageId, tmpWriteBuf, pageStoreWriter); + writeDirtyPage(pageMemory, pageId, tmpWriteBuf, pageStoreWriter, useTryWriteLockOnPage); } } finally { if (partitionId != null) { @@ -321,7 +336,7 @@ public class CheckpointPagesWriter implements Runnable { writePartitionMeta(pageMemory, partitionId, tmpWriteBuf.rewind()); } - pageMemory.checkpointWritePage(cpPageId, tmpWriteBuf.rewind(), pageStoreWriter, tracker); + pageMemory.checkpointWritePage(cpPageId, tmpWriteBuf.rewind(), pageStoreWriter, tracker, true); } finally { checkpointProgress.unblockPartitionDestruction(partitionId); } @@ -417,7 +432,7 @@ public class CheckpointPagesWriter implements Runnable { partitionId.getPartitionId() ); - assert partitionView != null : String.format("Unable to find view for dirty pages: [patitionId=%s, pageMemory=%s]", partitionId, + assert partitionView != null : String.format("Unable to find view for dirty pages: [partitionId=%s, pageMemory=%s]", partitionId, pageMemory); return partitionView; diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterTest.java index 318f9932ab7..105bcafd702 100644 --- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterTest.java +++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterTest.java @@ -36,6 +36,7 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; @@ -199,7 +200,8 @@ public class CheckpointPagesWriterTest extends BaseIgniteAbstractTest { any(FullPageId.class), any(ByteBuffer.class), any(PageStoreWriter.class), - any(CheckpointMetricsTracker.class) + any(CheckpointMetricsTracker.class), + anyBoolean() ); GroupPartitionId groupPartId = groupPartId(0, 0); @@ -251,7 +253,8 @@ public class CheckpointPagesWriterTest extends BaseIgniteAbstractTest { any(FullPageId.class), any(ByteBuffer.class), any(PageStoreWriter.class), - any(CheckpointMetricsTracker.class) + any(CheckpointMetricsTracker.class), + anyBoolean() ); CheckpointDirtyPages checkpointDirtyPages = new CheckpointDirtyPages(List.of( @@ -322,7 +325,8 @@ public class CheckpointPagesWriterTest extends BaseIgniteAbstractTest { any(FullPageId.class), any(ByteBuffer.class), any(PageStoreWriter.class), - any(CheckpointMetricsTracker.class) + any(CheckpointMetricsTracker.class), + anyBoolean() ); return pageMemory;
