This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new f79bd4f3e IGNITE-17267 Preparing a checkpoint for a DeltaFilePageStore
(#907)
f79bd4f3e is described below
commit f79bd4f3edac372962a3f43e414b8338c596d682
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Thu Jul 7 11:05:54 2022 +0300
IGNITE-17267 Preparing a checkpoint for a DeltaFilePageStore (#907)
---
.../asm/ConfigurationAsmGenerator.java | 5 +-
.../ignite/internal/thread/NamedThreadFactory.java | 1 -
.../ignite/internal/util/CollectionUtils.java | 10 +-
.../util}/IgniteConcurrentMultiPairQueue.java | 2 +-
.../java/org/apache/ignite/lang/IgniteBiTuple.java | 42 ++-
.../ignite/internal/util/CollectionUtilsTest.java | 66 ++--
.../util}/IgniteConcurrentMultiPairQueueTest.java | 6 +-
.../ignite/internal/pagememory/FullPageId.java | 7 +
.../PageMemoryCheckpointConfigurationSchema.java | 19 --
.../persistence/PersistentPageMemory.java | 2 +-
.../persistence/checkpoint/Checkpoint.java | 13 +-
.../checkpoint/CheckpointDirtyPages.java | 358 +++++++++++++++++++++
.../persistence/checkpoint/CheckpointManager.java | 2 +-
.../checkpoint/CheckpointPagesWriter.java | 39 ++-
.../checkpoint/CheckpointPagesWriterFactory.java | 9 +-
.../checkpoint/CheckpointProgressImpl.java | 6 +-
.../persistence/checkpoint/CheckpointWorkflow.java | 161 +++++----
.../checkpoint/CheckpointWriteOrder.java | 51 ---
.../persistence/checkpoint/Checkpointer.java | 11 +-
...sInfoHolder.java => DataRegionsDirtyPages.java} | 16 +-
.../internal/pagememory/util/PageIdUtils.java | 2 +-
.../checkpoint/CheckpointDirtyPagesTest.java | 295 +++++++++++++++++
.../checkpoint/CheckpointPagesWriterTest.java | 11 +-
.../persistence/checkpoint/CheckpointTest.java | 8 +-
.../checkpoint/CheckpointTestUtils.java | 14 +
.../checkpoint/CheckpointWorkflowTest.java | 217 +++++++------
.../persistence/checkpoint/CheckpointerTest.java | 26 +-
.../sql/engine/rule/LogicalScanConverterRule.java | 6 +-
28 files changed, 1001 insertions(+), 404 deletions(-)
diff --git
a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/asm/ConfigurationAsmGenerator.java
b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/asm/ConfigurationAsmGenerator.java
index 590269004..9aaa28669 100644
---
a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/asm/ConfigurationAsmGenerator.java
+++
b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/asm/ConfigurationAsmGenerator.java
@@ -69,7 +69,6 @@ import static
org.apache.ignite.internal.configuration.util.ConfigurationUtil.po
import static
org.apache.ignite.internal.configuration.util.ConfigurationUtil.schemaFields;
import static org.apache.ignite.internal.util.ArrayUtils.nullOrEmpty;
import static org.apache.ignite.internal.util.CollectionUtils.concat;
-import static org.apache.ignite.internal.util.CollectionUtils.union;
import static org.objectweb.asm.Opcodes.H_NEWINVOKESPECIAL;
import static org.objectweb.asm.Type.getMethodDescriptor;
import static org.objectweb.asm.Type.getMethodType;
@@ -1329,7 +1328,7 @@ public class ConfigurationAsmGenerator {
.expression(keyVar)
.defaultCase(throwException(NoSuchElementException.class,
keyVar));
- for (Field schemaField : union(schemaFields, internalFields)) {
+ for (Field schemaField : concat(schemaFields, internalFields)) {
if (isInjectedName(schemaField)) {
continue;
}
@@ -1489,7 +1488,7 @@ public class ConfigurationAsmGenerator {
.expression(keyVar)
.defaultCase(throwException(NoSuchElementException.class,
keyVar));
- for (Field schemaField : union(schemaFields, internalFields)) {
+ for (Field schemaField : concat(schemaFields, internalFields)) {
if (isInjectedName(schemaField)) {
continue;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/thread/NamedThreadFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/thread/NamedThreadFactory.java
index 792b6d285..ec9869cc4 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/thread/NamedThreadFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/thread/NamedThreadFactory.java
@@ -91,5 +91,4 @@ public class NamedThreadFactory implements ThreadFactory {
public static String threadPrefix(String nodeName, String poolName) {
return IgniteThread.threadPrefix(nodeName, poolName);
}
-
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
index ad0372ddb..a5d0ba607 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
@@ -159,14 +159,14 @@ public final class CollectionUtils {
}
/**
- * Union collections.
+ * Concatenates collections.
*
* @param collections Collections.
- * @param <T> Type of the elements of collections.
- * @return Immutable union of collections.
+ * @param <T> Type of the elements of lists.
+ * @return Immutable collection concatenation.
*/
@SafeVarargs
- public static <T> Collection<T> union(Collection<T>... collections) {
+ public static <T> Collection<T> concat(Collection<T>... collections) {
if (collections == null || collections.length == 0) {
return List.of();
}
@@ -175,7 +175,7 @@ public final class CollectionUtils {
/** {@inheritDoc} */
@Override
public Iterator<T> iterator() {
- return concat(collections).iterator();
+ return concat((Iterable<T>[]) collections).iterator();
}
/** {@inheritDoc} */
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/IgniteConcurrentMultiPairQueue.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteConcurrentMultiPairQueue.java
similarity index 98%
rename from
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/IgniteConcurrentMultiPairQueue.java
rename to
modules/core/src/main/java/org/apache/ignite/internal/util/IgniteConcurrentMultiPairQueue.java
index b894305ed..e669817a5 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/IgniteConcurrentMultiPairQueue.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteConcurrentMultiPairQueue.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+package org.apache.ignite.internal.util;
import static java.util.function.Predicate.not;
diff --git
a/modules/core/src/main/java/org/apache/ignite/lang/IgniteBiTuple.java
b/modules/core/src/main/java/org/apache/ignite/lang/IgniteBiTuple.java
index 3c761943c..f73e6f3ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/lang/IgniteBiTuple.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteBiTuple.java
@@ -27,6 +27,8 @@ import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
+import org.apache.ignite.internal.tostring.S;
import org.jetbrains.annotations.Nullable;
/**
@@ -35,16 +37,17 @@ import org.jetbrains.annotations.Nullable;
* @param <V1> First element type.
* @param <V2> Second element type.
*/
-public class IgniteBiTuple<V1, V2> implements Map<V1, V2>, Map.Entry<V1, V2>,
- Iterable<Object>, Externalizable, Cloneable {
+public class IgniteBiTuple<V1, V2> implements Map<V1, V2>, Map.Entry<V1, V2>,
Iterable<Object>, Externalizable, Cloneable {
/** Serial version uid. */
private static final long serialVersionUID = 0L;
/** First value. */
- private V1 val1;
+ @IgniteToStringInclude
+ private @Nullable V1 val1;
/** Second value. */
- private V2 val2;
+ @IgniteToStringInclude
+ private @Nullable V2 val2;
/**
* Empty constructor required by {@link Externalizable}.
@@ -75,19 +78,15 @@ public class IgniteBiTuple<V1, V2> implements Map<V1, V2>,
Map.Entry<V1, V2>,
/**
* Gets first value.
- *
- * @return First value.
*/
- public V1 get1() {
+ public @Nullable V1 get1() {
return val1;
}
/**
* Gets second value.
- *
- * @return Second value.
*/
- public V2 get2() {
+ public @Nullable V2 get2() {
return val2;
}
@@ -121,16 +120,14 @@ public class IgniteBiTuple<V1, V2> implements Map<V1,
V2>, Map.Entry<V1, V2>,
}
/** {@inheritDoc} */
- @Nullable
@Override
- public V1 getKey() {
+ public @Nullable V1 getKey() {
return val1;
}
/** {@inheritDoc} */
- @Nullable
@Override
- public V2 getValue() {
+ public @Nullable V2 getValue() {
return val2;
}
@@ -151,14 +148,15 @@ public class IgniteBiTuple<V1, V2> implements Map<V1,
V2>, Map.Entry<V1, V2>,
/** Next index. */
private int nextIdx = 1;
+ /** {@inheritDoc} */
@Override
public boolean hasNext() {
return nextIdx < 3;
}
- @Nullable
+ /** {@inheritDoc} */
@Override
- public Object next() {
+ public @Nullable Object next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
@@ -176,6 +174,7 @@ public class IgniteBiTuple<V1, V2> implements Map<V1, V2>,
Map.Entry<V1, V2>,
return res;
}
+ /** {@inheritDoc} */
@Override
public void remove() {
throw new UnsupportedOperationException();
@@ -208,16 +207,14 @@ public class IgniteBiTuple<V1, V2> implements Map<V1,
V2>, Map.Entry<V1, V2>,
}
/** {@inheritDoc} */
- @Nullable
@Override
- public V2 get(Object key) {
+ public @Nullable V2 get(Object key) {
return containsKey(key) ? val2 : null;
}
/** {@inheritDoc} */
- @Nullable
@Override
- public V2 put(V1 key, V2 val) {
+ public @Nullable V2 put(V1 key, V2 val) {
V2 old = containsKey(key) ? val2 : null;
set(key, val);
@@ -226,9 +223,8 @@ public class IgniteBiTuple<V1, V2> implements Map<V1, V2>,
Map.Entry<V1, V2>,
}
/** {@inheritDoc} */
- @Nullable
@Override
- public V2 remove(Object key) {
+ public @Nullable V2 remove(Object key) {
if (containsKey(key)) {
V2 v2 = val2;
@@ -347,6 +343,6 @@ public class IgniteBiTuple<V1, V2> implements Map<V1, V2>,
Map.Entry<V1, V2>,
/** {@inheritDoc} */
@Override
public String toString() {
- return "S.toString(IgniteBiTuple.class, this)";
+ return S.toString(IgniteBiTuple.class, this);
}
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/util/CollectionUtilsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/CollectionUtilsTest.java
index f4f18d600..a63c02a76 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/util/CollectionUtilsTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/util/CollectionUtilsTest.java
@@ -25,6 +25,8 @@ import static
org.apache.ignite.internal.util.CollectionUtils.difference;
import static org.apache.ignite.internal.util.CollectionUtils.setOf;
import static org.apache.ignite.internal.util.CollectionUtils.union;
import static org.apache.ignite.internal.util.CollectionUtils.viewReadOnly;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -175,44 +177,44 @@ public class CollectionUtilsTest {
}
@Test
- void testCollectionUnion() {
- assertTrue(union(new Collection[0]).isEmpty());
- assertTrue(union((Collection<Object>[]) null).isEmpty());
- assertTrue(union(List.of()).isEmpty());
-
- assertEquals(List.of(1), collect(union(List.of(1), Set.of())));
- assertEquals(List.of(1), collect(union(List.of(), Set.of(1))));
-
- assertEquals(List.of(1, 2), collect(union(List.of(1), Set.of(2))));
- assertEquals(List.of(1, 2, 2), collect(union(List.of(1), List.of(2),
Set.of(2))));
-
- assertFalse(union(new Collection[0]).contains(0));
- assertFalse(union(List.of()).contains(0));
- assertFalse(union(List.of(1)).contains(0));
- assertFalse(union(List.of(1), Set.of()).contains(0));
- assertFalse(union(List.of(), Set.of(1)).contains(0));
- assertFalse(union(List.of(1), Set.of(2, 3)).contains(0));
-
- assertTrue(union(List.of(0)).contains(0));
- assertTrue(union(List.of(), Set.of(0)).contains(0));
- assertTrue(union(List.of(0), Set.of()).contains(0));
-
- assertEquals(0, union(new Collection[0]).size());
- assertEquals(0, union(List.of()).size());
- assertEquals(1, union(List.of(1)).size());
- assertEquals(1, union(List.of(), Set.of(1)).size());
- assertEquals(1, union(List.of(1), Set.of()).size());
- assertEquals(2, union(List.of(1), Set.of(2)).size());
- assertEquals(3, union(List.of(1), Set.of(2, 3)).size());
- assertEquals(5, union(List.of(1, 4, 5), Set.of(2, 3)).size());
+ void testConcatCollection() {
+ assertTrue(concat(new Collection[0]).isEmpty());
+ assertTrue(concat((Collection<Object>[]) null).isEmpty());
+ assertTrue(concat((Collection<Object>) List.of()).isEmpty());
+
+ assertThat(concat(List.of(1), Set.of()), contains(1));
+ assertThat(concat(List.of(), Set.of(1)), contains(1));
+
+ assertThat(concat(List.of(1), Set.of(2)), contains(1, 2));
+ assertThat(concat(List.of(1), List.of(2), Set.of(2)), contains(1, 2,
2));
+
+ assertFalse(concat(new Collection[0]).contains(0));
+ assertFalse(concat((Collection<Object>) List.of()).contains(0));
+ assertFalse(concat((Collection<Integer>) List.of(1)).contains(0));
+ assertFalse(concat(List.of(1), Set.of()).contains(0));
+ assertFalse(concat(List.of(), Set.of(1)).contains(0));
+ assertFalse(concat(List.of(1), Set.of(2, 3)).contains(0));
+
+ assertTrue(concat((Collection<Integer>) List.of(0)).contains(0));
+ assertTrue(concat(List.of(), Set.of(0)).contains(0));
+ assertTrue(concat(List.of(0), Set.of()).contains(0));
+
+ assertEquals(0, concat(new Collection[0]).size());
+ assertEquals(0, concat((Collection<Object>) List.of()).size());
+ assertEquals(1, concat((Collection<Integer>) List.of(1)).size());
+ assertEquals(1, concat(List.of(), Set.of(1)).size());
+ assertEquals(1, concat(List.of(1), Set.of()).size());
+ assertEquals(2, concat(List.of(1), Set.of(2)).size());
+ assertEquals(3, concat(List.of(1), Set.of(2, 3)).size());
+ assertEquals(5, concat(List.of(1, 4, 5), Set.of(2, 3)).size());
Collection<Integer> integers = new ArrayList<>(List.of(1, 2, 3));
- Collection<Integer> union = union(integers);
+ Collection<Integer> concat = concat(integers);
integers.remove(1);
- assertEquals(2, union.size());
+ assertEquals(2, concat.size());
}
/**
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/IgniteConcurrentMultiPairQueueTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteConcurrentMultiPairQueueTest.java
similarity index 95%
rename from
modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/IgniteConcurrentMultiPairQueueTest.java
rename to
modules/core/src/test/java/org/apache/ignite/internal/util/IgniteConcurrentMultiPairQueueTest.java
index f017c6a8b..8eda56ee2 100644
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/IgniteConcurrentMultiPairQueueTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteConcurrentMultiPairQueueTest.java
@@ -15,12 +15,12 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+package org.apache.ignite.internal.util;
import static java.util.Collections.synchronizedCollection;
import static java.util.concurrent.ThreadLocalRandom.current;
-import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.IgniteConcurrentMultiPairQueue.EMPTY;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.runMultiThreaded;
+import static
org.apache.ignite.internal.util.IgniteConcurrentMultiPairQueue.EMPTY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -33,7 +33,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import
org.apache.ignite.internal.pagememory.persistence.checkpoint.IgniteConcurrentMultiPairQueue.Result;
+import org.apache.ignite.internal.util.IgniteConcurrentMultiPairQueue.Result;
import org.apache.ignite.lang.IgniteBiTuple;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/FullPageId.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/FullPageId.java
index 10d973fc0..5a99889e0 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/FullPageId.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/FullPageId.java
@@ -171,4 +171,11 @@ public final class FullPageId {
.app(", effectivePageId=").appendHex(effectivePageId())
.app(", groupId=").app(groupId).app(']').toString();
}
+
+ /**
+ * Returns the partition ID.
+ */
+ public int partitionId() {
+ return PageIdUtils.partitionId(pageId);
+ }
}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/configuration/schema/PageMemoryCheckpointConfigurationSchema.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/configuration/schema/PageMemoryCheckpointConfigurationSchema.java
index c58792b3d..b80a18a30 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/configuration/schema/PageMemoryCheckpointConfigurationSchema.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/configuration/schema/PageMemoryCheckpointConfigurationSchema.java
@@ -19,21 +19,13 @@ package
org.apache.ignite.internal.pagememory.configuration.schema;
import org.apache.ignite.configuration.annotation.Config;
import org.apache.ignite.configuration.annotation.Value;
-import org.apache.ignite.configuration.validation.OneOf;
import org.apache.ignite.configuration.validation.Range;
-import
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointWriteOrder;
/**
* Checkpoint configuration schema for persistent page memory.
*/
@Config
public class PageMemoryCheckpointConfigurationSchema {
- /** See description of {@link CheckpointWriteOrder#RANDOM}. */
- public static final String RANDOM_WRITE_ORDER = "RANDOM";
-
- /** See description of {@link CheckpointWriteOrder#SEQUENTIAL}. */
- public static final String SEQUENTIAL_WRITE_ORDER = "SEQUENTIAL";
-
/** Checkpoint frequency in milliseconds. */
@Range(min = 0)
@Value(hasDefault = true)
@@ -49,17 +41,6 @@ public class PageMemoryCheckpointConfigurationSchema {
@Value(hasDefault = true)
public int threads = 4;
- /** Checkpoint write order. */
- @OneOf({RANDOM_WRITE_ORDER, SEQUENTIAL_WRITE_ORDER})
- @Value(hasDefault = true)
- public String writeOrder = SEQUENTIAL_WRITE_ORDER;
-
- /**
- * Starting from this number of dirty pages in checkpoint, they will be
sorted in parallel in case of {@link #SEQUENTIAL_WRITE_ORDER}.
- */
- @Value(hasDefault = true)
- public int parallelSortThreshold = 512 * 1024;
-
/** Timeout for checkpoint read lock acquisition in milliseconds. */
@Range(min = 0)
@Value(hasDefault = true)
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
index 71bcd65b8..badf647ae 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
@@ -2095,7 +2095,7 @@ public class PersistentPageMemory implements PageMemory {
safeToUpdate.set(true);
- return CollectionUtils.union(collections);
+ return CollectionUtils.concat(collections);
}
/**
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpoint.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpoint.java
index cb0f07682..1c85e1a85 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpoint.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpoint.java
@@ -17,15 +17,12 @@
package org.apache.ignite.internal.pagememory.persistence.checkpoint;
-import org.apache.ignite.internal.pagememory.FullPageId;
-import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
-
/**
* Data class of checkpoint information.
*/
class Checkpoint {
- /** Checkpoint pages. */
- final IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId>
dirtyPages;
+ /** Sorted dirty pages from data regions that should be checkpointed. */
+ final CheckpointDirtyPages dirtyPages;
/** Checkpoint progress status. */
final CheckpointProgressImpl progress;
@@ -36,17 +33,17 @@ class Checkpoint {
/**
* Constructor.
*
- * @param dirtyPages Pages to write to the page store.
+ * @param dirtyPages Sorted dirty pages from data regions that should be
checkpointed.
* @param progress Checkpoint progress status.
*/
Checkpoint(
- IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId>
dirtyPages,
+ CheckpointDirtyPages dirtyPages,
CheckpointProgressImpl progress
) {
this.dirtyPages = dirtyPages;
this.progress = progress;
- dirtyPagesSize = dirtyPages.initialSize();
+ dirtyPagesSize = dirtyPages.dirtyPagesCount();
}
/**
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java
new file mode 100644
index 000000000..4010a8385
--- /dev/null
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.Collections.binarySearch;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static
org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.RandomAccess;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Sorted dirty pages from data regions that should be checkpointed.
+ *
+ * <p>Dirty pages should be sorted by groupId -> partitionId -> pageIdx.
+ */
+class CheckpointDirtyPages {
+ /** Dirty page ID comparator. */
+ static final Comparator<FullPageId> DIRTY_PAGE_COMPARATOR = Comparator
+ .comparingInt(FullPageId::groupId)
+ .thenComparingLong(FullPageId::effectivePageId);
+
+ /** Empty checkpoint dirty pages. */
+ static final CheckpointDirtyPages EMPTY = new
CheckpointDirtyPages(List.of());
+
+ /** Sorted dirty pages from data regions by groupId -> partitionId ->
pageIdx. */
+ private final List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>>
dirtyPages;
+
+ /** Total number of dirty pages. */
+ private final int dirtyPagesCount;
+
+ /**
+ * Constructor.
+ *
+ * @param dirtyPages Sorted dirty pages from data regions by groupId ->
partitionId -> pageIdx.
+ */
+ public CheckpointDirtyPages(Map<PersistentPageMemory, List<FullPageId>>
dirtyPages) {
+ this(dirtyPages.entrySet().stream().map(e -> new
IgniteBiTuple<>(e.getKey(), e.getValue())).collect(toList()));
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param dirtyPages Sorted dirty pages from data regions by groupId ->
partitionId -> pageIdx.
+ */
+ public CheckpointDirtyPages(List<IgniteBiTuple<PersistentPageMemory,
List<FullPageId>>> dirtyPages) {
+ assert dirtyPages instanceof RandomAccess : dirtyPages;
+
+ this.dirtyPages = dirtyPages;
+
+ int count = 0;
+
+ for (IgniteBiTuple<PersistentPageMemory, List<FullPageId>> pages :
dirtyPages) {
+ assert !pages.getValue().isEmpty() : pages.getKey();
+ assert pages.getValue() instanceof RandomAccess : pages.getValue();
+
+ count += pages.getValue().size();
+ }
+
+ dirtyPagesCount = count;
+ }
+
+ /**
+ * Returns total number of dirty pages.
+ */
+ public int dirtyPagesCount() {
+ return dirtyPagesCount;
+ }
+
+ /**
+ * Returns a queue of dirty pages to be written to a checkpoint.
+ */
+ public CheckpointDirtyPagesQueue toQueue() {
+ return new CheckpointDirtyPagesQueue();
+ }
+
+ /**
+ * Looks for dirty page views for a specific group and partition.
+ *
+ * @param grpId Group ID.
+ * @param partId Partition ID.
+ */
+ public @Nullable CheckpointDirtyPagesView findView(int grpId, int partId) {
+ if (dirtyPages.isEmpty()) {
+ return null;
+ }
+
+ FullPageId startPageId = new FullPageId(pageId(partId, (byte) 0, 0),
grpId);
+ FullPageId endPageId = new FullPageId(pageId(partId + 1, (byte) 0, 0),
grpId);
+
+ for (int i = 0; i < dirtyPages.size(); i++) {
+ List<FullPageId> pageIds = dirtyPages.get(i).getValue();
+
+ int fromIndex = binarySearch(pageIds, startPageId,
DIRTY_PAGE_COMPARATOR);
+
+ fromIndex = fromIndex >= 0 ? fromIndex : Math.min(pageIds.size() -
1, -fromIndex - 1);
+
+ if (!equalsByGroupAndPartition(startPageId,
pageIds.get(fromIndex))) {
+ continue;
+ }
+
+ int toIndex = binarySearch(pageIds.subList(fromIndex,
pageIds.size()), endPageId, DIRTY_PAGE_COMPARATOR);
+
+ // toIndex cannot be 0 because endPageId is greater than
startPageId by DIRTY_PAGE_COMPARATOR.
+ toIndex = toIndex > 0 ? toIndex - 1 : -toIndex - 2;
+
+ return new CheckpointDirtyPagesView(i, fromIndex, fromIndex +
toIndex);
+ }
+
+ return null;
+ }
+
+ /**
+ * Looks for the next dirty page view from the current one, {@code null}
if not found.
+ *
+ * @param currentView Current view to dirty pages, {@code null} to get
first.
+ */
+ public @Nullable CheckpointDirtyPagesView nextView(@Nullable
CheckpointDirtyPagesView currentView) {
+ assert currentView == null || currentView.owner() == this :
currentView;
+
+ if (dirtyPages.isEmpty()) {
+ return null;
+ }
+
+ int regionIndex;
+ int fromPosition;
+
+ if (currentView == null) {
+ regionIndex = 0;
+ fromPosition = 0;
+ } else {
+ regionIndex = currentView.isToPositionLast() ?
currentView.regionIndex + 1 : currentView.regionIndex;
+ fromPosition = currentView.isToPositionLast() ? 0 :
currentView.toPosition + 1;
+ }
+
+ if (regionIndex >= dirtyPages.size()) {
+ return null;
+ }
+
+ List<FullPageId> pageIds = dirtyPages.get(regionIndex).getValue();
+
+ if (fromPosition == pageIds.size() - 1 ||
!equalsByGroupAndPartition(pageIds.get(fromPosition), pageIds.get(fromPosition
+ 1))) {
+ return new CheckpointDirtyPagesView(regionIndex, fromPosition,
fromPosition);
+ }
+
+ FullPageId startPageId = pageIds.get(fromPosition);
+ FullPageId endPageId = new
FullPageId(pageId(partitionId(startPageId.pageId()) + 1, (byte) 0, 0),
startPageId.groupId());
+
+ int toPosition = binarySearch(pageIds.subList(fromPosition,
pageIds.size()), endPageId, DIRTY_PAGE_COMPARATOR);
+
+ toPosition = toPosition > 0 ? toPosition - 1 : -toPosition - 2;
+
+ return new CheckpointDirtyPagesView(regionIndex, fromPosition,
fromPosition + toPosition);
+ }
+
+ /**
+ * Queue of dirty pages that will need to be written to a checkpoint.
+ *
+ * <p>Thread safe.
+ */
+ class CheckpointDirtyPagesQueue {
+ /** Current position in the queue. */
+ private final AtomicInteger position = new AtomicInteger();
+
+ /** Sizes of each element in {@link #dirtyPages} + the previous value
in this array. */
+ private final int[] cumulativeSizes;
+
+ /**
+ * Private constructor.
+ */
+ private CheckpointDirtyPagesQueue() {
+ int size = 0;
+
+ int[] sizes = new int[dirtyPages.size()];
+
+ for (int i = 0; i < dirtyPages.size(); i++) {
+ sizes[i] = size += dirtyPages.get(i).getValue().size();
+ }
+
+ this.cumulativeSizes = sizes;
+ }
+
+ /**
+ * Returns {@link true} if the next element of the queue was obtained.
+ *
+ * @param result Holder is the result of getting the next dirty page.
+ */
+ public boolean next(QueueResult result) {
+ int queuePosition = this.position.getAndIncrement();
+
+ if (queuePosition >= dirtyPagesCount) {
+ result.owner = null;
+
+ return false;
+ }
+
+ if (result.owner != this) {
+ result.owner = this;
+ result.regionIndex = 0;
+ }
+
+ int regionIndex = result.regionIndex;
+
+ if (queuePosition >= cumulativeSizes[regionIndex]) {
+ if (queuePosition == cumulativeSizes[regionIndex]) {
+ regionIndex++;
+ } else {
+ regionIndex = findDirtyPagesIndex(regionIndex + 1,
queuePosition);
+ }
+ }
+
+ result.regionIndex = regionIndex;
+ result.position = regionIndex > 0 ? queuePosition -
cumulativeSizes[regionIndex - 1] : queuePosition;
+
+ return true;
+ }
+
+ /**
+ * Returns {@link true} if the queue is empty.
+ */
+ public boolean isEmpty() {
+ return position.get() >= dirtyPagesCount;
+ }
+
+ /**
+ * Returns the size of the queue.
+ */
+ public int size() {
+ return dirtyPagesCount - Math.min(dirtyPagesCount, position.get());
+ }
+
+ private int findDirtyPagesIndex(int index, int position) {
+ return Math.abs(Arrays.binarySearch(cumulativeSizes, index,
cumulativeSizes.length, position) + 1);
+ }
+
+ private CheckpointDirtyPages owner() {
+ return CheckpointDirtyPages.this;
+ }
+ }
+
+ /**
+ * View of {@link CheckpointDirtyPages} in which all dirty pages will
refer to the same {@link PersistentPageMemory} and contain the
+ * same groupId and partitionId and increasing pageIdx.
+ *
+ * <p>Thread safe.
+ */
+ class CheckpointDirtyPagesView {
+ /** Element index in {@link CheckpointDirtyPages#dirtyPages}. */
+ private final int regionIndex;
+
+ /** Starting position (inclusive) of the dirty page within the element
at {@link #regionIndex}. */
+ private final int fromPosition;
+
+ /** End position (inclusive) of the dirty page within the element at
{@link #regionIndex}. */
+ private final int toPosition;
+
+ /**
+ * Private constructor.
+ *
+ * @param regionIndex Element index in {@link
CheckpointDirtyPages#dirtyPages}.
+ * @param fromPosition Starting position (inclusive) of the dirty page
within the element at {@link #regionIndex}.
+ * @param toPosition End position (inclusive) of the dirty page within
the element at {@link #regionIndex}.
+ */
+ private CheckpointDirtyPagesView(int regionIndex, int fromPosition,
int toPosition) {
+ this.regionIndex = regionIndex;
+ this.fromPosition = fromPosition;
+ this.toPosition = toPosition;
+ }
+
+ /**
+ * Returns the dirty page by index.
+ *
+ * @param index Dirty page index.
+ */
+ public FullPageId get(int index) {
+ return
dirtyPages.get(this.regionIndex).getValue().get(fromPosition + index);
+ }
+
+ /**
+ * Returns the page memory for view.
+ */
+ public PersistentPageMemory pageMemory() {
+ return dirtyPages.get(regionIndex).getKey();
+ }
+
+ /**
+ * Returns the size of the view.
+ */
+ public int size() {
+ return toPosition - fromPosition + 1;
+ }
+
+ private CheckpointDirtyPages owner() {
+ return CheckpointDirtyPages.this;
+ }
+
+ private boolean isToPositionLast() {
+ return toPosition == dirtyPages.get(regionIndex).getValue().size()
- 1;
+ }
+ }
+
+ /**
+ * Holder is the result of getting the next dirty page in {@link
CheckpointDirtyPagesQueue#next(QueueResult)}.
+ *
+ * <p>Not thread safe.
+ */
+ static class QueueResult {
+ private @Nullable CheckpointDirtyPagesQueue owner;
+
+ /** Element index in {@link CheckpointDirtyPages#dirtyPages}. */
+ private int regionIndex;
+
+ /** Position of the dirty page within the element at {@link
#regionIndex}. */
+ private int position;
+
+ /**
+ * Returns the page memory for the associated dirty page.
+ */
+ public @Nullable PersistentPageMemory pageMemory() {
+ return owner == null ? null :
owner.owner().dirtyPages.get(regionIndex).getKey();
+ }
+
+ /**
+ * Returns dirty page.
+ */
+ public @Nullable FullPageId dirtyPage() {
+ return owner == null ? null :
owner.owner().dirtyPages.get(regionIndex).getValue().get(position);
+ }
+ }
+
+ private static boolean equalsByGroupAndPartition(FullPageId pageId0,
FullPageId pageId1) {
+ return pageId0.groupId() == pageId1.groupId() && pageId0.partitionId()
== pageId1.partitionId();
+ }
+}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
index c9689cb3a..80d643804 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
@@ -102,7 +102,7 @@ public class CheckpointManager {
checkpointMarkersStorage = new CheckpointMarkersStorage(storagePath);
checkpointWorkflow = new CheckpointWorkflow(
- checkpointConfig,
+ igniteInstanceName,
checkpointMarkersStorage,
checkpointReadWriteLock,
dataRegions
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
index 8bfc84e93..ac8fe684a 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
@@ -21,7 +21,7 @@ import static
org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA;
import static org.apache.ignite.internal.pagememory.io.PageIo.getType;
import static org.apache.ignite.internal.pagememory.io.PageIo.getVersion;
import static
org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory.TRY_AGAIN_TAG;
-import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.IgniteConcurrentMultiPairQueue.EMPTY;
+import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.EMPTY;
import static org.apache.ignite.internal.pagememory.util.PageIdUtils.flag;
import static org.apache.ignite.internal.util.IgniteUtils.hexLong;
@@ -38,6 +38,8 @@ import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.pagememory.FullPageId;
import org.apache.ignite.internal.pagememory.persistence.PageStoreWriter;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.CheckpointDirtyPagesQueue;
+import
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.QueueResult;
import org.apache.ignite.internal.pagememory.persistence.store.PageStore;
import org.apache.ignite.lang.IgniteInternalCheckedException;
@@ -51,8 +53,13 @@ public class CheckpointPagesWriter implements Runnable {
/** Checkpoint specific metrics tracker. */
private final CheckpointMetricsTracker tracker;
- /** Collection of page IDs to write under this task. Overall pages to
write may be greater than this collection. */
- private final IgniteConcurrentMultiPairQueue<PersistentPageMemory,
FullPageId> writePageIds;
+ /**
+ * Queue of dirty page IDs to write under this task.
+ *
+ * <p>Overall pages to write may be greater than this queue, since it may
be necessary to retire write some pages due to unsuccessful
+ * page write lock acquisition
+ */
+ private final CheckpointDirtyPagesQueue writePageIds;
/** Page store used to write -> Count of written pages. */
private final ConcurrentMap<PageStore, LongAdder> updStores;
@@ -79,7 +86,7 @@ public class CheckpointPagesWriter implements Runnable {
* Creates task for write pages.
*
* @param tracker Checkpoint metrics tracker.
- * @param writePageIds Collection of page IDs to write.
+ * @param writePageIds Queue of dirty page IDs to write.
* @param updStores Updating storage.
* @param doneFut Done future.
* @param beforePageWrite Action to be performed before every page write.
@@ -92,7 +99,7 @@ public class CheckpointPagesWriter implements Runnable {
CheckpointPagesWriter(
IgniteLogger log,
CheckpointMetricsTracker tracker,
- IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId>
writePageIds,
+ CheckpointDirtyPagesQueue writePageIds,
ConcurrentMap<PageStore, LongAdder> updStores,
CompletableFuture<?> doneFut,
Runnable beforePageWrite,
@@ -117,13 +124,13 @@ public class CheckpointPagesWriter implements Runnable {
@Override
public void run() {
try {
- IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId>
pagesToRetry = writePages(writePageIds);
+ CheckpointDirtyPagesQueue pagesToRetry = writePages(writePageIds);
if (pagesToRetry.isEmpty()) {
doneFut.complete(null);
} else {
if (log.isInfoEnabled()) {
- log.info(pagesToRetry.initialSize() + " checkpoint pages
were not written yet due to "
+ log.info(pagesToRetry.size() + " checkpoint pages were not
written yet due to "
+ "unsuccessful page write lock acquisition and
will be retried");
}
@@ -139,28 +146,26 @@ public class CheckpointPagesWriter implements Runnable {
}
/**
- * Writes pages.
+ * Writes dirty pages.
*
- * @param writePageIds Collections of pages to write.
- * @return pagesToRetry Pages which should be retried.
+ * @param writePageIds Queue of dirty pages to write.
+ * @return pagesToRetry Queue dirty pages which should be retried.
*/
- private IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId>
writePages(
- IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId>
writePageIds
- ) throws IgniteInternalCheckedException {
+ private CheckpointDirtyPagesQueue writePages(CheckpointDirtyPagesQueue
writePageIds) throws IgniteInternalCheckedException {
Map<PersistentPageMemory, List<FullPageId>> pagesToRetry = new
HashMap<>();
Map<PersistentPageMemory, PageStoreWriter> pageStoreWriters = new
HashMap<>();
ByteBuffer tmpWriteBuf = threadBuf.get();
- IgniteConcurrentMultiPairQueue.Result<PersistentPageMemory,
FullPageId> res = new IgniteConcurrentMultiPairQueue.Result<>();
+ QueueResult res = new QueueResult();
while (!shutdownNow.getAsBoolean() && writePageIds.next(res)) {
beforePageWrite.run();
- FullPageId fullId = res.getValue();
+ FullPageId fullId = res.dirtyPage();
- PersistentPageMemory pageMemory = res.getKey();
+ PersistentPageMemory pageMemory = res.pageMemory();
tmpWriteBuf.rewind();
@@ -169,7 +174,7 @@ public class CheckpointPagesWriter implements Runnable {
pageMemory.checkpointWritePage(fullId, tmpWriteBuf,
pageStoreWriter, tracker);
}
- return pagesToRetry.isEmpty() ? EMPTY : new
IgniteConcurrentMultiPairQueue<>(pagesToRetry);
+ return pagesToRetry.isEmpty() ? EMPTY.toQueue() : new
CheckpointDirtyPages(pagesToRetry).toQueue();
}
/**
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java
index b0f8ac70c..fc0e6847c 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java
@@ -24,8 +24,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BooleanSupplier;
import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.pagememory.FullPageId;
-import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.CheckpointDirtyPagesQueue;
import org.apache.ignite.internal.pagememory.persistence.store.PageStore;
/**
@@ -72,7 +71,7 @@ public class CheckpointPagesWriterFactory {
* Returns instance of page checkpoint writer.
*
* @param tracker Checkpoint metrics tracker.
- * @param cpPages List of pages to write.
+ * @param checkpointDirtyPagesQueue Checkpoint dirty pages queue to write.
* @param updStores Updated page store storage.
* @param doneWriteFut Write done future.
* @param beforePageWrite Before page write callback.
@@ -81,7 +80,7 @@ public class CheckpointPagesWriterFactory {
*/
CheckpointPagesWriter build(
CheckpointMetricsTracker tracker,
- IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId>
cpPages,
+ CheckpointDirtyPagesQueue checkpointDirtyPagesQueue,
ConcurrentMap<PageStore, LongAdder> updStores,
CompletableFuture<?> doneWriteFut,
Runnable beforePageWrite,
@@ -92,7 +91,7 @@ public class CheckpointPagesWriterFactory {
return new CheckpointPagesWriter(
log,
tracker,
- cpPages,
+ checkpointDirtyPagesQueue,
updStores,
doneWriteFut,
beforePageWrite,
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java
index 39617997b..4e8ae1013 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java
@@ -178,10 +178,10 @@ class CheckpointProgressImpl implements
CheckpointProgress {
/**
* Initialize all counters before checkpoint.
*
- * @param pagesSize Number of dirty pages in current checkpoint at the
beginning of checkpoint.
+ * @param checkpointPages Number of dirty pages in current checkpoint at
the beginning of checkpoint.
*/
- public void initCounters(int pagesSize) {
- currCheckpointPagesCnt = pagesSize;
+ public void initCounters(int checkpointPages) {
+ currCheckpointPagesCnt = checkpointPages;
writtenPagesCntr.set(0);
syncedPagesCntr.set(0);
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
index e792f904f..9beaf009f 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
@@ -17,33 +17,32 @@
package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toUnmodifiableList;
+import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.DIRTY_PAGE_COMPARATOR;
+import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.EMPTY;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_TAKEN;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.MARKER_STORED_TO_DISK;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.PAGE_SNAPSHOT_TAKEN;
-import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointWriteOrder.SEQUENTIAL;
-import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.IgniteConcurrentMultiPairQueue.EMPTY;
+import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Comparator;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.ForkJoinWorkerThread;
-import java.util.concurrent.Future;
import org.apache.ignite.internal.pagememory.DataRegion;
import org.apache.ignite.internal.pagememory.FullPageId;
-import
org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointConfiguration;
-import
org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointView;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.jetbrains.annotations.Nullable;
@@ -64,13 +63,12 @@ import org.jetbrains.annotations.Nullable;
*/
class CheckpointWorkflow {
/**
- * Starting from this number of dirty pages in checkpoint, array will be
sorted with {@link Arrays#parallelSort(Comparable[])} in case
- * of {@link CheckpointWriteOrder#SEQUENTIAL}.
+ * Starting from this number of dirty pages in checkpoint, array will be
sorted with {@link Arrays#parallelSort(Comparable[])}.
+ *
+ * <p>See <a
href="https://www.researchgate.net/publication/331742843_Threshold_Analysis_and_Comparison_of_Sequential_and_Parallel_Divide_and_Conquer_Sorting_Algorithms">threshold
+ * for parallel sort.</a>
*/
- private final int parallelSortThreshold;
-
- /** This number of threads will be created and used for parallel sorting.
*/
- private static final int PARALLEL_SORT_THREADS =
Math.min(Runtime.getRuntime().availableProcessors(), 8);
+ static final int PARALLEL_SORT_THRESHOLD = 40_000;
/** Checkpoint marker storage. */
private final CheckpointMarkersStorage checkpointMarkersStorage;
@@ -81,33 +79,42 @@ class CheckpointWorkflow {
/** Persistent data regions for the checkpointing. */
private final Collection<? extends DataRegion<PersistentPageMemory>>
dataRegions;
- /** Checkpoint write order configuration. */
- private final CheckpointWriteOrder checkpointWriteOrder;
-
/** Collections of checkpoint listeners. */
private final List<IgniteBiTuple<CheckpointListener,
DataRegion<PersistentPageMemory>>> listeners = new CopyOnWriteArrayList<>();
+ /** Thread pool for sorting dirty pages in parallel if their count is >=
{@link #PARALLEL_SORT_THRESHOLD}. */
+ private final ForkJoinPool parallelSortThreadPool;
+
/**
* Constructor.
*
- * @param checkpointConfig Checkpoint configuration.
+ * @param igniteInstanceName Ignite instance name.
* @param checkpointMarkersStorage Checkpoint marker storage.
* @param checkpointReadWriteLock Checkpoint read write lock.
* @param dataRegions Persistent data regions for the checkpointing,
doesn't copy.
*/
public CheckpointWorkflow(
- PageMemoryCheckpointConfiguration checkpointConfig,
+ String igniteInstanceName,
CheckpointMarkersStorage checkpointMarkersStorage,
CheckpointReadWriteLock checkpointReadWriteLock,
Collection<? extends DataRegion<PersistentPageMemory>> dataRegions
) {
- PageMemoryCheckpointView checkpointConfigView =
checkpointConfig.value();
-
this.checkpointMarkersStorage = checkpointMarkersStorage;
this.checkpointReadWriteLock = checkpointReadWriteLock;
- this.checkpointWriteOrder =
CheckpointWriteOrder.valueOf(checkpointConfigView.writeOrder());
- this.parallelSortThreshold =
checkpointConfigView.parallelSortThreshold();
this.dataRegions = dataRegions;
+
+ parallelSortThreadPool = new ForkJoinPool(
+ Math.min(Runtime.getRuntime().availableProcessors(), 8) + 1,
+ pool -> {
+ ForkJoinWorkerThread worker =
ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
+
+
worker.setName(NamedThreadFactory.threadPrefix(igniteInstanceName,
"checkpoint-pages-sorter") + worker.getPoolIndex());
+
+ return worker;
+ },
+ null,
+ false
+ );
}
/**
@@ -122,6 +129,8 @@ class CheckpointWorkflow {
*/
public void stop() {
listeners.clear();
+
+ shutdownAndAwaitTermination(parallelSortThreadPool, 10, SECONDS);
}
/**
@@ -154,7 +163,7 @@ class CheckpointWorkflow {
checkpointReadWriteLock.writeLock();
- CheckpointDirtyPagesInfoHolder dirtyPages;
+ DataRegionsDirtyPages dirtyPages;
try {
curr.transitTo(LOCK_TAKEN);
@@ -167,7 +176,7 @@ class CheckpointWorkflow {
tracker.onMarkCheckpointBeginEnd();
- // There are allowable to replace pages only after checkpoint
entry was stored to disk.
+ // There are allowable to replace pages only after checkpoint
marker was stored to disk.
dirtyPages = beginCheckpoint(dataRegions,
curr.futureFor(MARKER_STORED_TO_DISK));
curr.currentCheckpointPagesCount(dirtyPages.dirtyPageCount);
@@ -192,11 +201,11 @@ class CheckpointWorkflow {
tracker.onSplitAndSortCheckpointPagesStart();
- IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId>
dirtyPages0 = splitAndSortCheckpointPagesIfNeeded(dirtyPages);
+ CheckpointDirtyPages checkpointPages =
createAndSortCheckpointDirtyPages(dirtyPages);
tracker.onSplitAndSortCheckpointPagesEnd();
- return new Checkpoint(dirtyPages0, curr);
+ return new Checkpoint(checkpointPages, curr);
}
return new Checkpoint(EMPTY, curr);
@@ -267,99 +276,77 @@ class CheckpointWorkflow {
.collect(toUnmodifiableList());
}
- private CheckpointDirtyPagesInfoHolder beginCheckpoint(
+ private DataRegionsDirtyPages beginCheckpoint(
Collection<? extends DataRegion<PersistentPageMemory>> dataRegions,
CompletableFuture<?> allowToReplace
) {
Collection<IgniteBiTuple<PersistentPageMemory,
Collection<FullPageId>>> pages = new ArrayList<>(dataRegions.size());
- int pageCount = 0;
-
for (DataRegion<PersistentPageMemory> dataRegion : dataRegions) {
Collection<FullPageId> dirtyPages =
dataRegion.pageMemory().beginCheckpoint(allowToReplace);
- pageCount += dirtyPages.size();
-
pages.add(new IgniteBiTuple<>(dataRegion.pageMemory(),
dirtyPages));
}
- return new CheckpointDirtyPagesInfoHolder(pages, pageCount);
- }
-
- private static ForkJoinPool parallelSortInIsolatedPool(
- FullPageId[] pagesArr,
- Comparator<FullPageId> cmp,
- @Nullable ForkJoinPool pool
- ) throws IgniteInternalCheckedException {
- ForkJoinPool.ForkJoinWorkerThreadFactory factory = pool1 -> {
- ForkJoinWorkerThread worker =
ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool1);
-
- worker.setName("checkpoint-pages-sorter-" + worker.getPoolIndex());
-
- return worker;
- };
-
- ForkJoinPool execPool = pool == null ? new
ForkJoinPool(PARALLEL_SORT_THREADS + 1, factory, null, false) : pool;
-
- Future<?> sortTask = execPool.submit(() ->
Arrays.parallelSort(pagesArr, cmp));
-
- try {
- sortTask.get();
- } catch (ExecutionException | InterruptedException e) {
- throw new IgniteInternalCheckedException(
- "Failed to perform pages array parallel sort",
- e instanceof ExecutionException ? e.getCause() : e
- );
- }
-
- return execPool;
+ return new DataRegionsDirtyPages(pages);
}
- private IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId>
splitAndSortCheckpointPagesIfNeeded(
- CheckpointDirtyPagesInfoHolder dirtyPages
+ CheckpointDirtyPages createAndSortCheckpointDirtyPages(
+ DataRegionsDirtyPages dataRegionsDirtyPages
) throws IgniteInternalCheckedException {
- Set<IgniteBiTuple<PersistentPageMemory, FullPageId[]>>
cpPagesPerRegion = new HashSet<>();
+ List<IgniteBiTuple<PersistentPageMemory, FullPageId[]>>
checkpointPages = new ArrayList<>();
int realPagesArrSize = 0;
- for (IgniteBiTuple<PersistentPageMemory, Collection<FullPageId>>
regPages : dirtyPages.dirtyPages) {
- FullPageId[] pages = new FullPageId[regPages.getValue().size()];
+ for (IgniteBiTuple<PersistentPageMemory, Collection<FullPageId>>
regionDirtyPages : dataRegionsDirtyPages.dirtyPages) {
+ FullPageId[] checkpointRegionDirtyPages = new
FullPageId[regionDirtyPages.getValue().size()];
int pagePos = 0;
- for (FullPageId dirtyPage : regPages.getValue()) {
- assert realPagesArrSize++ != dirtyPages.dirtyPageCount :
- "Incorrect estimated dirty pages number: " +
dirtyPages.dirtyPageCount;
+ for (FullPageId dirtyPage : regionDirtyPages.getValue()) {
+ assert realPagesArrSize++ !=
dataRegionsDirtyPages.dirtyPageCount :
+ "Incorrect estimated dirty pages number: " +
dataRegionsDirtyPages.dirtyPageCount;
- pages[pagePos++] = dirtyPage;
+ checkpointRegionDirtyPages[pagePos++] = dirtyPage;
}
// Some pages may have been already replaced.
- if (pagePos != pages.length) {
- cpPagesPerRegion.add(new IgniteBiTuple<>(regPages.getKey(),
Arrays.copyOf(pages, pagePos)));
+ if (pagePos == 0) {
+ continue;
+ } else if (pagePos != checkpointRegionDirtyPages.length) {
+ checkpointPages.add(new
IgniteBiTuple<>(regionDirtyPages.getKey(),
Arrays.copyOf(checkpointRegionDirtyPages, pagePos)));
} else {
- cpPagesPerRegion.add(new IgniteBiTuple<>(regPages.getKey(),
pages));
+ checkpointPages.add(new
IgniteBiTuple<>(regionDirtyPages.getKey(), checkpointRegionDirtyPages));
}
}
- if (checkpointWriteOrder == SEQUENTIAL) {
- Comparator<FullPageId> cmp =
Comparator.comparingInt(FullPageId::groupId).thenComparingLong(FullPageId::effectivePageId);
-
- ForkJoinPool pool = null;
+ List<ForkJoinTask<?>> parallelSortTasks = checkpointPages.stream()
+ .map(IgniteBiTuple::getValue)
+ .filter(pages -> pages.length >= PARALLEL_SORT_THRESHOLD)
+ .map(pages -> parallelSortThreadPool.submit(() ->
Arrays.parallelSort(pages, DIRTY_PAGE_COMPARATOR)))
+ .collect(toList());
- for (IgniteBiTuple<PersistentPageMemory, FullPageId[]> pagesPerReg
: cpPagesPerRegion) {
- if (pagesPerReg.getValue().length >= parallelSortThreshold) {
- pool = parallelSortInIsolatedPool(pagesPerReg.get2(), cmp,
pool);
- } else {
- Arrays.sort(pagesPerReg.get2(), cmp);
- }
+ for (IgniteBiTuple<PersistentPageMemory, FullPageId[]> regionPages :
checkpointPages) {
+ if (regionPages.getValue().length < PARALLEL_SORT_THRESHOLD) {
+ Arrays.sort(regionPages.getValue(), DIRTY_PAGE_COMPARATOR);
}
+ }
- if (pool != null) {
- pool.shutdown();
+ for (ForkJoinTask<?> parallelSortTask : parallelSortTasks) {
+ try {
+ parallelSortTask.get();
+ } catch (ExecutionException | InterruptedException e) {
+ throw new IgniteInternalCheckedException(
+ "Failed to perform pages array parallel sort",
+ e instanceof ExecutionException ? e.getCause() : e
+ );
}
}
- return new IgniteConcurrentMultiPairQueue<>(cpPagesPerRegion);
+ return new CheckpointDirtyPages(
+ checkpointPages.stream()
+ .map(tuple -> new IgniteBiTuple<>(tuple.getKey(),
Arrays.asList(tuple.getValue())))
+ .collect(toList())
+ );
}
}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWriteOrder.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWriteOrder.java
deleted file mode 100644
index 4b48aa2e4..000000000
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWriteOrder.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.pagememory.persistence.checkpoint;
-
-import org.jetbrains.annotations.Nullable;
-
-/**
- * This enum defines order of writing pages to disk storage during checkpoint.
- */
-public enum CheckpointWriteOrder {
- /**
- * Pages are written in order provided by checkpoint pages collection
iterator (which is basically a hashtable).
- */
- RANDOM,
-
- /**
- * All checkpoint pages are collected into single list and sorted by page
index. Provides almost sequential disk writes, which can be
- * much faster on some SSD models.
- */
- SEQUENTIAL;
-
- /**
- * Enumerated values.
- */
- private static final CheckpointWriteOrder[] VALS = values();
-
- /**
- * Efficiently gets enumerated value from its ordinal.
- *
- * @param ord Ordinal value.
- * @return Enumerated value or {@code null} if ordinal out of range.
- */
- public static @Nullable CheckpointWriteOrder fromOrdinal(int ord) {
- return ord >= 0 && ord < VALS.length ? VALS[ord] : null;
- }
-}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
index fa07f37ce..f2f0af386 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
@@ -41,10 +41,9 @@ import java.util.concurrent.atomic.LongAdder;
import java.util.function.BooleanSupplier;
import org.apache.ignite.internal.components.LongJvmPauseDetector;
import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.pagememory.FullPageId;
import
org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointConfiguration;
import
org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointView;
-import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.CheckpointDirtyPagesQueue;
import org.apache.ignite.internal.pagememory.persistence.store.PageStore;
import org.apache.ignite.internal.thread.IgniteThread;
import org.apache.ignite.internal.thread.NamedThreadFactory;
@@ -355,7 +354,7 @@ public class Checkpointer extends IgniteWorker {
* Writes dirty pages to the appropriate stores.
*
* @param tracker Checkpoint metrics tracker.
- * @param checkpointPages Checkpoint pages to write.
+ * @param checkpointDirtyPages Checkpoint dirty pages to write.
* @param currentCheckpointProgress Current checkpoint progress.
* @param workProgressDispatcher Work progress dispatcher.
* @param shutdownNow Checker of stop operation.
@@ -363,7 +362,7 @@ public class Checkpointer extends IgniteWorker {
*/
boolean writePages(
CheckpointMetricsTracker tracker,
- IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId>
checkpointPages,
+ CheckpointDirtyPages checkpointDirtyPages,
CheckpointProgressImpl currentCheckpointProgress,
WorkProgressDispatcher workProgressDispatcher,
BooleanSupplier shutdownNow
@@ -379,10 +378,12 @@ public class Checkpointer extends IgniteWorker {
tracker.onPagesWriteStart();
+ CheckpointDirtyPagesQueue checkpointDirtyPagesQueue =
checkpointDirtyPages.toQueue();
+
for (int i = 0; i < checkpointWritePageThreads; i++) {
CheckpointPagesWriter write = checkpointPagesWriterFactory.build(
tracker,
- checkpointPages,
+ checkpointDirtyPagesQueue,
updStores,
futures[i] = new CompletableFuture<>(),
workProgressDispatcher::updateHeartbeat,
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPagesInfoHolder.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/DataRegionsDirtyPages.java
similarity index 76%
rename from
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPagesInfoHolder.java
rename to
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/DataRegionsDirtyPages.java
index 2294f825d..5b71f9685 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPagesInfoHolder.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/DataRegionsDirtyPages.java
@@ -24,9 +24,13 @@ import org.apache.ignite.lang.IgniteBiTuple;
/**
* Holder of information about dirty pages by {@link PersistentPageMemory} for
checkpoint.
+ *
+ * <p>Due to page replacements, the total number of pages may change when they
are written to disk.
+ *
+ * <p>Total number of dirty pages can only decrease due to page replacement,
but should not increase.
*/
-class CheckpointDirtyPagesInfoHolder {
- /** Total number of dirty pages. */
+class DataRegionsDirtyPages {
+ /** Total number of dirty pages for all data regions at the time of data
collection. */
final int dirtyPageCount;
/** Collection of dirty pages per {@link PersistentPageMemory}
distribution. */
@@ -36,13 +40,11 @@ class CheckpointDirtyPagesInfoHolder {
* Constructor.
*
* @param dirtyPages Collection of dirty pages per {@link
PersistentPageMemory} distribution.
- * @param dirtyPageCount Total number of dirty pages.
*/
- public CheckpointDirtyPagesInfoHolder(
- Collection<IgniteBiTuple<PersistentPageMemory,
Collection<FullPageId>>> dirtyPages,
- int dirtyPageCount
+ public DataRegionsDirtyPages(
+ Collection<IgniteBiTuple<PersistentPageMemory,
Collection<FullPageId>>> dirtyPages
) {
this.dirtyPages = dirtyPages;
- this.dirtyPageCount = dirtyPageCount;
+ this.dirtyPageCount = dirtyPages.stream().mapToInt(tuple ->
tuple.getValue().size()).sum();
}
}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageIdUtils.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageIdUtils.java
index 1fb705873..9712ad3cc 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageIdUtils.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageIdUtils.java
@@ -187,7 +187,7 @@ public final class PageIdUtils {
* Extracts partition ID from the page ID.
*
* @param pageId Page ID.
- * @return Partition.
+ * @return Partition ID.
*/
public static int partitionId(long pageId) {
return (int) ((pageId >>> PAGE_IDX_SIZE) & PART_ID_MASK);
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPagesTest.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPagesTest.java
new file mode 100644
index 000000000..b15531a9d
--- /dev/null
+++
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPagesTest.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.stream.Collectors.toList;
+import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.DIRTY_PAGE_COMPARATOR;
+import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.EMPTY;
+import static
org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.CheckpointDirtyPagesQueue;
+import
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.CheckpointDirtyPagesView;
+import
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.QueueResult;
+import org.apache.ignite.internal.pagememory.util.PageIdUtils;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.junit.jupiter.api.Test;
+
+/**
+ * For {@link CheckpointDirtyPages} testing.
+ */
+public class CheckpointDirtyPagesTest {
+ @Test
+ void testDirtyPagesCount() {
+ IgniteBiTuple<PersistentPageMemory, List<FullPageId>> dirtyPages0 =
createDirtyPages(of(0, 0, 0), of(0, 0, 1));
+ IgniteBiTuple<PersistentPageMemory, List<FullPageId>> dirtyPages1 =
createDirtyPages(of(1, 0, 0), of(1, 0, 1), of(1, 0, 2));
+
+ assertEquals(0, EMPTY.dirtyPagesCount());
+ assertEquals(2, new
CheckpointDirtyPages(List.of(dirtyPages0)).dirtyPagesCount());
+ assertEquals(3, new
CheckpointDirtyPages(List.of(dirtyPages1)).dirtyPagesCount());
+ assertEquals(5, new CheckpointDirtyPages(List.of(dirtyPages0,
dirtyPages1)).dirtyPagesCount());
+ }
+
+ @Test
+ void testToQueue() {
+ assertTrue(EMPTY.toQueue().isEmpty());
+
+ IgniteBiTuple<PersistentPageMemory, List<FullPageId>> dirtyPages0 =
createDirtyPages(of(0, 0, 0));
+ IgniteBiTuple<PersistentPageMemory, List<FullPageId>> dirtyPages1 =
createDirtyPages(of(1, 0, 0), of(1, 0, 1));
+ IgniteBiTuple<PersistentPageMemory, List<FullPageId>> dirtyPages2 =
createDirtyPages(of(2, 0, 0), of(2, 1, 0), of(3, 2, 2));
+
+ CheckpointDirtyPages checkpointDirtyPages = new
CheckpointDirtyPages(List.of(dirtyPages0, dirtyPages1, dirtyPages2));
+
+ CheckpointDirtyPagesQueue queue0 = checkpointDirtyPages.toQueue();
+
+ assertFalse(queue0.isEmpty());
+ assertEquals(6, queue0.size());
+
+ assertThat(toListDirtyPagePair(queue0),
equalTo(toListDirtyPagePair(dirtyPages0, dirtyPages1, dirtyPages2)));
+
+ CheckpointDirtyPagesQueue queue1 = checkpointDirtyPages.toQueue();
+
+ assertNotSame(queue0, queue1);
+
+ assertFalse(queue1.isEmpty());
+ assertEquals(6, queue1.size());
+ }
+
+ @Test
+ void testFindView() {
+ assertNull(EMPTY.findView(0, 0));
+
+ IgniteBiTuple<PersistentPageMemory, List<FullPageId>> dirtyPages0 =
createDirtyPages(of(0, 0, 0));
+ IgniteBiTuple<PersistentPageMemory, List<FullPageId>> dirtyPages1 =
createDirtyPages(of(5, 0, 0));
+ IgniteBiTuple<PersistentPageMemory, List<FullPageId>> dirtyPages2 =
createDirtyPages(of(1, 0, 0), of(1, 0, 1));
+ IgniteBiTuple<PersistentPageMemory, List<FullPageId>> dirtyPages3 =
createDirtyPages(
+ of(2, 0, 0), of(2, 0, 1),
+ of(2, 1, 1),
+ of(3, 2, 2), of(3, 2, 3)
+ );
+
+ CheckpointDirtyPages checkpointDirtyPages = new
CheckpointDirtyPages(List.of(dirtyPages0, dirtyPages1, dirtyPages2,
dirtyPages3));
+
+ assertNull(checkpointDirtyPages.findView(4, 0));
+ assertNull(checkpointDirtyPages.findView(2, 2));
+ assertNull(checkpointDirtyPages.findView(3, 1));
+ assertNull(checkpointDirtyPages.findView(3, 3));
+
+ assertThat(toListDirtyPagePair(checkpointDirtyPages.findView(0, 0)),
equalTo(toListDirtyPagePair(dirtyPages0)));
+ assertThat(toListDirtyPagePair(checkpointDirtyPages.findView(5, 0)),
equalTo(toListDirtyPagePair(dirtyPages1)));
+ assertThat(toListDirtyPagePair(checkpointDirtyPages.findView(1, 0)),
equalTo(toListDirtyPagePair(dirtyPages2)));
+
+ assertThat(
+ toListDirtyPagePair(checkpointDirtyPages.findView(2, 0)),
+ equalTo(toListDirtyPagePair(equalsByGroupAndPartition(2, 0),
dirtyPages3))
+ );
+
+ assertThat(
+ toListDirtyPagePair(checkpointDirtyPages.findView(2, 1)),
+ equalTo(toListDirtyPagePair(equalsByGroupAndPartition(2, 1),
dirtyPages3))
+ );
+
+ assertThat(
+ toListDirtyPagePair(checkpointDirtyPages.findView(3, 2)),
+ equalTo(toListDirtyPagePair(equalsByGroupAndPartition(3, 2),
dirtyPages3))
+ );
+ }
+
+ @Test
+ void testNextView() {
+ assertNull(EMPTY.nextView(null));
+
+ IgniteBiTuple<PersistentPageMemory, List<FullPageId>> dirtyPages0 =
createDirtyPages(of(0, 0, 0));
+ IgniteBiTuple<PersistentPageMemory, List<FullPageId>> dirtyPages1 =
createDirtyPages(of(5, 0, 0));
+ IgniteBiTuple<PersistentPageMemory, List<FullPageId>> dirtyPages2 =
createDirtyPages(of(1, 0, 0), of(1, 0, 1));
+ IgniteBiTuple<PersistentPageMemory, List<FullPageId>> dirtyPages3 =
createDirtyPages(
+ of(2, 0, 0), of(2, 0, 1),
+ of(2, 1, 1),
+ of(3, 2, 2), of(3, 2, 3)
+ );
+
+ CheckpointDirtyPages checkpointDirtyPages = new
CheckpointDirtyPages(List.of(dirtyPages0, dirtyPages1, dirtyPages2,
dirtyPages3));
+
+ CheckpointDirtyPagesView view = checkpointDirtyPages.nextView(null);
+
+ assertThat(toListDirtyPagePair(view),
equalTo(toListDirtyPagePair(dirtyPages0)));
+ assertThat(toListDirtyPagePair(view =
checkpointDirtyPages.nextView(view)),
equalTo(toListDirtyPagePair(dirtyPages1)));
+ assertThat(toListDirtyPagePair(view =
checkpointDirtyPages.nextView(view)),
equalTo(toListDirtyPagePair(dirtyPages2)));
+
+ assertThat(
+ toListDirtyPagePair(view =
checkpointDirtyPages.nextView(view)),
+ equalTo(toListDirtyPagePair(equalsByGroupAndPartition(2, 0),
dirtyPages3))
+ );
+
+ assertThat(
+ toListDirtyPagePair(view =
checkpointDirtyPages.nextView(view)),
+ equalTo(toListDirtyPagePair(equalsByGroupAndPartition(2, 1),
dirtyPages3))
+ );
+
+ assertThat(
+ toListDirtyPagePair(view =
checkpointDirtyPages.nextView(view)),
+ equalTo(toListDirtyPagePair(equalsByGroupAndPartition(3, 2),
dirtyPages3))
+ );
+
+ assertNull(checkpointDirtyPages.nextView(view));
+ }
+
+ @Test
+ void testSortDirtyPageIds() {
+ List<FullPageId> pageIds = new ArrayList<>(List.of(
+ of(10, 10, 10), of(10, 10, 9), of(10, 10, 0), of(10, 0, 10),
of(10, 0, 0),
+ of(0, 1, 3), of(0, 1, 2), of(0, 1, 1), of(0, 1, 0), of(0, 0,
0),
+ of(1, 2, 0), of(1, 2, 1)
+ ));
+
+ pageIds.sort(DIRTY_PAGE_COMPARATOR);
+
+ assertThat(
+ pageIds,
+ equalTo(List.of(
+ of(0, 0, 0), of(0, 1, 0), of(0, 1, 1), of(0, 1, 2),
of(0, 1, 3),
+ of(1, 2, 0), of(1, 2, 1),
+ of(10, 0, 0), of(10, 0, 10), of(10, 10, 0), of(10, 10,
9), of(10, 10, 10)
+ ))
+ );
+ }
+
+ @Test
+ void testQueueNextElementInDifferentThreads() throws Exception {
+ IgniteBiTuple<PersistentPageMemory, List<FullPageId>> dirtyPages0 =
createDirtyPages(of(0, 0, 0));
+ IgniteBiTuple<PersistentPageMemory, List<FullPageId>> dirtyPages1 =
createDirtyPages(of(1, 0, 0));
+ IgniteBiTuple<PersistentPageMemory, List<FullPageId>> dirtyPages2 =
createDirtyPages(of(2, 0, 0), of(2, 0, 1));
+ IgniteBiTuple<PersistentPageMemory, List<FullPageId>> dirtyPages3 =
createDirtyPages(of(3, 0, 0), of(3, 1, 0), of(4, 2, 2));
+
+ CheckpointDirtyPages checkpointDirtyPages = new
CheckpointDirtyPages(List.of(dirtyPages0, dirtyPages1, dirtyPages2,
dirtyPages3));
+
+ CheckpointDirtyPagesQueue queue = checkpointDirtyPages.toQueue();
+
+ runAsync(() -> {
+ QueueResult queueResult0 = new QueueResult();
+
+ assertTrue(queue.next(queueResult0));
+ assertEquals(dirtyPages0.getKey(), queueResult0.pageMemory());
+ assertEquals(dirtyPages0.getValue().get(0),
queueResult0.dirtyPage());
+
+ QueueResult queueResult1 = new QueueResult();
+
+ assertTrue(queue.next(queueResult1));
+ assertEquals(dirtyPages1.getKey(), queueResult1.pageMemory());
+ assertEquals(dirtyPages1.getValue().get(0),
queueResult1.dirtyPage());
+
+ assertTrue(queue.next(queueResult0));
+ assertEquals(dirtyPages2.getKey(), queueResult0.pageMemory());
+ assertEquals(dirtyPages2.getValue().get(0),
queueResult0.dirtyPage());
+
+ assertTrue(queue.next(queueResult1));
+ assertEquals(dirtyPages2.getKey(), queueResult1.pageMemory());
+ assertEquals(dirtyPages2.getValue().get(1),
queueResult1.dirtyPage());
+ }).get(1, TimeUnit.SECONDS);
+
+ QueueResult queueResult0 = new QueueResult();
+
+ assertTrue(queue.next(queueResult0));
+ assertEquals(dirtyPages3.getKey(), queueResult0.pageMemory());
+ assertEquals(dirtyPages3.getValue().get(0), queueResult0.dirtyPage());
+
+ QueueResult queueResult1 = new QueueResult();
+
+ assertTrue(queue.next(queueResult1));
+ assertEquals(dirtyPages3.getKey(), queueResult1.pageMemory());
+ assertEquals(dirtyPages3.getValue().get(1), queueResult1.dirtyPage());
+
+ assertTrue(queue.next(queueResult0));
+ assertEquals(dirtyPages3.getKey(), queueResult0.pageMemory());
+ assertEquals(dirtyPages3.getValue().get(2), queueResult0.dirtyPage());
+ }
+
+ private static IgniteBiTuple<PersistentPageMemory, List<FullPageId>>
createDirtyPages(FullPageId... dirtyPages) {
+ Arrays.sort(dirtyPages, DIRTY_PAGE_COMPARATOR);
+
+ return new IgniteBiTuple<>(mock(PersistentPageMemory.class),
Arrays.asList(dirtyPages));
+ }
+
+ private static FullPageId of(int groupId, int partId, int pageIdx) {
+ return new FullPageId(PageIdUtils.pageId(partId, (byte) 0, pageIdx),
groupId);
+ }
+
+ private static List<IgniteBiTuple<PersistentPageMemory, FullPageId>>
toListDirtyPagePair(CheckpointDirtyPagesQueue queue) {
+ if (queue.isEmpty()) {
+ return List.of();
+ }
+
+ QueueResult queueResult = new QueueResult();
+
+ List<IgniteBiTuple<PersistentPageMemory, FullPageId>> result = new
ArrayList<>(queue.size());
+
+ while (queue.next(queueResult)) {
+ result.add(new IgniteBiTuple<>(queueResult.pageMemory(),
queueResult.dirtyPage()));
+ }
+
+ return result;
+ }
+
+ private static List<IgniteBiTuple<PersistentPageMemory, FullPageId>>
toListDirtyPagePair(
+ IgniteBiTuple<PersistentPageMemory, List<FullPageId>>... dirtyPages
+ ) {
+ return toListDirtyPagePair(dirtyPageId -> true, dirtyPages);
+ }
+
+ private static List<IgniteBiTuple<PersistentPageMemory, FullPageId>>
toListDirtyPagePair(
+ Predicate<FullPageId> dirtyPagePredicate,
+ IgniteBiTuple<PersistentPageMemory, List<FullPageId>>... dirtyPages
+ ) {
+ if (dirtyPages.length == 0) {
+ return List.of();
+ }
+
+ return Stream.of(dirtyPages)
+ .flatMap(pages ->
pages.getValue().stream().filter(dirtyPagePredicate).map(p -> new
IgniteBiTuple<>(pages.getKey(), p)))
+ .collect(toList());
+ }
+
+ private static List<IgniteBiTuple<PersistentPageMemory, FullPageId>>
toListDirtyPagePair(CheckpointDirtyPagesView view) {
+ if (view.size() == 0) {
+ return List.of();
+ }
+
+ return IntStream.range(0, view.size()).mapToObj(i -> new
IgniteBiTuple<>(view.pageMemory(), view.get(i))).collect(toList());
+ }
+
+ private static Predicate<FullPageId> equalsByGroupAndPartition(int grpId,
int partId) {
+ return fullPageId -> fullPageId.groupId() == grpId &&
partitionId(fullPageId.pageId()) == partId;
+ }
+}
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterTest.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterTest.java
index a4f6c5677..a2a094628 100644
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterTest.java
+++
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterTest.java
@@ -52,6 +52,7 @@ import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.pagememory.FullPageId;
import org.apache.ignite.internal.pagememory.persistence.PageStoreWriter;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.CheckpointDirtyPagesQueue;
import org.apache.ignite.internal.pagememory.persistence.store.PageStore;
import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.junit.jupiter.api.Test;
@@ -74,9 +75,9 @@ public class CheckpointPagesWriterTest {
FullPageId fullPageId4 = new FullPageId(pageId(1, FLAG_DATA, 4), 0);
FullPageId fullPageId5 = new FullPageId(pageId(1, FLAG_DATA, 5), 0);
- IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId>
writePageIds = new IgniteConcurrentMultiPairQueue<>(
+ CheckpointDirtyPagesQueue writePageIds = new CheckpointDirtyPages(
Map.of(pageMemory, List.of(fullPageId0, fullPageId1,
fullPageId2, fullPageId3, fullPageId4, fullPageId5))
- );
+ ).toQueue();
Runnable beforePageWrite = mock(Runnable.class);
@@ -152,7 +153,7 @@ public class CheckpointPagesWriterTest {
CheckpointPagesWriter pagesWriter = new CheckpointPagesWriter(
log,
new CheckpointMetricsTracker(),
- new IgniteConcurrentMultiPairQueue<>(Map.of(pageMemory,
List.of(new FullPageId(0, 0)))),
+ new CheckpointDirtyPages(Map.of(pageMemory, List.of(new
FullPageId(0, 0)))).toQueue(),
new ConcurrentHashMap<>(),
doneFuture,
() -> {
@@ -191,9 +192,9 @@ public class CheckpointPagesWriterTest {
any(CheckpointMetricsTracker.class)
);
- IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId>
writePageIds = new IgniteConcurrentMultiPairQueue<>(
+ CheckpointDirtyPagesQueue writePageIds = new CheckpointDirtyPages(
Map.of(pageMemory, List.of(new FullPageId(0, 0), new
FullPageId(1, 0)))
- );
+ ).toQueue();
CheckpointPagesWriter pagesWriter = new CheckpointPagesWriter(
log,
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTest.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTest.java
index 4cab5fb40..cc8719416 100644
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTest.java
+++
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTest.java
@@ -17,7 +17,7 @@
package org.apache.ignite.internal.pagememory.persistence.checkpoint;
-import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.IgniteConcurrentMultiPairQueue.EMPTY;
+import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.EMPTY;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
@@ -38,11 +38,11 @@ public class CheckpointTest {
assertFalse(new Checkpoint(EMPTY, progress).hasDelta());
- IgniteBiTuple<PersistentPageMemory, FullPageId[]> biTuple = new
IgniteBiTuple<>(
+ IgniteBiTuple<PersistentPageMemory, List<FullPageId>> biTuple = new
IgniteBiTuple<>(
mock(PersistentPageMemory.class),
- new FullPageId[]{new FullPageId(0, 1)}
+ List.of(new FullPageId(0, 1))
);
- assertTrue(new Checkpoint(new
IgniteConcurrentMultiPairQueue<>(List.of(biTuple)), progress).hasDelta());
+ assertTrue(new Checkpoint(new CheckpointDirtyPages(List.of(biTuple)),
progress).hasDelta());
}
}
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTestUtils.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTestUtils.java
index dc9e3d6e2..c8d1db653 100644
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTestUtils.java
+++
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTestUtils.java
@@ -19,7 +19,12 @@ package
org.apache.ignite.internal.pagememory.persistence.checkpoint;
import static org.mockito.Mockito.mock;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.CheckpointDirtyPagesView;
/**
* Useful class for testing a checkpoint.
@@ -57,4 +62,13 @@ public class CheckpointTestUtils {
}
};
}
+
+ /**
+ * Collects dirty pages into a list.
+ *
+ * @param dirtyPagesView Checkpoint dirty pages view.
+ */
+ public static List<FullPageId> toListDirtyPageIds(CheckpointDirtyPagesView
dirtyPagesView) {
+ return IntStream.range(0,
dirtyPagesView.size()).mapToObj(dirtyPagesView::get).collect(Collectors.toList());
+ }
}
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java
index 88db853b4..6608b044b 100644
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java
+++
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java
@@ -17,22 +17,20 @@
package org.apache.ignite.internal.pagememory.persistence.checkpoint;
-import static java.util.Comparator.comparing;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toList;
-import static java.util.stream.Collectors.toSet;
-import static
org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointConfigurationSchema.RANDOM_WRITE_ORDER;
+import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.EMPTY;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_TAKEN;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.MARKER_STORED_TO_DISK;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.PAGE_SNAPSHOT_TAKEN;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointTestUtils.newReadWriteLock;
+import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointTestUtils.toListDirtyPageIds;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointWorkflowTest.TestCheckpointListener.AFTER_CHECKPOINT_END;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointWorkflowTest.TestCheckpointListener.BEFORE_CHECKPOINT_BEGIN;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointWorkflowTest.TestCheckpointListener.ON_CHECKPOINT_BEGIN;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointWorkflowTest.TestCheckpointListener.ON_MARK_CHECKPOINT_BEGIN;
-import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.IgniteConcurrentMultiPairQueue.EMPTY;
import static
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
@@ -50,40 +48,33 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
-import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import java.util.stream.IntStream;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.pagememory.DataRegion;
import org.apache.ignite.internal.pagememory.FullPageId;
-import
org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointConfiguration;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
-import
org.apache.ignite.internal.pagememory.persistence.checkpoint.IgniteConcurrentMultiPairQueue.Result;
+import
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.CheckpointDirtyPagesView;
+import org.apache.ignite.internal.pagememory.util.PageIdUtils;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
/**
* For {@link CheckpointWorkflow} testing.
*/
-@ExtendWith(ConfigurationExtension.class)
public class CheckpointWorkflowTest {
private final IgniteLogger log =
Loggers.forClass(CheckpointWorkflowTest.class);
- @InjectConfiguration
- private PageMemoryCheckpointConfiguration checkpointConfig;
-
@Nullable
private CheckpointWorkflow workflow;
@@ -105,7 +96,7 @@ public class CheckpointWorkflowTest {
DataRegion<PersistentPageMemory> dataRegion2 = () -> pageMemory2;
workflow = new CheckpointWorkflow(
- checkpointConfig,
+ "test",
mock(CheckpointMarkersStorage.class),
newReadWriteLock(log),
List.of(dataRegion0, dataRegion1)
@@ -181,13 +172,13 @@ public class CheckpointWorkflowTest {
CheckpointReadWriteLock readWriteLock = newReadWriteLock(log);
- List<FullPageId> dirtyPages = List.of(new FullPageId(0, 0), new
FullPageId(1, 0), new FullPageId(2, 0));
+ List<FullPageId> dirtyPages = List.of(of(0, 0, 0), of(0, 0, 1), of(0,
0, 2));
PersistentPageMemory pageMemory = newPageMemory(dirtyPages);
DataRegion<PersistentPageMemory> dataRegion = () -> pageMemory;
- workflow = new CheckpointWorkflow(checkpointConfig, markersStorage,
readWriteLock, List.of(dataRegion));
+ workflow = new CheckpointWorkflow("test", markersStorage,
readWriteLock, List.of(dataRegion));
workflow.start();
@@ -289,17 +280,10 @@ public class CheckpointWorkflowTest {
verify(tracker, times(1)).onSplitAndSortCheckpointPagesStart();
verify(tracker, times(1)).onSplitAndSortCheckpointPagesEnd();
- List<IgniteBiTuple<PersistentPageMemory, FullPageId>> pairs =
collect(checkpoint.dirtyPages);
-
- assertThat(
- pairs.stream().map(IgniteBiTuple::getKey).collect(toSet()),
- equalTo(Set.of(dataRegion.pageMemory()))
- );
+ CheckpointDirtyPagesView dirtyPagesView =
checkpoint.dirtyPages.nextView(null);
- assertThat(
- pairs.stream().map(IgniteBiTuple::getValue).collect(toList()),
- equalTo(dirtyPages)
- );
+ assertThat(toListDirtyPageIds(dirtyPagesView), equalTo(dirtyPages));
+ assertThat(dirtyPagesView.pageMemory(),
equalTo(dataRegion.pageMemory()));
assertThat(
events,
@@ -314,72 +298,6 @@ public class CheckpointWorkflowTest {
verify(markersStorage, times(1)).onCheckpointBegin(checkpointId);
}
- @Test
- void testMarkCheckpointBeginRandom() throws Exception {
- checkpointConfig.writeOrder().update(RANDOM_WRITE_ORDER).get(100,
TimeUnit.MILLISECONDS);
-
- List<FullPageId> dirtyPages = List.of(new FullPageId(1, 0), new
FullPageId(0, 0), new FullPageId(2, 0));
-
- PersistentPageMemory pageMemory = newPageMemory(dirtyPages);
-
- DataRegion<PersistentPageMemory> dataRegion = () -> pageMemory;
-
- workflow = new CheckpointWorkflow(
- checkpointConfig,
- mock(CheckpointMarkersStorage.class),
- newReadWriteLock(log),
- List.of(dataRegion)
- );
-
- workflow.start();
-
- CheckpointProgressImpl progressImpl =
mock(CheckpointProgressImpl.class);
-
-
when(progressImpl.futureFor(MARKER_STORED_TO_DISK)).thenReturn(completedFuture(null));
-
- Checkpoint checkpoint = workflow.markCheckpointBegin(
- coarseCurrentTimeMillis(),
- progressImpl,
- mock(CheckpointMetricsTracker.class)
- );
-
- assertThat(
-
collect(checkpoint.dirtyPages).stream().map(IgniteBiTuple::getValue).collect(toList()),
- equalTo(dirtyPages)
- );
- }
-
- @Test
- void testMarkCheckpointBeginSequential() throws Exception {
- List<FullPageId> dirtyPages = List.of(new FullPageId(1, 0), new
FullPageId(0, 0), new FullPageId(2, 0));
-
- PersistentPageMemory pageMemory = newPageMemory(dirtyPages);
-
- DataRegion<PersistentPageMemory> dataRegion = () -> pageMemory;
-
- workflow = new CheckpointWorkflow(
- checkpointConfig,
- mock(CheckpointMarkersStorage.class),
- newReadWriteLock(log),
- List.of(dataRegion)
- );
-
- CheckpointProgressImpl progressImpl =
mock(CheckpointProgressImpl.class);
-
-
when(progressImpl.futureFor(MARKER_STORED_TO_DISK)).thenReturn(completedFuture(null));
-
- Checkpoint checkpoint = workflow.markCheckpointBegin(
- coarseCurrentTimeMillis(),
- progressImpl,
- mock(CheckpointMetricsTracker.class)
- );
-
- assertThat(
-
collect(checkpoint.dirtyPages).stream().map(IgniteBiTuple::getValue).collect(toList()),
-
equalTo(dirtyPages.stream().sorted(comparing(FullPageId::effectivePageId)).collect(toList()))
- );
- }
-
@Test
void testMarkCheckpointEnd() throws Exception {
CheckpointMarkersStorage markersStorage =
mock(CheckpointMarkersStorage.class);
@@ -390,7 +308,7 @@ public class CheckpointWorkflowTest {
DataRegion<PersistentPageMemory> dataRegion = () -> pageMemory;
- workflow = new CheckpointWorkflow(checkpointConfig, markersStorage,
readWriteLock, List.of(dataRegion));
+ workflow = new CheckpointWorkflow("test", markersStorage,
readWriteLock, List.of(dataRegion));
workflow.start();
@@ -427,7 +345,7 @@ public class CheckpointWorkflowTest {
workflow.addCheckpointListener(checkpointListener, dataRegion);
workflow.markCheckpointEnd(new Checkpoint(
- new IgniteConcurrentMultiPairQueue<>(Map.of(pageMemory,
List.of(new FullPageId(0, 0)))),
+ new
CheckpointDirtyPages(List.of(createCheckpointDirtyPages(pageMemory, of(0, 0,
0)))),
progressImpl
));
@@ -448,21 +366,90 @@ public class CheckpointWorkflowTest {
verify(markersStorage, times(1)).onCheckpointEnd(checkpointId);
}
- private List<IgniteBiTuple<PersistentPageMemory, FullPageId>> collect(
- IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId>
queue
- ) {
- List<IgniteBiTuple<PersistentPageMemory, FullPageId>> res = new
ArrayList<>();
+ @Test
+ void testCreateAndSortCheckpointDirtyPages() throws Exception {
+ IgniteBiTuple<PersistentPageMemory, Collection<FullPageId>>
dataRegionDirtyPages0 = createDataRegionDirtyPages(
+ mock(PersistentPageMemory.class),
+ of(10, 10, 2), of(10, 10, 1), of(10, 10, 0),
+ of(10, 5, 100), of(10, 5, 99),
+ of(10, 1, 50), of(10, 1, 51), of(10, 1, 99)
+ );
- Result<PersistentPageMemory, FullPageId> result = new Result<>();
+ IgniteBiTuple<PersistentPageMemory, Collection<FullPageId>>
dataRegionDirtyPages1 = createDataRegionDirtyPages(
+ mock(PersistentPageMemory.class),
+ of(77, 5, 100), of(77, 5, 99),
+ of(88, 1, 51), of(88, 1, 50), of(88, 1, 99),
+ of(66, 33, 0), of(66, 33, 1), of(66, 33, 2)
+ );
- while (queue.next(result)) {
- res.add(new IgniteBiTuple<>(result.getKey(), result.getValue()));
- }
+ workflow = new CheckpointWorkflow("test",
mock(CheckpointMarkersStorage.class), newReadWriteLock(log), List.of());
+
+ workflow.start();
+
+ CheckpointDirtyPages sortCheckpointDirtyPages =
workflow.createAndSortCheckpointDirtyPages(
+ new DataRegionsDirtyPages(List.of(dataRegionDirtyPages0,
dataRegionDirtyPages1))
+ );
+
+ CheckpointDirtyPagesView dirtyPagesView =
sortCheckpointDirtyPages.nextView(null);
+
+ assertThat(toListDirtyPageIds(dirtyPagesView), equalTo(List.of(of(10,
1, 50), of(10, 1, 51), of(10, 1, 99))));
+ assertThat(dirtyPagesView.pageMemory(),
equalTo(dataRegionDirtyPages0.getKey()));
+
+ dirtyPagesView = sortCheckpointDirtyPages.nextView(dirtyPagesView);
+
+ assertThat(toListDirtyPageIds(dirtyPagesView), equalTo(List.of(of(10,
5, 99), of(10, 5, 100))));
+ assertThat(dirtyPagesView.pageMemory(),
equalTo(dataRegionDirtyPages0.getKey()));
+
+ dirtyPagesView = sortCheckpointDirtyPages.nextView(dirtyPagesView);
+
+ assertThat(toListDirtyPageIds(dirtyPagesView), equalTo(List.of(of(10,
10, 0), of(10, 10, 1), of(10, 10, 2))));
+ assertThat(dirtyPagesView.pageMemory(),
equalTo(dataRegionDirtyPages0.getKey()));
+
+ dirtyPagesView = sortCheckpointDirtyPages.nextView(dirtyPagesView);
+
+ assertThat(toListDirtyPageIds(dirtyPagesView), equalTo(List.of(of(66,
33, 0), of(66, 33, 1), of(66, 33, 2))));
+ assertThat(dirtyPagesView.pageMemory(),
equalTo(dataRegionDirtyPages1.getKey()));
+
+ dirtyPagesView = sortCheckpointDirtyPages.nextView(dirtyPagesView);
+
+ assertThat(toListDirtyPageIds(dirtyPagesView), equalTo(List.of(of(77,
5, 99), of(77, 5, 100))));
+ assertThat(dirtyPagesView.pageMemory(),
equalTo(dataRegionDirtyPages1.getKey()));
- return res;
+ dirtyPagesView = sortCheckpointDirtyPages.nextView(dirtyPagesView);
+
+ assertThat(toListDirtyPageIds(dirtyPagesView), equalTo(List.of(of(88,
1, 50), of(88, 1, 51), of(88, 1, 99))));
+ assertThat(dirtyPagesView.pageMemory(),
equalTo(dataRegionDirtyPages1.getKey()));
}
- private PersistentPageMemory newPageMemory(Collection<FullPageId>
dirtyPages) {
+ @Test
+ void testParallelSortDirtyPages() throws Exception {
+ int count = CheckpointWorkflow.PARALLEL_SORT_THRESHOLD + 10;
+
+ FullPageId[] dirtyPages0 = IntStream.range(0, count).mapToObj(i ->
of(0, 0, count - i)).toArray(FullPageId[]::new);
+ FullPageId[] dirtyPages1 = IntStream.range(0, count).mapToObj(i ->
of(1, 1, i)).toArray(FullPageId[]::new);
+
+ workflow = new CheckpointWorkflow("test",
mock(CheckpointMarkersStorage.class), newReadWriteLock(log), List.of());
+
+ workflow.start();
+
+ CheckpointDirtyPages sortCheckpointDirtyPages =
workflow.createAndSortCheckpointDirtyPages(new DataRegionsDirtyPages(List.of(
+ createDataRegionDirtyPages(mock(PersistentPageMemory.class),
dirtyPages1),
+ createDataRegionDirtyPages(mock(PersistentPageMemory.class),
dirtyPages0)
+ )));
+
+ CheckpointDirtyPagesView dirtyPagesView =
sortCheckpointDirtyPages.nextView(null);
+
+ assertThat(toListDirtyPageIds(dirtyPagesView),
equalTo(List.of(dirtyPages1)));
+
+ dirtyPagesView = sortCheckpointDirtyPages.nextView(dirtyPagesView);
+
+ assertThat(
+ toListDirtyPageIds(dirtyPagesView),
+ equalTo(IntStream.range(0, count).mapToObj(i ->
dirtyPages0[count - i - 1]).collect(toList()))
+ );
+ }
+
+ private static PersistentPageMemory newPageMemory(Collection<FullPageId>
dirtyPages) {
PersistentPageMemory mock = mock(PersistentPageMemory.class);
when(mock.beginCheckpoint(any(CompletableFuture.class))).thenReturn(dirtyPages);
@@ -470,6 +457,24 @@ public class CheckpointWorkflowTest {
return mock;
}
+ private static IgniteBiTuple<PersistentPageMemory, List<FullPageId>>
createCheckpointDirtyPages(
+ PersistentPageMemory pageMemory,
+ FullPageId... dirtyPages
+ ) {
+ return new IgniteBiTuple<>(pageMemory, Arrays.asList(dirtyPages));
+ }
+
+ private static IgniteBiTuple<PersistentPageMemory, Collection<FullPageId>>
createDataRegionDirtyPages(
+ PersistentPageMemory pageMemory,
+ FullPageId... dirtyPages
+ ) {
+ return new IgniteBiTuple<>(pageMemory, Set.of(dirtyPages));
+ }
+
+ private static FullPageId of(int grpId, int partId, int pageIdx) {
+ return new FullPageId(PageIdUtils.pageId(partId, (byte) 0, pageIdx),
grpId);
+ }
+
/**
* Test listener implementation that simply collects events.
*/
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
index 954d5c422..4bb0e09f0 100644
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
+++
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
@@ -19,9 +19,10 @@ package
org.apache.ignite.internal.pagememory.persistence.checkpoint;
import static java.lang.System.nanoTime;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.DIRTY_PAGE_COMPARATOR;
+import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.EMPTY;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_TAKEN;
-import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.IgniteConcurrentMultiPairQueue.EMPTY;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -49,8 +50,8 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.util.Arrays;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
@@ -61,6 +62,7 @@ import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.pagememory.FullPageId;
import
org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointConfiguration;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.NodeStoppingException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -301,10 +303,10 @@ public class CheckpointerTest {
@Test
void testDoCheckpoint() throws Exception {
- IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId>
dirtyPages = dirtyPages(
+ CheckpointDirtyPages dirtyPages = spy(dirtyPages(
mock(PersistentPageMemory.class),
new FullPageId(0, 0), new FullPageId(1, 0), new FullPageId(2,
0)
- );
+ ));
Checkpointer checkpointer = spy(new Checkpointer(
log,
@@ -318,8 +320,7 @@ public class CheckpointerTest {
assertDoesNotThrow(checkpointer::doCheckpoint);
- assertTrue(dirtyPages.isEmpty());
-
+ verify(dirtyPages, times(1)).toQueue();
verify(checkpointer, times(1)).startCheckpointProgress();
assertEquals(checkpointer.currentProgress().currentCheckpointPagesCount(), 3);
@@ -364,16 +365,13 @@ public class CheckpointerTest {
);
}
- private IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId>
dirtyPages(
- PersistentPageMemory pageMemory,
- FullPageId... fullPageIds
- ) {
- return fullPageIds.length == 0 ? EMPTY : new
IgniteConcurrentMultiPairQueue<>(Map.of(pageMemory, List.of(fullPageIds)));
+ private CheckpointDirtyPages dirtyPages(PersistentPageMemory pageMemory,
FullPageId... fullPageIds) {
+ Arrays.sort(fullPageIds, DIRTY_PAGE_COMPARATOR);
+
+ return new CheckpointDirtyPages(List.of(new
IgniteBiTuple<>(pageMemory, Arrays.asList(fullPageIds))));
}
- private CheckpointWorkflow createCheckpointWorkflow(
- IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId>
dirtyPages
- ) throws Exception {
+ private CheckpointWorkflow createCheckpointWorkflow(CheckpointDirtyPages
dirtyPages) throws Exception {
CheckpointWorkflow mock = mock(CheckpointWorkflow.class);
when(mock.markCheckpointBegin(anyLong(),
any(CheckpointProgressImpl.class), any(CheckpointMetricsTracker.class)))
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/LogicalScanConverterRule.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/LogicalScanConverterRule.java
index 47960005c..07edfd372 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/LogicalScanConverterRule.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/LogicalScanConverterRule.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.sql.engine.rule;
+import static org.apache.ignite.internal.util.CollectionUtils.concat;
+
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -85,7 +87,7 @@ public abstract class LogicalScanConverterRule<T extends
ProjectableFilterableTa
Set<CorrelationId> corrIds =
RexUtils.extractCorrelationIds(rel.condition());
if (!CollectionUtils.nullOrEmpty(rel.projects())) {
- corrIds = new HashSet<>(CollectionUtils.union(corrIds,
RexUtils.extractCorrelationIds(rel.projects())));
+ corrIds = new HashSet<>(concat(corrIds,
RexUtils.extractCorrelationIds(rel.projects())));
}
RelTraitSet traits =
rel.getCluster().traitSetOf(IgniteConvention.INSTANCE)
@@ -133,7 +135,7 @@ public abstract class LogicalScanConverterRule<T extends
ProjectableFilterableTa
Set<CorrelationId> corrIds =
RexUtils.extractCorrelationIds(rel.condition());
if (!CollectionUtils.nullOrEmpty(rel.projects())) {
- corrIds = new HashSet<>(CollectionUtils.union(corrIds,
RexUtils.extractCorrelationIds(rel.projects())));
+ corrIds = new HashSet<>(concat(corrIds,
RexUtils.extractCorrelationIds(rel.projects())));
}
RelTraitSet traits =
cluster.traitSetOf(IgniteConvention.INSTANCE)