This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch ignite-26132
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 0bab240bb94a788df2c74a7c6c30e82ea33f7905
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Thu Aug 7 16:09:42 2025 +0300

    IGNITE-26132 wip
---
 .../persistence/PersistentPageMemory.java          | 54 +++++++++++++---------
 .../persistence/checkpoint/CheckpointManager.java  |  2 +-
 .../checkpoint/CheckpointPagesWriter.java          | 37 ++++++++++-----
 .../checkpoint/CheckpointPagesWriterTest.java      | 10 ++--
 4 files changed, 67 insertions(+), 36 deletions(-)

diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
index 0788f279a49..3765b351c97 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
@@ -1910,10 +1910,13 @@ public class PersistentPageMemory implements PageMemory 
{
      * @param absPtr Absolute page pointer.
      * @param fullId Full page ID.
      * @param buf Buffer for copy page content for future write via {@link 
PageStoreWriter}.
+     * @param tag Current partition generation tag.
      * @param pageSingleAcquire Page is acquired only once. We don't pin the 
page second time (until page will not be copied) in case
      *      checkpoint temporary buffer is used.
      * @param pageStoreWriter Checkpoint page writer.
      * @param tracker Checkpoint metrics tracker.
+     * @param useTryWriteLockLockOnPage {@code True} if need to use the <b>try 
write lock</b> on page, {@code false} for blocking
+     *      <b>write lock</b> on page.
      */
     private void copyPageForCheckpoint(
             long absPtr,
@@ -1922,31 +1925,32 @@ public class PersistentPageMemory implements PageMemory 
{
             int tag,
             boolean pageSingleAcquire,
             PageStoreWriter pageStoreWriter,
-            CheckpointMetricsTracker tracker
+            CheckpointMetricsTracker tracker,
+            boolean useTryWriteLockLockOnPage
     ) throws IgniteInternalCheckedException {
-        assert absPtr != 0;
-        assert isAcquired(absPtr) || !isInCheckpoint(fullId);
+        assert absPtr != 0 : hexLong(fullId.pageId());
+        assert isAcquired(absPtr) || !isInCheckpoint(fullId) : 
hexLong(fullId.pageId());
 
-        // Exception protection flag.
-        // No need to write if exception occurred.
-        boolean canWrite = false;
+        if (useTryWriteLockLockOnPage) {
+            if (!rwLock.tryWriteLock(absPtr + PAGE_LOCK_OFFSET, 
TAG_LOCK_ALWAYS)) {
+                // We release the page only once here because this page will 
be copied sometime later and
+                // will be released properly then.
+                if (!pageSingleAcquire) {
+                    PageHeader.releasePage(absPtr);
+                }
 
-        boolean locked = rwLock.tryWriteLock(absPtr + PAGE_LOCK_OFFSET, 
TAG_LOCK_ALWAYS);
+                buf.clear();
 
-        if (!locked) {
-            // We release the page only once here because this page will be 
copied sometime later and
-            // will be released properly then.
-            if (!pageSingleAcquire) {
-                PageHeader.releasePage(absPtr);
-            }
-
-            buf.clear();
+                if (isInCheckpoint(fullId)) {
+                    pageStoreWriter.writePage(fullId, buf, TRY_AGAIN_TAG);
+                }
 
-            if (isInCheckpoint(fullId)) {
-                pageStoreWriter.writePage(fullId, buf, TRY_AGAIN_TAG);
+                return;
             }
+        } else {
+            boolean locked = rwLock.writeLock(absPtr + PAGE_LOCK_OFFSET, 
TAG_LOCK_ALWAYS);
 
-            return;
+            assert locked : hexLong(fullId.pageId());
         }
 
         if (!removeOnCheckpoint(fullId)) {
@@ -1959,6 +1963,10 @@ public class PersistentPageMemory implements PageMemory {
             return;
         }
 
+        // Exception protection flag.
+        // No need to write if exception occurred.
+        boolean canWrite = false;
+
         try {
             long tmpRelPtr = tempBufferPointer(absPtr);
 
@@ -2010,20 +2018,24 @@ public class PersistentPageMemory implements PageMemory 
{
     }
 
     /**
-     * Prepare page for write during checkpoint. {@link PageStoreWriter} will 
be called when the page will be ready to write.
+     * Tries to copy a page from memory for checkpoint and then pass the 
contents to {@code pageStoreWriter} if it has not already been
+     * written or invalidated (due to partition destruction). {@link 
PageStoreWriter} will be called when the page will be ready to write.
      *
      * @param fullId Page ID to get byte buffer for. The page ID must be 
present in the collection returned by the {@link #beginCheckpoint}
      *      method call.
      * @param buf Temporary buffer to write changes into.
      * @param pageStoreWriter Checkpoint page write context.
      * @param tracker Checkpoint metrics tracker.
+     * @param useTryWriteLockLockOnPage {@code True} if need to use the <b>try 
write lock</b> on page, {@code false} for blocking
+     *      <b>write lock</b> on page.
      * @throws IgniteInternalCheckedException If failed to obtain page data.
      */
     public void checkpointWritePage(
             FullPageId fullId,
             ByteBuffer buf,
             PageStoreWriter pageStoreWriter,
-            CheckpointMetricsTracker tracker
+            CheckpointMetricsTracker tracker,
+            boolean useTryWriteLockLockOnPage
     ) throws IgniteInternalCheckedException {
         assert buf.remaining() == pageSize() : buf.remaining();
 
@@ -2094,7 +2106,7 @@ public class PersistentPageMemory implements PageMemory {
             }
         }
 
-        copyPageForCheckpoint(absPtr, fullId, buf, tag, pageSingleAcquire, 
pageStoreWriter, tracker);
+        copyPageForCheckpoint(absPtr, fullId, buf, tag, pageSingleAcquire, 
pageStoreWriter, tracker, useTryWriteLockLockOnPage);
     }
 
     /**
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
index ee8f7730901..52ce770185b 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
@@ -328,7 +328,7 @@ public class CheckpointManager {
                             pageId.partitionId()
                     );
 
-                    assert partitionView != null : String.format("Unable to 
find view for dirty pages: [patitionId=%s, pageMemory=%s]",
+                    assert partitionView != null : String.format("Unable to 
find view for dirty pages: [partitionId=%s, pageMemory=%s]",
                             GroupPartitionId.convert(pageId), pageMemory);
 
                     return pageIndexesForDeltaFilePageStore(partitionView);
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
index c010d1f1580..a024ee5b4c3 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
@@ -61,6 +61,12 @@ public class CheckpointPagesWriter implements Runnable {
     /** Logger. */
     private static final IgniteLogger LOG = 
Loggers.forClass(CheckpointPagesWriter.class);
 
+    /**
+     * Maximum number of attempts to write retry dirty pages after which the 
blocking write lock will be used to write pages. Value is taken
+     * speculatively.
+     */
+    private static final int MAX_ATTEMPT_WRITE_RETRY_DIRTY_PAGES = 32;
+
     /**
      * Size of a batch of pages that we drain from a single checkpoint buffer 
at the same time. The value of {@code 10} is chosen
      * arbitrarily. We may reconsider it in <a 
href="https://issues.apache.org/jira/browse/IGNITE-23106";>IGNITE-23106</a> if 
necessary.
@@ -172,10 +178,12 @@ public class CheckpointPagesWriter implements Runnable {
                 writeDirtyPages(pageMemory, queueResult.getValue(), 
tmpWriteBuf, pageStoreWriter);
             }
 
+            int attemptWriteRetryDirtyPages = 0;
+
             while (!shutdownNow.getAsBoolean() && !pageIdsToRetry.isEmpty()) {
                 updateHeartbeat.run();
 
-                pageIdsToRetry = writeRetryDirtyPages(pageIdsToRetry, 
tmpWriteBuf);
+                pageIdsToRetry = writeRetryDirtyPages(pageIdsToRetry, 
tmpWriteBuf, attemptWriteRetryDirtyPages++);
             }
 
             doneFut.complete(null);
@@ -209,7 +217,7 @@ public class CheckpointPagesWriter implements Runnable {
                     continue;
                 }
 
-                writeDirtyPage(pageMemory, pageId, tmpWriteBuf, 
pageStoreWriter);
+                writeDirtyPage(pageMemory, pageId, tmpWriteBuf, 
pageStoreWriter, true);
             }
         } finally {
             checkpointProgress.unblockPartitionDestruction(partitionId);
@@ -220,26 +228,33 @@ public class CheckpointPagesWriter implements Runnable {
             PersistentPageMemory pageMemory,
             FullPageId pageId,
             ByteBuffer tmpWriteBuf,
-            PageStoreWriter pageStoreWriter
+            PageStoreWriter pageStoreWriter,
+            boolean useTryWriteLockLockOnPage
     ) throws IgniteInternalCheckedException {
         // Should also be done for partitions that will be destroyed to remove 
their pages from the data region.
-        pageMemory.checkpointWritePage(pageId, tmpWriteBuf.rewind(), 
pageStoreWriter, tracker);
+        pageMemory.checkpointWritePage(pageId, tmpWriteBuf.rewind(), 
pageStoreWriter, tracker, true);
 
         drainCheckpointBuffers(tmpWriteBuf);
     }
 
     private Map<PersistentPageMemory, List<FullPageId>> writeRetryDirtyPages(
             Map<PersistentPageMemory, List<FullPageId>> pageIdsToRetry,
-            ByteBuffer tmpWriteBuf
+            ByteBuffer tmpWriteBuf,
+            int attempt
     ) throws IgniteInternalCheckedException {
+        boolean useTryWriteLockOnPage = attempt < 
MAX_ATTEMPT_WRITE_RETRY_DIRTY_PAGES;
+
         if (LOG.isInfoEnabled()) {
             int pageCount = 
pageIdsToRetry.values().stream().mapToInt(List::size).sum();
 
-            LOG.info("Checkpoint pages were not written yet due to "
-                    + "unsuccessful page write lock acquisition and will be 
retried [pageCount={}]", pageCount);
+            LOG.info(
+                    "Checkpoint pages were not written yet due to unsuccessful 
page write lock acquisition and will be retried: "
+                            + "[pageCount={}, attempt={}, 
useTryWriteLockOnPage={}]",
+                    pageCount, attempt, useTryWriteLockOnPage
+            );
         }
 
-        var newPageIdsToRetry = new HashMap<PersistentPageMemory, 
List<FullPageId>>();
+        var newPageIdsToRetry = useTryWriteLockOnPage ? new 
HashMap<PersistentPageMemory, List<FullPageId>>() : null;
 
         for (Entry<PersistentPageMemory, List<FullPageId>> entry : 
pageIdsToRetry.entrySet()) {
             PersistentPageMemory pageMemory = entry.getKey();
@@ -266,7 +281,7 @@ public class CheckpointPagesWriter implements Runnable {
                         
checkpointProgress.blockPartitionDestruction(partitionId);
                     }
 
-                    writeDirtyPage(pageMemory, pageId, tmpWriteBuf, 
pageStoreWriter);
+                    writeDirtyPage(pageMemory, pageId, tmpWriteBuf, 
pageStoreWriter, useTryWriteLockOnPage);
                 }
             } finally {
                 if (partitionId != null) {
@@ -321,7 +336,7 @@ public class CheckpointPagesWriter implements Runnable {
                             writePartitionMeta(pageMemory, partitionId, 
tmpWriteBuf.rewind());
                         }
 
-                        pageMemory.checkpointWritePage(cpPageId, 
tmpWriteBuf.rewind(), pageStoreWriter, tracker);
+                        pageMemory.checkpointWritePage(cpPageId, 
tmpWriteBuf.rewind(), pageStoreWriter, tracker, true);
                     } finally {
                         
checkpointProgress.unblockPartitionDestruction(partitionId);
                     }
@@ -417,7 +432,7 @@ public class CheckpointPagesWriter implements Runnable {
                 partitionId.getPartitionId()
         );
 
-        assert partitionView != null : String.format("Unable to find view for 
dirty pages: [patitionId=%s, pageMemory=%s]", partitionId,
+        assert partitionView != null : String.format("Unable to find view for 
dirty pages: [partitionId=%s, pageMemory=%s]", partitionId,
                 pageMemory);
 
         return partitionView;
diff --git 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterTest.java
 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterTest.java
index 318f9932ab7..105bcafd702 100644
--- 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterTest.java
+++ 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterTest.java
@@ -36,6 +36,7 @@ import static 
org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doThrow;
@@ -199,7 +200,8 @@ public class CheckpointPagesWriterTest extends 
BaseIgniteAbstractTest {
                         any(FullPageId.class),
                         any(ByteBuffer.class),
                         any(PageStoreWriter.class),
-                        any(CheckpointMetricsTracker.class)
+                        any(CheckpointMetricsTracker.class),
+                        anyBoolean()
                 );
 
         GroupPartitionId groupPartId = groupPartId(0, 0);
@@ -251,7 +253,8 @@ public class CheckpointPagesWriterTest extends 
BaseIgniteAbstractTest {
                         any(FullPageId.class),
                         any(ByteBuffer.class),
                         any(PageStoreWriter.class),
-                        any(CheckpointMetricsTracker.class)
+                        any(CheckpointMetricsTracker.class),
+                        anyBoolean()
                 );
 
         CheckpointDirtyPages checkpointDirtyPages = new 
CheckpointDirtyPages(List.of(
@@ -322,7 +325,8 @@ public class CheckpointPagesWriterTest extends 
BaseIgniteAbstractTest {
                         any(FullPageId.class),
                         any(ByteBuffer.class),
                         any(PageStoreWriter.class),
-                        any(CheckpointMetricsTracker.class)
+                        any(CheckpointMetricsTracker.class),
+                        anyBoolean()
                 );
 
         return pageMemory;

Reply via email to