This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch ignite-26037
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit e06b90d8a1ce062559306ace44a1685d2a9eb873
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Fri Aug 1 12:50:06 2025 +0300

    IGNITE-26037 wip
---
 .../persistence/PersistentPageMemory.java          | 103 ++++++++++++++------
 .../persistence/checkpoint/CheckpointManager.java  |   8 ++
 .../persistence/checkpoint/Checkpointer.java       |  10 +-
 .../internal/pagememory/util/PageHandler.java      |  11 +++
 .../PersistentPageMemoryMvTableStorageTest.java    | 106 +++++++++++++++++++++
 5 files changed, 209 insertions(+), 29 deletions(-)

diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
index 0788f279a49..ca398440065 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
@@ -1115,25 +1115,64 @@ public class PersistentPageMemory implements PageMemory 
{
     private long postWriteLockPage(long absPtr, FullPageId fullId) {
         writeTimestamp(absPtr, coarseCurrentTimeMillis());
 
+        copyPageIfInCheckpoint(absPtr, fullId);
+
+        assert getCrc(absPtr + PAGE_OVERHEAD) == 0; // TODO IGNITE-16612
+
+        return absPtr + PAGE_OVERHEAD;
+    }
+
+    private void copyPageIfInCheckpoint(long absPtr, FullPageId fullId) {
+        Segment seg = segment(fullId);
+
+        if (!seg.isInCheckpoint(fullId) || tempBufferPointer(absPtr) != 
INVALID_REL_PTR) {
+            return;
+        }
+
         // Create a buffer copy if the page is scheduled for a checkpoint.
-        if (isInCheckpoint(fullId) && tempBufferPointer(absPtr) == 
INVALID_REL_PTR) {
-            long tmpRelPtr;
+        long tmpRelPtr;
 
-            PagePool checkpointPool = this.checkpointPool;
+        PagePool checkpointPool = this.checkpointPool;
 
-            while (true) {
-                tmpRelPtr = 
checkpointPool.borrowOrAllocateFreePage(tag(fullId.pageId()));
+        while (true) {
+            int tag = tag(fullId.pageId());
+            tmpRelPtr = checkpointPool.borrowOrAllocateFreePage(tag);
 
-                if (tmpRelPtr != INVALID_REL_PTR) {
-                    break;
-                }
+            if (tmpRelPtr != INVALID_REL_PTR) {
+                break;
+            }
 
-                // TODO https://issues.apache.org/jira/browse/IGNITE-23106 
Replace spin-wait with a proper wait.
-                try {
-                    Thread.sleep(1);
-                } catch (InterruptedException ignore) {
-                    // No-op.
-                }
+            // TODO https://issues.apache.org/jira/browse/IGNITE-23106 Replace 
spin-wait with a proper wait.
+            try {
+                Thread.sleep(1);
+            } catch (InterruptedException ignore) {
+                // No-op.
+            }
+        }
+
+        // The partition could have been deleted in parallel and we could get 
an empty page.
+        seg.writeLock().lock();
+
+        try {
+            // Double check to see if we need to clear the page because it has 
already been partitioned.
+            long relPtr = resolveRelativePointer(seg, fullId, 
generationTag(seg, fullId));
+
+            assert relPtr != INVALID_REL_PTR : "fullPageId=" + fullId + ", 
pageId=" + hexLong(fullId.pageId());
+
+            if (relPtr == OUTDATED_REL_PTR) {
+                relPtr = seg.refreshOutdatedPage(
+                        fullId.groupId(),
+                        fullId.effectivePageId(),
+                        true
+                );
+
+                seg.pageReplacementPolicy.onRemove(relPtr);
+
+                seg.pool.releaseFreePage(relPtr);
+
+                checkpointPool.releaseFreePage(tmpRelPtr);
+
+                return;
             }
 
             // Pin the page until checkpoint is not finished.
@@ -1149,7 +1188,8 @@ public class PersistentPageMemory implements PageMemory {
                     pageSize()
             );
 
-            assert getType(tmpAbsPtr + PAGE_OVERHEAD) != 0 : "Invalid state. 
Type is 0! pageId = " + hexLong(fullId.pageId());
+            assert getType(tmpAbsPtr + PAGE_OVERHEAD) != 0 :
+                    "Invalid state. Type is 0! pageId = " + 
hexLong(fullId.pageId()) + ", cmpAbsPtr=" + tmpAbsPtr;
             assert getVersion(tmpAbsPtr + PAGE_OVERHEAD) != 0 :
                     "Invalid state. Version is 0! pageId = " + 
hexLong(fullId.pageId());
 
@@ -1158,13 +1198,11 @@ public class PersistentPageMemory implements PageMemory 
{
             // info for checkpoint buffer cleaner.
             fullPageId(tmpAbsPtr, fullId);
 
-            assert getCrc(absPtr + PAGE_OVERHEAD) == 0; // TODO GG-11480
-            assert getCrc(tmpAbsPtr + PAGE_OVERHEAD) == 0; // TODO GG-11480
+            assert getCrc(absPtr + PAGE_OVERHEAD) == 0; // TODO IGNITE-16612
+            assert getCrc(tmpAbsPtr + PAGE_OVERHEAD) == 0; // TODO IGNITE-16612
+        } finally {
+            seg.writeLock().unlock();
         }
-
-        assert getCrc(absPtr + PAGE_OVERHEAD) == 0; // TODO IGNITE-16612
-
-        return absPtr + PAGE_OVERHEAD;
     }
 
     private void writeUnlockPage(
@@ -1305,6 +1343,10 @@ public class PersistentPageMemory implements PageMemory {
         return segments[idx];
     }
 
+    private Segment segment(FullPageId fullPageId) {
+        return segment(fullPageId.groupId(), fullPageId.pageId());
+    }
+
     /**
      * Returns segment index.
      *
@@ -1733,6 +1775,12 @@ public class PersistentPageMemory implements PageMemory {
             return tag == null ? INIT_PART_GENERATION : tag;
         }
 
+        private boolean isInCheckpoint(FullPageId fullPageId) {
+            CheckpointPages checkpointPages = this.checkpointPages;
+
+            return checkpointPages != null && 
checkpointPages.contains(fullPageId);
+        }
+
         /**
          * Gets loaded pages map.
          */
@@ -1877,16 +1925,14 @@ public class PersistentPageMemory implements PageMemory 
{
     }
 
     /**
-     * Returns {@code true} if it was added to the checkpoint list.
+     * Returns {@code true} if page was added to the checkpoint list.
      *
      * @param pageId Page ID to check if it was added to the checkpoint list.
      */
-    boolean isInCheckpoint(FullPageId pageId) {
-        Segment seg = segment(pageId.groupId(), pageId.pageId());
-
-        CheckpointPages pages0 = seg.checkpointPages;
+    private boolean isInCheckpoint(FullPageId pageId) {
+        Segment seg = segment(pageId);
 
-        return pages0 != null && pages0.contains(pageId);
+        return seg.isInCheckpoint(pageId);
     }
 
     /**
@@ -1986,6 +2032,9 @@ public class PersistentPageMemory implements PageMemory {
                 copyInBuffer(absPtr, buf);
 
                 dirty(absPtr, false);
+
+                // TODO: IGNITE-25861 Temporary solution needs to be done 
differently
+                return;
             }
 
             assert getType(buf) != 0 : "Invalid state. Type is 0! pageId = " + 
hexLong(fullId.pageId());
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
index ee8f7730901..2e93df9d4a1 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
@@ -45,6 +45,7 @@ import 
org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStor
 import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
 import 
org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
 import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
 
 /**
  * Main class to abstract checkpoint-related processes and actions and hide 
them from higher-level components.
@@ -377,4 +378,11 @@ public class CheckpointManager {
                 compactor.prepareToDestroyPartition(groupPartitionId)
         );
     }
+
+    /** Returns compactor. */
+    @TestOnly
+    // TODO: IGNITE-25861 Maybe get rid of it
+    public Compactor compactor() {
+        return compactor;
+    }
 }
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
index d07308cc7d4..8e96d3facdd 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
@@ -854,7 +854,10 @@ public class Checkpointer extends IgniteWorker {
         try {
             CompletableFuture<DeltaFilePageStoreIo> deltaFilePageStoreFuture = 
filePageStore.getNewDeltaFile();
 
-            assert deltaFilePageStoreFuture != null;
+            // This can happen when the store is destroyed in parallel.
+            if (deltaFilePageStoreFuture == null) {
+                return;
+            }
 
             deltaFilePageStoreFuture.join().sync();
         } finally {
@@ -873,7 +876,10 @@ public class Checkpointer extends IgniteWorker {
         try {
             CompletableFuture<DeltaFilePageStoreIo> deltaFilePageStoreFuture = 
filePageStore.getNewDeltaFile();
 
-            assert deltaFilePageStoreFuture != null;
+            // This can happen when the store is destroyed in parallel.
+            if (deltaFilePageStoreFuture == null) {
+                return;
+            }
 
             DeltaFilePageStoreIo deltaFilePageStoreIo = 
deltaFilePageStoreFuture.join();
 
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageHandler.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageHandler.java
index dba6d06464c..e1f5481f740 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageHandler.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageHandler.java
@@ -18,8 +18,11 @@
 package org.apache.ignite.internal.pagememory.util;
 
 import static java.lang.Boolean.FALSE;
+import static org.apache.ignite.internal.util.StringUtils.hexLong;
 
 import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.pagememory.PageMemory;
 import org.apache.ignite.internal.pagememory.io.PageIo;
 import org.jetbrains.annotations.Nullable;
@@ -31,6 +34,8 @@ import org.jetbrains.annotations.Nullable;
  * @param <R> Type of the result.
  */
 public interface PageHandler<X, R> {
+    IgniteLogger LOG = Loggers.forClass(PageHandler.class);
+
     /** No-op page handler. */
     PageHandler<Void, Boolean> NO_OP = (groupId, pageId, page, pageAddr, io, 
arg, intArg) -> Boolean.TRUE;
 
@@ -237,6 +242,12 @@ public interface PageHandler<X, R> {
                     pageMem.writeUnlock(groupId, pageId, page, ok);
                 }
             }
+        } catch (Throwable t) {
+            // This logging was added because of the case when an exception 
was thrown from "pageMem.writeLock" and it was not got in the
+            // "catch" in the calling code of this method, the exception seems 
to disappear.
+            LOG.error("Error writing page: [grpId={}, pageId={}]", t, groupId, 
hexLong(page));
+
+            throw t;
         } finally {
             if (releaseAfterWrite) {
                 pageMem.releasePage(groupId, pageId, page);
diff --git 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
index a1a6032572e..cf6022f7f0a 100644
--- 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
+++ 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
@@ -23,6 +23,7 @@ import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.Check
 import static 
org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine.ENGINE_NAME;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.equalTo;
@@ -33,6 +34,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.mockito.Mockito.mock;
 
 import java.nio.file.Path;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import org.apache.ignite.internal.components.LogSyncer;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
@@ -49,6 +51,7 @@ import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.configurations.StorageConfiguration;
 import 
org.apache.ignite.internal.storage.configurations.StorageProfileConfiguration;
+import org.apache.ignite.internal.storage.engine.MvPartitionMeta;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
 import 
org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryProfileConfiguration;
@@ -267,4 +270,107 @@ public class PersistentPageMemoryMvTableStorageTest 
extends AbstractMvTableStora
             return null;
         });
     }
+
+    @Test
+    void createMvPartitionStorageAndDoCheckpointInParallel() throws Exception {
+        stopCompactor();
+
+        for (int i = 0; i < 10; i++) {
+            runRace(
+                    () -> getOrCreateMvPartition(PARTITION_ID),
+                    () -> assertThat(forceCheckpointAsync(), 
willCompleteSuccessfully())
+            );
+
+            assertThat(tableStorage.destroyPartition(PARTITION_ID), 
willCompleteSuccessfully());
+        }
+    }
+
+    @Test
+    void clearMvPartitionStorageAndDoCheckpointInParallel() throws Exception {
+        stopCompactor();
+
+        for (int i = 0; i < 10; i++) {
+            getOrCreateMvPartition(PARTITION_ID);
+
+            runRace(
+                    () -> 
assertThat(tableStorage.clearPartition(PARTITION_ID), 
willCompleteSuccessfully()),
+                    () -> assertThat(forceCheckpointAsync(), 
willCompleteSuccessfully())
+            );
+
+            assertThat(tableStorage.destroyPartition(PARTITION_ID), 
willCompleteSuccessfully());
+        }
+    }
+
+    @Test
+    void destroyMvPartitionStorageAndDoCheckpointInParallel() throws Exception 
{
+        stopCompactor();
+
+        for (int i = 0; i < 10; i++) {
+            getOrCreateMvPartition(PARTITION_ID);
+
+            runRace(
+                    () -> 
assertThat(tableStorage.destroyPartition(PARTITION_ID), 
willCompleteSuccessfully()),
+                    () -> assertThat(forceCheckpointAsync(), 
willCompleteSuccessfully())
+            );
+        }
+    }
+
+    @Test
+    void startRebalancePartitionAndDoCheckpointInParallel() throws Exception {
+        stopCompactor();
+
+        getOrCreateMvPartition(PARTITION_ID);
+
+        for (int i = 0; i < 10; i++) {
+            runRace(
+                    () -> 
assertThat(tableStorage.startRebalancePartition(PARTITION_ID), 
willCompleteSuccessfully()),
+                    () -> assertThat(forceCheckpointAsync(), 
willCompleteSuccessfully())
+            );
+
+            assertThat(tableStorage.abortRebalancePartition(PARTITION_ID), 
willCompleteSuccessfully());
+        }
+    }
+
+    @Test
+    void abortRebalancePartitionAndDoCheckpointInParallel() throws Exception {
+        stopCompactor();
+
+        getOrCreateMvPartition(PARTITION_ID);
+
+        for (int i = 0; i < 10; i++) {
+            assertThat(tableStorage.startRebalancePartition(PARTITION_ID), 
willCompleteSuccessfully());
+
+            runRace(
+                    () -> 
assertThat(tableStorage.abortRebalancePartition(PARTITION_ID), 
willCompleteSuccessfully()),
+                    () -> assertThat(forceCheckpointAsync(), 
willCompleteSuccessfully())
+            );
+        }
+    }
+
+    @Test
+    void finishRebalancePartitionAndDoCheckpointInParallel() throws Exception {
+        stopCompactor();
+
+        getOrCreateMvPartition(PARTITION_ID);
+
+        for (int i = 0; i < 10; i++) {
+            assertThat(tableStorage.startRebalancePartition(PARTITION_ID), 
willCompleteSuccessfully());
+
+            var meta = new MvPartitionMeta(1, 1, BYTE_EMPTY_ARRAY, null, 
BYTE_EMPTY_ARRAY);
+
+            runRace(
+                    () -> 
assertThat(tableStorage.finishRebalancePartition(PARTITION_ID, meta), 
willCompleteSuccessfully()),
+                    () -> assertThat(forceCheckpointAsync(), 
willCompleteSuccessfully())
+            );
+        }
+    }
+
+    private CompletableFuture<Void> forceCheckpointAsync() {
+        return 
engine.checkpointManager().forceCheckpoint("test").futureFor(FINISHED);
+    }
+
+    // TODO: IGNITE-25861 Get rid of it
+    private void stopCompactor() throws Exception {
+        engine.checkpointManager().compactor().stop();
+    }
 }

Reply via email to