This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 44256b0c091 IGNITE-25861 Add more tests for partition generation
change (#6511)
44256b0c091 is described below
commit 44256b0c09131c99330d6d2410eb227ce28c17d6
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Fri Aug 29 11:33:55 2025 +0300
IGNITE-25861 Add more tests for partition generation change (#6511)
---
.../PersistentPageMemoryMvTableStorageTest.java | 125 +++++++++++++++++++++
1 file changed, 125 insertions(+)
diff --git
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
index 470ffeedcd2..3e4d419a51b 100644
---
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
+++
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
@@ -17,9 +17,11 @@
package org.apache.ignite.internal.storage.pagememory;
+import static java.util.stream.Collectors.toList;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
+import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.PAGES_SORTED;
import static
org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine.ENGINE_NAME;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -34,17 +36,23 @@ import static
org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.Mockito.mock;
import java.nio.file.Path;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
import java.util.stream.IntStream;
import org.apache.ignite.internal.components.LogSyncer;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.failure.FailureManager;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.RunnableX;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.metrics.LongMetric;
import org.apache.ignite.internal.metrics.TestMetricManager;
import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
+import
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointProgress;
+import
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointTimeoutLock;
import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.storage.AbstractMvTableStorageTest;
@@ -273,6 +281,25 @@ public class PersistentPageMemoryMvTableStorageTest
extends AbstractMvTableStora
});
}
+ private void addWriteCommitted(MvPartitionStorage storage, List<RowId>
rowIds, List<BinaryRow> binaryRows) {
+ assertEquals(rowIds.size(), binaryRows.size());
+
+ storage.runConsistently(locker -> {
+ HybridTimestamp now = clock.now();
+
+ for (int i = 0; i < rowIds.size(); i++) {
+ RowId rowId = rowIds.get(i);
+ BinaryRow binaryRow = binaryRows.get(i);
+
+ locker.lock(rowId);
+
+ storage.addWriteCommitted(rowId, binaryRow, now);
+ }
+
+ return null;
+ });
+ }
+
@Test
void createMvPartitionStorageAndDoCheckpointInParallel() {
for (int i = 0; i < 10; i++) {
@@ -373,7 +400,105 @@ public class PersistentPageMemoryMvTableStorageTest
extends AbstractMvTableStora
}
}
+ @Test
+ void testCheckpointWithTwoPartitionGeneration() throws Throwable {
+ MvPartitionStorage storage = getOrCreateMvPartition(PARTITION_ID);
+
+ List<RowId> rowIds = generateRowIds(PARTITION_ID, 10);
+
+ int halfPageSize = (int) pageSize() / 2;
+
+ addWriteCommitted(storage, rowIds,
generateBinaryRows(generatePrefix('a', halfPageSize), 10));
+
+ assertThat(forceCheckpointAsync(), willCompleteSuccessfully());
+
+ inCheckpointReadLock(() -> {
+ // Let's update old pages.
+ addWriteCommitted(storage, rowIds,
generateBinaryRows(generatePrefix('b', halfPageSize), 10));
+ // Let's add new pages.
+ addWriteCommitted(storage, generateRowIds(PARTITION_ID, 10),
generateBinaryRows(generatePrefix('b', halfPageSize), 10));
+
+ // Let's update the generation of the partition.
+ int oldPartitionGeneration = partitionGeneration(PARTITION_ID);
+ assertThat(tableStorage.clearPartition(PARTITION_ID),
willCompleteSuccessfully());
+ assertThat(partitionGeneration(PARTITION_ID),
greaterThan(oldPartitionGeneration));
+
+ // Let's add new pages
+ addWriteCommitted(storage, generateRowIds(PARTITION_ID, 10),
generateBinaryRows(generatePrefix('c', halfPageSize), 10));
+ });
+
+ assertThat(forceCheckpointAsync(), willCompleteSuccessfully());
+ }
+
+ @Test
+ void testIncreasePartitionGenerationAfterSortPages() {
+ MvPartitionStorage storage = getOrCreateMvPartition(PARTITION_ID);
+
+ List<BinaryRow> binaryRows = generateBinaryRows(generatePrefix('_',
(int) pageSize() / 2), 10);
+
+ for (int i = 0; i < 10; i++) {
+ CompletableFuture<Void> future = inCheckpointReadLock(() -> {
+ addWriteCommitted(storage, generateRowIds(PARTITION_ID, 10),
binaryRows);
+
+ CheckpointProgress forceCheckpoint =
engine.checkpointManager().forceCheckpoint("test");
+
+ CompletableFuture<Void> clearPartitionFuture =
forceCheckpoint.futureFor(PAGES_SORTED)
+ .thenCompose(unused ->
tableStorage.clearPartition(PARTITION_ID));
+
+ return
CompletableFuture.allOf(forceCheckpoint.futureFor(FINISHED),
clearPartitionFuture);
+ });
+
+ assertThat(future, willCompleteSuccessfully());
+ }
+ }
+
private CompletableFuture<Void> forceCheckpointAsync() {
return
engine.checkpointManager().forceCheckpoint("test").futureFor(FINISHED);
}
+
+ private void inCheckpointReadLock(RunnableX r) throws Throwable {
+ CheckpointTimeoutLock lock =
engine.checkpointManager().checkpointTimeoutLock();
+
+ lock.checkpointReadLock();
+
+ try {
+ r.run();
+ } finally {
+ lock.checkpointReadUnlock();
+ }
+ }
+
+ private <T> T inCheckpointReadLock(Supplier<T> supplier) {
+ CheckpointTimeoutLock lock =
engine.checkpointManager().checkpointTimeoutLock();
+
+ lock.checkpointReadLock();
+
+ try {
+ return supplier.get();
+ } finally {
+ lock.checkpointReadUnlock();
+ }
+ }
+
+ private static List<RowId> generateRowIds(int partId, int count) {
+ return IntStream.range(0, count).mapToObj(i -> new
RowId(partId)).collect(toList());
+ }
+
+ private static List<BinaryRow> generateBinaryRows(String prefix, int
count) {
+ return IntStream.range(0, count)
+ .mapToObj(i -> binaryRow(new TestKey(0, prefix + "k" + i), new
TestValue(1, prefix + "v" + i)))
+ .collect(toList());
+ }
+
+ private static String generatePrefix(char c, int count) {
+ var sb = new StringBuilder(count);
+
+ IntStream.range(0, count).forEach(i -> sb.append(c));
+
+ return sb.toString();
+ }
+
+ private int partitionGeneration(int partId) {
+ return ((PersistentPageMemoryTableStorage)
tableStorage).dataRegion().pageMemory().partGeneration(TABLE_ID, partId);
+ }
}