This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new b74522ac677 IGNITE-26315 Fix race between increasing partition 
generation and performing write operations to it (#6502)
b74522ac677 is described below

commit b74522ac6778fa3e74e964327ad8a015124bc152
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Fri Aug 29 14:52:07 2025 +0300

    IGNITE-26315 Fix race between increasing partition generation and 
performing write operations to it (#6502)
---
 .../ItBplusTreePersistentPageMemoryTest.java       |   4 +-
 ...BplusTreeReuseListPersistentPageMemoryTest.java |   4 +-
 .../PartitionDestructionLockManager.java           |  44 +++++++++
 .../persistence/PartitionMetaManager.java          |   2 +-
 .../persistence/PartitionProcessingCounterMap.java |  84 -----------------
 .../persistence/PersistentPageMemory.java          |  12 ++-
 .../persistence/checkpoint/CheckpointManager.java  |  30 +++---
 .../persistence/checkpoint/CheckpointPages.java    |  50 ----------
 .../checkpoint/CheckpointPagesWriter.java          |  35 +++++--
 .../checkpoint/CheckpointPagesWriterFactory.java   |  11 ++-
 .../checkpoint/CheckpointProgressImpl.java         |  71 --------------
 .../persistence/checkpoint/Checkpointer.java       |  54 ++++-------
 .../persistence/compaction/Compactor.java          |  35 +++----
 .../replacement/DelayedDirtyPageWrite.java         |  15 ++-
 .../replacement/DelayedPageReplacementTracker.java |  10 +-
 .../PartitionProcessingCounterMapTest.java         |  79 ----------------
 .../checkpoint/CheckpointPagesWriterTest.java      |  10 +-
 .../checkpoint/CheckpointProgressImplTest.java     |  48 ----------
 .../persistence/checkpoint/CheckpointerTest.java   | 102 ++++++++-------------
 .../persistence/compaction/CompactorTest.java      |  66 +++++--------
 .../replacement/AbstractPageReplacementTest.java   |   3 +-
 .../throttling/PageMemoryThrottlingTest.java       |   3 +-
 .../pagememory/PersistentPageMemoryDataRegion.java |   3 +-
 .../PersistentPageMemoryTableStorage.java          |  20 +++-
 .../pagememory/PersistentPageMemoryNoLoadTest.java |   6 +-
 25 files changed, 256 insertions(+), 545 deletions(-)

diff --git 
a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/persistence/ItBplusTreePersistentPageMemoryTest.java
 
b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/persistence/ItBplusTreePersistentPageMemoryTest.java
index 7bb5761eb51..01f6be69a70 100644
--- 
a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/persistence/ItBplusTreePersistentPageMemoryTest.java
+++ 
b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/persistence/ItBplusTreePersistentPageMemoryTest.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.pagememory.PageMemory;
 import org.apache.ignite.internal.pagememory.TestPageIoRegistry;
 import 
org.apache.ignite.internal.pagememory.configuration.PersistentDataRegionConfiguration;
 import org.apache.ignite.internal.pagememory.persistence.PageHeader;
+import 
org.apache.ignite.internal.pagememory.persistence.PartitionDestructionLockManager;
 import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
 import 
org.apache.ignite.internal.pagememory.persistence.PersistentPageMemoryMetricSource;
 import 
org.apache.ignite.internal.pagememory.persistence.TestPageReadWriteManager;
@@ -75,7 +76,8 @@ public class ItBplusTreePersistentPageMemoryTest extends 
AbstractBplusTreePageMe
                 (fullPageId, buf, tag) -> {
                 },
                 mockCheckpointTimeoutLock(true),
-                wrapLock(offheapReadWriteLock)
+                wrapLock(offheapReadWriteLock),
+                new PartitionDestructionLockManager()
         );
     }
 
diff --git 
a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/persistence/ItBplusTreeReuseListPersistentPageMemoryTest.java
 
b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/persistence/ItBplusTreeReuseListPersistentPageMemoryTest.java
index cc3f96239b6..f27fe2e0059 100644
--- 
a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/persistence/ItBplusTreeReuseListPersistentPageMemoryTest.java
+++ 
b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/persistence/ItBplusTreeReuseListPersistentPageMemoryTest.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.pagememory.PageMemory;
 import org.apache.ignite.internal.pagememory.TestPageIoRegistry;
 import 
org.apache.ignite.internal.pagememory.configuration.PersistentDataRegionConfiguration;
 import org.apache.ignite.internal.pagememory.persistence.PageHeader;
+import 
org.apache.ignite.internal.pagememory.persistence.PartitionDestructionLockManager;
 import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
 import 
org.apache.ignite.internal.pagememory.persistence.PersistentPageMemoryMetricSource;
 import 
org.apache.ignite.internal.pagememory.persistence.TestPageReadWriteManager;
@@ -61,7 +62,8 @@ public class ItBplusTreeReuseListPersistentPageMemoryTest 
extends AbstractBplusT
                 (fullPageId, buf, tag) -> {
                 },
                 mockCheckpointTimeoutLock(true),
