This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new ba9c1f88d1 IGNITE-20419 Fix partition meta loss after idle cluster 
restart. (#2594)
ba9c1f88d1 is described below

commit ba9c1f88d13a79d069e1ab6798b8932e8367a01d
Author: Ivan Bessonov <bessonov...@gmail.com>
AuthorDate: Mon Sep 18 13:10:08 2023 +0300

    IGNITE-20419 Fix partition meta loss after idle cluster restart. (#2594)
---
 .../persistence/PersistentPageMemory.java          |   1 +
 .../persistence/checkpoint/CheckpointManager.java  |   7 ++
 .../checkpoint/CheckpointPagesWriter.java          |   5 +
 .../persistence/checkpoint/CheckpointWorkflow.java |  45 ++++++++
 .../persistence/checkpoint/Checkpointer.java       |   8 ++
 .../internal/pagememory/util/PageIdUtils.java      |   6 +-
 .../checkpoint/CheckpointWorkflowTest.java         |  90 +++++++++++++++-
 .../internal/storage/engine/StorageEngine.java     |   5 +
 .../storage/AbstractMvTableStorageTest.java        |  19 ++--
 .../storage/engine/AbstractStorageEngineTest.java  | 119 +++++++++++++++++++++
 .../internal/storage/impl/TestStorageEngine.java   |   5 +
 .../PersistentPageMemoryStorageEngine.java         |   5 +
 .../VolatilePageMemoryStorageEngine.java           |   5 +
 .../mv/PersistentPageMemoryMvPartitionStorage.java |  58 ++++++----
 .../PersistentPageMemoryStorageEngineTest.java     |  58 ++++++++++
 .../storage/rocksdb/RocksDbStorageEngine.java      |   5 +
 .../rocksdb/engine/RocksDbStorageEngineTest.java   |  51 +++++++++
 17 files changed, 456 insertions(+), 36 deletions(-)

diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
index 4f11ab1454..e160645826 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
@@ -1230,6 +1230,7 @@ public class PersistentPageMemory implements PageMemory {
 
         if (dirty) {
             assert checkpointTimeoutLock.checkpointLockIsHeldByThread();
+            assert pageIndex(pageId.pageId()) != 0 : "Partition meta should 
only be updated via the instance of PartitionMeta.";
 
             if (!wasDirty || forceAdd) {
                 Segment seg = segment(pageId.groupId(), pageId.pageId());
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
index f2dac1a081..74bfbad288 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
@@ -236,6 +236,13 @@ public class CheckpointManager {
         return checkpointer.lastCheckpointProgress();
     }
 
+    /**
+     * Marks partition as dirty, forcing partition's meta-page to be written 
on disk during next checkpoint.
+     */
+    public void markPartitionAsDirty(DataRegion<?> dataRegion, int groupId, 
int partitionId) {
+        checkpointer.markPartitionAsDirty(dataRegion, groupId, partitionId);
+    }
+
     /**
      * Returns {@link true} if it is safe for all {@link DataRegion data 
regions} to update their {@link PageMemory}.
      *
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
index f133eb4aa0..093cdba516 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
@@ -229,6 +229,11 @@ public class CheckpointPagesWriter implements Runnable {
                         pm -> createPageStoreWriter(pm, pageIdsToRetry)
                 );
 
+                if (fullId.pageIdx() == 0) {
+                    // Skip meta-pages, they are written by 
"writePartitionMeta".
+                    continue;
+                }
+
                 // Should also be done for partitions that will be destroyed 
to remove their pages from the data region.
                 pageMemory.checkpointWritePage(fullId, tmpWriteBuf, 
pageStoreWriter, tracker);
             }
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
index d5f0a71afc..129532caf5 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
@@ -17,11 +17,13 @@
 
 package org.apache.ignite.internal.pagememory.persistence.checkpoint;
 
+import static java.util.concurrent.ConcurrentHashMap.newKeySet;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.MINUTES;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toUnmodifiableList;
+import static 
org.apache.ignite.internal.pagememory.persistence.PartitionMeta.partitionMetaPageId;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.DIRTY_PAGE_COMPARATOR;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.EMPTY;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointReadWriteLock.CHECKPOINT_RUNNER_THREAD_PREFIX;
@@ -36,7 +38,11 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ForkJoinPool;
@@ -48,8 +54,10 @@ import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.pagememory.DataRegion;
 import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.PageMemory;
 import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.CollectionUtils;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteInternalCheckedException;
 import org.jetbrains.annotations.Nullable;
@@ -97,6 +105,13 @@ class CheckpointWorkflow {
      */
     private final @Nullable ThreadPoolExecutor callbackListenerThreadPool;
 
+    /**
+     * Contains meta-page IDs for all partitions, that were explicitly marked 
dirty by {@link #markPartitionAsDirty(DataRegion, int, int)}.
+     * Not required to be volatile, read/write is protected by a {@link 
#checkpointReadWriteLock}. Publication of the initial value should
+     * be guaranteed by external user. {@link CheckpointManager}, in 
particular.
+     */
+    private Map<DataRegion<?>, Set<FullPageId>> dirtyPartitionsMap = new 
ConcurrentHashMap<>();
+
     /**
      * Constructor.
      *
@@ -161,6 +176,15 @@ class CheckpointWorkflow {
         }
     }
 
+    /**
+     * Marks partition as dirty, forcing partition's meta-page to be written 
on disk during next checkpoint.
+     */
+    public void markPartitionAsDirty(DataRegion<?> dataRegion, int groupId, 
int partitionId) {
+        Set<FullPageId> dirtyMetaPageIds = 
dirtyPartitionsMap.computeIfAbsent(dataRegion, unused -> newKeySet());
+
+        dirtyMetaPageIds.add(new FullPageId(partitionMetaPageId(partitionId), 
groupId));
+    }
+
     /**
      * First stage of checkpoint which collects demanded information (dirty 
pages mostly).
      *
@@ -343,14 +367,35 @@ class CheckpointWorkflow {
             Collection<? extends DataRegion<PersistentPageMemory>> dataRegions,
             CompletableFuture<?> allowToReplace
     ) {
+        Map<DataRegion<?>, Set<FullPageId>> dirtyPartitionsMap = 
this.dirtyPartitionsMap;
+
+        this.dirtyPartitionsMap = new ConcurrentHashMap<>();
+
         Collection<DataRegionDirtyPages<Collection<FullPageId>>> 
dataRegionsDirtyPages = new ArrayList<>(dataRegions.size());
 
+        // First, we iterate all regions that have dirty pages.
         for (DataRegion<PersistentPageMemory> dataRegion : dataRegions) {
             Collection<FullPageId> dirtyPages = 
dataRegion.pageMemory().beginCheckpoint(allowToReplace);
 
+            Set<FullPageId> dirtyMetaPageIds = 
dirtyPartitionsMap.remove(dataRegion);
+
+            if (dirtyMetaPageIds != null) {
+                // Merge these two collections. There should be no 
intersections.
+                dirtyPages = CollectionUtils.concat(dirtyMetaPageIds, 
dirtyPages);
+            }
+
             dataRegionsDirtyPages.add(new 
DataRegionDirtyPages<>(dataRegion.pageMemory(), dirtyPages));
         }
 
+        // Then we iterate regions that don't have dirty pages, but somehow 
have dirty partitions.
+        for (Entry<DataRegion<?>, Set<FullPageId>> entry : 
dirtyPartitionsMap.entrySet()) {
+            PageMemory pageMemory = entry.getKey().pageMemory();
+
+            assert pageMemory instanceof PersistentPageMemory;
+
+            dataRegionsDirtyPages.add(new 
DataRegionDirtyPages<>((PersistentPageMemory) pageMemory, entry.getValue()));
+        }
+
         return new DataRegionsDirtyPages(dataRegionsDirtyPages);
     }
 
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
index fe750b6271..786b29aee8 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
@@ -45,6 +45,7 @@ import java.util.function.BooleanSupplier;
 import org.apache.ignite.internal.components.LongJvmPauseDetector;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.pagememory.DataRegion;
 import org.apache.ignite.internal.pagememory.FullPageId;
 import 
org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointConfiguration;
 import 
org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointView;
@@ -263,6 +264,13 @@ public class Checkpointer extends IgniteWorker {
         return current;
     }
 
+    /**
+     * Marks partition as dirty, forcing partition's meta-page to be written 
on disk during next checkpoint.
+     */
+    void markPartitionAsDirty(DataRegion<?> dataRegion, int groupId, int 
partitionId) {
+        checkpointWorkflow.markPartitionAsDirty(dataRegion, groupId, 
partitionId);
+    }
+
     /**
      * Executes a checkpoint.
      *
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageIdUtils.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageIdUtils.java
index 6e290a0226..20338302fb 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageIdUtils.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageIdUtils.java
@@ -17,11 +17,13 @@
 
 package org.apache.ignite.internal.pagememory.util;
 
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
 import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA;
 
 import org.apache.ignite.internal.pagememory.FullPageId;
 import org.apache.ignite.internal.pagememory.PageIdAllocator;
 import org.apache.ignite.internal.util.HexStringUtils;
+import org.intellij.lang.annotations.MagicConstant;
 
 /**
  * Utility class for page ID parts manipulation.
@@ -115,11 +117,11 @@ public final class PageIdUtils {
      * Creates page ID from its components.
      *
      * @param partitionId Partition ID.
-     * @param flag Flag: {@link PageIdAllocator#FLAG_DATA} of {@link 
PageIdAllocator#FLAG_AUX}.
+     * @param flag Flag: {@link PageIdAllocator#FLAG_DATA} or {@link 
PageIdAllocator#FLAG_AUX}.
      * @param pageIdx Page index, monotonically growing number within each 
partition.
      * @return Page ID constructed from the given pageIdx and partition ID, 
see {@link FullPageId}.
      */
-    public static long pageId(int partitionId, byte flag, int pageIdx) {
+    public static long pageId(int partitionId, @MagicConstant(intValues = 
{FLAG_DATA, FLAG_AUX}) byte flag, int pageIdx) {
         long pageId = flag & FLAG_MASK;
 
         pageId = (pageId << PART_ID_SIZE) | (partitionId & PART_ID_MASK);
diff --git 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java
 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java
index 0a648dd03a..b9a176652b 100644
--- 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java
+++ 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java
@@ -20,6 +20,8 @@ package 
org.apache.ignite.internal.pagememory.persistence.checkpoint;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA;
+import static 
org.apache.ignite.internal.pagememory.persistence.PartitionMeta.partitionMetaPageId;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.EMPTY;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
@@ -32,12 +34,14 @@ import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.Check
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointWorkflowTest.TestCheckpointListener.BEFORE_CHECKPOINT_BEGIN;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointWorkflowTest.TestCheckpointListener.ON_CHECKPOINT_BEGIN;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointWorkflowTest.TestCheckpointListener.ON_MARK_CHECKPOINT_BEGIN;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
 import static 
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -67,7 +71,6 @@ import org.apache.ignite.internal.pagememory.DataRegion;
 import org.apache.ignite.internal.pagememory.FullPageId;
 import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
 import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.CheckpointDirtyPagesView;
-import org.apache.ignite.internal.pagememory.util.PageIdUtils;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.lang.IgniteInternalCheckedException;
 import org.jetbrains.annotations.Nullable;
@@ -496,6 +499,89 @@ public class CheckpointWorkflowTest extends 
BaseIgniteAbstractTest {
         );
     }
 
+    /**
+     * Tests that dirty partition with no dirty pages will be checkpointed.
+     */
+    @Test
+    void testDirtyPartitionWithoutDirtyPages() throws Exception {
+        PersistentPageMemory pageMemory = mock(PersistentPageMemory.class);
+
+        DataRegion<PersistentPageMemory> dataRegion = () -> pageMemory;
+
+        workflow = new CheckpointWorkflow(
+                "test",
+                newReadWriteLock(log),
+                List.of(dataRegion),
+                1
+        );
+
+        workflow.start();
+
+        int groupId = 10;
+        int partitionId = 20;
+
+        FullPageId metaPageId = new 
FullPageId(partitionMetaPageId(partitionId), groupId);
+        workflow.markPartitionAsDirty(dataRegion, groupId, partitionId);
+
+        Checkpoint checkpoint = workflow.markCheckpointBegin(
+                coarseCurrentTimeMillis(),
+                mock(CheckpointProgressImpl.class),
+                mock(CheckpointMetricsTracker.class),
+                () -> {},
+                () -> {}
+        );
+
+        assertEquals(1, checkpoint.dirtyPagesSize);
+
+        CheckpointDirtyPagesView dirtyPagesView = 
checkpoint.dirtyPages.nextPartitionView(null);
+
+        assertNotNull(dirtyPagesView);
+        assertThat(toListDirtyPageIds(dirtyPagesView), 
is(List.of(metaPageId)));
+    }
+
+    /**
+     * Tests that dirty partition with dirty pages will be checkpointed.
+     */
+    @Test
+    void testDirtyPartitionWithDirtyPages() throws Exception {
+        PersistentPageMemory pageMemory = mock(PersistentPageMemory.class);
+
+        DataRegion<PersistentPageMemory> dataRegion = () -> pageMemory;
+
+        workflow = new CheckpointWorkflow(
+                "test",
+                newReadWriteLock(log),
+                List.of(dataRegion),
+                1
+        );
+
+        workflow.start();
+
+        int groupId = 10;
+        int partitionId = 20;
+
+        FullPageId metaPageId = new 
FullPageId(partitionMetaPageId(partitionId), groupId);
+        FullPageId dataPageId = new FullPageId(pageId(partitionId, FLAG_DATA, 
1), groupId);
+
+        workflow.markPartitionAsDirty(dataRegion, groupId, partitionId);
+        
when(pageMemory.beginCheckpoint(any())).thenReturn(List.of(dataPageId));
+
+        Checkpoint checkpoint = workflow.markCheckpointBegin(
+                coarseCurrentTimeMillis(),
+                mock(CheckpointProgressImpl.class),
+                mock(CheckpointMetricsTracker.class),
+                () -> {},
+                () -> {}
+        );
+
+        assertEquals(2, checkpoint.dirtyPagesSize);
+
+        CheckpointDirtyPagesView dirtyPagesView = 
checkpoint.dirtyPages.nextPartitionView(null);
+
+        assertNotNull(dirtyPagesView);
+        assertThat(toListDirtyPageIds(dirtyPagesView), is(List.of(metaPageId, 
dataPageId)));
+    }
+
     @Test
     void testAwaitPendingTasksOfListenerCallback() {
         workflow = new CheckpointWorkflow(
@@ -592,7 +678,7 @@ public class CheckpointWorkflowTest extends 
BaseIgniteAbstractTest {
     }
 
     private static FullPageId of(int grpId, int partId, int pageIdx) {
-        return new FullPageId(PageIdUtils.pageId(partId, (byte) 0, pageIdx), 
grpId);
+        return new FullPageId(pageId(partId, (byte) 0, pageIdx), grpId);
     }
 
     /**
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageEngine.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageEngine.java
index a1e4252f24..908d62a9c1 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageEngine.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageEngine.java
@@ -43,6 +43,11 @@ public interface StorageEngine {
      */
     void stop() throws StorageException;
 
+    /**
+     * Whether the data is lost upon engine restart or not.
+     */
+    boolean isVolatile();
+
     /**
      * Creates new table storage.
      *
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index 765c4f7e9f..3728e9f41d 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -434,7 +434,7 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
 
         fillStorages(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, 
rowsOnRebalance);
 
-        checkLastApplied(mvPartitionStorage, REBALANCE_IN_PROGRESS, 
REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
+        checkLastApplied(mvPartitionStorage, REBALANCE_IN_PROGRESS, 
REBALANCE_IN_PROGRESS);
         assertNull(mvPartitionStorage.committedGroupConfiguration());
 
         // Let's finish rebalancing.
@@ -459,7 +459,7 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
         checkForMissingRows(mvPartitionStorage, hashIndexStorage, 
sortedIndexStorage, rowsBeforeRebalanceStart);
         checkForPresenceRows(mvPartitionStorage, hashIndexStorage, 
sortedIndexStorage, rowsOnRebalance);
 
-        checkLastApplied(mvPartitionStorage, 10, 10, 20);
+        checkLastApplied(mvPartitionStorage, 10, 20);
         checkRaftGroupConfigs(raftGroupConfig, 
mvPartitionStorage.committedGroupConfiguration());
     }
 
@@ -493,7 +493,7 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
 
         fillStorages(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, 
rowsOnRebalance);
 
-        checkLastApplied(mvPartitionStorage, REBALANCE_IN_PROGRESS, 
REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
+        checkLastApplied(mvPartitionStorage, REBALANCE_IN_PROGRESS, 
REBALANCE_IN_PROGRESS);
 
         // Let's abort rebalancing.
 
@@ -506,7 +506,7 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
         checkForMissingRows(mvPartitionStorage, hashIndexStorage, 
sortedIndexStorage, rowsBeforeRebalanceStart);
         checkForMissingRows(mvPartitionStorage, hashIndexStorage, 
sortedIndexStorage, rowsOnRebalance);
 
-        checkLastApplied(mvPartitionStorage, 0, 0, 0);
+        checkLastApplied(mvPartitionStorage, 0, 0);
         assertNull(mvPartitionStorage.committedGroupConfiguration());
     }
 
@@ -609,11 +609,11 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
             // Let's check the repositories: they should be empty.
             checkForMissingRows(mvPartitionStorage, hashIndexStorage, 
sortedIndexStorage, rows);
 
-            checkLastApplied(mvPartitionStorage, 0, 0, 0);
+            checkLastApplied(mvPartitionStorage, 0, 0);
         } else {
             checkForPresenceRows(mvPartitionStorage, hashIndexStorage, 
sortedIndexStorage, rows);
 
-            checkLastApplied(mvPartitionStorage, REBALANCE_IN_PROGRESS, 
REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
+            checkLastApplied(mvPartitionStorage, REBALANCE_IN_PROGRESS, 
REBALANCE_IN_PROGRESS);
         }
     }
 
@@ -631,7 +631,7 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
         // Let's check the cleanup for an empty partition.
         assertThat(tableStorage.clearPartition(PARTITION_ID), 
willCompleteSuccessfully());
 
-        checkLastApplied(mvPartitionStorage, 0, 0, 0);
+        checkLastApplied(mvPartitionStorage, 0, 0);
         assertNull(mvPartitionStorage.committedGroupConfiguration());
 
         // Let's fill the storages and clean them.
@@ -655,7 +655,7 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
         // Let's clear the storages and check them out.
         assertThat(tableStorage.clearPartition(PARTITION_ID), 
willCompleteSuccessfully());
 
-        checkLastApplied(mvPartitionStorage, 0, 0, 0);
+        checkLastApplied(mvPartitionStorage, 0, 0);
         assertNull(mvPartitionStorage.committedGroupConfiguration());
 
         checkForMissingRows(mvPartitionStorage, hashIndexStorage, 
sortedIndexStorage, rows);
@@ -912,7 +912,7 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
     }
 
     private void 
checkMvPartitionStorageMethodsAfterStartRebalance(MvPartitionStorage storage) {
-        checkLastApplied(storage, REBALANCE_IN_PROGRESS, 
REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
+        checkLastApplied(storage, REBALANCE_IN_PROGRESS, 
REBALANCE_IN_PROGRESS);
 
         assertNull(storage.committedGroupConfiguration());
 
@@ -1056,7 +1056,6 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
     private static void checkLastApplied(
             MvPartitionStorage storage,
             long expLastAppliedIndex,
-            long expPersistentIndex,
             long expLastAppliedTerm
     ) {
         assertEquals(expLastAppliedIndex, storage.lastAppliedIndex());
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/engine/AbstractStorageEngineTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/engine/AbstractStorageEngineTest.java
new file mode 100644
index 0000000000..ce4249927e
--- /dev/null
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/engine/AbstractStorageEngineTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.engine;
+
+import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_DATA_REGION;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assumptions.assumeFalse;
+import static org.mockito.Mockito.mock;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.storage.AbstractMvTableStorageTest;
+import org.apache.ignite.internal.storage.BaseMvStoragesTest;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.index.StorageIndexDescriptorSupplier;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests basic functionality of storage engines. Allows for more complex 
scenarios than {@link AbstractMvTableStorageTest}, because it
+ * doesn't limit the usage of the engine with a single table.
+ */
+public abstract class AbstractStorageEngineTest extends BaseMvStoragesTest {
+    /** Engine instance. */
+    private StorageEngine storageEngine;
+
+    @BeforeEach
+    void createEngineBeforeTest() {
+        storageEngine = createEngine();
+
+        storageEngine.start();
+    }
+
+    @AfterEach
+    void stopEngineAfterTest() {
+        if (storageEngine != null) {
+            storageEngine.stop();
+        }
+    }
+
+    /**
+     * Creates a new storage engine instance. For persistent engines, the 
instances within a single test method should point to the same
+     * directory.
+     */
+    protected abstract StorageEngine createEngine();
+
+    /**
+     * Tests that explicitly flushed data remains persistent on the device, 
when the engine is restarted.
+     */
+    @Test
+    void testRestartAfterFlush() throws Exception {
+        assumeFalse(storageEngine.isVolatile());
+
+        StorageTableDescriptor tableDescriptor = new StorageTableDescriptor(1, 
1, DEFAULT_DATA_REGION);
+        StorageIndexDescriptorSupplier indexSupplier = 
mock(StorageIndexDescriptorSupplier.class);
+
+        MvTableStorage mvTableStorage = 
storageEngine.createMvTable(tableDescriptor, indexSupplier);
+
+        mvTableStorage.start();
+        try (AutoCloseable ignored0 = mvTableStorage::stop) {
+            CompletableFuture<MvPartitionStorage> mvPartitionStorageFuture = 
mvTableStorage.createMvPartition(0);
+
+            assertThat(mvPartitionStorageFuture, willCompleteSuccessfully());
+            MvPartitionStorage mvPartitionStorage = 
mvPartitionStorageFuture.join();
+
+            try (AutoCloseable ignored1 = mvTableStorage::stop) {
+                // Flush. Persist the table itself, not the update.
+                assertThat(mvPartitionStorage.flush(), 
willCompleteSuccessfully());
+
+                mvPartitionStorage.runConsistently(locker -> {
+                    // Update of basic storage data.
+                    mvPartitionStorage.lastApplied(10, 20);
+
+                    return null;
+                });
+
+                // Flush.
+                assertThat(mvPartitionStorage.flush(), 
willCompleteSuccessfully());
+            }
+        }
+
+        // Restart.
+        stopEngineAfterTest();
+        createEngineBeforeTest();
+
+        mvTableStorage = storageEngine.createMvTable(tableDescriptor, 
indexSupplier);
+
+        mvTableStorage.start();
+        try (AutoCloseable ignored0 = mvTableStorage::close) {
+            CompletableFuture<MvPartitionStorage> mvPartitionStorageFuture = 
mvTableStorage.createMvPartition(0);
+
+            assertThat(mvPartitionStorageFuture, willCompleteSuccessfully());
+            MvPartitionStorage mvPartitionStorage = 
mvPartitionStorageFuture.join();
+
+            try (AutoCloseable ignored1 = mvTableStorage::stop) {
+                // Check that data has been persisted.
+                assertEquals(10, mvPartitionStorage.lastAppliedIndex());
+                assertEquals(20, mvPartitionStorage.lastAppliedTerm());
+            }
+        }
+    }
+}
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestStorageEngine.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestStorageEngine.java
index dc94832078..c983c8bbf3 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestStorageEngine.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestStorageEngine.java
@@ -47,6 +47,11 @@ public class TestStorageEngine implements StorageEngine {
         // No-op.
     }
 
+    @Override
+    public boolean isVolatile() {
+        return true;
+    }
+
     @Override
     public TestMvTableStorage createMvTable(
             StorageTableDescriptor tableDescriptor,
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
index 48f644bef2..43c19d2f27 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
@@ -184,6 +184,11 @@ public class PersistentPageMemoryStorageEngine implements 
StorageEngine {
         }
     }
 
+    @Override
+    public boolean isVolatile() {
+        return false;
+    }
+
     @Override
     public PersistentPageMemoryTableStorage createMvTable(
             StorageTableDescriptor tableDescriptor,
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryStorageEngine.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryStorageEngine.java
index 98d13fb97b..0cd413e343 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryStorageEngine.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryStorageEngine.java
@@ -125,6 +125,11 @@ public class VolatilePageMemoryStorageEngine implements 
StorageEngine {
         }
     }
 
+    @Override
+    public boolean isVolatile() {
+        return true;
+    }
+
     @Override
     public VolatilePageMemoryTableStorage createMvTable(
             StorageTableDescriptor tableDescriptor,
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
index a59e59cb73..a34bb3060d 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
@@ -212,13 +212,30 @@ public class PersistentPageMemoryMvPartitionStorage 
extends AbstractPageMemoryMv
     }
 
     private void lastAppliedBusy(long lastAppliedIndex, long lastAppliedTerm) 
throws StorageException {
+        updateMeta((lastCheckpointId, meta) -> 
meta.lastApplied(lastCheckpointId, lastAppliedIndex, lastAppliedTerm));
+    }
+
+    /**
+     * Closure interface for {@link #update(UUID, PartitionMeta)}.
+     */
+    @FunctionalInterface
+    private interface MetaUpdateClosure {
+        void update(UUID lastCheckpointId, PartitionMeta meta);
+    }
+
+    /**
+     * Updates partition meta. Hides all the necessary boilderplate in a 
single place.
+     */
+    private void updateMeta(MetaUpdateClosure closure) {
         assert checkpointTimeoutLock.checkpointLockIsHeldByThread();
 
         CheckpointProgress lastCheckpoint = 
checkpointManager.lastCheckpointProgress();
 
         UUID lastCheckpointId = lastCheckpoint == null ? null : 
lastCheckpoint.id();
 
-        meta.lastApplied(lastCheckpointId, lastAppliedIndex, lastAppliedTerm);
+        closure.update(lastCheckpointId, meta);
+
+        checkpointManager.markPartitionAsDirty(tableStorage.dataRegion(), 
tableStorage.getTableId(), partitionId);
     }
 
     @Override
@@ -262,30 +279,27 @@ public class PersistentPageMemoryMvPartitionStorage 
extends AbstractPageMemoryMv
     }
 
     private void committedGroupConfigurationBusy(byte[] groupConfigBytes) {
-        assert checkpointTimeoutLock.checkpointLockIsHeldByThread();
-
-        CheckpointProgress lastCheckpoint = 
checkpointManager.lastCheckpointProgress();
-        UUID lastCheckpointId = lastCheckpoint == null ? null : 
lastCheckpoint.id();
+        updateMeta((lastCheckpointId, meta) -> {
+            replicationProtocolGroupConfigReadWriteLock.writeLock().lock();
 
-        replicationProtocolGroupConfigReadWriteLock.writeLock().lock();
-
-        try {
-            if (meta.lastReplicationProtocolGroupConfigFirstPageId() == 
BlobStorage.NO_PAGE_ID) {
-                long configPageId = blobStorage.addBlob(groupConfigBytes);
+            try {
+                if (meta.lastReplicationProtocolGroupConfigFirstPageId() == 
BlobStorage.NO_PAGE_ID) {
+                    long configPageId = blobStorage.addBlob(groupConfigBytes);
 
-                
meta.lastReplicationProtocolGroupConfigFirstPageId(lastCheckpointId, 
configPageId);
-            } else {
-                
blobStorage.updateBlob(meta.lastReplicationProtocolGroupConfigFirstPageId(), 
groupConfigBytes);
+                    
meta.lastReplicationProtocolGroupConfigFirstPageId(lastCheckpointId, 
configPageId);
+                } else {
+                    
blobStorage.updateBlob(meta.lastReplicationProtocolGroupConfigFirstPageId(), 
groupConfigBytes);
+                }
+            } catch (IgniteInternalCheckedException e) {
+                throw new StorageException(
+                        "Cannot save committed group configuration: 
[tableId={}, partitionId={}]",
+                        e,
+                        tableStorage.getTableId(), partitionId
+                );
+            } finally {
+                
replicationProtocolGroupConfigReadWriteLock.writeLock().unlock();
             }
-        } catch (IgniteInternalCheckedException e) {
-            throw new StorageException(
-                    "Cannot save committed group configuration: [tableId={}, 
partitionId={}]",
-                    e,
-                    tableStorage.getTableId(), partitionId
-            );
-        } finally {
-            replicationProtocolGroupConfigReadWriteLock.writeLock().unlock();
-        }
+        });
     }
 
     @Override
diff --git 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/engine/PersistentPageMemoryStorageEngineTest.java
 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/engine/PersistentPageMemoryStorageEngineTest.java
new file mode 100644
index 0000000000..6a41918159
--- /dev/null
+++ 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/engine/PersistentPageMemoryStorageEngineTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.engine;
+
+import java.nio.file.Path;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
+import org.apache.ignite.internal.storage.engine.AbstractStorageEngineTest;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import 
org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
+import 
org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryStorageEngineConfiguration;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Implementation of the {@link AbstractStorageEngineTest} for the {@link 
PersistentPageMemoryStorageEngine#ENGINE_NAME} engine.
+ */
+@ExtendWith(ConfigurationExtension.class)
+@ExtendWith(WorkDirectoryExtension.class)
+public class PersistentPageMemoryStorageEngineTest extends 
AbstractStorageEngineTest {
+    @InjectConfiguration("mock {checkpoint.checkpointDelayMillis = 0, 
defaultRegion.size = 1048576}")
+    private PersistentPageMemoryStorageEngineConfiguration engineConfiguration;
+
+    @WorkDirectory
+    private Path workDir;
+
+    @Override
+    protected StorageEngine createEngine() {
+        var ioRegistry = new PageIoRegistry();
+
+        ioRegistry.loadFromServiceLoader();
+
+        return new PersistentPageMemoryStorageEngine(
+                "test",
+                engineConfiguration,
+                ioRegistry,
+                workDir,
+                null
+        );
+    }
+}
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
index 16d30ae7bb..7aa9127437 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
@@ -167,6 +167,11 @@ public class RocksDbStorageEngine implements StorageEngine 
{
         IgniteUtils.shutdownAndAwaitTermination(scheduledPool, 10, 
TimeUnit.SECONDS);
     }
 
+    @Override
+    public boolean isVolatile() {
+        return false;
+    }
+
     @Override
     public RocksDbTableStorage createMvTable(
             StorageTableDescriptor tableDescriptor,
diff --git 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/engine/RocksDbStorageEngineTest.java
 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/engine/RocksDbStorageEngineTest.java
new file mode 100644
index 0000000000..4faa412a08
--- /dev/null
+++ 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/engine/RocksDbStorageEngineTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.engine;
+
+import java.nio.file.Path;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.storage.engine.AbstractStorageEngineTest;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import 
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Implementation of the {@link AbstractStorageEngineTest} for the {@link 
RocksDbStorageEngine#ENGINE_NAME} engine.
+ */
+@ExtendWith(ConfigurationExtension.class)
+@ExtendWith(WorkDirectoryExtension.class)
+public class RocksDbStorageEngineTest extends AbstractStorageEngineTest {
+    @InjectConfiguration("mock.flushDelayMillis = 0")
+    private RocksDbStorageEngineConfiguration engineConfiguration;
+
+    @WorkDirectory
+    private Path workDir;
+
+    @Override
+    protected StorageEngine createEngine() {
+        return new RocksDbStorageEngine(
+                "test",
+                engineConfiguration,
+                workDir
+        );
+    }
+}


Reply via email to