This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new f79bd4f3e IGNITE-17267 Preparing a checkpoint for a DeltaFilePageStore 
(#907)
f79bd4f3e is described below

commit f79bd4f3edac372962a3f43e414b8338c596d682
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Thu Jul 7 11:05:54 2022 +0300

    IGNITE-17267 Preparing a checkpoint for a DeltaFilePageStore (#907)
---
 .../asm/ConfigurationAsmGenerator.java             |   5 +-
 .../ignite/internal/thread/NamedThreadFactory.java |   1 -
 .../ignite/internal/util/CollectionUtils.java      |  10 +-
 .../util}/IgniteConcurrentMultiPairQueue.java      |   2 +-
 .../java/org/apache/ignite/lang/IgniteBiTuple.java |  42 ++-
 .../ignite/internal/util/CollectionUtilsTest.java  |  66 ++--
 .../util}/IgniteConcurrentMultiPairQueueTest.java  |   6 +-
 .../ignite/internal/pagememory/FullPageId.java     |   7 +
 .../PageMemoryCheckpointConfigurationSchema.java   |  19 --
 .../persistence/PersistentPageMemory.java          |   2 +-
 .../persistence/checkpoint/Checkpoint.java         |  13 +-
 .../checkpoint/CheckpointDirtyPages.java           | 358 +++++++++++++++++++++
 .../persistence/checkpoint/CheckpointManager.java  |   2 +-
 .../checkpoint/CheckpointPagesWriter.java          |  39 ++-
 .../checkpoint/CheckpointPagesWriterFactory.java   |   9 +-
 .../checkpoint/CheckpointProgressImpl.java         |   6 +-
 .../persistence/checkpoint/CheckpointWorkflow.java | 161 +++++----
 .../checkpoint/CheckpointWriteOrder.java           |  51 ---
 .../persistence/checkpoint/Checkpointer.java       |  11 +-
 ...sInfoHolder.java => DataRegionsDirtyPages.java} |  16 +-
 .../internal/pagememory/util/PageIdUtils.java      |   2 +-
 .../checkpoint/CheckpointDirtyPagesTest.java       | 295 +++++++++++++++++
 .../checkpoint/CheckpointPagesWriterTest.java      |  11 +-
 .../persistence/checkpoint/CheckpointTest.java     |   8 +-
 .../checkpoint/CheckpointTestUtils.java            |  14 +
 .../checkpoint/CheckpointWorkflowTest.java         | 217 +++++++------
 .../persistence/checkpoint/CheckpointerTest.java   |  26 +-
 .../sql/engine/rule/LogicalScanConverterRule.java  |   6 +-
 28 files changed, 1001 insertions(+), 404 deletions(-)

diff --git 
a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/asm/ConfigurationAsmGenerator.java
 
b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/asm/ConfigurationAsmGenerator.java
index 590269004..9aaa28669 100644
--- 
a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/asm/ConfigurationAsmGenerator.java
+++ 
b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/asm/ConfigurationAsmGenerator.java
@@ -69,7 +69,6 @@ import static 
org.apache.ignite.internal.configuration.util.ConfigurationUtil.po
 import static 
org.apache.ignite.internal.configuration.util.ConfigurationUtil.schemaFields;
 import static org.apache.ignite.internal.util.ArrayUtils.nullOrEmpty;
 import static org.apache.ignite.internal.util.CollectionUtils.concat;
-import static org.apache.ignite.internal.util.CollectionUtils.union;
 import static org.objectweb.asm.Opcodes.H_NEWINVOKESPECIAL;
 import static org.objectweb.asm.Type.getMethodDescriptor;
 import static org.objectweb.asm.Type.getMethodType;