-                wrapLock(new 
OffheapReadWriteLock(OffheapReadWriteLock.DEFAULT_CONCURRENCY_LEVEL))
+                wrapLock(new 
OffheapReadWriteLock(OffheapReadWriteLock.DEFAULT_CONCURRENCY_LEVEL)),
+                new PartitionDestructionLockManager()
         );
     }
 
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionDestructionLockManager.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionDestructionLockManager.java
new file mode 100644
index 00000000000..79fc2858bd5
--- /dev/null
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionDestructionLockManager.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/** Partition Destruction Lock Manager. */
+// TODO: IGNITE-26339 Make partition generation increase non-blocking
+public class PartitionDestructionLockManager {
+    private final Map<GroupPartitionId, ReentrantReadWriteLock> 
lockByPartitionId = new ConcurrentHashMap<>();
+
+    /**
+     * Returns a lock to synchronize between partition destruction and write 
operations to it.
+     *
+     * <p>For partition write operations you need to use {@link 
ReadWriteLock#readLock()} and for partition destruction
+     * {@link ReadWriteLock#writeLock()}.</p>
+     */
+    public ReadWriteLock destructionLock(GroupPartitionId groupPartitionId) {
+        return lockByPartitionId.computeIfAbsent(groupPartitionId, unused -> 
new ReentrantReadWriteLock());
+    }
+
+    /** Removes all locks for a group. */
+    public void removeLockForGroup(int groupId) {
+        lockByPartitionId.entrySet().removeIf(e -> e.getKey().getGroupId() == 
groupId);
+    }
+}
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaManager.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaManager.java
index 3db2224454a..5cdb733af75 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaManager.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaManager.java
@@ -65,7 +65,7 @@ public class PartitionMetaManager {
     }
 
     /**
-     * Returns the partition's meta information.
+     * Returns the partition's meta information, {@code null} if not yet added 
or was removed when the partition was destroyed.
      *
      * @param groupPartitionId Partition of the group.
      */
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionProcessingCounterMap.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionProcessingCounterMap.java
deleted file mode 100644
index d0369054794..00000000000
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionProcessingCounterMap.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.pagememory.persistence;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Helper class for thread-safe work with {@link PartitionProcessingCounter} 
for any partition of any group.
- */
-public class PartitionProcessingCounterMap {
-    private final ConcurrentMap<GroupPartitionId, PartitionProcessingCounter> 
processedPartitions = new ConcurrentHashMap<>();
-
-    /**
-     * Atomically increments the partition processing counter.
-     *
-     * @param groupPartitionId Pair of group ID with partition ID.
-     */
-    public void incrementPartitionProcessingCounter(GroupPartitionId 
groupPartitionId) {
-        processedPartitions.compute(groupPartitionId, (id, 
partitionProcessingCounter) -> {
-            if (partitionProcessingCounter == null) {
-                PartitionProcessingCounter counter = new 
PartitionProcessingCounter();
-
-                counter.incrementPartitionProcessingCounter();
-
-                return counter;
-            }
-
-            partitionProcessingCounter.incrementPartitionProcessingCounter();
-
-            return partitionProcessingCounter;
-        });
-    }
-
-    /**
-     * Atomically decrements the partition processing counter.
-     *
-     * @param groupPartitionId Pair of group ID with partition ID.
-     */
-    public void decrementPartitionProcessingCounter(GroupPartitionId 
groupPartitionId) {
-        processedPartitions.compute(groupPartitionId, (id, 
partitionProcessingCounter) -> {
-            assert partitionProcessingCounter != null : id;
-            assert !partitionProcessingCounter.future().isDone() : id;
-
-            partitionProcessingCounter.decrementPartitionProcessingCounter();
-
-            return partitionProcessingCounter.future().isDone() ? null : 
partitionProcessingCounter;
-        });
-    }
-
-    /**
-     * Returns the future if the partition according to the given parameters 
is currently being processed, for example, dirty pages are
-     * being written or fsync is being done, {@code null} if the partition is 
not currently being processed.
-     *
-     * <p>Future will be added on {@link 
#incrementPartitionProcessingCounter(GroupPartitionId)} call and completed on
-     * {@link #incrementPartitionProcessingCounter(GroupPartitionId)} call 
(equal to the number of
-     * {@link #decrementPartitionProcessingCounter(GroupPartitionId)} calls).
-     *
-     * @param groupPartitionId Pair of group ID with partition ID.
-     */
-    @Nullable
-    public CompletableFuture<Void> 
getProcessedPartitionFuture(GroupPartitionId groupPartitionId) {
-        PartitionProcessingCounter partitionProcessingCounter = 
processedPartitions.get(groupPartitionId);
-
-        return partitionProcessingCounter == null ? null : 
partitionProcessingCounter.future();
-    }
-}
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
index fe0a6b8b1da..80741dc7d0f 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
@@ -225,6 +225,7 @@ public class PersistentPageMemory implements PageMemory {
      * @param flushDirtyPageForReplacement Write callback invoked when a dirty 
page is removed for replacement.
      * @param checkpointTimeoutLock Checkpoint timeout lock.
      * @param rwLock Read-write lock for pages.
+     * @param partitionDestructionLockManager Partition Destruction Lock 
Manager.
      */
     public PersistentPageMemory(
             PersistentDataRegionConfiguration dataRegionConfiguration,
@@ -235,7 +236,8 @@ public class PersistentPageMemory implements PageMemory {
             PageReadWriteManager pageStoreManager,
             WriteDirtyPage flushDirtyPageForReplacement,
             CheckpointTimeoutLock checkpointTimeoutLock,
-            OffheapReadWriteLock rwLock
+            OffheapReadWriteLock rwLock,
+            PartitionDestructionLockManager partitionDestructionLockManager
     ) {
         this.dataRegionConfiguration = dataRegionConfiguration;
         this.metricSource = metricSource;
@@ -272,7 +274,13 @@ public class PersistentPageMemory implements PageMemory {
                 throw new IgniteInternalException("Unexpected page replacement 
mode: " + replacementMode);
         }
 
-        delayedPageReplacementTracker = new 
DelayedPageReplacementTracker(pageSize, flushDirtyPageForReplacement, LOG, 
sizes.length - 1);
+        delayedPageReplacementTracker = new DelayedPageReplacementTracker(
+                pageSize,
+                flushDirtyPageForReplacement,
+                LOG,
+                sizes.length - 1,
+                partitionDestructionLockManager
+        );
 
         this.writeThrottle = null;
     }
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
index fe973622617..acbe84a2f13 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
@@ -39,6 +39,7 @@ import 
org.apache.ignite.internal.pagememory.io.PageIoRegistry;
 import org.apache.ignite.internal.pagememory.persistence.CheckpointUrgency;
 import org.apache.ignite.internal.pagememory.persistence.DirtyFullPageId;
 import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
+import 
org.apache.ignite.internal.pagememory.persistence.PartitionDestructionLockManager;
 import org.apache.ignite.internal.pagememory.persistence.PartitionMetaManager;
 import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
 import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.CheckpointDirtyPagesView;
@@ -84,6 +85,8 @@ public class CheckpointManager {
     /** Delta file compactor. */
     private final Compactor compactor;
 
+    private final PartitionDestructionLockManager 
partitionDestructionLockManager;
+
     /**
      * Constructor.
      *
@@ -133,11 +136,14 @@ public class CheckpointManager {
                 checkpointConfig.checkpointThreads()
         );
 
+        partitionDestructionLockManager = new 
PartitionDestructionLockManager();
+
         checkpointPagesWriterFactory = new CheckpointPagesWriterFactory(
                 this::writePageToFilePageStore,
                 ioRegistry,
                 partitionMetaManager,
-                pageSize
+                pageSize,
+                partitionDestructionLockManager
         );
 
         compactor = new Compactor(
@@ -146,7 +152,8 @@ public class CheckpointManager {
                 checkpointConfig.compactionThreads(),
                 filePageStoreManager,
                 pageSize,
-                failureManager
+                failureManager,
+                partitionDestructionLockManager
         );
 
         checkpointer = new Checkpointer(
@@ -160,7 +167,8 @@ public class CheckpointManager {
                 compactor,
                 pageSize,
                 checkpointConfig,
-                logSyncer
+                logSyncer,
+                partitionDestructionLockManager
         );
 
         checkpointTimeoutLock = new CheckpointTimeoutLock(
@@ -395,18 +403,8 @@ public class CheckpointManager {
         compactor.triggerCompaction();
     }
 
-    /**
-     * Callback on destruction of the partition of the corresponding group.
-     *
-     * <p>Prepares the checkpointer and compactor for partition destruction.
-     *
-     * @param groupPartitionId Pair of group ID with partition ID.
-     * @return Future that will complete when the callback completes.
-     */
-    public CompletableFuture<Void> onPartitionDestruction(GroupPartitionId 
groupPartitionId) {
-        return CompletableFuture.allOf(
-                checkpointer.prepareToDestroyPartition(groupPartitionId),
-                compactor.prepareToDestroyPartition(groupPartitionId)
-        );
+    /** Partition Destruction Lock Manager. */
+    public PartitionDestructionLockManager partitionDestructionLockManager() {
+        return partitionDestructionLockManager;
     }
 }
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPages.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPages.java
index 01c2e69ad9e..60be59913dc 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPages.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPages.java
@@ -25,10 +25,7 @@ import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
 import org.apache.ignite.internal.pagememory.persistence.DirtyFullPageId;
-import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
 import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
-import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
-import 
org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -148,51 +145,4 @@ public class CheckpointPages {
     public void unblockFsyncOnPageReplacement(DirtyFullPageId pageId, 
@Nullable Throwable error) {
         checkpointProgress.unblockFsyncOnPageReplacement(pageId, error);
     }
-
-    /**
-     * Blocks physical destruction of partition.
-     *
-     * <p>When the intention to destroy partition appears, {@link 
FilePageStore#isMarkedToDestroy()} is set to {@code == true} and
-     * {@link PersistentPageMemory#invalidate(int, int)} invoked at the 
beginning. And if there is a block, it waits for unblocking.
-     * Then it destroys the partition, {@link 
FilePageStoreManager#getStore(GroupPartitionId)} will return {@code null}.</p>
-     *
-     * <p>It is recommended to use where physical destruction of the partition 
may have an impact, for example when writing dirty pages and
-     * executing a fsync.</p>
-     *
-     * <p>To make sure that we can physically do something with the partition 
during a block, we will need to use approximately the
-     * following code:</p>
-     * <pre><code>
-     *     checkpointProgress.blockPartitionDestruction(partitionId);
-     *
-     *     try {
-     *         FilePageStore pageStore = 
FilePageStoreManager#getStore(partitionId);
-     *
-     *         if (pageStore == null || pageStore.isMarkedToDestroy()) {
-     *             return;
-     *         }
-     *
-     *         someAction(pageStore);
-     *     } finally {
-     *         checkpointProgress.unblockPartitionDestruction(partitionId);
-     *     }
-     * </code></pre>
-     *
-     * @param groupPartitionId Pair of group ID with partition ID.
-     * @see #unblockPartitionDestruction(GroupPartitionId)
-     */
-    public void blockPartitionDestruction(GroupPartitionId groupPartitionId) {
-        checkpointProgress.blockPartitionDestruction(groupPartitionId);
-    }
-
-    /**
-     * Unblocks physical destruction of partition.
-     *
-     * <p>As soon as the last thread makes an unlock, the physical destruction 
of the partition can immediately begin.</p>
-     *
-     * @param groupPartitionId Pair of group ID with partition ID.
-     * @see #blockPartitionDestruction(GroupPartitionId)
-     */
-    public void unblockPartitionDestruction(GroupPartitionId groupPartitionId) 
{
-        checkpointProgress.unblockPartitionDestruction(groupPartitionId);
-    }
 }
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
index e0746d46c18..ebfdeb867f5 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
@@ -35,6 +35,7 @@ import java.util.Map.Entry;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.locks.Lock;
 import java.util.function.BooleanSupplier;
 import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
 import org.apache.ignite.internal.logger.IgniteLogger;
@@ -44,6 +45,7 @@ import 
org.apache.ignite.internal.pagememory.io.PageIoRegistry;
 import org.apache.ignite.internal.pagememory.persistence.DirtyFullPageId;
 import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
 import org.apache.ignite.internal.pagememory.persistence.PageStoreWriter;
+import 
org.apache.ignite.internal.pagememory.persistence.PartitionDestructionLockManager;
 import org.apache.ignite.internal.pagememory.persistence.PartitionMeta;
 import 
org.apache.ignite.internal.pagememory.persistence.PartitionMeta.PartitionMetaSnapshot;
 import org.apache.ignite.internal.pagememory.persistence.PartitionMetaManager;
@@ -118,6 +120,8 @@ public class CheckpointPagesWriter implements Runnable {
     /** Shutdown now. */
     private final BooleanSupplier shutdownNow;
 
+    private final PartitionDestructionLockManager 
partitionDestructionLockManager;
+
     /**
      * Creates task for write pages.
      *
@@ -133,6 +137,7 @@ public class CheckpointPagesWriter implements Runnable {
      * @param ioRegistry Page IO registry.
      * @param partitionMetaManager Partition meta information manager.
      * @param shutdownNow Shutdown supplier.
+     * @param partitionDestructionLockManager Partition Destruction Lock 
Manager.
      */
     CheckpointPagesWriter(
             CheckpointMetricsTracker tracker,
@@ -146,7 +151,8 @@ public class CheckpointPagesWriter implements Runnable {
             WriteDirtyPage pageWriter,
             PageIoRegistry ioRegistry,
             PartitionMetaManager partitionMetaManager,
-            BooleanSupplier shutdownNow
+            BooleanSupplier shutdownNow,
+            PartitionDestructionLockManager partitionDestructionLockManager
     ) {
         this.tracker = tracker;
         this.dirtyPartitionQueue = dirtyPartitionQueue;
@@ -160,6 +166,7 @@ public class CheckpointPagesWriter implements Runnable {
         this.ioRegistry = ioRegistry;
         this.partitionMetaManager = partitionMetaManager;
         this.shutdownNow = shutdownNow;
+        this.partitionDestructionLockManager = partitionDestructionLockManager;
     }
 
     @Override
@@ -203,7 +210,9 @@ public class CheckpointPagesWriter implements Runnable {
     ) throws IgniteInternalCheckedException {
         CheckpointDirtyPagesView checkpointDirtyPagesView = 
checkpointDirtyPagesView(pageMemory, partitionId);
 
-        checkpointProgress.blockPartitionDestruction(partitionId);
+        Lock partitionDestructionLock = 
partitionDestructionLockManager.destructionLock(partitionId).readLock();
+
+        partitionDestructionLock.lock();
 
         try {
             addUpdatePartitionCounterIfAbsent(partitionId);
@@ -224,7 +233,7 @@ public class CheckpointPagesWriter implements Runnable {
                 writeDirtyPage(pageMemory, pageId, tmpWriteBuf, 
pageStoreWriter, true);
             }
         } finally {
-            checkpointProgress.unblockPartitionDestruction(partitionId);
+            partitionDestructionLock.unlock();
         }
     }
 
@@ -267,6 +276,8 @@ public class CheckpointPagesWriter implements Runnable {
 
             GroupPartitionId partitionId = null;
 
+            Lock partitionDestructionLock = null;
+
             try {
                 for (DirtyFullPageId pageId : entry.getValue()) {
                     if (shutdownNow.getAsBoolean()) {
@@ -276,20 +287,22 @@ public class CheckpointPagesWriter implements Runnable {
                     updateHeartbeat.run();
 
                     if (partitionIdChanged(partitionId, pageId)) {
-                        if (partitionId != null) {
-                            
checkpointProgress.unblockPartitionDestruction(partitionId);
+                        if (partitionDestructionLock != null) {
+                            partitionDestructionLock.unlock();
                         }
 
                         partitionId = GroupPartitionId.convert(pageId);
 
-                        
checkpointProgress.blockPartitionDestruction(partitionId);
+                        partitionDestructionLock = 
partitionDestructionLockManager.destructionLock(partitionId).readLock();
+
+                        partitionDestructionLock.lock();
                     }
 
                     writeDirtyPage(pageMemory, pageId, tmpWriteBuf, 
pageStoreWriter, useTryWriteLockOnPage);
                 }
             } finally {
-                if (partitionId != null) {
-                    
checkpointProgress.unblockPartitionDestruction(partitionId);
+                if (partitionDestructionLock != null) {
+                    partitionDestructionLock.unlock();
                 }
             }
         }
@@ -333,14 +346,16 @@ public class CheckpointPagesWriter implements Runnable {
 
                     GroupPartitionId partitionId = 
GroupPartitionId.convert(cpPageId);
 
-                    checkpointProgress.blockPartitionDestruction(partitionId);
+                    Lock partitionDestructionLock = 
partitionDestructionLockManager.destructionLock(partitionId).readLock();
+
+                    partitionDestructionLock.lock();
 
                     try {
                         addUpdatePartitionCounterIfAbsent(partitionId);
 
                         pageMemory.checkpointWritePage(cpPageId, 
tmpWriteBuf.rewind(), pageStoreWriter, tracker, true);
                     } finally {
-                        
checkpointProgress.unblockPartitionDestruction(partitionId);
+                        partitionDestructionLock.unlock();
                     }
                 }
             }
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java
index c0c05e5ca16..d003ab8ff87 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.LongAdder;
 import java.util.function.BooleanSupplier;
 import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
 import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
+import 
org.apache.ignite.internal.pagememory.persistence.PartitionDestructionLockManager;
 import org.apache.ignite.internal.pagememory.persistence.PartitionMetaManager;
 import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
 import org.apache.ignite.internal.pagememory.persistence.WriteDirtyPage;
@@ -49,6 +50,8 @@ public class CheckpointPagesWriterFactory {
     /** Partition meta information manager. */
     private final PartitionMetaManager partitionMetaManager;
 
+    private final PartitionDestructionLockManager 
partitionDestructionLockManager;
+
     /**
      * Constructor.
      *
@@ -56,17 +59,20 @@ public class CheckpointPagesWriterFactory {
      * @param ioRegistry Page IO registry.
      * @param partitionMetaManager Partition meta information manager.
      * @param pageSize Page size in bytes.
+     * @param partitionDestructionLockManager Partition Destruction Lock 
Manager.
      */
     CheckpointPagesWriterFactory(
             WriteDirtyPage dirtyPageWriter,
             PageIoRegistry ioRegistry,
             PartitionMetaManager partitionMetaManager,
             // TODO: IGNITE-17017 Move to common config
-            int pageSize
+            int pageSize,
+            PartitionDestructionLockManager partitionDestructionLockManager
     ) {
         this.dirtyPageWriter = dirtyPageWriter;
         this.ioRegistry = ioRegistry;
         this.partitionMetaManager = partitionMetaManager;
+        this.partitionDestructionLockManager = partitionDestructionLockManager;
 
         threadBuf = ThreadLocal.withInitial(() -> {
             ByteBuffer tmpWriteBuf = ByteBuffer.allocateDirect(pageSize);
@@ -112,7 +118,8 @@ public class CheckpointPagesWriterFactory {
                 dirtyPageWriter,
                 ioRegistry,
                 partitionMetaManager,
-                shutdownNow
+                shutdownNow,
+                partitionDestructionLockManager
         );
     }
 }
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java
index ea77ba217fe..fdd160b2b17 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java
@@ -29,11 +29,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.internal.pagememory.persistence.DirtyFullPageId;
-import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
-import 
org.apache.ignite.internal.pagememory.persistence.PartitionProcessingCounterMap;
-import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
-import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
-import 
org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -75,9 +70,6 @@ public class CheckpointProgressImpl implements 
CheckpointProgress {
     /** Sorted dirty pages to be written on the checkpoint. */
     private volatile @Nullable CheckpointDirtyPages pageToWrite;
 
-    /** Partitions currently being processed, for example, writing dirty pages 
or doing fsync. */
-    private final PartitionProcessingCounterMap processedPartitionMap = new 
PartitionProcessingCounterMap();
-
     /** Assistant for synchronizing page replacement and fsync phase. */
     private final CheckpointPageReplacement checkpointPageReplacement = new 
CheckpointPageReplacement();
 
@@ -305,69 +297,6 @@ public class CheckpointProgressImpl implements 
CheckpointProgress {
         this.pageToWrite = pageToWrite;
     }
 
-    /**
-     * Blocks physical destruction of partition.
-     *
-     * <p>When the intention to destroy partition appears, {@link 
FilePageStore#isMarkedToDestroy()} is set to {@code == true} and
-     * {@link PersistentPageMemory#invalidate(int, int)} invoked at the 
beginning. And if there is a block, it waits for unblocking.
-     * Then it destroys the partition, {@link 
FilePageStoreManager#getStore(GroupPartitionId)} will return {@code null}.</p>
-     *
-     * <p>It is recommended to use where physical destruction of the partition 
may have an impact, for example when writing dirty pages and
-     * executing a fsync.</p>
-     *
-     * <p>To make sure that we can physically do something with the partition 
during a block, we will need to use approximately the
-     * following code:</p>
-     * <pre><code>
-     *     checkpointProgress.blockPartitionDestruction(partitionId);
-     *
-     *     try {
-     *         FilePageStore pageStore = 
FilePageStoreManager#getStore(partitionId);
-     *
-     *         if (pageStore == null || pageStore.isMarkedToDestroy()) {
-     *             return;
-     *         }
-     *
-     *         someAction(pageStore);
-     *     } finally {
-     *         checkpointProgress.unblockPartitionDestruction(partitionId);
-     *     }
-     * </code></pre>
-     *
-     * @param groupPartitionId Pair of group ID with partition ID.
-     * @see #unblockPartitionDestruction(GroupPartitionId)
-     * @see #getUnblockPartitionDestructionFuture(GroupPartitionId)
-     */
-    public void blockPartitionDestruction(GroupPartitionId groupPartitionId) {
-        
processedPartitionMap.incrementPartitionProcessingCounter(groupPartitionId);
-    }
-
-    /**
-     * Unblocks physical destruction of partition.
-     *
-     * <p>As soon as the last thread makes an unlock, the physical destruction 
of the partition can immediately begin.</p>
-     *
-     * @param groupPartitionId Pair of group ID with partition ID.
-     * @see #blockPartitionDestruction(GroupPartitionId)
-     * @see #getUnblockPartitionDestructionFuture(GroupPartitionId)
-     */
-    public void unblockPartitionDestruction(GroupPartitionId groupPartitionId) 
{
-        
processedPartitionMap.decrementPartitionProcessingCounter(groupPartitionId);
-    }
-
-    /**
-     * Returns the future if the partition according to the given parameters 
is currently being blocked, for example, dirty pages are
-     * being written or fsync is being done, {@code null} if the partition is 
not currently being blocked.
-     *
-     * <p>Future will be added on {@link 
#blockPartitionDestruction(GroupPartitionId)} call and completed on
-     * {@link #unblockPartitionDestruction(GroupPartitionId)} call (equal to 
the number of
-     * {@link #unblockPartitionDestruction(GroupPartitionId)} calls).
-     *
-     * @param groupPartitionId Pair of group ID with partition ID.
-     */
-    public @Nullable CompletableFuture<Void> 
getUnblockPartitionDestructionFuture(GroupPartitionId groupPartitionId) {
-        return 
processedPartitionMap.getProcessedPartitionFuture(groupPartitionId);
-    }
-
     /**
      * Block the start of the fsync phase at a checkpoint before replacing the 
page.
      *
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
index 463866fd26d..df0d9062170 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
@@ -28,7 +28,6 @@ import static 
org.apache.ignite.internal.failure.FailureType.SYSTEM_WORKER_TERMI
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointReadWriteLock.CHECKPOINT_RUNNER_THREAD_PREFIX;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_TAKEN;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.PAGES_SNAPSHOT_TAKEN;
-import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static 
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
 import static org.apache.ignite.internal.util.IgniteUtils.safeAbs;
 import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
@@ -46,6 +45,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.locks.Lock;
 import java.util.function.BooleanSupplier;
 import org.apache.ignite.internal.components.LogSyncer;
 import org.apache.ignite.internal.components.LongJvmPauseDetector;
@@ -60,6 +60,7 @@ import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.pagememory.DataRegion;
 import 
org.apache.ignite.internal.pagememory.configuration.CheckpointConfiguration;
 import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
+import 
org.apache.ignite.internal.pagememory.persistence.PartitionDestructionLockManager;
 import org.apache.ignite.internal.pagememory.persistence.PartitionMeta;
 import org.apache.ignite.internal.pagememory.persistence.PartitionMetaManager;
 import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
@@ -183,6 +184,8 @@ public class Checkpointer extends IgniteWorker {
 
     private final LogSyncer logSyncer;
 
+    private final PartitionDestructionLockManager 
partitionDestructionLockManager;
+
     /**
      * Constructor.
      *
@@ -196,6 +199,7 @@ public class Checkpointer extends IgniteWorker {
      * @param pageSize Page size.
      * @param checkpointConfig Checkpoint configuration.
      * @param logSyncer Write-ahead log synchronizer.
+     * @param partitionDestructionLockManager Partition Destruction Lock 
Manager.
      */
     Checkpointer(
             String igniteInstanceName,
@@ -208,7 +212,8 @@ public class Checkpointer extends IgniteWorker {
             Compactor compactor,
             int pageSize,
             CheckpointConfiguration checkpointConfig,
-            LogSyncer logSyncer
+            LogSyncer logSyncer,
+            PartitionDestructionLockManager partitionDestructionLockManager
     ) {
         super(LOG, igniteInstanceName, "checkpoint-thread");
 
@@ -222,6 +227,7 @@ public class Checkpointer extends IgniteWorker {
         this.failureManager = failureManager;
         this.logSyncer = logSyncer;
         this.partitionMetaManager = partitionMetaManager;
+        this.partitionDestructionLockManager = partitionDestructionLockManager;
 
         scheduledCheckpointProgress = new 
CheckpointProgressImpl(MILLISECONDS.toNanos(nextCheckpointInterval()));
 
@@ -612,27 +618,29 @@ public class Checkpointer extends IgniteWorker {
             return;
         }
 
-        currentCheckpointProgress.blockPartitionDestruction(partitionId);
-
-        try {
-            fsyncDeltaFilePageStoreOnCheckpointThread(filePageStore);
-
-            fsyncFilePageStoreOnCheckpointThread(filePageStore);
+        Lock partitionDestructionLock = 
partitionDestructionLockManager.destructionLock(partitionId).readLock();
 
-            renameDeltaFileOnCheckpointThread(filePageStore, partitionId);
+        partitionDestructionLock.lock();
 
-            // TODO: IGNITE-26315 Deal with partition deletion blocking on 
checkpoint
+        try {
             PartitionMeta meta = partitionMetaManager.getMeta(partitionId);
 
+            // If this happens, then the partition is destroyed.
             if (meta == null) {
                 return;
             }
 
+            fsyncDeltaFilePageStoreOnCheckpointThread(filePageStore);
+
+            fsyncFilePageStoreOnCheckpointThread(filePageStore);
+
+            renameDeltaFileOnCheckpointThread(filePageStore, partitionId);
+
             
filePageStore.checkpointedPageCount(meta.metaSnapshot(currentCheckpointProgress.id()).pageCount());
 
             
currentCheckpointProgress.syncedPagesCounter().addAndGet(pagesWritten.intValue());
         } finally {
-            currentCheckpointProgress.unblockPartitionDestruction(partitionId);
+            partitionDestructionLock.unlock();
         }
     }
 
@@ -929,30 +937,6 @@ public class Checkpointer extends IgniteWorker {
         afterReleaseWriteLockCheckpointProgress = currentCheckpointProgress;
     }
 
-    /**
-     * Prepares the checkpointer to destroy a partition.
-     *
-     * <p>If the checkpoint is in progress, then wait until it finishes 
processing the partition that we are going to destroy, in order to
-     * prevent the situation when we want to destroy the partition file along 
with its delta files, and at this time the checkpoint performs
-     * I/O operations on them.
-     *
-     * @param groupPartitionId Pair of group ID with partition ID.
-     * @return Future that will end when the checkpoint is ready to destroy 
the partition.
-     */
-    CompletableFuture<Void> prepareToDestroyPartition(GroupPartitionId 
groupPartitionId) {
-        CheckpointProgressImpl currentCheckpointProgress = 
this.currentCheckpointProgress;
-
-        // If the checkpoint starts after this line, then the data region will 
already know that we want to destroy the partition, and when
-        // reading the page for writing to the delta file, we will receive an 
"outdated" page that we will not write to disk.
-        if (currentCheckpointProgress == null || 
!currentCheckpointProgress.inProgress()) {
-            return nullCompletedFuture();
-        }
-
-        CompletableFuture<Void> processedPartitionFuture = 
currentCheckpointProgress.getUnblockPartitionDestructionFuture(groupPartitionId);
-
-        return processedPartitionFuture == null ? nullCompletedFuture() : 
processedPartitionFuture;
-    }
-
     private void replicatorLogSync(CheckpointMetricsTracker tracker) throws 
IgniteInternalCheckedException {
         try {
             tracker.onReplicatorLogSyncStart();
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java
index e708f55bad6..773d7901d08 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java
@@ -22,7 +22,6 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static java.util.stream.Collectors.toCollection;
 import static 
org.apache.ignite.internal.failure.FailureType.SYSTEM_WORKER_TERMINATION;
-import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
@@ -34,6 +33,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
 import org.apache.ignite.internal.failure.FailureContext;
 import org.apache.ignite.internal.failure.FailureManager;
 import org.apache.ignite.internal.lang.IgniteInternalException;
@@ -41,7 +41,7 @@ import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.pagememory.io.PageIo;
 import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
-import 
org.apache.ignite.internal.pagememory.persistence.PartitionProcessingCounterMap;
+import 
org.apache.ignite.internal.pagememory.persistence.PartitionDestructionLockManager;
 import org.apache.ignite.internal.pagememory.persistence.WriteSpeedFormatter;
 import 
org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStoreIo;
 import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
@@ -84,15 +84,14 @@ public class Compactor extends IgniteWorker {
     /** Thread local with buffers for the compaction threads. */
     private static final ThreadLocal<ByteBuffer> THREAD_BUF = new 
ThreadLocal<>();
 
-    /** Partitions for which delta files are currently compacted. */
-    private final PartitionProcessingCounterMap 
partitionCompactionInProgressMap = new PartitionProcessingCounterMap();
-
     /** Page size in bytes. */
     private final int pageSize;
 
     /** Failure processor. */
     private final FailureManager failureManager;
 
+    private final PartitionDestructionLockManager 
partitionDestructionLockManager;
+
     /**
      * Creates new ignite worker with given parameters.
      *
@@ -102,6 +101,7 @@ public class Compactor extends IgniteWorker {
      * @param filePageStoreManager File page store manager.
      * @param pageSize Page size in bytes.
      * @param failureManager Failure processor that is used to handle critical 
errors.
+     * @param partitionDestructionLockManager Partition Destruction Lock 
Manager.
      */
     public Compactor(
             IgniteLogger log,
@@ -109,12 +109,14 @@ public class Compactor extends IgniteWorker {
             int threads,
             FilePageStoreManager filePageStoreManager,
             int pageSize,
-            FailureManager failureManager
+            FailureManager failureManager,
+            PartitionDestructionLockManager partitionDestructionLockManager
     ) {
         super(log, igniteInstanceName, "compaction-thread");
 
         this.filePageStoreManager = filePageStoreManager;
         this.failureManager = failureManager;
+        this.partitionDestructionLockManager = partitionDestructionLockManager;
 
         if (threads > 1) {
             threadPoolExecutor = new ThreadPoolExecutor(
@@ -250,7 +252,9 @@ public class Compactor extends IgniteWorker {
 
                             GroupPartitionId groupPartitionId = 
toMerge.groupPartitionFilePageStore.groupPartitionId();
 
-                            
partitionCompactionInProgressMap.incrementPartitionProcessingCounter(groupPartitionId);
+                            Lock partitionDestructionLock = 
partitionDestructionLockManager.destructionLock(groupPartitionId).readLock();
+
+                            partitionDestructionLock.lock();
 
                             try {
                                 mergeDeltaFileToMainFile(
@@ -259,7 +263,7 @@ public class Compactor extends IgniteWorker {
                                         tracker
                                 );
                             } finally {
-                                
partitionCompactionInProgressMap.decrementPartitionProcessingCounter(groupPartitionId);
+                                partitionDestructionLock.unlock();
                             }
                         }
                     } catch (Throwable ex) {
@@ -451,21 +455,6 @@ public class Compactor extends IgniteWorker {
         assert removed : filePageStore.filePath();
     }
 
-    /**
-     * Prepares the compactor to destroy a partition.
-     *
-     * <p>If the partition compaction is in progress, then we will wait until 
it is completed so that there are no errors when we want to
-     * destroy the partition file and its delta file, and at this time its 
compaction occurs.
-     *
-     * @param groupPartitionId Pair of group ID with partition ID.
-     * @return Future at the complete of which we can delete the partition 
file and its delta files.
-     */
-    public CompletableFuture<Void> prepareToDestroyPartition(GroupPartitionId 
groupPartitionId) {
-        CompletableFuture<Void> partitionProcessingFuture = 
partitionCompactionInProgressMap.getProcessedPartitionFuture(groupPartitionId);
-
-        return partitionProcessingFuture == null ? nullCompletedFuture() : 
partitionProcessingFuture;
-    }
-
     private static ByteBuffer getThreadLocalBuffer(int pageSize) {
         ByteBuffer buffer = THREAD_BUF.get();
 
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedDirtyPageWrite.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedDirtyPageWrite.java
index 243ace83dbf..6ea44830d2c 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedDirtyPageWrite.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedDirtyPageWrite.java
@@ -21,9 +21,11 @@ import static 
org.apache.ignite.internal.util.GridUnsafe.bufferAddress;
 import static org.apache.ignite.internal.util.GridUnsafe.copyMemory;
 
 import java.nio.ByteBuffer;
+import java.util.concurrent.locks.Lock;
 import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
 import org.apache.ignite.internal.pagememory.persistence.DirtyFullPageId;
 import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
+import 
org.apache.ignite.internal.pagememory.persistence.PartitionDestructionLockManager;
 import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
 import org.apache.ignite.internal.pagememory.persistence.WriteDirtyPage;
 import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointPages;
@@ -55,6 +57,8 @@ public class DelayedDirtyPageWrite {
     /** Replacing pages tracker, used to register & unregister pages being 
written. */
     private final DelayedPageReplacementTracker tracker;
 
+    private final PartitionDestructionLockManager 
partitionDestructionLockManager;
+
     /** Full page id to be written on {@link #flushCopiedPageIfExists}, {@code 
null} if nothing to write. */
     private @Nullable DirtyFullPageId fullPageId;
 
@@ -73,18 +77,21 @@ public class DelayedDirtyPageWrite {
      * @param byteBufThreadLoc Thread local buffers to use for pages copying.
      * @param pageSize Page size in bytes.
      * @param tracker Tracker to lock/unlock page reads.
+     * @param partitionDestructionLockManager Partition Destruction Lock 
Manager.
      */
     DelayedDirtyPageWrite(
             WriteDirtyPage flushDirtyPage,
             ThreadLocal<ByteBuffer> byteBufThreadLoc,
             // TODO: IGNITE-17017 Move to common config
             int pageSize,
-            DelayedPageReplacementTracker tracker
+            DelayedPageReplacementTracker tracker,
+            PartitionDestructionLockManager partitionDestructionLockManager
     ) {
         this.flushDirtyPage = flushDirtyPage;
         this.pageSize = pageSize;
         this.byteBufThreadLoc = byteBufThreadLoc;
         this.tracker = tracker;
+        this.partitionDestructionLockManager = partitionDestructionLockManager;
     }
 
     /**
@@ -134,7 +141,9 @@ public class DelayedDirtyPageWrite {
 
         Throwable errorOnWrite = null;
 
-        
checkpointPages.blockPartitionDestruction(GroupPartitionId.convert(fullPageId));
+        Lock partitionDestructionLock = 
partitionDestructionLockManager.destructionLock(GroupPartitionId.convert(fullPageId)).readLock();
+
+        partitionDestructionLock.lock();
 
         try {
             flushDirtyPage.write(pageMemory, fullPageId, 
byteBufThreadLoc.get());
@@ -143,7 +152,7 @@ public class DelayedDirtyPageWrite {
 
             throw t;
         } finally {
-            
checkpointPages.unblockPartitionDestruction(GroupPartitionId.convert(fullPageId));
+            partitionDestructionLock.unlock();
 
             checkpointPages.unblockFsyncOnPageReplacement(fullPageId, 
errorOnWrite);
 
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedPageReplacementTracker.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedPageReplacementTracker.java
index d5a3c14d392..63e1dec3560 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedPageReplacementTracker.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedPageReplacementTracker.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.pagememory.FullPageId;
+import 
org.apache.ignite.internal.pagememory.persistence.PartitionDestructionLockManager;
 import org.apache.ignite.internal.pagememory.persistence.WriteDirtyPage;
 
 /**
@@ -55,6 +56,8 @@ public class DelayedPageReplacementTracker {
      */
     private final Map<Long, DelayedDirtyPageWrite> 
delayedPageWriteThreadLocMap = new ConcurrentHashMap<>();
 
+    private final PartitionDestructionLockManager 
partitionDestructionLockManager;
+
     /**
      * Constructor.
      *
@@ -62,17 +65,20 @@ public class DelayedPageReplacementTracker {
      * @param flushDirtyPage Flush dirty page.
      * @param log Logger.
      * @param segmentCnt Segments count.
+     * @param partitionDestructionLockManager Partition Destruction Lock 
Manager.
      */
     public DelayedPageReplacementTracker(
             // TODO: IGNITE-17017 Move to common config
             int pageSize,
             WriteDirtyPage flushDirtyPage,
             IgniteLogger log,
-            int segmentCnt
+            int segmentCnt,
+            PartitionDestructionLockManager partitionDestructionLockManager
     ) {
         this.pageSize = pageSize;
         this.flushDirtyPage = flushDirtyPage;
         this.log = log;
+        this.partitionDestructionLockManager = partitionDestructionLockManager;
 
         stripes = new Stripe[segmentCnt];
 
@@ -94,7 +100,7 @@ public class DelayedPageReplacementTracker {
      */
     public DelayedDirtyPageWrite delayedPageWrite() {
         return 
delayedPageWriteThreadLocMap.computeIfAbsent(Thread.currentThread().getId(),
-                id -> new DelayedDirtyPageWrite(flushDirtyPage, 
byteBufThreadLoc, pageSize, this));
+                id -> new DelayedDirtyPageWrite(flushDirtyPage, 
byteBufThreadLoc, pageSize, this, partitionDestructionLockManager));
     }
 
     /**
diff --git 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionProcessingCounterMapTest.java
 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionProcessingCounterMapTest.java
deleted file mode 100644
index af5bf41158a..00000000000
--- 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionProcessingCounterMapTest.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.pagememory.persistence;
-
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNotSame;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertSame;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-import java.util.concurrent.CompletableFuture;
-import org.junit.jupiter.api.Test;
-
-/**
- * For {@link PartitionProcessingCounterMap} testing.
- */
-public class PartitionProcessingCounterMapTest {
-    @Test
-    void test() {
-        PartitionProcessingCounterMap processingCounterMap = new 
PartitionProcessingCounterMap();
-
-        GroupPartitionId groupPartitionId = new GroupPartitionId(0, 0);
-
-        
assertNull(processingCounterMap.getProcessedPartitionFuture(groupPartitionId));
-
-        
processingCounterMap.incrementPartitionProcessingCounter(groupPartitionId);
-
-        CompletableFuture<Void> processedPartitionFuture0 = 
processingCounterMap.getProcessedPartitionFuture(groupPartitionId);
-
-        assertNotNull(processedPartitionFuture0);
-        assertFalse(processedPartitionFuture0.isDone());
-
-        
processingCounterMap.incrementPartitionProcessingCounter(groupPartitionId);
-
-        assertSame(processedPartitionFuture0, 
processingCounterMap.getProcessedPartitionFuture(groupPartitionId));
-        assertFalse(processedPartitionFuture0.isDone());
-
-        
processingCounterMap.decrementPartitionProcessingCounter(groupPartitionId);
-
-        assertSame(processedPartitionFuture0, 
processingCounterMap.getProcessedPartitionFuture(groupPartitionId));
-        assertFalse(processedPartitionFuture0.isDone());
-
-        
processingCounterMap.decrementPartitionProcessingCounter(groupPartitionId);
-
-        
assertNull(processingCounterMap.getProcessedPartitionFuture(groupPartitionId));
-        assertTrue(processedPartitionFuture0.isDone());
-
-        // Let's check the reprocessing of the partition.
-
-        
processingCounterMap.incrementPartitionProcessingCounter(groupPartitionId);
-
-        CompletableFuture<Void> processedPartitionFuture1 = 
processingCounterMap.getProcessedPartitionFuture(groupPartitionId);
-
-        assertNotNull(processedPartitionFuture1);
-        assertFalse(processedPartitionFuture1.isDone());
-        assertNotSame(processedPartitionFuture0, processedPartitionFuture1);
-
-        
processingCounterMap.decrementPartitionProcessingCounter(groupPartitionId);
-
-        
assertNull(processingCounterMap.getProcessedPartitionFuture(groupPartitionId));
-        assertTrue(processedPartitionFuture1.isDone());
-    }
-}
diff --git 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterTest.java
 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterTest.java
index b187a1d8950..ec4639c6641 100644
--- 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterTest.java
+++ 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterTest.java
@@ -63,6 +63,7 @@ import 
org.apache.ignite.internal.pagememory.io.PageIoRegistry;
 import org.apache.ignite.internal.pagememory.persistence.DirtyFullPageId;
 import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
 import org.apache.ignite.internal.pagememory.persistence.PageStoreWriter;
+import 
org.apache.ignite.internal.pagememory.persistence.PartitionDestructionLockManager;
 import org.apache.ignite.internal.pagememory.persistence.PartitionMeta;
 import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
 import org.apache.ignite.internal.pagememory.persistence.WriteDirtyPage;
@@ -152,7 +153,8 @@ public class CheckpointPagesWriterTest extends 
BaseIgniteAbstractTest {
                 pageWriter,
                 ioRegistry,
                 createPartitionMetaManager(Map.of(groupPartId0, 
partitionMeta0, groupPartId1, partitionMeta1)),
-                () -> false
+                () -> false,
+                new PartitionDestructionLockManager()
         );
 
         pagesWriter.run();
@@ -231,7 +233,8 @@ public class CheckpointPagesWriterTest extends 
BaseIgniteAbstractTest {
                 createDirtyPageWriter(null),
                 ioRegistry,
                 createPartitionMetaManager(Map.of(groupPartId, 
mock(PartitionMeta.class))),
-                () -> false
+                () -> false,
+                new PartitionDestructionLockManager()
         );
 
         pagesWriter.run();
@@ -292,7 +295,8 @@ public class CheckpointPagesWriterTest extends 
BaseIgniteAbstractTest {
                 createDirtyPageWriter(null),
                 ioRegistry,
                 createPartitionMetaManager(Map.of(groupPartId, partitionMeta)),
-                () -> checkpointWritePageCount.get() > 0
+                () -> checkpointWritePageCount.get() > 0,
+                new PartitionDestructionLockManager()
         );
 
         pagesWriter.run();
diff --git 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImplTest.java
 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImplTest.java
index c1c96cd8d3c..8454c71e511 100644
--- 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImplTest.java
+++ 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImplTest.java
@@ -30,7 +30,6 @@ import static 
org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNotSame;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -39,7 +38,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
 import org.junit.jupiter.api.Test;
 
 /**
@@ -367,50 +365,4 @@ public class CheckpointProgressImplTest {
 
         assertNull(progressImpl.pagesToWrite());
     }
-
-    @Test
-    void testProcessedPartition() {
-        CheckpointProgressImpl progressImpl = new CheckpointProgressImpl(0);
-
-        GroupPartitionId groupPartitionId = new GroupPartitionId(0, 0);
-
-        
assertNull(progressImpl.getUnblockPartitionDestructionFuture(groupPartitionId));
-
-        progressImpl.blockPartitionDestruction(groupPartitionId);
-
-        CompletableFuture<Void> processedPartitionFuture0 = 
progressImpl.getUnblockPartitionDestructionFuture(groupPartitionId);
-
-        assertNotNull(processedPartitionFuture0);
-        assertFalse(processedPartitionFuture0.isDone());
-
-        progressImpl.blockPartitionDestruction(groupPartitionId);
-
-        assertSame(processedPartitionFuture0, 
progressImpl.getUnblockPartitionDestructionFuture(groupPartitionId));
-        assertFalse(processedPartitionFuture0.isDone());
-
-        progressImpl.unblockPartitionDestruction(groupPartitionId);
-
-        assertSame(processedPartitionFuture0, 
progressImpl.getUnblockPartitionDestructionFuture(groupPartitionId));
-        assertFalse(processedPartitionFuture0.isDone());
-
-        progressImpl.unblockPartitionDestruction(groupPartitionId);
-
-        
assertNull(progressImpl.getUnblockPartitionDestructionFuture(groupPartitionId));
-        assertTrue(processedPartitionFuture0.isDone());
-
-        // Let's check the reprocessing of the partition.
-
-        progressImpl.blockPartitionDestruction(groupPartitionId);
-
-        CompletableFuture<Void> processedPartitionFuture1 = 
progressImpl.getUnblockPartitionDestructionFuture(groupPartitionId);
-
-        assertNotNull(processedPartitionFuture1);
-        assertFalse(processedPartitionFuture1.isDone());
-        assertNotSame(processedPartitionFuture0, processedPartitionFuture1);
-
-        progressImpl.unblockPartitionDestruction(groupPartitionId);
-
-        
assertNull(progressImpl.getUnblockPartitionDestructionFuture(groupPartitionId));
-        assertTrue(processedPartitionFuture1.isDone());
-    }
 }
diff --git 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
index f8c66d94c68..789ac8c17e4 100644
--- 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
+++ 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
@@ -20,11 +20,9 @@ package 
org.apache.ignite.internal.pagememory.persistence.checkpoint;
 import static java.lang.System.nanoTime;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.SECONDS;
 import static 
org.apache.ignite.internal.pagememory.persistence.FakePartitionMeta.FACTORY;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.EMPTY;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
-import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_TAKEN;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.TestCheckpointUtils.createDirtyPagesAndPartitions;
 import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
@@ -73,6 +71,7 @@ import 
org.apache.ignite.internal.pagememory.io.PageIoRegistry;
 import org.apache.ignite.internal.pagememory.persistence.DirtyFullPageId;
 import org.apache.ignite.internal.pagememory.persistence.FakePartitionMeta;
 import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
+import 
org.apache.ignite.internal.pagememory.persistence.PartitionDestructionLockManager;
 import org.apache.ignite.internal.pagememory.persistence.PartitionMetaManager;
 import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
 import org.apache.ignite.internal.pagememory.persistence.WriteDirtyPage;
@@ -121,18 +120,21 @@ public class CheckpointerTest extends 
BaseIgniteAbstractTest {
     void testStartAndStop() throws Exception {
         PartitionMetaManager mockParititonMetaManager = 
mock(PartitionMetaManager.class);
 
-        Checkpointer checkpointer = new Checkpointer(
+        var partitionDestructionLockManager = new 
PartitionDestructionLockManager();
+
+        var checkpointer = new Checkpointer(
                 "test",
                 null,
                 mock(FailureManager.class),
                 createCheckpointWorkflow(EMPTY),
-                createCheckpointPagesWriterFactory(mockParititonMetaManager),
+                createCheckpointPagesWriterFactory(mockParititonMetaManager, 
partitionDestructionLockManager),
                 mock(FilePageStoreManager.class),
                 mockParititonMetaManager,
                 mock(Compactor.class),
                 PAGE_SIZE,
                 checkpointConfig,
-                mock(LogSyncer.class)
+                mock(LogSyncer.class),
+                partitionDestructionLockManager
         );
 
         assertNull(checkpointer.runner());
@@ -167,7 +169,8 @@ public class CheckpointerTest extends 
BaseIgniteAbstractTest {
                 mock(Compactor.class),
                 PAGE_SIZE,
                 checkpointConfig,
-                mock(LogSyncer.class)
+                mock(LogSyncer.class),
+                new PartitionDestructionLockManager()
         ));
 
         assertNull(checkpointer.lastCheckpointProgress());
@@ -273,7 +276,8 @@ public class CheckpointerTest extends 
BaseIgniteAbstractTest {
                 mock(Compactor.class),
                 PAGE_SIZE,
                 checkpointConfig,
-                mock(LogSyncer.class)
+                mock(LogSyncer.class),
+                new PartitionDestructionLockManager()
         );
 
         CompletableFuture<?> waitCheckpointEventFuture = 
runAsync(checkpointer::waitCheckpointEvent);
@@ -293,18 +297,21 @@ public class CheckpointerTest extends 
BaseIgniteAbstractTest {
     void testCheckpointBody() throws Exception {
         intervalMillis.set(100L);
 
-        Checkpointer checkpointer = spy(new Checkpointer(
+        var partitionDestructionLockManager = new 
PartitionDestructionLockManager();
+
+        var checkpointer = spy(new Checkpointer(
                 "test",
                 null,
                 mock(FailureManager.class),
                 createCheckpointWorkflow(EMPTY),
-                
createCheckpointPagesWriterFactory(mock(PartitionMetaManager.class)),
+                
createCheckpointPagesWriterFactory(mock(PartitionMetaManager.class), 
partitionDestructionLockManager),
                 mock(FilePageStoreManager.class),
                 mock(PartitionMetaManager.class),
                 mock(Compactor.class),
                 PAGE_SIZE,
                 checkpointConfig,
-                mock(LogSyncer.class)
+                mock(LogSyncer.class),
+                partitionDestructionLockManager
         ));
 
         checkpointer.scheduledProgress()
@@ -392,18 +399,21 @@ public class CheckpointerTest extends 
BaseIgniteAbstractTest {
 
         when(mock.getMeta(any())).thenReturn(meta);
 
-        Checkpointer checkpointer = spy(new Checkpointer(
+        var partitionDestructionLockManager = new 
PartitionDestructionLockManager();
+
+        var checkpointer = spy(new Checkpointer(
                 "test",
                 null,
                 mock(FailureManager.class),
                 createCheckpointWorkflow(dirtyPages),
-                createCheckpointPagesWriterFactory(partitionMetaManager),
+                createCheckpointPagesWriterFactory(partitionMetaManager, 
partitionDestructionLockManager),
                 createFilePageStoreManager(Map.of(new GroupPartitionId(0, 0), 
filePageStore)),
                 mock,
                 compactor,
                 PAGE_SIZE,
                 checkpointConfig,
-                mockLogSyncer
+                mockLogSyncer,
+                partitionDestructionLockManager
         ));
 
         assertDoesNotThrow(checkpointer::doCheckpoint);
@@ -424,18 +434,24 @@ public class CheckpointerTest extends 
BaseIgniteAbstractTest {
 
         Compactor compactor = mock(Compactor.class);
 
-        Checkpointer checkpointer = spy(new Checkpointer(
+        var partitionDestructionLockManager = new 
PartitionDestructionLockManager();
+
+        var checkpointer = spy(new Checkpointer(
                 "test",
                 null,
                 mock(FailureManager.class),
                 createCheckpointWorkflow(dirtyPages),
-                createCheckpointPagesWriterFactory(new 
PartitionMetaManager(ioRegistry, PAGE_SIZE, FACTORY)),
+                createCheckpointPagesWriterFactory(
+                        new PartitionMetaManager(ioRegistry, PAGE_SIZE, 
FACTORY),
+                        partitionDestructionLockManager
+                ),
                 createFilePageStoreManager(Map.of()),
                 mock(PartitionMetaManager.class),
                 compactor,
                 PAGE_SIZE,
                 checkpointConfig,
-                mock(LogSyncer.class)
+                mock(LogSyncer.class),
+                partitionDestructionLockManager
         ));
 
         assertDoesNotThrow(checkpointer::doCheckpoint);
@@ -462,7 +478,8 @@ public class CheckpointerTest extends 
BaseIgniteAbstractTest {
                 mock(Compactor.class),
                 PAGE_SIZE,
                 checkpointConfig,
-                mock(LogSyncer.class)
+                mock(LogSyncer.class),
+                new PartitionDestructionLockManager()
         );
 
         // Checks case 0 deviation.
@@ -491,49 +508,6 @@ public class CheckpointerTest extends 
BaseIgniteAbstractTest {
         );
     }
 
-    @Test
-    void testPrepareToDestroyPartition() throws Exception {
-        Checkpointer checkpointer = new Checkpointer(
-                "test",
-                null,
-                mock(FailureManager.class),
-                mock(CheckpointWorkflow.class),
-                mock(CheckpointPagesWriterFactory.class),
-                mock(FilePageStoreManager.class),
-                mock(PartitionMetaManager.class),
-                mock(Compactor.class),
-                PAGE_SIZE,
-                checkpointConfig,
-                mock(LogSyncer.class)
-        );
-
-        GroupPartitionId groupPartitionId = new GroupPartitionId(0, 0);
-
-        // Everything should be fine as there is no current running checkpoint.
-        checkpointer.prepareToDestroyPartition(groupPartitionId).get(1, 
SECONDS);
-
-        CheckpointProgressImpl checkpointProgress = (CheckpointProgressImpl) 
checkpointer.scheduledProgress();
-
-        checkpointer.startCheckpointProgress();
-
-        checkpointer.prepareToDestroyPartition(groupPartitionId).get(1, 
SECONDS);
-
-        checkpointProgress.transitTo(LOCK_RELEASED);
-        assertTrue(checkpointProgress.inProgress());
-
-        // Everything should be fine so on a "working" checkpoint we don't 
process the partition anyhow.
-        checkpointer.prepareToDestroyPartition(groupPartitionId).get(1, 
SECONDS);
-
-        // Let's emulate that we are processing a partition and check that 
everything will be fine after processing is completed.
-        checkpointProgress.blockPartitionDestruction(groupPartitionId);
-
-        CompletableFuture<?> onPartitionDestructionFuture = 
checkpointer.prepareToDestroyPartition(groupPartitionId);
-
-        checkpointProgress.unblockPartitionDestruction(groupPartitionId);
-
-        onPartitionDestructionFuture.get(1, SECONDS);
-    }
-
     private static CheckpointDirtyPages dirtyPages(PersistentPageMemory 
pageMemory, DirtyFullPageId... pageIds) {
         return new 
CheckpointDirtyPages(List.of(createDirtyPagesAndPartitions(pageMemory, 
pageIds)));
     }
@@ -573,12 +547,16 @@ public class CheckpointerTest extends 
BaseIgniteAbstractTest {
         return mock;
     }
 
-    private CheckpointPagesWriterFactory 
createCheckpointPagesWriterFactory(PartitionMetaManager partitionMetaManager) {
+    private CheckpointPagesWriterFactory createCheckpointPagesWriterFactory(
+            PartitionMetaManager partitionMetaManager,
+            PartitionDestructionLockManager partitionDestructionLockManager
+    ) {
         return new CheckpointPagesWriterFactory(
                 mock(WriteDirtyPage.class),
                 ioRegistry,
                 partitionMetaManager,
-                PAGE_SIZE
+                PAGE_SIZE,
+                partitionDestructionLockManager
         );
     }
 
diff --git 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java
 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java
index fd2917c41bf..1af89b6e7c8 100644
--- 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java
+++ 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java
@@ -45,6 +45,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.internal.failure.FailureManager;
 import org.apache.ignite.internal.pagememory.io.PageIo;
 import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
+import 
org.apache.ignite.internal.pagememory.persistence.PartitionDestructionLockManager;
 import 
org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStoreIo;
 import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
 import 
org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
@@ -61,14 +62,7 @@ public class CompactorTest extends BaseIgniteAbstractTest {
 
     @Test
     void testStartAndStop() throws Exception {
-        var compactor = new Compactor(
-                log,
-                "test",
-                1,
-                mock(FilePageStoreManager.class),
-                PAGE_SIZE,
-                mock(FailureManager.class)
-        );
+        Compactor compactor = newCompactor();
 
         assertNull(compactor.runner());
 
@@ -95,13 +89,7 @@ public class CompactorTest extends BaseIgniteAbstractTest {
 
     @Test
     void testMergeDeltaFileToMainFile() throws Throwable {
-        Compactor compactor = new Compactor(
-                log,
-                "test",
-                1,
-                mock(FilePageStoreManager.class),
-                PAGE_SIZE,
-                mock(FailureManager.class));
+        Compactor compactor = newCompactor();
 
         FilePageStore filePageStore = mock(FilePageStore.class);
         DeltaFilePageStoreIo deltaFilePageStoreIo = 
mock(DeltaFilePageStoreIo.class);
@@ -147,13 +135,7 @@ public class CompactorTest extends BaseIgniteAbstractTest {
 
         when(filePageStoreManager.allPageStores()).then(answer -> 
groupPageStoresMap.getAll());
 
-        Compactor compactor = spy(new Compactor(
-                log,
-                "test",
-                1,
-                filePageStoreManager,
-                PAGE_SIZE,
-                mock(FailureManager.class)));
+        Compactor compactor = spy(newCompactor(filePageStoreManager));
 
         doAnswer(answer -> {
             assertSame(filePageStore, answer.getArgument(0));
@@ -179,13 +161,7 @@ public class CompactorTest extends BaseIgniteAbstractTest {
 
     @Test
     void testBody() throws Exception {
-        Compactor compactor = spy(new Compactor(
-                log,
-                "test",
-                1,
-                mock(FilePageStoreManager.class),
-                PAGE_SIZE,
-                mock(FailureManager.class)));
+        Compactor compactor = spy(newCompactor());
 
         doNothing().when(compactor).waitDeltaFiles();
 
@@ -204,13 +180,7 @@ public class CompactorTest extends BaseIgniteAbstractTest {
 
     @Test
     void testWaitDeltaFiles() throws Exception {
-        Compactor compactor = spy(new Compactor(
-                log,
-                "test",
-                1,
-                mock(FilePageStoreManager.class),
-                PAGE_SIZE,
-                mock(FailureManager.class)));
+        Compactor compactor = spy(newCompactor());
 
         CompletableFuture<?> waitDeltaFilesFuture = 
runAsync(compactor::waitDeltaFiles);
 
@@ -223,13 +193,7 @@ public class CompactorTest extends BaseIgniteAbstractTest {
 
     @Test
     void testCancel() throws Exception {
-        Compactor compactor = spy(new Compactor(
-                log,
-                "test",
-                1,
-                mock(FilePageStoreManager.class),
-                PAGE_SIZE,
-                mock(FailureManager.class)));
+        Compactor compactor = spy(newCompactor());
 
         assertFalse(compactor.isCancelled());
 
@@ -244,4 +208,20 @@ public class CompactorTest extends BaseIgniteAbstractTest {
 
         waitDeltaFilesFuture.get(100, MILLISECONDS);
     }
+
+    private Compactor newCompactor() {
+        return newCompactor(mock(FilePageStoreManager.class));
+    }
+
+    private Compactor newCompactor(FilePageStoreManager filePageStoreManager) {
+        return new Compactor(
+                log,
+                "test",
+                1,
+                filePageStoreManager,
+                PAGE_SIZE,
+                mock(FailureManager.class),
+                new PartitionDestructionLockManager()
+        );
+    }
 }
diff --git 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/replacement/AbstractPageReplacementTest.java
 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/replacement/AbstractPageReplacementTest.java
index ba2a750a27f..13f8a2f0787 100644
--- 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/replacement/AbstractPageReplacementTest.java
+++ 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/replacement/AbstractPageReplacementTest.java
@@ -154,7 +154,8 @@ public abstract class AbstractPageReplacementTest extends 
IgniteAbstractTest {
                 filePageStoreManager,
                 checkpointManager::writePageToFilePageStore,
                 checkpointManager.checkpointTimeoutLock(),
-                new 
OffheapReadWriteLock(OffheapReadWriteLock.DEFAULT_CONCURRENCY_LEVEL)
+                new 
OffheapReadWriteLock(OffheapReadWriteLock.DEFAULT_CONCURRENCY_LEVEL),
+                checkpointManager.partitionDestructionLockManager()
         );
 
         dataRegionList.add(() -> pageMemory);
diff --git 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/throttling/PageMemoryThrottlingTest.java
 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/throttling/PageMemoryThrottlingTest.java
index 40fe9ad3ae9..ddd1b902e63 100644
--- 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/throttling/PageMemoryThrottlingTest.java
+++ 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/throttling/PageMemoryThrottlingTest.java
@@ -173,7 +173,8 @@ public class PageMemoryThrottlingTest extends 
IgniteAbstractTest {
                     checkpointProgress.evictedPagesCounter().incrementAndGet();
                 },
                 checkpointManager.checkpointTimeoutLock(),
-                new OffheapReadWriteLock(2)
+                new OffheapReadWriteLock(2),
+                checkpointManager.partitionDestructionLockManager()
         );
 
         pageStoreManager.start();
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryDataRegion.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryDataRegion.java
index fed1a143b38..45b54e3dc78 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryDataRegion.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryDataRegion.java
@@ -173,7 +173,8 @@ public class PersistentPageMemoryDataRegion implements 
DataRegion<PersistentPage
                 filePageStoreManager,
                 this::flushDirtyPageOnReplacement,
                 checkpointManager.checkpointTimeoutLock(),
-                new 
OffheapReadWriteLock(OffheapReadWriteLock.DEFAULT_CONCURRENCY_LEVEL)
+                new 
OffheapReadWriteLock(OffheapReadWriteLock.DEFAULT_CONCURRENCY_LEVEL),
+                checkpointManager.partitionDestructionLockManager()
         );
 
         initThrottling(pageMemory);
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
index 12d08279c3e..e9c50688e0b 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
@@ -27,6 +27,7 @@ import java.nio.ByteBuffer;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.Lock;
 import java.util.function.Supplier;
 import org.apache.ignite.internal.failure.FailureProcessor;
 import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
@@ -108,6 +109,8 @@ public class PersistentPageMemoryTableStorage extends 
AbstractPageMemoryTableSto
     protected void finishDestruction() {
         dataRegion.pageMemory().onGroupDestroyed(getTableId());
 
+        
dataRegion.checkpointManager().partitionDestructionLockManager().removeLockForGroup(getTableId());
+
         try {
             
dataRegion.filePageStoreManager().destroyGroupIfExists(getTableId());
         } catch (IOException e) {
@@ -364,11 +367,20 @@ public class PersistentPageMemoryTableStorage extends 
AbstractPageMemoryTableSto
     private CompletableFuture<Void> 
destroyPartitionPhysically(GroupPartitionId groupPartitionId) {
         
dataRegion.filePageStoreManager().getStore(groupPartitionId).markToDestroy();
 
-        dataRegion.pageMemory().invalidate(groupPartitionId.getGroupId(), 
groupPartitionId.getPartitionId());
+        Lock partitionDestructionLock = 
dataRegion.checkpointManager().partitionDestructionLockManager().destructionLock(groupPartitionId)
+                .writeLock();
+
+        partitionDestructionLock.lock();
+
+        try {
+            dataRegion.pageMemory().invalidate(groupPartitionId.getGroupId(), 
groupPartitionId.getPartitionId());
 
-        return 
dataRegion.checkpointManager().onPartitionDestruction(groupPartitionId)
-                .thenAccept(unused -> 
dataRegion.partitionMetaManager().removeMeta(groupPartitionId))
-                .thenCompose(unused -> 
dataRegion.filePageStoreManager().destroyPartition(groupPartitionId));
+            dataRegion.partitionMetaManager().removeMeta(groupPartitionId);
+
+            return 
dataRegion.filePageStoreManager().destroyPartition(groupPartitionId);
+        } finally {
+            partitionDestructionLock.unlock();
+        }
     }
 
     private GroupPartitionId createGroupPartitionId(int partitionId) {
diff --git 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryNoLoadTest.java
 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryNoLoadTest.java
index 1520688a02e..e90e71da7fb 100644
--- 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryNoLoadTest.java
+++ 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryNoLoadTest.java
@@ -70,6 +70,7 @@ import 
org.apache.ignite.internal.pagememory.configuration.PersistentDataRegionC
 import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
 import org.apache.ignite.internal.pagememory.persistence.DirtyFullPageId;
 import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
+import 
org.apache.ignite.internal.pagememory.persistence.PartitionDestructionLockManager;
 import 
org.apache.ignite.internal.pagememory.persistence.PartitionMeta.PartitionMetaSnapshot;
 import org.apache.ignite.internal.pagememory.persistence.PartitionMetaManager;
 import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
@@ -448,7 +449,7 @@ public class PersistentPageMemoryNoLoadTest extends 
AbstractPageMemoryNoLoadSelf
         }
     }
 
-    protected PersistentPageMemory createPageMemory(
+    private PersistentPageMemory createPageMemory(
             long[] segmentSizes,
             long checkpointBufferSize,
             @Nullable FilePageStoreManager filePageStoreManager,
@@ -464,7 +465,8 @@ public class PersistentPageMemoryNoLoadTest extends 
AbstractPageMemoryNoLoadSelf
                 filePageStoreManager == null ? new TestPageReadWriteManager() 
: filePageStoreManager,
                 flushDirtyPageForReplacement,
                 checkpointManager == null ? mockCheckpointTimeoutLock(true) : 
checkpointManager.checkpointTimeoutLock(),
-                new 
OffheapReadWriteLock(OffheapReadWriteLock.DEFAULT_CONCURRENCY_LEVEL)
+                new 
OffheapReadWriteLock(OffheapReadWriteLock.DEFAULT_CONCURRENCY_LEVEL),
+                checkpointManager == null ? new 
PartitionDestructionLockManager() : 
checkpointManager.partitionDestructionLockManager()
         );
     }
 


Reply via email to