This is an automated email from the ASF dual-hosted git repository. ibessonov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new ba9c1f88d1 IGNITE-20419 Fix partition meta loss after idle cluster restart. (#2594) ba9c1f88d1 is described below commit ba9c1f88d13a79d069e1ab6798b8932e8367a01d Author: Ivan Bessonov <bessonov...@gmail.com> AuthorDate: Mon Sep 18 13:10:08 2023 +0300 IGNITE-20419 Fix partition meta loss after idle cluster restart. (#2594) --- .../persistence/PersistentPageMemory.java | 1 + .../persistence/checkpoint/CheckpointManager.java | 7 ++ .../checkpoint/CheckpointPagesWriter.java | 5 + .../persistence/checkpoint/CheckpointWorkflow.java | 45 ++++++++ .../persistence/checkpoint/Checkpointer.java | 8 ++ .../internal/pagememory/util/PageIdUtils.java | 6 +- .../checkpoint/CheckpointWorkflowTest.java | 90 +++++++++++++++- .../internal/storage/engine/StorageEngine.java | 5 + .../storage/AbstractMvTableStorageTest.java | 19 ++-- .../storage/engine/AbstractStorageEngineTest.java | 119 +++++++++++++++++++++ .../internal/storage/impl/TestStorageEngine.java | 5 + .../PersistentPageMemoryStorageEngine.java | 5 + .../VolatilePageMemoryStorageEngine.java | 5 + .../mv/PersistentPageMemoryMvPartitionStorage.java | 58 ++++++---- .../PersistentPageMemoryStorageEngineTest.java | 58 ++++++++++ .../storage/rocksdb/RocksDbStorageEngine.java | 5 + .../rocksdb/engine/RocksDbStorageEngineTest.java | 51 +++++++++ 17 files changed, 456 insertions(+), 36 deletions(-) diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java index 4f11ab1454..e160645826 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java @@ -1230,6 +1230,7 @@ public class PersistentPageMemory implements PageMemory { if (dirty) { assert checkpointTimeoutLock.checkpointLockIsHeldByThread(); + assert pageIndex(pageId.pageId()) != 0 : "Partition meta should only be updated via the instance of PartitionMeta."; if (!wasDirty || forceAdd) { Segment seg = segment(pageId.groupId(), pageId.pageId()); diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java index f2dac1a081..74bfbad288 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java @@ -236,6 +236,13 @@ public class CheckpointManager { return checkpointer.lastCheckpointProgress(); } + /** + * Marks partition as dirty, forcing partition's meta-page to be written on disk during next checkpoint. + */ + public void markPartitionAsDirty(DataRegion<?> dataRegion, int groupId, int partitionId) { + checkpointer.markPartitionAsDirty(dataRegion, groupId, partitionId); + } + /** * Returns {@link true} if it is safe for all {@link DataRegion data regions} to update their {@link PageMemory}. * diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java index f133eb4aa0..093cdba516 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java @@ -229,6 +229,11 @@ public class CheckpointPagesWriter implements Runnable { pm -> createPageStoreWriter(pm, pageIdsToRetry) ); + if (fullId.pageIdx() == 0) { + // Skip meta-pages, they are written by "writePartitionMeta". + continue; + } + // Should also be done for partitions that will be destroyed to remove their pages from the data region. pageMemory.checkpointWritePage(fullId, tmpWriteBuf, pageStoreWriter, tracker); } diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java index d5f0a71afc..129532caf5 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java @@ -17,11 +17,13 @@ package org.apache.ignite.internal.pagememory.persistence.checkpoint; +import static java.util.concurrent.ConcurrentHashMap.newKeySet; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toUnmodifiableList; +import static org.apache.ignite.internal.pagememory.persistence.PartitionMeta.partitionMetaPageId; import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.DIRTY_PAGE_COMPARATOR; import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.EMPTY; import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointReadWriteLock.CHECKPOINT_RUNNER_THREAD_PREFIX; @@ -36,7 +38,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; @@ -48,8 +54,10 @@ import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.pagememory.DataRegion; import org.apache.ignite.internal.pagememory.FullPageId; +import org.apache.ignite.internal.pagememory.PageMemory; import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory; import org.apache.ignite.internal.thread.NamedThreadFactory; +import org.apache.ignite.internal.util.CollectionUtils; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteInternalCheckedException; import org.jetbrains.annotations.Nullable; @@ -97,6 +105,13 @@ class CheckpointWorkflow { */ private final @Nullable ThreadPoolExecutor callbackListenerThreadPool; + /** + * Contains meta-page IDs for all partitions, that were explicitly marked dirty by {@link #markPartitionAsDirty(DataRegion, int, int)}. + * Not required to be volatile, read/write is protected by a {@link #checkpointReadWriteLock}. Publication of the initial value should + * be guaranteed by external user. {@link CheckpointManager}, in particular. + */ + private Map<DataRegion<?>, Set<FullPageId>> dirtyPartitionsMap = new ConcurrentHashMap<>(); + /** * Constructor. * @@ -161,6 +176,15 @@ class CheckpointWorkflow { } } + /** + * Marks partition as dirty, forcing partition's meta-page to be written on disk during next checkpoint. + */ + public void markPartitionAsDirty(DataRegion<?> dataRegion, int groupId, int partitionId) { + Set<FullPageId> dirtyMetaPageIds = dirtyPartitionsMap.computeIfAbsent(dataRegion, unused -> newKeySet()); + + dirtyMetaPageIds.add(new FullPageId(partitionMetaPageId(partitionId), groupId)); + } + /** * First stage of checkpoint which collects demanded information (dirty pages mostly). * @@ -343,14 +367,35 @@ class CheckpointWorkflow { Collection<? extends DataRegion<PersistentPageMemory>> dataRegions, CompletableFuture<?> allowToReplace ) { + Map<DataRegion<?>, Set<FullPageId>> dirtyPartitionsMap = this.dirtyPartitionsMap; + + this.dirtyPartitionsMap = new ConcurrentHashMap<>(); + Collection<DataRegionDirtyPages<Collection<FullPageId>>> dataRegionsDirtyPages = new ArrayList<>(dataRegions.size()); + // First, we iterate all regions that have dirty pages. for (DataRegion<PersistentPageMemory> dataRegion : dataRegions) { Collection<FullPageId> dirtyPages = dataRegion.pageMemory().beginCheckpoint(allowToReplace); + Set<FullPageId> dirtyMetaPageIds = dirtyPartitionsMap.remove(dataRegion); + + if (dirtyMetaPageIds != null) { + // Merge these two collections. There should be no intersections. + dirtyPages = CollectionUtils.concat(dirtyMetaPageIds, dirtyPages); + } + dataRegionsDirtyPages.add(new DataRegionDirtyPages<>(dataRegion.pageMemory(), dirtyPages)); } + // Then we iterate regions that don't have dirty pages, but somehow have dirty partitions. + for (Entry<DataRegion<?>, Set<FullPageId>> entry : dirtyPartitionsMap.entrySet()) { + PageMemory pageMemory = entry.getKey().pageMemory(); + + assert pageMemory instanceof PersistentPageMemory; + + dataRegionsDirtyPages.add(new DataRegionDirtyPages<>((PersistentPageMemory) pageMemory, entry.getValue())); + } + return new DataRegionsDirtyPages(dataRegionsDirtyPages); } diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java index fe750b6271..786b29aee8 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java @@ -45,6 +45,7 @@ import java.util.function.BooleanSupplier; import org.apache.ignite.internal.components.LongJvmPauseDetector; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.pagememory.DataRegion; import org.apache.ignite.internal.pagememory.FullPageId; import org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointConfiguration; import org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointView; @@ -263,6 +264,13 @@ public class Checkpointer extends IgniteWorker { return current; } + /** + * Marks partition as dirty, forcing partition's meta-page to be written on disk during next checkpoint. + */ + void markPartitionAsDirty(DataRegion<?> dataRegion, int groupId, int partitionId) { + checkpointWorkflow.markPartitionAsDirty(dataRegion, groupId, partitionId); + } + /** * Executes a checkpoint. * diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageIdUtils.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageIdUtils.java index 6e290a0226..20338302fb 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageIdUtils.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageIdUtils.java @@ -17,11 +17,13 @@ package org.apache.ignite.internal.pagememory.util; +import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX; import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA; import org.apache.ignite.internal.pagememory.FullPageId; import org.apache.ignite.internal.pagememory.PageIdAllocator; import org.apache.ignite.internal.util.HexStringUtils; +import org.intellij.lang.annotations.MagicConstant; /** * Utility class for page ID parts manipulation. @@ -115,11 +117,11 @@ public final class PageIdUtils { * Creates page ID from its components. * * @param partitionId Partition ID. - * @param flag Flag: {@link PageIdAllocator#FLAG_DATA} of {@link PageIdAllocator#FLAG_AUX}. + * @param flag Flag: {@link PageIdAllocator#FLAG_DATA} or {@link PageIdAllocator#FLAG_AUX}. * @param pageIdx Page index, monotonically growing number within each partition. * @return Page ID constructed from the given pageIdx and partition ID, see {@link FullPageId}. */ - public static long pageId(int partitionId, byte flag, int pageIdx) { + public static long pageId(int partitionId, @MagicConstant(intValues = {FLAG_DATA, FLAG_AUX}) byte flag, int pageIdx) { long pageId = flag & FLAG_MASK; pageId = (pageId << PART_ID_SIZE) | (partitionId & PART_ID_MASK); diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java index 0a648dd03a..b9a176652b 100644 --- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java +++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java @@ -20,6 +20,8 @@ package org.apache.ignite.internal.pagememory.persistence.checkpoint; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toList; +import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA; +import static org.apache.ignite.internal.pagememory.persistence.PartitionMeta.partitionMetaPageId; import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.EMPTY; import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED; import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED; @@ -32,12 +34,14 @@ import static org.apache.ignite.internal.pagememory.persistence.checkpoint.Check import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointWorkflowTest.TestCheckpointListener.BEFORE_CHECKPOINT_BEGIN; import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointWorkflowTest.TestCheckpointListener.ON_CHECKPOINT_BEGIN; import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointWorkflowTest.TestCheckpointListener.ON_MARK_CHECKPOINT_BEGIN; +import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId; import static org.apache.ignite.internal.testframework.IgniteTestUtils.await; import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync; import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -67,7 +71,6 @@ import org.apache.ignite.internal.pagememory.DataRegion; import org.apache.ignite.internal.pagememory.FullPageId; import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory; import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.CheckpointDirtyPagesView; -import org.apache.ignite.internal.pagememory.util.PageIdUtils; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.apache.ignite.lang.IgniteInternalCheckedException; import org.jetbrains.annotations.Nullable; @@ -496,6 +499,89 @@ public class CheckpointWorkflowTest extends BaseIgniteAbstractTest { ); } + /** + * Tests that dirty partition with no dirty pages will be checkpointed. + */ + @Test + void testDirtyPartitionWithoutDirtyPages() throws Exception { + PersistentPageMemory pageMemory = mock(PersistentPageMemory.class); + + DataRegion<PersistentPageMemory> dataRegion = () -> pageMemory; + + workflow = new CheckpointWorkflow( + "test", + newReadWriteLock(log), + List.of(dataRegion), + 1 + ); + + workflow.start(); + + int groupId = 10; + int partitionId = 20; + + FullPageId metaPageId = new FullPageId(partitionMetaPageId(partitionId), groupId); + workflow.markPartitionAsDirty(dataRegion, groupId, partitionId); + + Checkpoint checkpoint = workflow.markCheckpointBegin( + coarseCurrentTimeMillis(), + mock(CheckpointProgressImpl.class), + mock(CheckpointMetricsTracker.class), + () -> {}, + () -> {} + ); + + assertEquals(1, checkpoint.dirtyPagesSize); + + CheckpointDirtyPagesView dirtyPagesView = checkpoint.dirtyPages.nextPartitionView(null); + + assertNotNull(dirtyPagesView); + assertThat(toListDirtyPageIds(dirtyPagesView), is(List.of(metaPageId))); + } + + /** + * Tests that dirty partition with dirty pages will be checkpointed. + */ + @Test + void testDirtyPartitionWithDirtyPages() throws Exception { + PersistentPageMemory pageMemory = mock(PersistentPageMemory.class); + + DataRegion<PersistentPageMemory> dataRegion = () -> pageMemory; + + workflow = new CheckpointWorkflow( + "test", + newReadWriteLock(log), + List.of(dataRegion), + 1 + ); + + workflow.start(); + + int groupId = 10; + int partitionId = 20; + + FullPageId metaPageId = new FullPageId(partitionMetaPageId(partitionId), groupId); + FullPageId dataPageId = new FullPageId(pageId(partitionId, FLAG_DATA, 1), groupId); + + workflow.markPartitionAsDirty(dataRegion, groupId, partitionId); + when(pageMemory.beginCheckpoint(any())).thenReturn(List.of(dataPageId)); + + Checkpoint checkpoint = workflow.markCheckpointBegin( + coarseCurrentTimeMillis(), + mock(CheckpointProgressImpl.class), + mock(CheckpointMetricsTracker.class), + () -> {}, + () -> {} + ); + + assertEquals(2, checkpoint.dirtyPagesSize); + + CheckpointDirtyPagesView dirtyPagesView = checkpoint.dirtyPages.nextPartitionView(null); + + assertNotNull(dirtyPagesView); + assertThat(toListDirtyPageIds(dirtyPagesView), is(List.of(metaPageId, dataPageId))); + } + @Test void testAwaitPendingTasksOfListenerCallback() { workflow = new CheckpointWorkflow( @@ -592,7 +678,7 @@ public class CheckpointWorkflowTest extends BaseIgniteAbstractTest { } private static FullPageId of(int grpId, int partId, int pageIdx) { - return new FullPageId(PageIdUtils.pageId(partId, (byte) 0, pageIdx), grpId); + return new FullPageId(pageId(partId, (byte) 0, pageIdx), grpId); } /** diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageEngine.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageEngine.java index a1e4252f24..908d62a9c1 100644 --- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageEngine.java +++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageEngine.java @@ -43,6 +43,11 @@ public interface StorageEngine { */ void stop() throws StorageException; + /** + * Whether the data is lost upon engine restart or not. + */ + boolean isVolatile(); + /** * Creates new table storage. * diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java index 765c4f7e9f..3728e9f41d 100644 --- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java +++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java @@ -434,7 +434,7 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest { fillStorages(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rowsOnRebalance); - checkLastApplied(mvPartitionStorage, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS); + checkLastApplied(mvPartitionStorage, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS); assertNull(mvPartitionStorage.committedGroupConfiguration()); // Let's finish rebalancing. @@ -459,7 +459,7 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest { checkForMissingRows(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rowsBeforeRebalanceStart); checkForPresenceRows(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rowsOnRebalance); - checkLastApplied(mvPartitionStorage, 10, 10, 20); + checkLastApplied(mvPartitionStorage, 10, 20); checkRaftGroupConfigs(raftGroupConfig, mvPartitionStorage.committedGroupConfiguration()); } @@ -493,7 +493,7 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest { fillStorages(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rowsOnRebalance); - checkLastApplied(mvPartitionStorage, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS); + checkLastApplied(mvPartitionStorage, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS); // Let's abort rebalancing. @@ -506,7 +506,7 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest { checkForMissingRows(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rowsBeforeRebalanceStart); checkForMissingRows(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rowsOnRebalance); - checkLastApplied(mvPartitionStorage, 0, 0, 0); + checkLastApplied(mvPartitionStorage, 0, 0); assertNull(mvPartitionStorage.committedGroupConfiguration()); } @@ -609,11 +609,11 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest { // Let's check the repositories: they should be empty. checkForMissingRows(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rows); - checkLastApplied(mvPartitionStorage, 0, 0, 0); + checkLastApplied(mvPartitionStorage, 0, 0); } else { checkForPresenceRows(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rows); - checkLastApplied(mvPartitionStorage, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS); + checkLastApplied(mvPartitionStorage, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS); } } @@ -631,7 +631,7 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest { // Let's check the cleanup for an empty partition. assertThat(tableStorage.clearPartition(PARTITION_ID), willCompleteSuccessfully()); - checkLastApplied(mvPartitionStorage, 0, 0, 0); + checkLastApplied(mvPartitionStorage, 0, 0); assertNull(mvPartitionStorage.committedGroupConfiguration()); // Let's fill the storages and clean them. @@ -655,7 +655,7 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest { // Let's clear the storages and check them out. assertThat(tableStorage.clearPartition(PARTITION_ID), willCompleteSuccessfully()); - checkLastApplied(mvPartitionStorage, 0, 0, 0); + checkLastApplied(mvPartitionStorage, 0, 0); assertNull(mvPartitionStorage.committedGroupConfiguration()); checkForMissingRows(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rows); @@ -912,7 +912,7 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest { } private void checkMvPartitionStorageMethodsAfterStartRebalance(MvPartitionStorage storage) { - checkLastApplied(storage, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS); + checkLastApplied(storage, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS); assertNull(storage.committedGroupConfiguration()); @@ -1056,7 +1056,6 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest { private static void checkLastApplied( MvPartitionStorage storage, long expLastAppliedIndex, - long expPersistentIndex, long expLastAppliedTerm ) { assertEquals(expLastAppliedIndex, storage.lastAppliedIndex()); diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/engine/AbstractStorageEngineTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/engine/AbstractStorageEngineTest.java new file mode 100644 index 0000000000..ce4249927e --- /dev/null +++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/engine/AbstractStorageEngineTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.storage.engine; + +import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_DATA_REGION; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assumptions.assumeFalse; +import static org.mockito.Mockito.mock; + +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.storage.AbstractMvTableStorageTest; +import org.apache.ignite.internal.storage.BaseMvStoragesTest; +import org.apache.ignite.internal.storage.MvPartitionStorage; +import org.apache.ignite.internal.storage.index.StorageIndexDescriptorSupplier; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * Tests basic functionality of storage engines. Allows for more complex scenarios than {@link AbstractMvTableStorageTest}, because it + * doesn't limit the usage of the engine with a single table. + */ +public abstract class AbstractStorageEngineTest extends BaseMvStoragesTest { + /** Engine instance. */ + private StorageEngine storageEngine; + + @BeforeEach + void createEngineBeforeTest() { + storageEngine = createEngine(); + + storageEngine.start(); + } + + @AfterEach + void stopEngineAfterTest() { + if (storageEngine != null) { + storageEngine.stop(); + } + } + + /** + * Creates a new storage engine instance. For persistent engines, the instances within a single test method should point to the same + * directory. + */ + protected abstract StorageEngine createEngine(); + + /** + * Tests that explicitly flushed data remains persistent on the device, when the engine is restarted. + */ + @Test + void testRestartAfterFlush() throws Exception { + assumeFalse(storageEngine.isVolatile()); + + StorageTableDescriptor tableDescriptor = new StorageTableDescriptor(1, 1, DEFAULT_DATA_REGION); + StorageIndexDescriptorSupplier indexSupplier = mock(StorageIndexDescriptorSupplier.class); + + MvTableStorage mvTableStorage = storageEngine.createMvTable(tableDescriptor, indexSupplier); + + mvTableStorage.start(); + try (AutoCloseable ignored0 = mvTableStorage::stop) { + CompletableFuture<MvPartitionStorage> mvPartitionStorageFuture = mvTableStorage.createMvPartition(0); + + assertThat(mvPartitionStorageFuture, willCompleteSuccessfully()); + MvPartitionStorage mvPartitionStorage = mvPartitionStorageFuture.join(); + + try (AutoCloseable ignored1 = mvTableStorage::stop) { + // Flush. Persist the table itself, not the update. + assertThat(mvPartitionStorage.flush(), willCompleteSuccessfully()); + + mvPartitionStorage.runConsistently(locker -> { + // Update of basic storage data. + mvPartitionStorage.lastApplied(10, 20); + + return null; + }); + + // Flush. + assertThat(mvPartitionStorage.flush(), willCompleteSuccessfully()); + } + } + + // Restart. + stopEngineAfterTest(); + createEngineBeforeTest(); + + mvTableStorage = storageEngine.createMvTable(tableDescriptor, indexSupplier); + + mvTableStorage.start(); + try (AutoCloseable ignored0 = mvTableStorage::close) { + CompletableFuture<MvPartitionStorage> mvPartitionStorageFuture = mvTableStorage.createMvPartition(0); + + assertThat(mvPartitionStorageFuture, willCompleteSuccessfully()); + MvPartitionStorage mvPartitionStorage = mvPartitionStorageFuture.join(); + + try (AutoCloseable ignored1 = mvTableStorage::stop) { + // Check that data has been persisted. + assertEquals(10, mvPartitionStorage.lastAppliedIndex()); + assertEquals(20, mvPartitionStorage.lastAppliedTerm()); + } + } + } +} diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestStorageEngine.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestStorageEngine.java index dc94832078..c983c8bbf3 100644 --- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestStorageEngine.java +++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestStorageEngine.java @@ -47,6 +47,11 @@ public class TestStorageEngine implements StorageEngine { // No-op. } + @Override + public boolean isVolatile() { + return true; + } + @Override public TestMvTableStorage createMvTable( StorageTableDescriptor tableDescriptor, diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java index 48f644bef2..43c19d2f27 100644 --- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java +++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java @@ -184,6 +184,11 @@ public class PersistentPageMemoryStorageEngine implements StorageEngine { } } + @Override + public boolean isVolatile() { + return false; + } + @Override public PersistentPageMemoryTableStorage createMvTable( StorageTableDescriptor tableDescriptor, diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryStorageEngine.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryStorageEngine.java index 98d13fb97b..0cd413e343 100644 --- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryStorageEngine.java +++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryStorageEngine.java @@ -125,6 +125,11 @@ public class VolatilePageMemoryStorageEngine implements StorageEngine { } } + @Override + public boolean isVolatile() { + return true; + } + @Override public VolatilePageMemoryTableStorage createMvTable( StorageTableDescriptor tableDescriptor, diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java index a59e59cb73..a34bb3060d 100644 --- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java +++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java @@ -212,13 +212,30 @@ public class PersistentPageMemoryMvPartitionStorage extends AbstractPageMemoryMv } private void lastAppliedBusy(long lastAppliedIndex, long lastAppliedTerm) throws StorageException { + updateMeta((lastCheckpointId, meta) -> meta.lastApplied(lastCheckpointId, lastAppliedIndex, lastAppliedTerm)); + } + + /** + * Closure interface for {@link #update(UUID, PartitionMeta)}. + */ + @FunctionalInterface + private interface MetaUpdateClosure { + void update(UUID lastCheckpointId, PartitionMeta meta); + } + + /** + * Updates partition meta. Hides all the necessary boilderplate in a single place. + */ + private void updateMeta(MetaUpdateClosure closure) { assert checkpointTimeoutLock.checkpointLockIsHeldByThread(); CheckpointProgress lastCheckpoint = checkpointManager.lastCheckpointProgress(); UUID lastCheckpointId = lastCheckpoint == null ? null : lastCheckpoint.id(); - meta.lastApplied(lastCheckpointId, lastAppliedIndex, lastAppliedTerm); + closure.update(lastCheckpointId, meta); + + checkpointManager.markPartitionAsDirty(tableStorage.dataRegion(), tableStorage.getTableId(), partitionId); } @Override @@ -262,30 +279,27 @@ public class PersistentPageMemoryMvPartitionStorage extends AbstractPageMemoryMv } private void committedGroupConfigurationBusy(byte[] groupConfigBytes) { - assert checkpointTimeoutLock.checkpointLockIsHeldByThread(); - - CheckpointProgress lastCheckpoint = checkpointManager.lastCheckpointProgress(); - UUID lastCheckpointId = lastCheckpoint == null ? null : lastCheckpoint.id(); + updateMeta((lastCheckpointId, meta) -> { + replicationProtocolGroupConfigReadWriteLock.writeLock().lock(); - replicationProtocolGroupConfigReadWriteLock.writeLock().lock(); - - try { - if (meta.lastReplicationProtocolGroupConfigFirstPageId() == BlobStorage.NO_PAGE_ID) { - long configPageId = blobStorage.addBlob(groupConfigBytes); + try { + if (meta.lastReplicationProtocolGroupConfigFirstPageId() == BlobStorage.NO_PAGE_ID) { + long configPageId = blobStorage.addBlob(groupConfigBytes); - meta.lastReplicationProtocolGroupConfigFirstPageId(lastCheckpointId, configPageId); - } else { - blobStorage.updateBlob(meta.lastReplicationProtocolGroupConfigFirstPageId(), groupConfigBytes); + meta.lastReplicationProtocolGroupConfigFirstPageId(lastCheckpointId, configPageId); + } else { + blobStorage.updateBlob(meta.lastReplicationProtocolGroupConfigFirstPageId(), groupConfigBytes); + } + } catch (IgniteInternalCheckedException e) { + throw new StorageException( + "Cannot save committed group configuration: [tableId={}, partitionId={}]", + e, + tableStorage.getTableId(), partitionId + ); + } finally { + replicationProtocolGroupConfigReadWriteLock.writeLock().unlock(); } - } catch (IgniteInternalCheckedException e) { - throw new StorageException( - "Cannot save committed group configuration: [tableId={}, partitionId={}]", - e, - tableStorage.getTableId(), partitionId - ); - } finally { - replicationProtocolGroupConfigReadWriteLock.writeLock().unlock(); - } + }); } @Override diff --git a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/engine/PersistentPageMemoryStorageEngineTest.java b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/engine/PersistentPageMemoryStorageEngineTest.java new file mode 100644 index 0000000000..6a41918159 --- /dev/null +++ b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/engine/PersistentPageMemoryStorageEngineTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.storage.pagememory.engine; + +import java.nio.file.Path; +import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; +import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; +import org.apache.ignite.internal.pagememory.io.PageIoRegistry; +import org.apache.ignite.internal.storage.engine.AbstractStorageEngineTest; +import org.apache.ignite.internal.storage.engine.StorageEngine; +import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine; +import org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryStorageEngineConfiguration; +import org.apache.ignite.internal.testframework.WorkDirectory; +import org.apache.ignite.internal.testframework.WorkDirectoryExtension; +import org.junit.jupiter.api.extension.ExtendWith; + +/** + * Implementation of the {@link AbstractStorageEngineTest} for the {@link PersistentPageMemoryStorageEngine#ENGINE_NAME} engine. + */ +@ExtendWith(ConfigurationExtension.class) +@ExtendWith(WorkDirectoryExtension.class) +public class PersistentPageMemoryStorageEngineTest extends AbstractStorageEngineTest { + @InjectConfiguration("mock {checkpoint.checkpointDelayMillis = 0, defaultRegion.size = 1048576}") + private PersistentPageMemoryStorageEngineConfiguration engineConfiguration; + + @WorkDirectory + private Path workDir; + + @Override + protected StorageEngine createEngine() { + var ioRegistry = new PageIoRegistry(); + + ioRegistry.loadFromServiceLoader(); + + return new PersistentPageMemoryStorageEngine( + "test", + engineConfiguration, + ioRegistry, + workDir, + null + ); + } +} diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java index 16d30ae7bb..7aa9127437 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java @@ -167,6 +167,11 @@ public class RocksDbStorageEngine implements StorageEngine { IgniteUtils.shutdownAndAwaitTermination(scheduledPool, 10, TimeUnit.SECONDS); } + @Override + public boolean isVolatile() { + return false; + } + @Override public RocksDbTableStorage createMvTable( StorageTableDescriptor tableDescriptor, diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/engine/RocksDbStorageEngineTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/engine/RocksDbStorageEngineTest.java new file mode 100644 index 0000000000..4faa412a08 --- /dev/null +++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/engine/RocksDbStorageEngineTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.storage.rocksdb.engine; + +import java.nio.file.Path; +import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; +import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; +import org.apache.ignite.internal.storage.engine.AbstractStorageEngineTest; +import org.apache.ignite.internal.storage.engine.StorageEngine; +import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine; +import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration; +import org.apache.ignite.internal.testframework.WorkDirectory; +import org.apache.ignite.internal.testframework.WorkDirectoryExtension; +import org.junit.jupiter.api.extension.ExtendWith; + +/** + * Implementation of the {@link AbstractStorageEngineTest} for the {@link RocksDbStorageEngine#ENGINE_NAME} engine. + */ +@ExtendWith(ConfigurationExtension.class) +@ExtendWith(WorkDirectoryExtension.class) +public class RocksDbStorageEngineTest extends AbstractStorageEngineTest { + @InjectConfiguration("mock.flushDelayMillis = 0") + private RocksDbStorageEngineConfiguration engineConfiguration; + + @WorkDirectory + private Path workDir; + + @Override + protected StorageEngine createEngine() { + return new RocksDbStorageEngine( + "test", + engineConfiguration, + workDir + ); + } +}