@@ -1329,7 +1328,7 @@ public class ConfigurationAsmGenerator {
                     .expression(keyVar)
                     .defaultCase(throwException(NoSuchElementException.class, 
keyVar));
 
-            for (Field schemaField : union(schemaFields, internalFields)) {
+            for (Field schemaField : concat(schemaFields, internalFields)) {
                 if (isInjectedName(schemaField)) {
                     continue;
                 }
@@ -1489,7 +1488,7 @@ public class ConfigurationAsmGenerator {
                     .expression(keyVar)
                     .defaultCase(throwException(NoSuchElementException.class, 
keyVar));
 
-            for (Field schemaField : union(schemaFields, internalFields)) {
+            for (Field schemaField : concat(schemaFields, internalFields)) {
                 if (isInjectedName(schemaField)) {
                     continue;
                 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/thread/NamedThreadFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/thread/NamedThreadFactory.java
index 792b6d285..ec9869cc4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/thread/NamedThreadFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/thread/NamedThreadFactory.java
@@ -91,5 +91,4 @@ public class NamedThreadFactory implements ThreadFactory {
     public static String threadPrefix(String nodeName, String poolName) {
         return IgniteThread.threadPrefix(nodeName, poolName);
     }
-
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
index ad0372ddb..a5d0ba607 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
@@ -159,14 +159,14 @@ public final class CollectionUtils {
     }
 
     /**
-     * Union collections.
+     * Concatenates collections.
      *
      * @param collections Collections.
-     * @param <T> Type of the elements of collections.
-     * @return Immutable union of collections.
+     * @param <T> Type of the elements of lists.
+     * @return Immutable collection concatenation.
      */
     @SafeVarargs
-    public static <T> Collection<T> union(Collection<T>... collections) {
+    public static <T> Collection<T> concat(Collection<T>... collections) {
         if (collections == null || collections.length == 0) {
             return List.of();
         }
@@ -175,7 +175,7 @@ public final class CollectionUtils {
             /** {@inheritDoc} */
             @Override
             public Iterator<T> iterator() {
-                return concat(collections).iterator();
+                return concat((Iterable<T>[]) collections).iterator();
             }
 
             /** {@inheritDoc} */
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/IgniteConcurrentMultiPairQueue.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteConcurrentMultiPairQueue.java
similarity index 98%
rename from 
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/IgniteConcurrentMultiPairQueue.java
rename to 
modules/core/src/main/java/org/apache/ignite/internal/util/IgniteConcurrentMultiPairQueue.java
index b894305ed..e669817a5 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/IgniteConcurrentMultiPairQueue.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteConcurrentMultiPairQueue.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+package org.apache.ignite.internal.util;
 
 import static java.util.function.Predicate.not;
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/lang/IgniteBiTuple.java 
b/modules/core/src/main/java/org/apache/ignite/lang/IgniteBiTuple.java
index 3c761943c..f73e6f3ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/lang/IgniteBiTuple.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteBiTuple.java
@@ -27,6 +27,8 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
+import org.apache.ignite.internal.tostring.S;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -35,16 +37,17 @@ import org.jetbrains.annotations.Nullable;
  * @param <V1> First element type.
  * @param <V2> Second element type.
  */
-public class IgniteBiTuple<V1, V2> implements Map<V1, V2>, Map.Entry<V1, V2>,
-        Iterable<Object>, Externalizable, Cloneable {
+public class IgniteBiTuple<V1, V2> implements Map<V1, V2>, Map.Entry<V1, V2>, 
Iterable<Object>, Externalizable, Cloneable {
     /** Serial version uid. */
     private static final long serialVersionUID = 0L;
 
     /** First value. */
-    private V1 val1;
+    @IgniteToStringInclude
+    private @Nullable V1 val1;
 
     /** Second value. */
-    private V2 val2;
+    @IgniteToStringInclude
+    private @Nullable V2 val2;
 
     /**
      * Empty constructor required by {@link Externalizable}.
@@ -75,19 +78,15 @@ public class IgniteBiTuple<V1, V2> implements Map<V1, V2>, 
Map.Entry<V1, V2>,
 
     /**
      * Gets first value.
-     *
-     * @return First value.
      */
-    public V1 get1() {
+    public @Nullable V1 get1() {
         return val1;
     }
 
     /**
      * Gets second value.
-     *
-     * @return Second value.
      */
-    public V2 get2() {
+    public @Nullable V2 get2() {
         return val2;
     }
 
@@ -121,16 +120,14 @@ public class IgniteBiTuple<V1, V2> implements Map<V1, 
V2>, Map.Entry<V1, V2>,
     }
 
     /** {@inheritDoc} */
-    @Nullable
     @Override
-    public V1 getKey() {
+    public @Nullable V1 getKey() {
         return val1;
     }
 
     /** {@inheritDoc} */
-    @Nullable
     @Override
-    public V2 getValue() {
+    public @Nullable V2 getValue() {
         return val2;
     }
 
@@ -151,14 +148,15 @@ public class IgniteBiTuple<V1, V2> implements Map<V1, 
V2>, Map.Entry<V1, V2>,
             /** Next index. */
             private int nextIdx = 1;
 
+            /** {@inheritDoc} */
             @Override
             public boolean hasNext() {
                 return nextIdx < 3;
             }
 
-            @Nullable
+            /** {@inheritDoc} */
             @Override
-            public Object next() {
+            public @Nullable Object next() {
                 if (!hasNext()) {
                     throw new NoSuchElementException();
                 }
@@ -176,6 +174,7 @@ public class IgniteBiTuple<V1, V2> implements Map<V1, V2>, 
Map.Entry<V1, V2>,
                 return res;
             }
 
+            /** {@inheritDoc} */
             @Override
             public void remove() {
                 throw new UnsupportedOperationException();
@@ -208,16 +207,14 @@ public class IgniteBiTuple<V1, V2> implements Map<V1, 
V2>, Map.Entry<V1, V2>,
     }
 
     /** {@inheritDoc} */
-    @Nullable
     @Override
-    public V2 get(Object key) {
+    public @Nullable V2 get(Object key) {
         return containsKey(key) ? val2 : null;
     }
 
     /** {@inheritDoc} */
-    @Nullable
     @Override
-    public V2 put(V1 key, V2 val) {
+    public @Nullable V2 put(V1 key, V2 val) {
         V2 old = containsKey(key) ? val2 : null;
 
         set(key, val);
@@ -226,9 +223,8 @@ public class IgniteBiTuple<V1, V2> implements Map<V1, V2>, 
Map.Entry<V1, V2>,
     }
 
     /** {@inheritDoc} */
-    @Nullable
     @Override
-    public V2 remove(Object key) {
+    public @Nullable V2 remove(Object key) {
         if (containsKey(key)) {
             V2 v2 = val2;
 
@@ -347,6 +343,6 @@ public class IgniteBiTuple<V1, V2> implements Map<V1, V2>, 
Map.Entry<V1, V2>,
     /** {@inheritDoc} */
     @Override
     public String toString() {
-        return "S.toString(IgniteBiTuple.class, this)";
+        return S.toString(IgniteBiTuple.class, this);
     }
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/util/CollectionUtilsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/util/CollectionUtilsTest.java
index f4f18d600..a63c02a76 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/util/CollectionUtilsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/util/CollectionUtilsTest.java
@@ -25,6 +25,8 @@ import static 
org.apache.ignite.internal.util.CollectionUtils.difference;
 import static org.apache.ignite.internal.util.CollectionUtils.setOf;
 import static org.apache.ignite.internal.util.CollectionUtils.union;
 import static org.apache.ignite.internal.util.CollectionUtils.viewReadOnly;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -175,44 +177,44 @@ public class CollectionUtilsTest {
     }
 
     @Test
-    void testCollectionUnion() {
-        assertTrue(union(new Collection[0]).isEmpty());
-        assertTrue(union((Collection<Object>[]) null).isEmpty());
-        assertTrue(union(List.of()).isEmpty());
-
-        assertEquals(List.of(1), collect(union(List.of(1), Set.of())));
-        assertEquals(List.of(1), collect(union(List.of(), Set.of(1))));
-
-        assertEquals(List.of(1, 2), collect(union(List.of(1), Set.of(2))));
-        assertEquals(List.of(1, 2, 2), collect(union(List.of(1), List.of(2), 
Set.of(2))));
-
-        assertFalse(union(new Collection[0]).contains(0));
-        assertFalse(union(List.of()).contains(0));
-        assertFalse(union(List.of(1)).contains(0));
-        assertFalse(union(List.of(1), Set.of()).contains(0));
-        assertFalse(union(List.of(), Set.of(1)).contains(0));
-        assertFalse(union(List.of(1), Set.of(2, 3)).contains(0));
-
-        assertTrue(union(List.of(0)).contains(0));
-        assertTrue(union(List.of(), Set.of(0)).contains(0));
-        assertTrue(union(List.of(0), Set.of()).contains(0));
-
-        assertEquals(0, union(new Collection[0]).size());
-        assertEquals(0, union(List.of()).size());
-        assertEquals(1, union(List.of(1)).size());
-        assertEquals(1, union(List.of(), Set.of(1)).size());
-        assertEquals(1, union(List.of(1), Set.of()).size());
-        assertEquals(2, union(List.of(1), Set.of(2)).size());
-        assertEquals(3, union(List.of(1), Set.of(2, 3)).size());
-        assertEquals(5, union(List.of(1, 4, 5), Set.of(2, 3)).size());
+    void testConcatCollection() {
+        assertTrue(concat(new Collection[0]).isEmpty());
+        assertTrue(concat((Collection<Object>[]) null).isEmpty());
+        assertTrue(concat((Collection<Object>) List.of()).isEmpty());
+
+        assertThat(concat(List.of(1), Set.of()), contains(1));
+        assertThat(concat(List.of(), Set.of(1)), contains(1));
+
+        assertThat(concat(List.of(1), Set.of(2)), contains(1, 2));
+        assertThat(concat(List.of(1), List.of(2), Set.of(2)), contains(1, 2, 
2));
+
+        assertFalse(concat(new Collection[0]).contains(0));
+        assertFalse(concat((Collection<Object>) List.of()).contains(0));
+        assertFalse(concat((Collection<Integer>) List.of(1)).contains(0));
+        assertFalse(concat(List.of(1), Set.of()).contains(0));
+        assertFalse(concat(List.of(), Set.of(1)).contains(0));
+        assertFalse(concat(List.of(1), Set.of(2, 3)).contains(0));
+
+        assertTrue(concat((Collection<Integer>) List.of(0)).contains(0));
+        assertTrue(concat(List.of(), Set.of(0)).contains(0));
+        assertTrue(concat(List.of(0), Set.of()).contains(0));
+
+        assertEquals(0, concat(new Collection[0]).size());
+        assertEquals(0, concat((Collection<Object>) List.of()).size());
+        assertEquals(1, concat((Collection<Integer>) List.of(1)).size());
+        assertEquals(1, concat(List.of(), Set.of(1)).size());
+        assertEquals(1, concat(List.of(1), Set.of()).size());
+        assertEquals(2, concat(List.of(1), Set.of(2)).size());
+        assertEquals(3, concat(List.of(1), Set.of(2, 3)).size());
+        assertEquals(5, concat(List.of(1, 4, 5), Set.of(2, 3)).size());
 
         Collection<Integer> integers = new ArrayList<>(List.of(1, 2, 3));
 
-        Collection<Integer> union = union(integers);
+        Collection<Integer> concat = concat(integers);
 
         integers.remove(1);
 
-        assertEquals(2, union.size());
+        assertEquals(2, concat.size());
     }
 
     /**
diff --git 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/IgniteConcurrentMultiPairQueueTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteConcurrentMultiPairQueueTest.java
similarity index 95%
rename from 
modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/IgniteConcurrentMultiPairQueueTest.java
rename to 
modules/core/src/test/java/org/apache/ignite/internal/util/IgniteConcurrentMultiPairQueueTest.java
index f017c6a8b..8eda56ee2 100644
--- 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/IgniteConcurrentMultiPairQueueTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteConcurrentMultiPairQueueTest.java
@@ -15,12 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+package org.apache.ignite.internal.util;
 
 import static java.util.Collections.synchronizedCollection;
 import static java.util.concurrent.ThreadLocalRandom.current;
-import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.IgniteConcurrentMultiPairQueue.EMPTY;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runMultiThreaded;
+import static 
org.apache.ignite.internal.util.IgniteConcurrentMultiPairQueue.EMPTY;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -33,7 +33,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.IgniteConcurrentMultiPairQueue.Result;
+import org.apache.ignite.internal.util.IgniteConcurrentMultiPairQueue.Result;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/FullPageId.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/FullPageId.java
index 10d973fc0..5a99889e0 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/FullPageId.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/FullPageId.java
@@ -171,4 +171,11 @@ public final class FullPageId {
                 .app(", effectivePageId=").appendHex(effectivePageId())
                 .app(", groupId=").app(groupId).app(']').toString();
     }
+
+    /**
+     * Returns the partition ID.
+     */
+    public int partitionId() {
+        return PageIdUtils.partitionId(pageId);
+    }
 }
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/configuration/schema/PageMemoryCheckpointConfigurationSchema.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/configuration/schema/PageMemoryCheckpointConfigurationSchema.java
index c58792b3d..b80a18a30 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/configuration/schema/PageMemoryCheckpointConfigurationSchema.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/configuration/schema/PageMemoryCheckpointConfigurationSchema.java
@@ -19,21 +19,13 @@ package 
org.apache.ignite.internal.pagememory.configuration.schema;
 
 import org.apache.ignite.configuration.annotation.Config;
 import org.apache.ignite.configuration.annotation.Value;
-import org.apache.ignite.configuration.validation.OneOf;
 import org.apache.ignite.configuration.validation.Range;
-import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointWriteOrder;
 
 /**
  * Checkpoint configuration schema for persistent page memory.
  */
 @Config
 public class PageMemoryCheckpointConfigurationSchema {
-    /** See description of {@link CheckpointWriteOrder#RANDOM}. */
-    public static final String RANDOM_WRITE_ORDER = "RANDOM";
-
-    /** See description of {@link CheckpointWriteOrder#SEQUENTIAL}. */
-    public static final String SEQUENTIAL_WRITE_ORDER = "SEQUENTIAL";
-
     /** Checkpoint frequency in milliseconds. */
     @Range(min = 0)
     @Value(hasDefault = true)
@@ -49,17 +41,6 @@ public class PageMemoryCheckpointConfigurationSchema {
     @Value(hasDefault = true)
     public int threads = 4;
 
-    /** Checkpoint write order. */
-    @OneOf({RANDOM_WRITE_ORDER, SEQUENTIAL_WRITE_ORDER})
-    @Value(hasDefault = true)
-    public String writeOrder = SEQUENTIAL_WRITE_ORDER;
-
-    /**
-     * Starting from this number of dirty pages in checkpoint, they will be 
sorted in parallel in case of {@link #SEQUENTIAL_WRITE_ORDER}.
-     */
-    @Value(hasDefault = true)
-    public int parallelSortThreshold = 512 * 1024;
-
     /** Timeout for checkpoint read lock acquisition in milliseconds. */
     @Range(min = 0)
     @Value(hasDefault = true)
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
index 71bcd65b8..badf647ae 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
@@ -2095,7 +2095,7 @@ public class PersistentPageMemory implements PageMemory {
 
         safeToUpdate.set(true);
 
-        return CollectionUtils.union(collections);
+        return CollectionUtils.concat(collections);
     }
 
     /**
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpoint.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpoint.java
index cb0f07682..1c85e1a85 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpoint.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpoint.java
@@ -17,15 +17,12 @@
 
 package org.apache.ignite.internal.pagememory.persistence.checkpoint;
 
-import org.apache.ignite.internal.pagememory.FullPageId;
-import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
-
 /**
  * Data class of checkpoint information.
  */
 class Checkpoint {
-    /** Checkpoint pages. */
-    final IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> 
dirtyPages;
+    /** Sorted dirty pages from data regions that should be checkpointed. */
+    final CheckpointDirtyPages dirtyPages;
 
     /** Checkpoint progress status. */
     final CheckpointProgressImpl progress;
@@ -36,17 +33,17 @@ class Checkpoint {
     /**
      * Constructor.
      *
-     * @param dirtyPages Pages to write to the page store.
+     * @param dirtyPages Sorted dirty pages from data regions that should be 
checkpointed.
      * @param progress Checkpoint progress status.
      */
     Checkpoint(
-            IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> 
dirtyPages,
+            CheckpointDirtyPages dirtyPages,
             CheckpointProgressImpl progress
     ) {
         this.dirtyPages = dirtyPages;
         this.progress = progress;
 
-        dirtyPagesSize = dirtyPages.initialSize();
+        dirtyPagesSize = dirtyPages.dirtyPagesCount();
     }
 
     /**
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java
new file mode 100644
index 000000000..4010a8385
--- /dev/null
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.Collections.binarySearch;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static 
org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.RandomAccess;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Sorted dirty pages from data regions that should be checkpointed.
+ *
+ * <p>Dirty pages should be sorted by groupId -> partitionId -> pageIdx.
+ */
+class CheckpointDirtyPages {
+    /** Dirty page ID comparator. */
+    static final Comparator<FullPageId> DIRTY_PAGE_COMPARATOR = Comparator
+            .comparingInt(FullPageId::groupId)
+            .thenComparingLong(FullPageId::effectivePageId);
+
+    /** Empty checkpoint dirty pages. */
+    static final CheckpointDirtyPages EMPTY = new 
CheckpointDirtyPages(List.of());
+
+    /** Sorted dirty pages from data regions by groupId -> partitionId -> 
pageIdx. */
+    private final List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>> 
dirtyPages;
+
+    /** Total number of dirty pages. */
+    private final int dirtyPagesCount;
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Sorted dirty pages from data regions by groupId -> 
partitionId -> pageIdx.
+     */
+    public CheckpointDirtyPages(Map<PersistentPageMemory, List<FullPageId>> 
dirtyPages) {
+        this(dirtyPages.entrySet().stream().map(e -> new 
IgniteBiTuple<>(e.getKey(), e.getValue())).collect(toList()));
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Sorted dirty pages from data regions by groupId -> 
partitionId -> pageIdx.
+     */
+    public CheckpointDirtyPages(List<IgniteBiTuple<PersistentPageMemory, 
List<FullPageId>>> dirtyPages) {
+        assert dirtyPages instanceof RandomAccess : dirtyPages;
+
+        this.dirtyPages = dirtyPages;
+
+        int count = 0;
+
+        for (IgniteBiTuple<PersistentPageMemory, List<FullPageId>> pages : 
dirtyPages) {
+            assert !pages.getValue().isEmpty() : pages.getKey();
+            assert pages.getValue() instanceof RandomAccess : pages.getValue();
+
+            count += pages.getValue().size();
+        }
+
+        dirtyPagesCount = count;
+    }
+
+    /**
+     * Returns total number of dirty pages.
+     */
+    public int dirtyPagesCount() {
+        return dirtyPagesCount;
+    }
+
+    /**
+     * Returns a queue of dirty pages to be written to a checkpoint.
+     */
+    public CheckpointDirtyPagesQueue toQueue() {
+        return new CheckpointDirtyPagesQueue();
+    }
+
+    /**
+     * Looks for dirty page views for a specific group and partition.
+     *
+     * @param grpId Group ID.
+     * @param partId Partition ID.
+     */
+    public @Nullable CheckpointDirtyPagesView findView(int grpId, int partId) {
+        if (dirtyPages.isEmpty()) {
+            return null;
+        }
+
+        FullPageId startPageId = new FullPageId(pageId(partId, (byte) 0, 0), 
grpId);
+        FullPageId endPageId = new FullPageId(pageId(partId + 1, (byte) 0, 0), 
grpId);
+
+        for (int i = 0; i < dirtyPages.size(); i++) {
+            List<FullPageId> pageIds = dirtyPages.get(i).getValue();
+
+            int fromIndex = binarySearch(pageIds, startPageId, 
DIRTY_PAGE_COMPARATOR);
+
+            fromIndex = fromIndex >= 0 ? fromIndex : Math.min(pageIds.size() - 
1, -fromIndex - 1);
+
+            if (!equalsByGroupAndPartition(startPageId, 
pageIds.get(fromIndex))) {
+                continue;
+            }
+
+            int toIndex = binarySearch(pageIds.subList(fromIndex, 
pageIds.size()), endPageId, DIRTY_PAGE_COMPARATOR);
+
+            // toIndex cannot be 0 because endPageId is greater than 
startPageId by DIRTY_PAGE_COMPARATOR.
+            toIndex = toIndex > 0 ? toIndex - 1 : -toIndex - 2;
+
+            return new CheckpointDirtyPagesView(i, fromIndex, fromIndex + 
toIndex);
+        }
+
+        return null;
+    }
+
+    /**
+     * Looks for the next dirty page view from the current one, {@code null} 
if not found.
+     *
+     * @param currentView Current view to dirty pages, {@code null} to get 
first.
+     */
+    public @Nullable CheckpointDirtyPagesView nextView(@Nullable 
CheckpointDirtyPagesView currentView) {
+        assert currentView == null || currentView.owner() == this : 
currentView;
+
+        if (dirtyPages.isEmpty()) {
+            return null;
+        }
+
+        int regionIndex;
+        int fromPosition;
+
+        if (currentView == null) {
+            regionIndex = 0;
+            fromPosition = 0;
+        } else {
+            regionIndex = currentView.isToPositionLast() ? 
currentView.regionIndex + 1 : currentView.regionIndex;
+            fromPosition = currentView.isToPositionLast() ? 0 : 
currentView.toPosition + 1;
+        }
+
+        if (regionIndex >= dirtyPages.size()) {
+            return null;
+        }
+
+        List<FullPageId> pageIds = dirtyPages.get(regionIndex).getValue();
+
+        if (fromPosition == pageIds.size() - 1 || 
!equalsByGroupAndPartition(pageIds.get(fromPosition), pageIds.get(fromPosition 
+ 1))) {
+            return new CheckpointDirtyPagesView(regionIndex, fromPosition, 
fromPosition);
+        }
+
+        FullPageId startPageId = pageIds.get(fromPosition);
+        FullPageId endPageId = new 
FullPageId(pageId(partitionId(startPageId.pageId()) + 1, (byte) 0, 0), 
startPageId.groupId());
+
+        int toPosition = binarySearch(pageIds.subList(fromPosition, 
pageIds.size()), endPageId, DIRTY_PAGE_COMPARATOR);
+
+        toPosition = toPosition > 0 ? toPosition - 1 : -toPosition - 2;
+
+        return new CheckpointDirtyPagesView(regionIndex, fromPosition, 
fromPosition + toPosition);
+    }
+
+    /**
+     * Queue of dirty pages that will need to be written to a checkpoint.
+     *
+     * <p>Thread safe.
+     */
+    class CheckpointDirtyPagesQueue {
+        /** Current position in the queue. */
+        private final AtomicInteger position = new AtomicInteger();
+
+        /** Sizes of each element in {@link #dirtyPages} + the previous value 
in this array. */
+        private final int[] cumulativeSizes;
+
+        /**
+         * Private constructor.
+         */
+        private CheckpointDirtyPagesQueue() {
+            int size = 0;
+
+            int[] sizes = new int[dirtyPages.size()];
+
+            for (int i = 0; i < dirtyPages.size(); i++) {
+                sizes[i] = size += dirtyPages.get(i).getValue().size();
+            }
+
+            this.cumulativeSizes = sizes;
+        }
+
+        /**
+         * Returns {@link true} if the next element of the queue was obtained.
+         *
+         * @param result Holder is the result of getting the next dirty page.
+         */
+        public boolean next(QueueResult result) {
+            int queuePosition = this.position.getAndIncrement();
+
+            if (queuePosition >= dirtyPagesCount) {
+                result.owner = null;
+
+                return false;
+            }
+
+            if (result.owner != this) {
+                result.owner = this;
+                result.regionIndex = 0;
+            }
+
+            int regionIndex = result.regionIndex;
+
+            if (queuePosition >= cumulativeSizes[regionIndex]) {
+                if (queuePosition == cumulativeSizes[regionIndex]) {
+                    regionIndex++;
+                } else {
+                    regionIndex = findDirtyPagesIndex(regionIndex + 1, 
queuePosition);
+                }
+            }
+
+            result.regionIndex = regionIndex;
+            result.position = regionIndex > 0 ? queuePosition - 
cumulativeSizes[regionIndex - 1] : queuePosition;
+
+            return true;
+        }
+
+        /**
+         * Returns {@link true} if the queue is empty.
+         */
+        public boolean isEmpty() {
+            return position.get() >= dirtyPagesCount;
+        }
+
+        /**
+         * Returns the size of the queue.
+         */
+        public int size() {
+            return dirtyPagesCount - Math.min(dirtyPagesCount, position.get());
+        }
+
+        private int findDirtyPagesIndex(int index, int position) {
+            return Math.abs(Arrays.binarySearch(cumulativeSizes, index, 
cumulativeSizes.length, position) + 1);
+        }
+
+        private CheckpointDirtyPages owner() {
+            return CheckpointDirtyPages.this;
+        }
+    }
+
+    /**
+     * View of {@link CheckpointDirtyPages} in which all dirty pages will 
refer to the same {@link PersistentPageMemory} and contain the
+     * same groupId and partitionId and increasing pageIdx.
+     *
+     * <p>Thread safe.
+     */
+    class CheckpointDirtyPagesView {
+        /** Element index in {@link CheckpointDirtyPages#dirtyPages}. */
+        private final int regionIndex;
+
+        /** Starting position (inclusive) of the dirty page within the element 
at {@link #regionIndex}. */
+        private final int fromPosition;
+
+        /** End position (inclusive) of the dirty page within the element at 
{@link #regionIndex}. */
+        private final int toPosition;
+
+        /**
+         * Private constructor.
+         *
+         * @param regionIndex Element index in {@link 
CheckpointDirtyPages#dirtyPages}.
+         * @param fromPosition Starting position (inclusive) of the dirty page 
within the element at {@link #regionIndex}.
+         * @param toPosition End position (inclusive) of the dirty page within 
the element at {@link #regionIndex}.
+         */
+        private CheckpointDirtyPagesView(int regionIndex, int fromPosition, 
int toPosition) {
+            this.regionIndex = regionIndex;
+            this.fromPosition = fromPosition;
+            this.toPosition = toPosition;
+        }
+
+        /**
+         * Returns the dirty page by index.
+         *
+         * @param index Dirty page index.
+         */
+        public FullPageId get(int index) {
+            return 
dirtyPages.get(this.regionIndex).getValue().get(fromPosition + index);
+        }
+
+        /**
+         * Returns the page memory for view.
+         */
+        public PersistentPageMemory pageMemory() {
+            return dirtyPages.get(regionIndex).getKey();
+        }
+
+        /**
+         * Returns the size of the view.
+         */
+        public int size() {
+            return toPosition - fromPosition + 1;
+        }
+
+        private CheckpointDirtyPages owner() {
+            return CheckpointDirtyPages.this;
+        }
+
+        private boolean isToPositionLast() {
+            return toPosition == dirtyPages.get(regionIndex).getValue().size() 
- 1;
+        }
+    }
+
+    /**
+     * Holder is the result of getting the next dirty page in {@link 
CheckpointDirtyPagesQueue#next(QueueResult)}.
+     *
+     * <p>Not thread safe.
+     */
+    static class QueueResult {
+        private @Nullable CheckpointDirtyPagesQueue owner;
+
+        /** Element index in {@link CheckpointDirtyPages#dirtyPages}. */
+        private int regionIndex;
+
+        /** Position of the dirty page within the element at {@link 
#regionIndex}. */
+        private int position;
+
+        /**
+         * Returns the page memory for the associated dirty page.
+         */
+        public @Nullable PersistentPageMemory pageMemory() {
+            return owner == null ? null : 
owner.owner().dirtyPages.get(regionIndex).getKey();
+        }
+
+        /**
+         * Returns dirty page.
+         */
+        public @Nullable FullPageId dirtyPage() {
+            return owner == null ? null : 
owner.owner().dirtyPages.get(regionIndex).getValue().get(position);
+        }
+    }
+
+    private static boolean equalsByGroupAndPartition(FullPageId pageId0, 
FullPageId pageId1) {
+        return pageId0.groupId() == pageId1.groupId() && pageId0.partitionId() 
== pageId1.partitionId();
+    }
+}
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
index c9689cb3a..80d643804 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
@@ -102,7 +102,7 @@ public class CheckpointManager {
         checkpointMarkersStorage = new CheckpointMarkersStorage(storagePath);
 
         checkpointWorkflow = new CheckpointWorkflow(
-                checkpointConfig,
+                igniteInstanceName,
                 checkpointMarkersStorage,
                 checkpointReadWriteLock,
                 dataRegions
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
index 8bfc84e93..ac8fe684a 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
@@ -21,7 +21,7 @@ import static 
org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA;
 import static org.apache.ignite.internal.pagememory.io.PageIo.getType;
 import static org.apache.ignite.internal.pagememory.io.PageIo.getVersion;
 import static 
org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory.TRY_AGAIN_TAG;
-import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.IgniteConcurrentMultiPairQueue.EMPTY;
+import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.EMPTY;
 import static org.apache.ignite.internal.pagememory.util.PageIdUtils.flag;
 import static org.apache.ignite.internal.util.IgniteUtils.hexLong;
 
@@ -38,6 +38,8 @@ import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.pagememory.FullPageId;
 import org.apache.ignite.internal.pagememory.persistence.PageStoreWriter;
 import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.CheckpointDirtyPagesQueue;
+import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.QueueResult;
 import org.apache.ignite.internal.pagememory.persistence.store.PageStore;
 import org.apache.ignite.lang.IgniteInternalCheckedException;
 
@@ -51,8 +53,13 @@ public class CheckpointPagesWriter implements Runnable {
     /** Checkpoint specific metrics tracker. */
     private final CheckpointMetricsTracker tracker;
 
-    /** Collection of page IDs to write under this task. Overall pages to 
write may be greater than this collection. */
-    private final IgniteConcurrentMultiPairQueue<PersistentPageMemory, 
FullPageId> writePageIds;
+    /**
+     * Queue of dirty page IDs to write under this task.
+     *
+     * <p>Overall pages to write may be greater than this queue, since it may 
be necessary to retire write some pages due to unsuccessful
+     * page write lock acquisition
+     */
+    private final CheckpointDirtyPagesQueue writePageIds;
 
     /** Page store used to write -> Count of written pages. */
     private final ConcurrentMap<PageStore, LongAdder> updStores;
@@ -79,7 +86,7 @@ public class CheckpointPagesWriter implements Runnable {
      * Creates task for write pages.
      *
      * @param tracker Checkpoint metrics tracker.
-     * @param writePageIds Collection of page IDs to write.
+     * @param writePageIds Queue of dirty page IDs to write.
      * @param updStores Updating storage.
      * @param doneFut Done future.
      * @param beforePageWrite Action to be performed before every page write.
@@ -92,7 +99,7 @@ public class CheckpointPagesWriter implements Runnable {
     CheckpointPagesWriter(
             IgniteLogger log,
             CheckpointMetricsTracker tracker,
-            IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> 
writePageIds,
+            CheckpointDirtyPagesQueue writePageIds,
             ConcurrentMap<PageStore, LongAdder> updStores,
             CompletableFuture<?> doneFut,
             Runnable beforePageWrite,
@@ -117,13 +124,13 @@ public class CheckpointPagesWriter implements Runnable {
     @Override
     public void run() {
         try {
-            IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> 
pagesToRetry = writePages(writePageIds);
+            CheckpointDirtyPagesQueue pagesToRetry = writePages(writePageIds);
 
             if (pagesToRetry.isEmpty()) {
                 doneFut.complete(null);
             } else {
                 if (log.isInfoEnabled()) {
-                    log.info(pagesToRetry.initialSize() + " checkpoint pages 
were not written yet due to "
+                    log.info(pagesToRetry.size() + " checkpoint pages were not 
written yet due to "
                             + "unsuccessful page write lock acquisition and 
will be retried");
                 }
 
@@ -139,28 +146,26 @@ public class CheckpointPagesWriter implements Runnable {
     }
 
     /**
-     * Writes pages.
+     * Writes dirty pages.
      *
-     * @param writePageIds Collections of pages to write.
-     * @return pagesToRetry Pages which should be retried.
+     * @param writePageIds Queue of dirty pages to write.
+     * @return pagesToRetry Queue dirty pages which should be retried.
      */
-    private IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> 
writePages(
-            IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> 
writePageIds
-    ) throws IgniteInternalCheckedException {
+    private CheckpointDirtyPagesQueue writePages(CheckpointDirtyPagesQueue 
writePageIds) throws IgniteInternalCheckedException {
         Map<PersistentPageMemory, List<FullPageId>> pagesToRetry = new 
HashMap<>();
 
         Map<PersistentPageMemory, PageStoreWriter> pageStoreWriters = new 
HashMap<>();
 
         ByteBuffer tmpWriteBuf = threadBuf.get();
 
-        IgniteConcurrentMultiPairQueue.Result<PersistentPageMemory, 
FullPageId> res = new IgniteConcurrentMultiPairQueue.Result<>();
+        QueueResult res = new QueueResult();
 
         while (!shutdownNow.getAsBoolean() && writePageIds.next(res)) {
             beforePageWrite.run();
 
-            FullPageId fullId = res.getValue();
+            FullPageId fullId = res.dirtyPage();
 
-            PersistentPageMemory pageMemory = res.getKey();
+            PersistentPageMemory pageMemory = res.pageMemory();
 
             tmpWriteBuf.rewind();
 
@@ -169,7 +174,7 @@ public class CheckpointPagesWriter implements Runnable {
             pageMemory.checkpointWritePage(fullId, tmpWriteBuf, 
pageStoreWriter, tracker);
         }
 
-        return pagesToRetry.isEmpty() ? EMPTY : new 
IgniteConcurrentMultiPairQueue<>(pagesToRetry);
+        return pagesToRetry.isEmpty() ? EMPTY.toQueue() : new 
CheckpointDirtyPages(pagesToRetry).toQueue();
     }
 
     /**
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java
index b0f8ac70c..fc0e6847c 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java
@@ -24,8 +24,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.function.BooleanSupplier;
 import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.pagememory.FullPageId;
-import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.CheckpointDirtyPagesQueue;
 import org.apache.ignite.internal.pagememory.persistence.store.PageStore;
 
 /**
@@ -72,7 +71,7 @@ public class CheckpointPagesWriterFactory {
      * Returns instance of page checkpoint writer.
      *
      * @param tracker Checkpoint metrics tracker.
-     * @param cpPages List of pages to write.
+     * @param checkpointDirtyPagesQueue Checkpoint dirty pages queue to write.
      * @param updStores Updated page store storage.
      * @param doneWriteFut Write done future.
      * @param beforePageWrite Before page write callback.
@@ -81,7 +80,7 @@ public class CheckpointPagesWriterFactory {
      */
     CheckpointPagesWriter build(
             CheckpointMetricsTracker tracker,
-            IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> 
cpPages,
+            CheckpointDirtyPagesQueue checkpointDirtyPagesQueue,
             ConcurrentMap<PageStore, LongAdder> updStores,
             CompletableFuture<?> doneWriteFut,
             Runnable beforePageWrite,
@@ -92,7 +91,7 @@ public class CheckpointPagesWriterFactory {
         return new CheckpointPagesWriter(
                 log,
                 tracker,
-                cpPages,
+                checkpointDirtyPagesQueue,
                 updStores,
                 doneWriteFut,
                 beforePageWrite,
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java
index 39617997b..4e8ae1013 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java
@@ -178,10 +178,10 @@ class CheckpointProgressImpl implements 
CheckpointProgress {
     /**
      * Initialize all counters before checkpoint.
      *
-     * @param pagesSize Number of dirty pages in current checkpoint at the 
beginning of checkpoint.
+     * @param checkpointPages Number of dirty pages in current checkpoint at 
the beginning of checkpoint.
      */
-    public void initCounters(int pagesSize) {
-        currCheckpointPagesCnt = pagesSize;
+    public void initCounters(int checkpointPages) {
+        currCheckpointPagesCnt = checkpointPages;
 
         writtenPagesCntr.set(0);
         syncedPagesCntr.set(0);
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
index e792f904f..9beaf009f 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
@@ -17,33 +17,32 @@
 
 package org.apache.ignite.internal.pagememory.persistence.checkpoint;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toUnmodifiableList;
+import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.DIRTY_PAGE_COMPARATOR;
+import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.EMPTY;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_TAKEN;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.MARKER_STORED_TO_DISK;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.PAGE_SNAPSHOT_TAKEN;
-import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointWriteOrder.SEQUENTIAL;
-import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.IgniteConcurrentMultiPairQueue.EMPTY;
+import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Comparator;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinTask;
 import java.util.concurrent.ForkJoinWorkerThread;
-import java.util.concurrent.Future;
 import org.apache.ignite.internal.pagememory.DataRegion;
 import org.apache.ignite.internal.pagememory.FullPageId;
-import 
org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointConfiguration;
-import 
org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointView;
 import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteInternalCheckedException;
 import org.jetbrains.annotations.Nullable;
@@ -64,13 +63,12 @@ import org.jetbrains.annotations.Nullable;
  */
 class CheckpointWorkflow {
     /**
-     * Starting from this number of dirty pages in checkpoint, array will be 
sorted with {@link Arrays#parallelSort(Comparable[])} in case
-     * of {@link CheckpointWriteOrder#SEQUENTIAL}.
+     * Starting from this number of dirty pages in checkpoint, array will be 
sorted with {@link Arrays#parallelSort(Comparable[])}.
+     *
+     * <p>See <a 
href="https://www.researchgate.net/publication/331742843_Threshold_Analysis_and_Comparison_of_Sequential_and_Parallel_Divide_and_Conquer_Sorting_Algorithms";>threshold
+     * for parallel sort.</a>
      */
-    private final int parallelSortThreshold;
-
-    /** This number of threads will be created and used for parallel sorting. 
*/
-    private static final int PARALLEL_SORT_THREADS = 
Math.min(Runtime.getRuntime().availableProcessors(), 8);
+    static final int PARALLEL_SORT_THRESHOLD = 40_000;
 
     /** Checkpoint marker storage. */
     private final CheckpointMarkersStorage checkpointMarkersStorage;
@@ -81,33 +79,42 @@ class CheckpointWorkflow {
     /** Persistent data regions for the checkpointing. */
     private final Collection<? extends DataRegion<PersistentPageMemory>> 
dataRegions;
 
-    /** Checkpoint write order configuration. */
-    private final CheckpointWriteOrder checkpointWriteOrder;
-
     /** Collections of checkpoint listeners. */
     private final List<IgniteBiTuple<CheckpointListener, 
DataRegion<PersistentPageMemory>>> listeners = new CopyOnWriteArrayList<>();
 
+    /** Thread pool for sorting dirty pages in parallel if their count is >= 
{@link #PARALLEL_SORT_THRESHOLD}. */
+    private final ForkJoinPool parallelSortThreadPool;
+
     /**
      * Constructor.
      *
-     * @param checkpointConfig Checkpoint configuration.
+     * @param igniteInstanceName Ignite instance name.
      * @param checkpointMarkersStorage Checkpoint marker storage.
      * @param checkpointReadWriteLock Checkpoint read write lock.
      * @param dataRegions Persistent data regions for the checkpointing, 
doesn't copy.
      */
     public CheckpointWorkflow(
-            PageMemoryCheckpointConfiguration checkpointConfig,
+            String igniteInstanceName,
             CheckpointMarkersStorage checkpointMarkersStorage,
             CheckpointReadWriteLock checkpointReadWriteLock,
             Collection<? extends DataRegion<PersistentPageMemory>> dataRegions
     ) {
-        PageMemoryCheckpointView checkpointConfigView = 
checkpointConfig.value();
-
         this.checkpointMarkersStorage = checkpointMarkersStorage;
         this.checkpointReadWriteLock = checkpointReadWriteLock;
-        this.checkpointWriteOrder = 
CheckpointWriteOrder.valueOf(checkpointConfigView.writeOrder());
-        this.parallelSortThreshold = 
checkpointConfigView.parallelSortThreshold();
         this.dataRegions = dataRegions;
+
+        parallelSortThreadPool = new ForkJoinPool(
+                Math.min(Runtime.getRuntime().availableProcessors(), 8) + 1,
+                pool -> {
+                    ForkJoinWorkerThread worker = 
ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
+
+                    
worker.setName(NamedThreadFactory.threadPrefix(igniteInstanceName, 
"checkpoint-pages-sorter") + worker.getPoolIndex());
+
+                    return worker;
+                },
+                null,
+                false
+        );
     }
 
     /**
@@ -122,6 +129,8 @@ class CheckpointWorkflow {
      */
     public void stop() {
         listeners.clear();
+
+        shutdownAndAwaitTermination(parallelSortThreadPool, 10, SECONDS);
     }
 
     /**
@@ -154,7 +163,7 @@ class CheckpointWorkflow {
 
         checkpointReadWriteLock.writeLock();
 
-        CheckpointDirtyPagesInfoHolder dirtyPages;
+        DataRegionsDirtyPages dirtyPages;
 
         try {
             curr.transitTo(LOCK_TAKEN);
@@ -167,7 +176,7 @@ class CheckpointWorkflow {
 
             tracker.onMarkCheckpointBeginEnd();
 
-            // There are allowable to replace pages only after checkpoint 
entry was stored to disk.
+            // There are allowable to replace pages only after checkpoint 
marker was stored to disk.
             dirtyPages = beginCheckpoint(dataRegions, 
curr.futureFor(MARKER_STORED_TO_DISK));
 
             curr.currentCheckpointPagesCount(dirtyPages.dirtyPageCount);
@@ -192,11 +201,11 @@ class CheckpointWorkflow {
 
             tracker.onSplitAndSortCheckpointPagesStart();
 
-            IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> 
dirtyPages0 = splitAndSortCheckpointPagesIfNeeded(dirtyPages);
+            CheckpointDirtyPages checkpointPages = 
createAndSortCheckpointDirtyPages(dirtyPages);
 
             tracker.onSplitAndSortCheckpointPagesEnd();
 
-            return new Checkpoint(dirtyPages0, curr);
+            return new Checkpoint(checkpointPages, curr);
         }
 
         return new Checkpoint(EMPTY, curr);
@@ -267,99 +276,77 @@ class CheckpointWorkflow {
                 .collect(toUnmodifiableList());
     }
 
-    private CheckpointDirtyPagesInfoHolder beginCheckpoint(
+    private DataRegionsDirtyPages beginCheckpoint(
             Collection<? extends DataRegion<PersistentPageMemory>> dataRegions,
             CompletableFuture<?> allowToReplace
     ) {
         Collection<IgniteBiTuple<PersistentPageMemory, 
Collection<FullPageId>>> pages = new ArrayList<>(dataRegions.size());
 
-        int pageCount = 0;
-
         for (DataRegion<PersistentPageMemory> dataRegion : dataRegions) {
             Collection<FullPageId> dirtyPages = 
dataRegion.pageMemory().beginCheckpoint(allowToReplace);
 
-            pageCount += dirtyPages.size();
-
             pages.add(new IgniteBiTuple<>(dataRegion.pageMemory(), 
dirtyPages));
         }
 
-        return new CheckpointDirtyPagesInfoHolder(pages, pageCount);
-    }
-
-    private static ForkJoinPool parallelSortInIsolatedPool(
-            FullPageId[] pagesArr,
-            Comparator<FullPageId> cmp,
-            @Nullable ForkJoinPool pool
-    ) throws IgniteInternalCheckedException {
-        ForkJoinPool.ForkJoinWorkerThreadFactory factory = pool1 -> {
-            ForkJoinWorkerThread worker = 
ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool1);
-
-            worker.setName("checkpoint-pages-sorter-" + worker.getPoolIndex());
-
-            return worker;
-        };
-
-        ForkJoinPool execPool = pool == null ? new 
ForkJoinPool(PARALLEL_SORT_THREADS + 1, factory, null, false) : pool;
-
-        Future<?> sortTask = execPool.submit(() -> 
Arrays.parallelSort(pagesArr, cmp));
-
-        try {
-            sortTask.get();
-        } catch (ExecutionException | InterruptedException e) {
-            throw new IgniteInternalCheckedException(
-                    "Failed to perform pages array parallel sort",
-                    e instanceof ExecutionException ? e.getCause() : e
-            );
-        }
-
-        return execPool;
+        return new DataRegionsDirtyPages(pages);
     }
 
-    private IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> 
splitAndSortCheckpointPagesIfNeeded(
-            CheckpointDirtyPagesInfoHolder dirtyPages
+    CheckpointDirtyPages createAndSortCheckpointDirtyPages(
+            DataRegionsDirtyPages dataRegionsDirtyPages
     ) throws IgniteInternalCheckedException {
-        Set<IgniteBiTuple<PersistentPageMemory, FullPageId[]>> 
cpPagesPerRegion = new HashSet<>();
+        List<IgniteBiTuple<PersistentPageMemory, FullPageId[]>> 
checkpointPages = new ArrayList<>();
 
         int realPagesArrSize = 0;
 
-        for (IgniteBiTuple<PersistentPageMemory, Collection<FullPageId>> 
regPages : dirtyPages.dirtyPages) {
-            FullPageId[] pages = new FullPageId[regPages.getValue().size()];
+        for (IgniteBiTuple<PersistentPageMemory, Collection<FullPageId>> 
regionDirtyPages : dataRegionsDirtyPages.dirtyPages) {
+            FullPageId[] checkpointRegionDirtyPages = new 
FullPageId[regionDirtyPages.getValue().size()];
 
             int pagePos = 0;
 
-            for (FullPageId dirtyPage : regPages.getValue()) {
-                assert realPagesArrSize++ != dirtyPages.dirtyPageCount :
-                        "Incorrect estimated dirty pages number: " + 
dirtyPages.dirtyPageCount;
+            for (FullPageId dirtyPage : regionDirtyPages.getValue()) {
+                assert realPagesArrSize++ != 
dataRegionsDirtyPages.dirtyPageCount :
+                        "Incorrect estimated dirty pages number: " + 
dataRegionsDirtyPages.dirtyPageCount;
 
-                pages[pagePos++] = dirtyPage;
+                checkpointRegionDirtyPages[pagePos++] = dirtyPage;
             }
 
             // Some pages may have been already replaced.
-            if (pagePos != pages.length) {
-                cpPagesPerRegion.add(new IgniteBiTuple<>(regPages.getKey(), 
Arrays.copyOf(pages, pagePos)));
+            if (pagePos == 0) {
+                continue;
+            } else if (pagePos != checkpointRegionDirtyPages.length) {
+                checkpointPages.add(new 
IgniteBiTuple<>(regionDirtyPages.getKey(), 
Arrays.copyOf(checkpointRegionDirtyPages, pagePos)));
             } else {
-                cpPagesPerRegion.add(new IgniteBiTuple<>(regPages.getKey(), 
pages));
+                checkpointPages.add(new 
IgniteBiTuple<>(regionDirtyPages.getKey(), checkpointRegionDirtyPages));
             }
         }
 
-        if (checkpointWriteOrder == SEQUENTIAL) {
-            Comparator<FullPageId> cmp = 
Comparator.comparingInt(FullPageId::groupId).thenComparingLong(FullPageId::effectivePageId);
-
-            ForkJoinPool pool = null;
+        List<ForkJoinTask<?>> parallelSortTasks = checkpointPages.stream()
+                .map(IgniteBiTuple::getValue)
+                .filter(pages -> pages.length >= PARALLEL_SORT_THRESHOLD)
+                .map(pages -> parallelSortThreadPool.submit(() -> 
Arrays.parallelSort(pages, DIRTY_PAGE_COMPARATOR)))
+                .collect(toList());
 
-            for (IgniteBiTuple<PersistentPageMemory, FullPageId[]> pagesPerReg 
: cpPagesPerRegion) {
-                if (pagesPerReg.getValue().length >= parallelSortThreshold) {
-                    pool = parallelSortInIsolatedPool(pagesPerReg.get2(), cmp, 
pool);
-                } else {
-                    Arrays.sort(pagesPerReg.get2(), cmp);
-                }
+        for (IgniteBiTuple<PersistentPageMemory, FullPageId[]> regionPages : 
checkpointPages) {
+            if (regionPages.getValue().length < PARALLEL_SORT_THRESHOLD) {
+                Arrays.sort(regionPages.getValue(), DIRTY_PAGE_COMPARATOR);
             }
+        }
 
-            if (pool != null) {
-                pool.shutdown();
+        for (ForkJoinTask<?> parallelSortTask : parallelSortTasks) {
+            try {
+                parallelSortTask.get();
+            } catch (ExecutionException | InterruptedException e) {
+                throw new IgniteInternalCheckedException(
+                        "Failed to perform pages array parallel sort",
+                        e instanceof ExecutionException ? e.getCause() : e
+                );
             }
         }
 
-        return new IgniteConcurrentMultiPairQueue<>(cpPagesPerRegion);
+        return new CheckpointDirtyPages(
+                checkpointPages.stream()
+                        .map(tuple -> new IgniteBiTuple<>(tuple.getKey(), 
Arrays.asList(tuple.getValue())))
+                        .collect(toList())
+        );
     }
 }
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWriteOrder.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWriteOrder.java
deleted file mode 100644
index 4b48aa2e4..000000000
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWriteOrder.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.pagememory.persistence.checkpoint;
-
-import org.jetbrains.annotations.Nullable;
-
-/**
- * This enum defines order of writing pages to disk storage during checkpoint.
- */
-public enum CheckpointWriteOrder {
-    /**
-     * Pages are written in order provided by checkpoint pages collection 
iterator (which is basically a hashtable).
-     */
-    RANDOM,
-
-    /**
-     * All checkpoint pages are collected into single list and sorted by page 
index. Provides almost sequential disk writes, which can be
-     * much faster on some SSD models.
-     */
-    SEQUENTIAL;
-
-    /**
-     * Enumerated values.
-     */
-    private static final CheckpointWriteOrder[] VALS = values();
-
-    /**
-     * Efficiently gets enumerated value from its ordinal.
-     *
-     * @param ord Ordinal value.
-     * @return Enumerated value or {@code null} if ordinal out of range.
-     */
-    public static @Nullable CheckpointWriteOrder fromOrdinal(int ord) {
-        return ord >= 0 && ord < VALS.length ? VALS[ord] : null;
-    }
-}
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
index fa07f37ce..f2f0af386 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
@@ -41,10 +41,9 @@ import java.util.concurrent.atomic.LongAdder;
 import java.util.function.BooleanSupplier;
 import org.apache.ignite.internal.components.LongJvmPauseDetector;
 import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.pagememory.FullPageId;
 import 
org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointConfiguration;
 import 
org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointView;
-import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.CheckpointDirtyPagesQueue;
 import org.apache.ignite.internal.pagememory.persistence.store.PageStore;
 import org.apache.ignite.internal.thread.IgniteThread;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
@@ -355,7 +354,7 @@ public class Checkpointer extends IgniteWorker {
      * Writes dirty pages to the appropriate stores.
      *
      * @param tracker Checkpoint metrics tracker.
-     * @param checkpointPages Checkpoint pages to write.
+     * @param checkpointDirtyPages Checkpoint dirty pages to write.
      * @param currentCheckpointProgress Current checkpoint progress.
      * @param workProgressDispatcher Work progress dispatcher.
      * @param shutdownNow Checker of stop operation.
@@ -363,7 +362,7 @@ public class Checkpointer extends IgniteWorker {
      */
     boolean writePages(
             CheckpointMetricsTracker tracker,
-            IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> 
checkpointPages,
+            CheckpointDirtyPages checkpointDirtyPages,
             CheckpointProgressImpl currentCheckpointProgress,
             WorkProgressDispatcher workProgressDispatcher,
             BooleanSupplier shutdownNow
@@ -379,10 +378,12 @@ public class Checkpointer extends IgniteWorker {
 
         tracker.onPagesWriteStart();
 
+        CheckpointDirtyPagesQueue checkpointDirtyPagesQueue = 
checkpointDirtyPages.toQueue();
+
         for (int i = 0; i < checkpointWritePageThreads; i++) {
             CheckpointPagesWriter write = checkpointPagesWriterFactory.build(
                     tracker,
-                    checkpointPages,
+                    checkpointDirtyPagesQueue,
                     updStores,
                     futures[i] = new CompletableFuture<>(),
                     workProgressDispatcher::updateHeartbeat,
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPagesInfoHolder.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/DataRegionsDirtyPages.java
similarity index 76%
rename from 
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPagesInfoHolder.java
rename to 
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/DataRegionsDirtyPages.java
index 2294f825d..5b71f9685 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPagesInfoHolder.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/DataRegionsDirtyPages.java
@@ -24,9 +24,13 @@ import org.apache.ignite.lang.IgniteBiTuple;
 
 /**
  * Holder of information about dirty pages by {@link PersistentPageMemory} for 
checkpoint.
+ *
+ * <p>Due to page replacements, the total number of pages may change when they 
are written to disk.
+ *
+ * <p>Total number of dirty pages can only decrease due to page replacement, 
but should not increase.
  */
-class CheckpointDirtyPagesInfoHolder {
-    /** Total number of dirty pages. */
+class DataRegionsDirtyPages {
+    /** Total number of dirty pages for all data regions at the time of data 
collection. */
     final int dirtyPageCount;
 
     /** Collection of dirty pages per {@link PersistentPageMemory} 
distribution. */
@@ -36,13 +40,11 @@ class CheckpointDirtyPagesInfoHolder {
      * Constructor.
      *
      * @param dirtyPages Collection of dirty pages per {@link 
PersistentPageMemory} distribution.
-     * @param dirtyPageCount Total number of dirty pages.
      */
-    public CheckpointDirtyPagesInfoHolder(
-            Collection<IgniteBiTuple<PersistentPageMemory, 
Collection<FullPageId>>> dirtyPages,
-            int dirtyPageCount
+    public DataRegionsDirtyPages(
+            Collection<IgniteBiTuple<PersistentPageMemory, 
Collection<FullPageId>>> dirtyPages
     ) {
         this.dirtyPages = dirtyPages;
-        this.dirtyPageCount = dirtyPageCount;
+        this.dirtyPageCount = dirtyPages.stream().mapToInt(tuple -> 
tuple.getValue().size()).sum();
     }
 }
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageIdUtils.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageIdUtils.java
index 1fb705873..9712ad3cc 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageIdUtils.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageIdUtils.java
@@ -187,7 +187,7 @@ public final class PageIdUtils {
      * Extracts partition ID from the page ID.
      *
      * @param pageId Page ID.
-     * @return Partition.
+     * @return Partition ID.
      */
     public static int partitionId(long pageId) {
         return (int) ((pageId >>> PAGE_IDX_SIZE) & PART_ID_MASK);
diff --git 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPagesTest.java
 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPagesTest.java
new file mode 100644
index 000000000..b15531a9d
--- /dev/null
+++ 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPagesTest.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.DIRTY_PAGE_COMPARATOR;
+import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.EMPTY;
+import static 
org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.CheckpointDirtyPagesQueue;
+import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.CheckpointDirtyPagesView;
+import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.QueueResult;
+import org.apache.ignite.internal.pagememory.util.PageIdUtils;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.junit.jupiter.api.Test;
+
+/**
+ * For {@link CheckpointDirtyPages} testing.
+ */
+public class CheckpointDirtyPagesTest {
+    @Test
+    void testDirtyPagesCount() {
+        IgniteBiTuple<PersistentPageMemory, List<FullPageId>> dirtyPages0 = 
createDirtyPages(of(0, 0, 0), of(0, 0, 1));
+        IgniteBiTuple<PersistentPageMemory, List<FullPageId>> dirtyPages1 = 
createDirtyPages(of(1, 0, 0), of(1, 0, 1), of(1, 0, 2));
+
+        assertEquals(0, EMPTY.dirtyPagesCount());
+        assertEquals(2, new 
CheckpointDirtyPages(List.of(dirtyPages0)).dirtyPagesCount());
+        assertEquals(3, new 
CheckpointDirtyPages(List.of(dirtyPages1)).dirtyPagesCount());
+        assertEquals(5, new CheckpointDirtyPages(List.of(dirtyPages0, 
dirtyPages1)).dirtyPagesCount());
+    }
+
+    @Test
+    void testToQueue() {
+        assertTrue(EMPTY.toQueue().isEmpty());
+
+        IgniteBiTuple<PersistentPageMemory, List<FullPageId>> dirtyPages0 = 
createDirtyPages(of(0, 0, 0));
+        IgniteBiTuple<PersistentPageMemory, List<FullPageId>> dirtyPages1 = 
createDirtyPages(of(1, 0, 0), of(1, 0, 1));
+        IgniteBiTuple<PersistentPageMemory, List<FullPageId>> dirtyPages2 = 
createDirtyPages(of(2, 0, 0), of(2, 1, 0), of(3, 2, 2));
+
+        CheckpointDirtyPages checkpointDirtyPages = new 
CheckpointDirtyPages(List.of(dirtyPages0, dirtyPages1, dirtyPages2));
+
+        CheckpointDirtyPagesQueue queue0 = checkpointDirtyPages.toQueue();
+
+        assertFalse(queue0.isEmpty());
+        assertEquals(6, queue0.size());
+
+        assertThat(toListDirtyPagePair(queue0), 
equalTo(toListDirtyPagePair(dirtyPages0, dirtyPages1, dirtyPages2)));
+
+        CheckpointDirtyPagesQueue queue1 = checkpointDirtyPages.toQueue();
+
+        assertNotSame(queue0, queue1);
+
+        assertFalse(queue1.isEmpty());
+        assertEquals(6, queue1.size());
+    }
+
+    @Test
+    void testFindView() {
+        assertNull(EMPTY.findView(0, 0));
+
+        IgniteBiTuple<PersistentPageMemory, List<FullPageId>> dirtyPages0 = 
createDirtyPages(of(0, 0, 0));
+        IgniteBiTuple<PersistentPageMemory, List<FullPageId>> dirtyPages1 = 
createDirtyPages(of(5, 0, 0));
+        IgniteBiTuple<PersistentPageMemory, List<FullPageId>> dirtyPages2 = 
createDirtyPages(of(1, 0, 0), of(1, 0, 1));
+        IgniteBiTuple<PersistentPageMemory, List<FullPageId>> dirtyPages3 = 
createDirtyPages(
+                of(2, 0, 0), of(2, 0, 1),
+                of(2, 1, 1),
+                of(3, 2, 2), of(3, 2, 3)
+        );
+
+        CheckpointDirtyPages checkpointDirtyPages = new 
CheckpointDirtyPages(List.of(dirtyPages0, dirtyPages1, dirtyPages2, 
dirtyPages3));
+
+        assertNull(checkpointDirtyPages.findView(4, 0));
+        assertNull(checkpointDirtyPages.findView(2, 2));
+        assertNull(checkpointDirtyPages.findView(3, 1));
+        assertNull(checkpointDirtyPages.findView(3, 3));
+
+        assertThat(toListDirtyPagePair(checkpointDirtyPages.findView(0, 0)), 
equalTo(toListDirtyPagePair(dirtyPages0)));
+        assertThat(toListDirtyPagePair(checkpointDirtyPages.findView(5, 0)), 
equalTo(toListDirtyPagePair(dirtyPages1)));
+        assertThat(toListDirtyPagePair(checkpointDirtyPages.findView(1, 0)), 
equalTo(toListDirtyPagePair(dirtyPages2)));
+
+        assertThat(
+                toListDirtyPagePair(checkpointDirtyPages.findView(2, 0)),
+                equalTo(toListDirtyPagePair(equalsByGroupAndPartition(2, 0), 
dirtyPages3))
+        );
+
+        assertThat(
+                toListDirtyPagePair(checkpointDirtyPages.findView(2, 1)),
+                equalTo(toListDirtyPagePair(equalsByGroupAndPartition(2, 1), 
dirtyPages3))
+        );
+
+        assertThat(
+                toListDirtyPagePair(checkpointDirtyPages.findView(3, 2)),
+                equalTo(toListDirtyPagePair(equalsByGroupAndPartition(3, 2), 
dirtyPages3))
+        );
+    }
+
+    @Test
+    void testNextView() {
+        assertNull(EMPTY.nextView(null));
+
+        IgniteBiTuple<PersistentPageMemory, List<FullPageId>> dirtyPages0 = 
createDirtyPages(of(0, 0, 0));
+        IgniteBiTuple<PersistentPageMemory, List<FullPageId>> dirtyPages1 = 
createDirtyPages(of(5, 0, 0));
+        IgniteBiTuple<PersistentPageMemory, List<FullPageId>> dirtyPages2 = 
createDirtyPages(of(1, 0, 0), of(1, 0, 1));
+        IgniteBiTuple<PersistentPageMemory, List<FullPageId>> dirtyPages3 = 
createDirtyPages(
+                of(2, 0, 0), of(2, 0, 1),
+                of(2, 1, 1),
+                of(3, 2, 2), of(3, 2, 3)
+        );
+
+        CheckpointDirtyPages checkpointDirtyPages = new 
CheckpointDirtyPages(List.of(dirtyPages0, dirtyPages1, dirtyPages2, 
dirtyPages3));
+
+        CheckpointDirtyPagesView view = checkpointDirtyPages.nextView(null);
+
+        assertThat(toListDirtyPagePair(view), 
equalTo(toListDirtyPagePair(dirtyPages0)));
+        assertThat(toListDirtyPagePair(view = 
checkpointDirtyPages.nextView(view)), 
equalTo(toListDirtyPagePair(dirtyPages1)));
+        assertThat(toListDirtyPagePair(view = 
checkpointDirtyPages.nextView(view)), 
equalTo(toListDirtyPagePair(dirtyPages2)));
+
+        assertThat(
+                toListDirtyPagePair(view = 
checkpointDirtyPages.nextView(view)),
+                equalTo(toListDirtyPagePair(equalsByGroupAndPartition(2, 0), 
dirtyPages3))
+        );
+
+        assertThat(
+                toListDirtyPagePair(view = 
checkpointDirtyPages.nextView(view)),
+                equalTo(toListDirtyPagePair(equalsByGroupAndPartition(2, 1), 
dirtyPages3))
+        );
+
+        assertThat(
+                toListDirtyPagePair(view = 
checkpointDirtyPages.nextView(view)),
+                equalTo(toListDirtyPagePair(equalsByGroupAndPartition(3, 2), 
dirtyPages3))
+        );
+
+        assertNull(checkpointDirtyPages.nextView(view));
+    }
+
+    @Test
+    void testSortDirtyPageIds() {
+        List<FullPageId> pageIds = new ArrayList<>(List.of(
+                of(10, 10, 10), of(10, 10, 9), of(10, 10, 0), of(10, 0, 10), 
of(10, 0, 0),
+                of(0, 1, 3), of(0, 1, 2), of(0, 1, 1), of(0, 1, 0), of(0, 0, 
0),
+                of(1, 2, 0), of(1, 2, 1)
+        ));
+
+        pageIds.sort(DIRTY_PAGE_COMPARATOR);
+
+        assertThat(
+                pageIds,
+                equalTo(List.of(
+                        of(0, 0, 0), of(0, 1, 0), of(0, 1, 1), of(0, 1, 2), 
of(0, 1, 3),
+                        of(1, 2, 0), of(1, 2, 1),
+                        of(10, 0, 0), of(10, 0, 10), of(10, 10, 0), of(10, 10, 
9), of(10, 10, 10)
+                ))
+        );
+    }
+
+    @Test
+    void testQueueNextElementInDifferentThreads() throws Exception {
+        IgniteBiTuple<PersistentPageMemory, List<FullPageId>> dirtyPages0 = 
createDirtyPages(of(0, 0, 0));
+        IgniteBiTuple<PersistentPageMemory, List<FullPageId>> dirtyPages1 = 
createDirtyPages(of(1, 0, 0));
+        IgniteBiTuple<PersistentPageMemory, List<FullPageId>> dirtyPages2 = 
createDirtyPages(of(2, 0, 0), of(2, 0, 1));
+        IgniteBiTuple<PersistentPageMemory, List<FullPageId>> dirtyPages3 = 
createDirtyPages(of(3, 0, 0), of(3, 1, 0), of(4, 2, 2));
+
+        CheckpointDirtyPages checkpointDirtyPages = new 
CheckpointDirtyPages(List.of(dirtyPages0, dirtyPages1, dirtyPages2, 
dirtyPages3));
+
+        CheckpointDirtyPagesQueue queue = checkpointDirtyPages.toQueue();
+
+        runAsync(() -> {
+            QueueResult queueResult0 = new QueueResult();
+
+            assertTrue(queue.next(queueResult0));
+            assertEquals(dirtyPages0.getKey(), queueResult0.pageMemory());
+            assertEquals(dirtyPages0.getValue().get(0), 
queueResult0.dirtyPage());
+
+            QueueResult queueResult1 = new QueueResult();
+
+            assertTrue(queue.next(queueResult1));
+            assertEquals(dirtyPages1.getKey(), queueResult1.pageMemory());
+            assertEquals(dirtyPages1.getValue().get(0), 
queueResult1.dirtyPage());
+
+            assertTrue(queue.next(queueResult0));
+            assertEquals(dirtyPages2.getKey(), queueResult0.pageMemory());
+            assertEquals(dirtyPages2.getValue().get(0), 
queueResult0.dirtyPage());
+
+            assertTrue(queue.next(queueResult1));
+            assertEquals(dirtyPages2.getKey(), queueResult1.pageMemory());
+            assertEquals(dirtyPages2.getValue().get(1), 
queueResult1.dirtyPage());
+        }).get(1, TimeUnit.SECONDS);
+
+        QueueResult queueResult0 = new QueueResult();
+
+        assertTrue(queue.next(queueResult0));
+        assertEquals(dirtyPages3.getKey(), queueResult0.pageMemory());
+        assertEquals(dirtyPages3.getValue().get(0), queueResult0.dirtyPage());
+
+        QueueResult queueResult1 = new QueueResult();
+
+        assertTrue(queue.next(queueResult1));
+        assertEquals(dirtyPages3.getKey(), queueResult1.pageMemory());
+        assertEquals(dirtyPages3.getValue().get(1), queueResult1.dirtyPage());
+
+        assertTrue(queue.next(queueResult0));
+        assertEquals(dirtyPages3.getKey(), queueResult0.pageMemory());
+        assertEquals(dirtyPages3.getValue().get(2), queueResult0.dirtyPage());
+    }
+
+    private static IgniteBiTuple<PersistentPageMemory, List<FullPageId>> 
createDirtyPages(FullPageId... dirtyPages) {
+        Arrays.sort(dirtyPages, DIRTY_PAGE_COMPARATOR);
+
+        return new IgniteBiTuple<>(mock(PersistentPageMemory.class), 
Arrays.asList(dirtyPages));
+    }
+
+    private static FullPageId of(int groupId, int partId, int pageIdx) {
+        return new FullPageId(PageIdUtils.pageId(partId, (byte) 0, pageIdx), 
groupId);
+    }
+
+    private static List<IgniteBiTuple<PersistentPageMemory, FullPageId>> 
toListDirtyPagePair(CheckpointDirtyPagesQueue queue) {
+        if (queue.isEmpty()) {
+            return List.of();
+        }
+
+        QueueResult queueResult = new QueueResult();
+
+        List<IgniteBiTuple<PersistentPageMemory, FullPageId>> result = new 
ArrayList<>(queue.size());
+
+        while (queue.next(queueResult)) {
+            result.add(new IgniteBiTuple<>(queueResult.pageMemory(), 
queueResult.dirtyPage()));
+        }
+
+        return result;
+    }
+
+    private static List<IgniteBiTuple<PersistentPageMemory, FullPageId>> 
toListDirtyPagePair(
+            IgniteBiTuple<PersistentPageMemory, List<FullPageId>>... dirtyPages
+    ) {
+        return toListDirtyPagePair(dirtyPageId -> true, dirtyPages);
+    }
+
+    private static List<IgniteBiTuple<PersistentPageMemory, FullPageId>> 
toListDirtyPagePair(
+            Predicate<FullPageId> dirtyPagePredicate,
+            IgniteBiTuple<PersistentPageMemory, List<FullPageId>>... dirtyPages
+    ) {
+        if (dirtyPages.length == 0) {
+            return List.of();
+        }
+
+        return Stream.of(dirtyPages)
+                .flatMap(pages -> 
pages.getValue().stream().filter(dirtyPagePredicate).map(p -> new 
IgniteBiTuple<>(pages.getKey(), p)))
+                .collect(toList());
+    }
+
+    private static List<IgniteBiTuple<PersistentPageMemory, FullPageId>> 
toListDirtyPagePair(CheckpointDirtyPagesView view) {
+        if (view.size() == 0) {
+            return List.of();
+        }
+
+        return IntStream.range(0, view.size()).mapToObj(i -> new 
IgniteBiTuple<>(view.pageMemory(), view.get(i))).collect(toList());
+    }
+
+    private static Predicate<FullPageId> equalsByGroupAndPartition(int grpId, 
int partId) {
+        return fullPageId -> fullPageId.groupId() == grpId && 
partitionId(fullPageId.pageId()) == partId;
+    }
+}
diff --git 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterTest.java
 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterTest.java
index a4f6c5677..a2a094628 100644
--- 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterTest.java
+++ 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterTest.java
@@ -52,6 +52,7 @@ import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.pagememory.FullPageId;
 import org.apache.ignite.internal.pagememory.persistence.PageStoreWriter;
 import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.CheckpointDirtyPagesQueue;
 import org.apache.ignite.internal.pagememory.persistence.store.PageStore;
 import org.apache.ignite.lang.IgniteInternalCheckedException;
 import org.junit.jupiter.api.Test;
@@ -74,9 +75,9 @@ public class CheckpointPagesWriterTest {
         FullPageId fullPageId4 = new FullPageId(pageId(1, FLAG_DATA, 4), 0);
         FullPageId fullPageId5 = new FullPageId(pageId(1, FLAG_DATA, 5), 0);
 
-        IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> 
writePageIds = new IgniteConcurrentMultiPairQueue<>(
+        CheckpointDirtyPagesQueue writePageIds = new CheckpointDirtyPages(
                 Map.of(pageMemory, List.of(fullPageId0, fullPageId1, 
fullPageId2, fullPageId3, fullPageId4, fullPageId5))
-        );
+        ).toQueue();
 
         Runnable beforePageWrite = mock(Runnable.class);
 
@@ -152,7 +153,7 @@ public class CheckpointPagesWriterTest {
         CheckpointPagesWriter pagesWriter = new CheckpointPagesWriter(
                 log,
                 new CheckpointMetricsTracker(),
-                new IgniteConcurrentMultiPairQueue<>(Map.of(pageMemory, 
List.of(new FullPageId(0, 0)))),
+                new CheckpointDirtyPages(Map.of(pageMemory, List.of(new 
FullPageId(0, 0)))).toQueue(),
                 new ConcurrentHashMap<>(),
                 doneFuture,
                 () -> {
@@ -191,9 +192,9 @@ public class CheckpointPagesWriterTest {
                         any(CheckpointMetricsTracker.class)
                 );
 
-        IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> 
writePageIds = new IgniteConcurrentMultiPairQueue<>(
+        CheckpointDirtyPagesQueue writePageIds = new CheckpointDirtyPages(
                 Map.of(pageMemory, List.of(new FullPageId(0, 0), new 
FullPageId(1, 0)))
-        );
+        ).toQueue();
 
         CheckpointPagesWriter pagesWriter = new CheckpointPagesWriter(
                 log,
diff --git 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTest.java
 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTest.java
index 4cab5fb40..cc8719416 100644
--- 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTest.java
+++ 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.ignite.internal.pagememory.persistence.checkpoint;
 
-import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.IgniteConcurrentMultiPairQueue.EMPTY;
+import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.EMPTY;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
@@ -38,11 +38,11 @@ public class CheckpointTest {
 
         assertFalse(new Checkpoint(EMPTY, progress).hasDelta());
 
-        IgniteBiTuple<PersistentPageMemory, FullPageId[]> biTuple = new 
IgniteBiTuple<>(
+        IgniteBiTuple<PersistentPageMemory, List<FullPageId>> biTuple = new 
IgniteBiTuple<>(
                 mock(PersistentPageMemory.class),
-                new FullPageId[]{new FullPageId(0, 1)}
+                List.of(new FullPageId(0, 1))
         );
 
-        assertTrue(new Checkpoint(new 
IgniteConcurrentMultiPairQueue<>(List.of(biTuple)), progress).hasDelta());
+        assertTrue(new Checkpoint(new CheckpointDirtyPages(List.of(biTuple)), 
progress).hasDelta());
     }
 }
diff --git 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTestUtils.java
 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTestUtils.java
index dc9e3d6e2..c8d1db653 100644
--- 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTestUtils.java
+++ 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTestUtils.java
@@ -19,7 +19,12 @@ package 
org.apache.ignite.internal.pagememory.persistence.checkpoint;
 
 import static org.mockito.Mockito.mock;
 
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.CheckpointDirtyPagesView;
 
 /**
  * Useful class for testing a checkpoint.
@@ -57,4 +62,13 @@ public class CheckpointTestUtils {
             }
         };
     }
+
+    /**
+     * Collects dirty pages into a list.
+     *
+     * @param dirtyPagesView Checkpoint dirty pages view.
+     */
+    public static List<FullPageId> toListDirtyPageIds(CheckpointDirtyPagesView 
dirtyPagesView) {
+        return IntStream.range(0, 
dirtyPagesView.size()).mapToObj(dirtyPagesView::get).collect(Collectors.toList());
+    }
 }
diff --git 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java
 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java
index 88db853b4..6608b044b 100644
--- 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java
+++ 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java
@@ -17,22 +17,20 @@
 
 package org.apache.ignite.internal.pagememory.persistence.checkpoint;
 
-import static java.util.Comparator.comparing;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toList;
-import static java.util.stream.Collectors.toSet;
-import static 
org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointConfigurationSchema.RANDOM_WRITE_ORDER;
+import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.EMPTY;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_TAKEN;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.MARKER_STORED_TO_DISK;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.PAGE_SNAPSHOT_TAKEN;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointTestUtils.newReadWriteLock;
+import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointTestUtils.toListDirtyPageIds;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointWorkflowTest.TestCheckpointListener.AFTER_CHECKPOINT_END;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointWorkflowTest.TestCheckpointListener.BEFORE_CHECKPOINT_BEGIN;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointWorkflowTest.TestCheckpointListener.ON_CHECKPOINT_BEGIN;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointWorkflowTest.TestCheckpointListener.ON_MARK_CHECKPOINT_BEGIN;
-import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.IgniteConcurrentMultiPairQueue.EMPTY;
 import static 
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.empty;
@@ -50,40 +48,33 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
-import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import java.util.stream.IntStream;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.pagememory.DataRegion;
 import org.apache.ignite.internal.pagememory.FullPageId;
-import 
org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointConfiguration;
 import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
-import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.IgniteConcurrentMultiPairQueue.Result;
+import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.CheckpointDirtyPagesView;
+import org.apache.ignite.internal.pagememory.util.PageIdUtils;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteInternalCheckedException;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.ArgumentCaptor;
 
 /**
  * For {@link CheckpointWorkflow} testing.
  */
-@ExtendWith(ConfigurationExtension.class)
 public class CheckpointWorkflowTest {
     private final IgniteLogger log = 
Loggers.forClass(CheckpointWorkflowTest.class);
 
-    @InjectConfiguration
-    private PageMemoryCheckpointConfiguration checkpointConfig;
-
     @Nullable
     private CheckpointWorkflow workflow;
 
@@ -105,7 +96,7 @@ public class CheckpointWorkflowTest {
         DataRegion<PersistentPageMemory> dataRegion2 = () -> pageMemory2;
 
         workflow = new CheckpointWorkflow(
-                checkpointConfig,
+                "test",
                 mock(CheckpointMarkersStorage.class),
                 newReadWriteLock(log),
                 List.of(dataRegion0, dataRegion1)
@@ -181,13 +172,13 @@ public class CheckpointWorkflowTest {
 
         CheckpointReadWriteLock readWriteLock = newReadWriteLock(log);
 
-        List<FullPageId> dirtyPages = List.of(new FullPageId(0, 0), new 
FullPageId(1, 0), new FullPageId(2, 0));
+        List<FullPageId> dirtyPages = List.of(of(0, 0, 0), of(0, 0, 1), of(0, 
0, 2));
 
         PersistentPageMemory pageMemory = newPageMemory(dirtyPages);
 
         DataRegion<PersistentPageMemory> dataRegion = () -> pageMemory;
 
-        workflow = new CheckpointWorkflow(checkpointConfig, markersStorage, 
readWriteLock, List.of(dataRegion));
+        workflow = new CheckpointWorkflow("test", markersStorage, 
readWriteLock, List.of(dataRegion));
 
         workflow.start();
 
@@ -289,17 +280,10 @@ public class CheckpointWorkflowTest {
         verify(tracker, times(1)).onSplitAndSortCheckpointPagesStart();
         verify(tracker, times(1)).onSplitAndSortCheckpointPagesEnd();
 
-        List<IgniteBiTuple<PersistentPageMemory, FullPageId>> pairs = 
collect(checkpoint.dirtyPages);
-
-        assertThat(
-                pairs.stream().map(IgniteBiTuple::getKey).collect(toSet()),
-                equalTo(Set.of(dataRegion.pageMemory()))
-        );
+        CheckpointDirtyPagesView dirtyPagesView = 
checkpoint.dirtyPages.nextView(null);
 
-        assertThat(
-                pairs.stream().map(IgniteBiTuple::getValue).collect(toList()),
-                equalTo(dirtyPages)
-        );
+        assertThat(toListDirtyPageIds(dirtyPagesView), equalTo(dirtyPages));
+        assertThat(dirtyPagesView.pageMemory(), 
equalTo(dataRegion.pageMemory()));
 
         assertThat(
                 events,
@@ -314,72 +298,6 @@ public class CheckpointWorkflowTest {
         verify(markersStorage, times(1)).onCheckpointBegin(checkpointId);
     }
 
-    @Test
-    void testMarkCheckpointBeginRandom() throws Exception {
-        checkpointConfig.writeOrder().update(RANDOM_WRITE_ORDER).get(100, 
TimeUnit.MILLISECONDS);
-
-        List<FullPageId> dirtyPages = List.of(new FullPageId(1, 0), new 
FullPageId(0, 0), new FullPageId(2, 0));
-
-        PersistentPageMemory pageMemory = newPageMemory(dirtyPages);
-
-        DataRegion<PersistentPageMemory> dataRegion = () -> pageMemory;
-
-        workflow = new CheckpointWorkflow(
-                checkpointConfig,
-                mock(CheckpointMarkersStorage.class),
-                newReadWriteLock(log),
-                List.of(dataRegion)
-        );
-
-        workflow.start();
-
-        CheckpointProgressImpl progressImpl = 
mock(CheckpointProgressImpl.class);
-
-        
when(progressImpl.futureFor(MARKER_STORED_TO_DISK)).thenReturn(completedFuture(null));
-
-        Checkpoint checkpoint = workflow.markCheckpointBegin(
-                coarseCurrentTimeMillis(),
-                progressImpl,
-                mock(CheckpointMetricsTracker.class)
-        );
-
-        assertThat(
-                
collect(checkpoint.dirtyPages).stream().map(IgniteBiTuple::getValue).collect(toList()),
-                equalTo(dirtyPages)
-        );
-    }
-
-    @Test
-    void testMarkCheckpointBeginSequential() throws Exception {
-        List<FullPageId> dirtyPages = List.of(new FullPageId(1, 0), new 
FullPageId(0, 0), new FullPageId(2, 0));
-
-        PersistentPageMemory pageMemory = newPageMemory(dirtyPages);
-
-        DataRegion<PersistentPageMemory> dataRegion = () -> pageMemory;
-
-        workflow = new CheckpointWorkflow(
-                checkpointConfig,
-                mock(CheckpointMarkersStorage.class),
-                newReadWriteLock(log),
-                List.of(dataRegion)
-        );
-
-        CheckpointProgressImpl progressImpl = 
mock(CheckpointProgressImpl.class);
-
-        
when(progressImpl.futureFor(MARKER_STORED_TO_DISK)).thenReturn(completedFuture(null));
-
-        Checkpoint checkpoint = workflow.markCheckpointBegin(
-                coarseCurrentTimeMillis(),
-                progressImpl,
-                mock(CheckpointMetricsTracker.class)
-        );
-
-        assertThat(
-                
collect(checkpoint.dirtyPages).stream().map(IgniteBiTuple::getValue).collect(toList()),
-                
equalTo(dirtyPages.stream().sorted(comparing(FullPageId::effectivePageId)).collect(toList()))
-        );
-    }
-
     @Test
     void testMarkCheckpointEnd() throws Exception {
         CheckpointMarkersStorage markersStorage = 
mock(CheckpointMarkersStorage.class);
@@ -390,7 +308,7 @@ public class CheckpointWorkflowTest {
 
         DataRegion<PersistentPageMemory> dataRegion = () -> pageMemory;
 
-        workflow = new CheckpointWorkflow(checkpointConfig, markersStorage, 
readWriteLock, List.of(dataRegion));
+        workflow = new CheckpointWorkflow("test", markersStorage, 
readWriteLock, List.of(dataRegion));
 
         workflow.start();
 
@@ -427,7 +345,7 @@ public class CheckpointWorkflowTest {
         workflow.addCheckpointListener(checkpointListener, dataRegion);
 
         workflow.markCheckpointEnd(new Checkpoint(
-                new IgniteConcurrentMultiPairQueue<>(Map.of(pageMemory, 
List.of(new FullPageId(0, 0)))),
+                new 
CheckpointDirtyPages(List.of(createCheckpointDirtyPages(pageMemory, of(0, 0, 
0)))),
                 progressImpl
         ));
 
@@ -448,21 +366,90 @@ public class CheckpointWorkflowTest {
         verify(markersStorage, times(1)).onCheckpointEnd(checkpointId);
     }
 
-    private List<IgniteBiTuple<PersistentPageMemory, FullPageId>> collect(
-            IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> 
queue
-    ) {
-        List<IgniteBiTuple<PersistentPageMemory, FullPageId>> res = new 
ArrayList<>();
+    @Test
+    void testCreateAndSortCheckpointDirtyPages() throws Exception {
+        IgniteBiTuple<PersistentPageMemory, Collection<FullPageId>> 
dataRegionDirtyPages0 = createDataRegionDirtyPages(
+                mock(PersistentPageMemory.class),
+                of(10, 10, 2), of(10, 10, 1), of(10, 10, 0),
+                of(10, 5, 100), of(10, 5, 99),
+                of(10, 1, 50), of(10, 1, 51), of(10, 1, 99)
+        );
 
-        Result<PersistentPageMemory, FullPageId> result = new Result<>();
+        IgniteBiTuple<PersistentPageMemory, Collection<FullPageId>> 
dataRegionDirtyPages1 = createDataRegionDirtyPages(
+                mock(PersistentPageMemory.class),
+                of(77, 5, 100), of(77, 5, 99),
+                of(88, 1, 51), of(88, 1, 50), of(88, 1, 99),
+                of(66, 33, 0), of(66, 33, 1), of(66, 33, 2)
+        );
 
-        while (queue.next(result)) {
-            res.add(new IgniteBiTuple<>(result.getKey(), result.getValue()));
-        }
+        workflow = new CheckpointWorkflow("test", 
mock(CheckpointMarkersStorage.class), newReadWriteLock(log), List.of());
+
+        workflow.start();
+
+        CheckpointDirtyPages sortCheckpointDirtyPages = 
workflow.createAndSortCheckpointDirtyPages(
+                new DataRegionsDirtyPages(List.of(dataRegionDirtyPages0, 
dataRegionDirtyPages1))
+        );
+
+        CheckpointDirtyPagesView dirtyPagesView = 
sortCheckpointDirtyPages.nextView(null);
+
+        assertThat(toListDirtyPageIds(dirtyPagesView), equalTo(List.of(of(10, 
1, 50), of(10, 1, 51), of(10, 1, 99))));
+        assertThat(dirtyPagesView.pageMemory(), 
equalTo(dataRegionDirtyPages0.getKey()));
+
+        dirtyPagesView = sortCheckpointDirtyPages.nextView(dirtyPagesView);
+
+        assertThat(toListDirtyPageIds(dirtyPagesView), equalTo(List.of(of(10, 
5, 99), of(10, 5, 100))));
+        assertThat(dirtyPagesView.pageMemory(), 
equalTo(dataRegionDirtyPages0.getKey()));
+
+        dirtyPagesView = sortCheckpointDirtyPages.nextView(dirtyPagesView);
+
+        assertThat(toListDirtyPageIds(dirtyPagesView), equalTo(List.of(of(10, 
10, 0), of(10, 10, 1), of(10, 10, 2))));
+        assertThat(dirtyPagesView.pageMemory(), 
equalTo(dataRegionDirtyPages0.getKey()));
+
+        dirtyPagesView = sortCheckpointDirtyPages.nextView(dirtyPagesView);
+
+        assertThat(toListDirtyPageIds(dirtyPagesView), equalTo(List.of(of(66, 
33, 0), of(66, 33, 1), of(66, 33, 2))));
+        assertThat(dirtyPagesView.pageMemory(), 
equalTo(dataRegionDirtyPages1.getKey()));
+
+        dirtyPagesView = sortCheckpointDirtyPages.nextView(dirtyPagesView);
+
+        assertThat(toListDirtyPageIds(dirtyPagesView), equalTo(List.of(of(77, 
5, 99), of(77, 5, 100))));
+        assertThat(dirtyPagesView.pageMemory(), 
equalTo(dataRegionDirtyPages1.getKey()));
 
-        return res;
+        dirtyPagesView = sortCheckpointDirtyPages.nextView(dirtyPagesView);
+
+        assertThat(toListDirtyPageIds(dirtyPagesView), equalTo(List.of(of(88, 
1, 50), of(88, 1, 51), of(88, 1, 99))));
+        assertThat(dirtyPagesView.pageMemory(), 
equalTo(dataRegionDirtyPages1.getKey()));
     }
 
-    private PersistentPageMemory newPageMemory(Collection<FullPageId> 
dirtyPages) {
+    @Test
+    void testParallelSortDirtyPages() throws Exception {
+        int count = CheckpointWorkflow.PARALLEL_SORT_THRESHOLD + 10;
+
+        FullPageId[] dirtyPages0 = IntStream.range(0, count).mapToObj(i -> 
of(0, 0, count - i)).toArray(FullPageId[]::new);
+        FullPageId[] dirtyPages1 = IntStream.range(0, count).mapToObj(i -> 
of(1, 1, i)).toArray(FullPageId[]::new);
+
+        workflow = new CheckpointWorkflow("test", 
mock(CheckpointMarkersStorage.class), newReadWriteLock(log), List.of());
+
+        workflow.start();
+
+        CheckpointDirtyPages sortCheckpointDirtyPages = 
workflow.createAndSortCheckpointDirtyPages(new DataRegionsDirtyPages(List.of(
+                createDataRegionDirtyPages(mock(PersistentPageMemory.class), 
dirtyPages1),
+                createDataRegionDirtyPages(mock(PersistentPageMemory.class), 
dirtyPages0)
+        )));
+
+        CheckpointDirtyPagesView dirtyPagesView = 
sortCheckpointDirtyPages.nextView(null);
+
+        assertThat(toListDirtyPageIds(dirtyPagesView), 
equalTo(List.of(dirtyPages1)));
+
+        dirtyPagesView = sortCheckpointDirtyPages.nextView(dirtyPagesView);
+
+        assertThat(
+                toListDirtyPageIds(dirtyPagesView),
+                equalTo(IntStream.range(0, count).mapToObj(i -> 
dirtyPages0[count - i - 1]).collect(toList()))
+        );
+    }
+
+    private static PersistentPageMemory newPageMemory(Collection<FullPageId> 
dirtyPages) {
         PersistentPageMemory mock = mock(PersistentPageMemory.class);
 
         
when(mock.beginCheckpoint(any(CompletableFuture.class))).thenReturn(dirtyPages);
@@ -470,6 +457,24 @@ public class CheckpointWorkflowTest {
         return mock;
     }
 
+    private static IgniteBiTuple<PersistentPageMemory, List<FullPageId>> 
createCheckpointDirtyPages(
+            PersistentPageMemory pageMemory,
+            FullPageId... dirtyPages
+    ) {
+        return new IgniteBiTuple<>(pageMemory, Arrays.asList(dirtyPages));
+    }
+
+    private static IgniteBiTuple<PersistentPageMemory, Collection<FullPageId>> 
createDataRegionDirtyPages(
+            PersistentPageMemory pageMemory,
+            FullPageId... dirtyPages
+    ) {
+        return new IgniteBiTuple<>(pageMemory, Set.of(dirtyPages));
+    }
+
+    private static FullPageId of(int grpId, int partId, int pageIdx) {
+        return new FullPageId(PageIdUtils.pageId(partId, (byte) 0, pageIdx), 
grpId);
+    }
+
     /**
      * Test listener implementation that simply collects events.
      */
diff --git 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
index 954d5c422..4bb0e09f0 100644
--- 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
+++ 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
@@ -19,9 +19,10 @@ package 
org.apache.ignite.internal.pagememory.persistence.checkpoint;
 
 import static java.lang.System.nanoTime;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.DIRTY_PAGE_COMPARATOR;
+import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.EMPTY;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_TAKEN;
-import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.IgniteConcurrentMultiPairQueue.EMPTY;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -49,8 +50,8 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.Arrays;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
@@ -61,6 +62,7 @@ import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.pagememory.FullPageId;
 import 
org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointConfiguration;
 import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -301,10 +303,10 @@ public class CheckpointerTest {
 
     @Test
     void testDoCheckpoint() throws Exception {
-        IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> 
dirtyPages = dirtyPages(
+        CheckpointDirtyPages dirtyPages = spy(dirtyPages(
                 mock(PersistentPageMemory.class),
                 new FullPageId(0, 0), new FullPageId(1, 0), new FullPageId(2, 
0)
-        );
+        ));
 
         Checkpointer checkpointer = spy(new Checkpointer(
                 log,
@@ -318,8 +320,7 @@ public class CheckpointerTest {
 
         assertDoesNotThrow(checkpointer::doCheckpoint);
 
-        assertTrue(dirtyPages.isEmpty());
-
+        verify(dirtyPages, times(1)).toQueue();
         verify(checkpointer, times(1)).startCheckpointProgress();
 
         
assertEquals(checkpointer.currentProgress().currentCheckpointPagesCount(), 3);
@@ -364,16 +365,13 @@ public class CheckpointerTest {
         );
     }
 
-    private IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> 
dirtyPages(
-            PersistentPageMemory pageMemory,
-            FullPageId... fullPageIds
-    ) {
-        return fullPageIds.length == 0 ? EMPTY : new 
IgniteConcurrentMultiPairQueue<>(Map.of(pageMemory, List.of(fullPageIds)));
+    private CheckpointDirtyPages dirtyPages(PersistentPageMemory pageMemory, 
FullPageId... fullPageIds) {
+        Arrays.sort(fullPageIds, DIRTY_PAGE_COMPARATOR);
+
+        return new CheckpointDirtyPages(List.of(new 
IgniteBiTuple<>(pageMemory, Arrays.asList(fullPageIds))));
     }
 
-    private CheckpointWorkflow createCheckpointWorkflow(
-            IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> 
dirtyPages
-    ) throws Exception {
+    private CheckpointWorkflow createCheckpointWorkflow(CheckpointDirtyPages 
dirtyPages) throws Exception {
         CheckpointWorkflow mock = mock(CheckpointWorkflow.class);
 
         when(mock.markCheckpointBegin(anyLong(), 
any(CheckpointProgressImpl.class), any(CheckpointMetricsTracker.class)))
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/LogicalScanConverterRule.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/LogicalScanConverterRule.java
index 47960005c..07edfd372 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/LogicalScanConverterRule.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/LogicalScanConverterRule.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.sql.engine.rule;
 
+import static org.apache.ignite.internal.util.CollectionUtils.concat;
+
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -85,7 +87,7 @@ public abstract class LogicalScanConverterRule<T extends 
ProjectableFilterableTa
                     Set<CorrelationId> corrIds = 
RexUtils.extractCorrelationIds(rel.condition());
 
                     if (!CollectionUtils.nullOrEmpty(rel.projects())) {
-                        corrIds = new HashSet<>(CollectionUtils.union(corrIds, 
RexUtils.extractCorrelationIds(rel.projects())));
+                        corrIds = new HashSet<>(concat(corrIds, 
RexUtils.extractCorrelationIds(rel.projects())));
                     }
 
                     RelTraitSet traits = 
rel.getCluster().traitSetOf(IgniteConvention.INSTANCE)
@@ -133,7 +135,7 @@ public abstract class LogicalScanConverterRule<T extends 
ProjectableFilterableTa
                     Set<CorrelationId> corrIds = 
RexUtils.extractCorrelationIds(rel.condition());
 
                     if (!CollectionUtils.nullOrEmpty(rel.projects())) {
-                        corrIds = new HashSet<>(CollectionUtils.union(corrIds, 
RexUtils.extractCorrelationIds(rel.projects())));
+                        corrIds = new HashSet<>(concat(corrIds, 
RexUtils.extractCorrelationIds(rel.projects())));
                     }
 
                     RelTraitSet traits = 
cluster.traitSetOf(IgniteConvention.INSTANCE)

Reply via email to