sashapolo commented on a change in pull request #285:
URL: https://github.com/apache/ignite-3/pull/285#discussion_r692312221



##########
File path: 
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
##########
@@ -309,12 +337,108 @@ public RocksDbStorage(Path dbPath, 
Comparator<ByteBuffer> comparator) throws Sto
         return new ScanCursor(db.newIterator(), filter);
     }
 
+    /** {@inheritDoc} */
+    @Override public @NotNull CompletableFuture<Void> snapshot(Path 
snapshotPath) {
+        Path tempPath = Paths.get(snapshotPath.toString() + TMP_SUFFIX);
+
+        // Create a RocksDB point-in-time snapshot
+        Snapshot snapshot = db.getSnapshot();
+
+        return CompletableFuture.runAsync(() -> {
+            // (Re)create the temporary directory
+            IgniteUtils.deleteIfExists(tempPath);
+
+            try {
+                Files.createDirectories(tempPath);
+            }
+            catch (IOException e) {
+                throw new IgniteInternalException("Failed to create directory: 
" + tempPath, e);
+            }
+        }, snapshotExecutor)
+        .thenCompose(aVoid -> createSstFile(db, snapshot, tempPath))

Review comment:
       your formatting is off

##########
File path: 
modules/raft/src/integrationTest/java/org/apache/ignite/raft/client/service/ITAbstractListenerSnapshotTest.java
##########
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.client.service;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BooleanSupplier;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.raft.server.impl.JRaftServerImpl;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.network.ClusterLocalConfiguration;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.ClusterServiceFactory;
+import org.apache.ignite.network.MessageSerializationRegistryImpl;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.StaticNodeFinder;
+import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import org.apache.ignite.raft.client.Peer;
+import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
+import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Base class for persistent raft group's snapshots tests.
+ *
+ * @param <T> Type of the raft group listener.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+public abstract class ITAbstractListenerSnapshotTest<T extends 
RaftGroupListener> {
+    /** Starting server port. */
+    private static final int PORT = 5003;
+
+    /** Starting client port. */
+    private static final int CLIENT_PORT = 6003;
+
+    /**
+     * Peers list.
+     */
+    private static final List<Peer> INITIAL_CONF = IntStream.rangeClosed(0, 2)
+        .mapToObj(i -> new NetworkAddress(getLocalAddress(), PORT + i))
+        .map(Peer::new)
+        .collect(Collectors.toUnmodifiableList());
+
+    /** Factory. */
+    private static final RaftClientMessagesFactory FACTORY = new 
RaftClientMessagesFactory();
+
+    /** Network factory. */
+    private static final ClusterServiceFactory NETWORK_FACTORY = new 
TestScaleCubeClusterServiceFactory();
+
+    /** */
+    private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = 
new MessageSerializationRegistryImpl();
+
+    /** */
+    @WorkDirectory
+    private Path workDir;
+
+    /** Cluster. */
+    private final List<ClusterService> cluster = new ArrayList<>();
+
+    /** Servers. */
+    private final List<JRaftServerImpl> servers = new ArrayList<>();
+
+    /** Clients. */
+    private final List<RaftGroupService> clients = new ArrayList<>();
+
+    /**
+     * Shutdown raft server and stop all cluster nodes.
+     *
+     * @throws Exception If failed to shutdown raft server,
+     */
+    @AfterEach
+    public void afterTest() throws Exception {
+        for (RaftGroupService client : clients)
+            client.shutdown();
+
+        for (JRaftServerImpl server : servers)
+            server.stop();
+
+        for (ClusterService service : cluster)
+            service.stop();
+    }
+
+    /**
+     * Test parameters for {@link #testSnapshot}.
+     */
+    private static class TestData {
+        /** Delete raft group folder. */
+        private final boolean deleteFolder;
+
+        /** Interact with the raft group after a snapshot. */
+        private final boolean interactAfterSnapshot;
+
+        /** */
+        private TestData(boolean deleteFolder, boolean interactAfterSnapshot) {
+            this.deleteFolder = deleteFolder;
+            this.interactAfterSnapshot = interactAfterSnapshot;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return String.format("deleteFolder=%s, interactAfterSnapshot=%s", 
deleteFolder, interactAfterSnapshot);
+        }
+    }
+
+    /**
+     * @return {@link #testSnapshot} parameters.
+     */
+    private static List<TestData> testSnapshotData() {
+        return List.of(
+            new TestData(false, false),
+            new TestData(true, true),
+            new TestData(false, true),
+            new TestData(true, false)
+        );
+    }
+
+    /**
+     * Tests that a joining raft node successfully restores a snapshot.
+     *
+     * @param testData Test parameters.
+     * @throws Exception If failed.
+     */
+    @ParameterizedTest
+    @MethodSource("testSnapshotData")
+    public void testSnapshot(TestData testData) throws Exception {
+        // Set up a raft group service
+        RaftGroupService service = prepareRaftGroup();
+
+        doBeforeStop(service);
+
+        // Select any node that is not the leader of the group
+        JRaftServerImpl toStop = servers.stream()
+            .filter(server -> 
!server.localPeer(raftGroupId()).equals(service.leader()))
+            .findAny()
+            .orElseThrow();
+
+        // Get the path to that node's raft directory
+        Path serverDataPath = toStop.getServerDataPath(raftGroupId());
+
+        // Get the path to that node's RocksDB key-value storage
+        Path dbPath = getListenerPersistencePath(getListener(toStop, 
raftGroupId()));
+
+        int stopIdx = servers.indexOf(toStop);
+
+        // Remove that node from the list of servers
+        servers.remove(stopIdx);
+
+        // Shutdown that node
+        toStop.stop();
+
+        // Create a snapshot of the raft group
+        service.snapshot(service.leader()).get();
+
+        doAfterStop(service);
+
+        // Create another raft snapshot
+        service.snapshot(service.leader()).get();
+
+        if (testData.deleteFolder) {
+            // Delete a stopped node's raft directory and key-value storage 
directory
+            // to check if snapshot could be restored by the restarted node
+            IgniteUtils.deleteIfExists(dbPath);
+            IgniteUtils.deleteIfExists(serverDataPath);
+        }
+
+        if (testData.interactAfterSnapshot) {
+            // Interact with the raft group after the second snapshot to check 
if the restarted node would see these
+            // interactions after restoring a snapshot and raft logs
+            doAfterSnapshot(service);
+        }
+
+        // Restart the node
+        JRaftServerImpl restarted = startServer(stopIdx);
+
+        assertTrue(waitForTopology(cluster.get(0), servers.size(), 3_000));
+
+        BooleanSupplier closure = snapshotCheckClosure(restarted, 
testData.interactAfterSnapshot);
+
+        boolean success = waitForCondition(closure, 10_000);
+
+        assertTrue(success);
+    }
+
+    /**
+     * Interacts with raft group before the follower is stopped.
+     *
+     * @param service Raft group service.
+     * @throws Exception If failed.
+     */
+    public abstract void doBeforeStop(RaftGroupService service) throws 
Exception;
+
+    /**
+     * Interacts with raft group after the follower is stopped.
+     *
+     * @param service Raft group service.
+     * @throws Exception If failed.
+     */
+    public abstract void doAfterStop(RaftGroupService service) throws 
Exception;
+
+    /**
+     * Interacts with raft group after the leader has captured a snapshot.

Review comment:
       ```suggestion
        * Interacts with a raft group after the leader has captured a snapshot.
   ```

##########
File path: 
modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITTablePersistenceTest.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.distributed;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.function.BooleanSupplier;
+import org.apache.ignite.internal.raft.server.impl.JRaftServerImpl;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.schema.row.RowAssembler;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.basic.SimpleDataRow;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorage;
+import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.raft.client.service.ITAbstractListenerSnapshotTest;
+import org.apache.ignite.raft.client.service.RaftGroupListener;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+
+/**
+ * Persistent (rocksdb-based) partitions raft group snapshots tests.
+ */
+public class ITTablePersistenceTest extends 
ITAbstractListenerSnapshotTest<PartitionListener> {
+    /** */
+    public static SchemaDescriptor SCHEMA = new 
SchemaDescriptor(UUID.randomUUID(),
+        1,
+        new Column[] {new Column("key", NativeTypes.INT64, false)},
+        new Column[] {new Column("value", NativeTypes.INT64, false)}
+    );
+
+    /** */
+    private static final Row firstKey = createKeyRow(0);

Review comment:
       shouldn't these variables be named in upper case?

##########
File path: 
modules/raft/src/integrationTest/java/org/apache/ignite/raft/client/service/ITAbstractListenerSnapshotTest.java
##########
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.client.service;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BooleanSupplier;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.raft.server.impl.JRaftServerImpl;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.network.ClusterLocalConfiguration;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.ClusterServiceFactory;
+import org.apache.ignite.network.MessageSerializationRegistryImpl;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.StaticNodeFinder;
+import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import org.apache.ignite.raft.client.Peer;
+import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
+import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Base class for persistent raft group's snapshots tests.
+ *
+ * @param <T> Type of the raft group listener.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+public abstract class ITAbstractListenerSnapshotTest<T extends 
RaftGroupListener> {
+    /** Starting server port. */
+    private static final int PORT = 5003;
+
+    /** Starting client port. */
+    private static final int CLIENT_PORT = 6003;
+
+    /**
+     * Peers list.
+     */
+    private static final List<Peer> INITIAL_CONF = IntStream.rangeClosed(0, 2)
+        .mapToObj(i -> new NetworkAddress(getLocalAddress(), PORT + i))
+        .map(Peer::new)
+        .collect(Collectors.toUnmodifiableList());
+
+    /** Factory. */
+    private static final RaftClientMessagesFactory FACTORY = new 
RaftClientMessagesFactory();
+
+    /** Network factory. */
+    private static final ClusterServiceFactory NETWORK_FACTORY = new 
TestScaleCubeClusterServiceFactory();
+
+    /** */
+    private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = 
new MessageSerializationRegistryImpl();
+
+    /** */
+    @WorkDirectory
+    private Path workDir;
+
+    /** Cluster. */
+    private final List<ClusterService> cluster = new ArrayList<>();
+
+    /** Servers. */
+    private final List<JRaftServerImpl> servers = new ArrayList<>();
+
+    /** Clients. */
+    private final List<RaftGroupService> clients = new ArrayList<>();
+
+    /**
+     * Shutdown raft server and stop all cluster nodes.
+     *
+     * @throws Exception If failed to shutdown raft server,
+     */
+    @AfterEach
+    public void afterTest() throws Exception {
+        for (RaftGroupService client : clients)
+            client.shutdown();
+
+        for (JRaftServerImpl server : servers)
+            server.stop();
+
+        for (ClusterService service : cluster)
+            service.stop();
+    }
+
+    /**
+     * Test parameters for {@link #testSnapshot}.
+     */
+    private static class TestData {
+        /** Delete raft group folder. */
+        private final boolean deleteFolder;
+
+        /** Interact with the raft group after a snapshot. */
+        private final boolean interactAfterSnapshot;
+
+        /** */
+        private TestData(boolean deleteFolder, boolean interactAfterSnapshot) {
+            this.deleteFolder = deleteFolder;
+            this.interactAfterSnapshot = interactAfterSnapshot;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return String.format("deleteFolder=%s, interactAfterSnapshot=%s", 
deleteFolder, interactAfterSnapshot);
+        }
+    }
+
+    /**
+     * @return {@link #testSnapshot} parameters.
+     */
+    private static List<TestData> testSnapshotData() {
+        return List.of(
+            new TestData(false, false),
+            new TestData(true, true),
+            new TestData(false, true),
+            new TestData(true, false)
+        );
+    }
+
+    /**
+     * Tests that a joining raft node successfully restores a snapshot.
+     *
+     * @param testData Test parameters.
+     * @throws Exception If failed.
+     */
+    @ParameterizedTest
+    @MethodSource("testSnapshotData")
+    public void testSnapshot(TestData testData) throws Exception {
+        // Set up a raft group service
+        RaftGroupService service = prepareRaftGroup();
+
+        doBeforeStop(service);
+
+        // Select any node that is not the leader of the group
+        JRaftServerImpl toStop = servers.stream()
+            .filter(server -> 
!server.localPeer(raftGroupId()).equals(service.leader()))
+            .findAny()
+            .orElseThrow();
+
+        // Get the path to that node's raft directory
+        Path serverDataPath = toStop.getServerDataPath(raftGroupId());
+
+        // Get the path to that node's RocksDB key-value storage
+        Path dbPath = getListenerPersistencePath(getListener(toStop, 
raftGroupId()));
+
+        int stopIdx = servers.indexOf(toStop);
+
+        // Remove that node from the list of servers
+        servers.remove(stopIdx);
+
+        // Shutdown that node
+        toStop.stop();
+
+        // Create a snapshot of the raft group
+        service.snapshot(service.leader()).get();
+
+        doAfterStop(service);
+
+        // Create another raft snapshot
+        service.snapshot(service.leader()).get();
+
+        if (testData.deleteFolder) {
+            // Delete a stopped node's raft directory and key-value storage 
directory
+            // to check if snapshot could be restored by the restarted node
+            IgniteUtils.deleteIfExists(dbPath);
+            IgniteUtils.deleteIfExists(serverDataPath);
+        }
+
+        if (testData.interactAfterSnapshot) {
+            // Interact with the raft group after the second snapshot to check 
if the restarted node would see these
+            // interactions after restoring a snapshot and raft logs
+            doAfterSnapshot(service);
+        }
+
+        // Restart the node
+        JRaftServerImpl restarted = startServer(stopIdx);
+
+        assertTrue(waitForTopology(cluster.get(0), servers.size(), 3_000));
+
+        BooleanSupplier closure = snapshotCheckClosure(restarted, 
testData.interactAfterSnapshot);
+
+        boolean success = waitForCondition(closure, 10_000);
+
+        assertTrue(success);
+    }
+
+    /**
+     * Interacts with raft group before the follower is stopped.
+     *
+     * @param service Raft group service.
+     * @throws Exception If failed.
+     */
+    public abstract void doBeforeStop(RaftGroupService service) throws 
Exception;
+
+    /**
+     * Interacts with raft group after the follower is stopped.
+     *
+     * @param service Raft group service.
+     * @throws Exception If failed.
+     */
+    public abstract void doAfterStop(RaftGroupService service) throws 
Exception;
+
+    /**
+     * Interacts with raft group after the leader has captured a snapshot.

Review comment:
       what do you mean by "interacts"?

##########
File path: 
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
##########
@@ -309,12 +337,108 @@ public RocksDbStorage(Path dbPath, 
Comparator<ByteBuffer> comparator) throws Sto
         return new ScanCursor(db.newIterator(), filter);
     }
 
+    /** {@inheritDoc} */
+    @Override public @NotNull CompletableFuture<Void> snapshot(Path 
snapshotPath) {
+        Path tempPath = Paths.get(snapshotPath.toString() + TMP_SUFFIX);
+
+        // Create a RocksDB point-in-time snapshot
+        Snapshot snapshot = db.getSnapshot();
+
+        return CompletableFuture.runAsync(() -> {
+            // (Re)create the temporary directory
+            IgniteUtils.deleteIfExists(tempPath);
+
+            try {
+                Files.createDirectories(tempPath);
+            }
+            catch (IOException e) {
+                throw new IgniteInternalException("Failed to create directory: 
" + tempPath, e);
+            }
+        }, snapshotExecutor)
+        .thenCompose(aVoid -> createSstFile(db, snapshot, tempPath))
+        .whenComplete((aVoid, throwable) -> {
+            // Release a snapshot
+            db.releaseSnapshot(snapshot);
+
+            // Snapshot is not actually closed here, because a Snapshot 
instance doesn't own a pointer, the
+            // database does. Calling close to maintain the AutoCloseable 
semantics
+            snapshot.close();
+
+            if (throwable != null)
+                return;
+
+            // Delete snapshot directory if it already exists
+            IgniteUtils.deleteIfExists(snapshotPath);
+
+            try {
+                // Rename the temporary directory
+                Files.move(tempPath, snapshotPath);
+            }
+            catch (IOException e) {
+                throw new IgniteInternalException("Failed to rename: " + 
tempPath + " to " + snapshotPath, e);
+            }
+        });
+    }
+
+    /**
+     * Creates an SST file for the database.
+     *
+     * @param db RocksDB instance.
+     * @param snapshot Point-in-time snapshot.
+     * @param path Directory to put the SST file in.
+     * @return Future representing pending completion of the operation.
+     */
+    private CompletableFuture<Void> createSstFile(RocksDB db, Snapshot 
snapshot, Path path) {

Review comment:
       Should we extract this code into a common place? This looks like a 
copy-paste from `RocksDBKeyValueStorage`

##########
File path: 
modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITTablePersistenceTest.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.distributed;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.function.BooleanSupplier;
+import org.apache.ignite.internal.raft.server.impl.JRaftServerImpl;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.schema.row.RowAssembler;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.basic.SimpleDataRow;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorage;
+import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.raft.client.service.ITAbstractListenerSnapshotTest;
+import org.apache.ignite.raft.client.service.RaftGroupListener;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+
+/**
+ * Persistent (rocksdb-based) partitions raft group snapshots tests.
+ */
+public class ITTablePersistenceTest extends 
ITAbstractListenerSnapshotTest<PartitionListener> {
+    /** */
+    public static SchemaDescriptor SCHEMA = new 
SchemaDescriptor(UUID.randomUUID(),

Review comment:
       ```suggestion
       private static final SchemaDescriptor SCHEMA = new 
SchemaDescriptor(UUID.randomUUID(),
   ```

##########
File path: 
modules/raft/src/integrationTest/java/org/apache/ignite/raft/client/service/ITAbstractListenerSnapshotTest.java
##########
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.client.service;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BooleanSupplier;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.raft.server.impl.JRaftServerImpl;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.network.ClusterLocalConfiguration;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.ClusterServiceFactory;
+import org.apache.ignite.network.MessageSerializationRegistryImpl;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.StaticNodeFinder;
+import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import org.apache.ignite.raft.client.Peer;
+import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
+import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Base class for persistent raft group's snapshots tests.
+ *
+ * @param <T> Type of the raft group listener.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+public abstract class ITAbstractListenerSnapshotTest<T extends 
RaftGroupListener> {
+    /** Starting server port. */
+    private static final int PORT = 5003;
+
+    /** Starting client port. */
+    private static final int CLIENT_PORT = 6003;
+
+    /**
+     * Peers list.
+     */
+    private static final List<Peer> INITIAL_CONF = IntStream.rangeClosed(0, 2)
+        .mapToObj(i -> new NetworkAddress(getLocalAddress(), PORT + i))
+        .map(Peer::new)
+        .collect(Collectors.toUnmodifiableList());
+
+    /** Factory. */
+    private static final RaftClientMessagesFactory FACTORY = new 
RaftClientMessagesFactory();
+
+    /** Network factory. */
+    private static final ClusterServiceFactory NETWORK_FACTORY = new 
TestScaleCubeClusterServiceFactory();
+
+    /** */
+    private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = 
new MessageSerializationRegistryImpl();
+
+    /** */
+    @WorkDirectory
+    private Path workDir;
+
+    /** Cluster. */
+    private final List<ClusterService> cluster = new ArrayList<>();
+
+    /** Servers. */
+    private final List<JRaftServerImpl> servers = new ArrayList<>();
+
+    /** Clients. */
+    private final List<RaftGroupService> clients = new ArrayList<>();
+
+    /**
+     * Shutdown raft server and stop all cluster nodes.
+     *
+     * @throws Exception If failed to shutdown raft server,
+     */
+    @AfterEach
+    public void afterTest() throws Exception {
+        for (RaftGroupService client : clients)
+            client.shutdown();
+
+        for (JRaftServerImpl server : servers)
+            server.stop();
+
+        for (ClusterService service : cluster)
+            service.stop();
+    }
+
+    /**
+     * Test parameters for {@link #testSnapshot}.
+     */
+    private static class TestData {
+        /** Delete raft group folder. */

Review comment:
       please add more verbose description, it is not clear, how do these flags 
influence the tests

##########
File path: 
modules/raft/src/integrationTest/java/org/apache/ignite/raft/client/service/ITAbstractListenerSnapshotTest.java
##########
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.client.service;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BooleanSupplier;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.raft.server.impl.JRaftServerImpl;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.network.ClusterLocalConfiguration;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.ClusterServiceFactory;
+import org.apache.ignite.network.MessageSerializationRegistryImpl;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.StaticNodeFinder;
+import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import org.apache.ignite.raft.client.Peer;
+import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
+import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Base class for persistent raft group's snapshots tests.
+ *
+ * @param <T> Type of the raft group listener.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+public abstract class ITAbstractListenerSnapshotTest<T extends 
RaftGroupListener> {
+    /** Starting server port. */
+    private static final int PORT = 5003;
+
+    /** Starting client port. */
+    private static final int CLIENT_PORT = 6003;
+
+    /**
+     * Peers list.
+     */
+    private static final List<Peer> INITIAL_CONF = IntStream.rangeClosed(0, 2)
+        .mapToObj(i -> new NetworkAddress(getLocalAddress(), PORT + i))
+        .map(Peer::new)
+        .collect(Collectors.toUnmodifiableList());
+
+    /** Factory. */
+    private static final RaftClientMessagesFactory FACTORY = new 
RaftClientMessagesFactory();
+
+    /** Network factory. */
+    private static final ClusterServiceFactory NETWORK_FACTORY = new 
TestScaleCubeClusterServiceFactory();
+
+    /** */
+    private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = 
new MessageSerializationRegistryImpl();
+
+    /** */
+    @WorkDirectory
+    private Path workDir;
+
+    /** Cluster. */
+    private final List<ClusterService> cluster = new ArrayList<>();
+
+    /** Servers. */
+    private final List<JRaftServerImpl> servers = new ArrayList<>();
+
+    /** Clients. */
+    private final List<RaftGroupService> clients = new ArrayList<>();
+
+    /**
+     * Shutdown raft server and stop all cluster nodes.
+     *
+     * @throws Exception If failed to shutdown raft server,
+     */
+    @AfterEach
+    public void afterTest() throws Exception {
+        for (RaftGroupService client : clients)
+            client.shutdown();
+
+        for (JRaftServerImpl server : servers)
+            server.stop();
+
+        for (ClusterService service : cluster)
+            service.stop();
+    }
+
+    /**
+     * Test parameters for {@link #testSnapshot}.
+     */
+    private static class TestData {
+        /** Delete raft group folder. */
+        private final boolean deleteFolder;
+
+        /** Interact with the raft group after a snapshot. */
+        private final boolean interactAfterSnapshot;
+
+        /** */
+        private TestData(boolean deleteFolder, boolean interactAfterSnapshot) {
+            this.deleteFolder = deleteFolder;
+            this.interactAfterSnapshot = interactAfterSnapshot;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return String.format("deleteFolder=%s, interactAfterSnapshot=%s", 
deleteFolder, interactAfterSnapshot);
+        }
+    }
+
+    /**
+     * @return {@link #testSnapshot} parameters.
+     */
+    private static List<TestData> testSnapshotData() {
+        return List.of(
+            new TestData(false, false),
+            new TestData(true, true),
+            new TestData(false, true),
+            new TestData(true, false)
+        );
+    }
+
+    /**
+     * Tests that a joining raft node successfully restores a snapshot.
+     *
+     * @param testData Test parameters.
+     * @throws Exception If failed.
+     */
+    @ParameterizedTest
+    @MethodSource("testSnapshotData")
+    public void testSnapshot(TestData testData) throws Exception {
+        // Set up a raft group service
+        RaftGroupService service = prepareRaftGroup();
+
+        doBeforeStop(service);
+
+        // Select any node that is not the leader of the group
+        JRaftServerImpl toStop = servers.stream()
+            .filter(server -> 
!server.localPeer(raftGroupId()).equals(service.leader()))
+            .findAny()
+            .orElseThrow();
+
+        // Get the path to that node's raft directory
+        Path serverDataPath = toStop.getServerDataPath(raftGroupId());
+
+        // Get the path to that node's RocksDB key-value storage
+        Path dbPath = getListenerPersistencePath(getListener(toStop, 
raftGroupId()));
+
+        int stopIdx = servers.indexOf(toStop);
+
+        // Remove that node from the list of servers
+        servers.remove(stopIdx);
+
+        // Shutdown that node
+        toStop.stop();
+
+        // Create a snapshot of the raft group
+        service.snapshot(service.leader()).get();
+
+        doAfterStop(service);
+
+        // Create another raft snapshot
+        service.snapshot(service.leader()).get();
+
+        if (testData.deleteFolder) {
+            // Delete a stopped node's raft directory and key-value storage 
directory
+            // to check if snapshot could be restored by the restarted node
+            IgniteUtils.deleteIfExists(dbPath);
+            IgniteUtils.deleteIfExists(serverDataPath);
+        }
+
+        if (testData.interactAfterSnapshot) {
+            // Interact with the raft group after the second snapshot to check 
if the restarted node would see these
+            // interactions after restoring a snapshot and raft logs
+            doAfterSnapshot(service);
+        }
+
+        // Restart the node
+        JRaftServerImpl restarted = startServer(stopIdx);
+
+        assertTrue(waitForTopology(cluster.get(0), servers.size(), 3_000));
+
+        BooleanSupplier closure = snapshotCheckClosure(restarted, 
testData.interactAfterSnapshot);
+
+        boolean success = waitForCondition(closure, 10_000);
+
+        assertTrue(success);
+    }
+
+    /**
+     * Interacts with raft group before the follower is stopped.

Review comment:
       ```suggestion
        * Interacts with the raft group before a follower is stopped.
   ```

##########
File path: 
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/Storage.java
##########
@@ -117,5 +120,21 @@
      * @throws StorageException If failed to read data or storage is already 
stopped.
      */
     public Cursor<DataRow> scan(Predicate<SearchRow> filter) throws 
StorageException;
+
+    /**
+     * Creates a snapshot of the storage's current state in the specified 
directory.
+     *
+     * @param snapshotPath Directory to store a snapshot.
+     * @return Future representing pending completion of the operation. Could 
not be {@code null}.

Review comment:
       ```suggestion
        * @return Future representing pending completion of the operation. Can 
not be {@code null}.
   ```

##########
File path: 
modules/raft/src/integrationTest/java/org/apache/ignite/raft/client/service/ITAbstractListenerSnapshotTest.java
##########
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.client.service;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BooleanSupplier;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.raft.server.impl.JRaftServerImpl;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.network.ClusterLocalConfiguration;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.ClusterServiceFactory;
+import org.apache.ignite.network.MessageSerializationRegistryImpl;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.StaticNodeFinder;
+import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import org.apache.ignite.raft.client.Peer;
+import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
+import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Base class for persistent raft group's snapshots tests.
+ *
+ * @param <T> Type of the raft group listener.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+public abstract class ITAbstractListenerSnapshotTest<T extends 
RaftGroupListener> {
+    /** Starting server port. */
+    private static final int PORT = 5003;
+
+    /** Starting client port. */
+    private static final int CLIENT_PORT = 6003;
+
+    /**
+     * Peers list.
+     */
+    private static final List<Peer> INITIAL_CONF = IntStream.rangeClosed(0, 2)
+        .mapToObj(i -> new NetworkAddress(getLocalAddress(), PORT + i))
+        .map(Peer::new)
+        .collect(Collectors.toUnmodifiableList());
+
+    /** Factory. */
+    private static final RaftClientMessagesFactory FACTORY = new 
RaftClientMessagesFactory();
+
+    /** Network factory. */
+    private static final ClusterServiceFactory NETWORK_FACTORY = new 
TestScaleCubeClusterServiceFactory();
+
+    /** */
+    private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = 
new MessageSerializationRegistryImpl();
+
+    /** */
+    @WorkDirectory
+    private Path workDir;
+
+    /** Cluster. */
+    private final List<ClusterService> cluster = new ArrayList<>();
+
+    /** Servers. */
+    private final List<JRaftServerImpl> servers = new ArrayList<>();
+
+    /** Clients. */
+    private final List<RaftGroupService> clients = new ArrayList<>();
+
+    /**
+     * Shutdown raft server and stop all cluster nodes.
+     *
+     * @throws Exception If failed to shutdown raft server,
+     */
+    @AfterEach
+    public void afterTest() throws Exception {
+        for (RaftGroupService client : clients)
+            client.shutdown();
+
+        for (JRaftServerImpl server : servers)
+            server.stop();
+
+        for (ClusterService service : cluster)
+            service.stop();
+    }
+
+    /**
+     * Test parameters for {@link #testSnapshot}.
+     */
+    private static class TestData {
+        /** Delete raft group folder. */
+        private final boolean deleteFolder;
+
+        /** Interact with the raft group after a snapshot. */
+        private final boolean interactAfterSnapshot;
+
+        /** */
+        private TestData(boolean deleteFolder, boolean interactAfterSnapshot) {
+            this.deleteFolder = deleteFolder;
+            this.interactAfterSnapshot = interactAfterSnapshot;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return String.format("deleteFolder=%s, interactAfterSnapshot=%s", 
deleteFolder, interactAfterSnapshot);
+        }
+    }
+
+    /**
+     * @return {@link #testSnapshot} parameters.
+     */
+    private static List<TestData> testSnapshotData() {
+        return List.of(
+            new TestData(false, false),
+            new TestData(true, true),
+            new TestData(false, true),
+            new TestData(true, false)
+        );
+    }
+
+    /**
+     * Tests that a joining raft node successfully restores a snapshot.
+     *
+     * @param testData Test parameters.
+     * @throws Exception If failed.
+     */
+    @ParameterizedTest
+    @MethodSource("testSnapshotData")
+    public void testSnapshot(TestData testData) throws Exception {
+        // Set up a raft group service
+        RaftGroupService service = prepareRaftGroup();
+
+        doBeforeStop(service);
+
+        // Select any node that is not the leader of the group
+        JRaftServerImpl toStop = servers.stream()
+            .filter(server -> 
!server.localPeer(raftGroupId()).equals(service.leader()))
+            .findAny()
+            .orElseThrow();
+
+        // Get the path to that node's raft directory
+        Path serverDataPath = toStop.getServerDataPath(raftGroupId());
+
+        // Get the path to that node's RocksDB key-value storage
+        Path dbPath = getListenerPersistencePath(getListener(toStop, 
raftGroupId()));
+
+        int stopIdx = servers.indexOf(toStop);
+
+        // Remove that node from the list of servers
+        servers.remove(stopIdx);
+
+        // Shutdown that node
+        toStop.stop();
+
+        // Create a snapshot of the raft group
+        service.snapshot(service.leader()).get();
+
+        doAfterStop(service);
+
+        // Create another raft snapshot
+        service.snapshot(service.leader()).get();
+
+        if (testData.deleteFolder) {
+            // Delete a stopped node's raft directory and key-value storage 
directory
+            // to check if snapshot could be restored by the restarted node
+            IgniteUtils.deleteIfExists(dbPath);
+            IgniteUtils.deleteIfExists(serverDataPath);
+        }
+
+        if (testData.interactAfterSnapshot) {
+            // Interact with the raft group after the second snapshot to check 
if the restarted node would see these
+            // interactions after restoring a snapshot and raft logs
+            doAfterSnapshot(service);
+        }
+
+        // Restart the node
+        JRaftServerImpl restarted = startServer(stopIdx);
+
+        assertTrue(waitForTopology(cluster.get(0), servers.size(), 3_000));
+
+        BooleanSupplier closure = snapshotCheckClosure(restarted, 
testData.interactAfterSnapshot);
+
+        boolean success = waitForCondition(closure, 10_000);
+
+        assertTrue(success);
+    }
+
+    /**
+     * Interacts with raft group before the follower is stopped.
+     *
+     * @param service Raft group service.
+     * @throws Exception If failed.
+     */
+    public abstract void doBeforeStop(RaftGroupService service) throws 
Exception;

Review comment:
       I think that `beforeFollowerStop`, `afterFollowerStop`, `afterSnapshot` 
might be better names for these methods

##########
File path: 
modules/raft/src/integrationTest/java/org/apache/ignite/raft/client/service/ITAbstractListenerSnapshotTest.java
##########
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.client.service;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BooleanSupplier;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.raft.server.impl.JRaftServerImpl;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.network.ClusterLocalConfiguration;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.ClusterServiceFactory;
+import org.apache.ignite.network.MessageSerializationRegistryImpl;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.StaticNodeFinder;
+import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import org.apache.ignite.raft.client.Peer;
+import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
+import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Base class for persistent raft group's snapshots tests.
+ *
+ * @param <T> Type of the raft group listener.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+public abstract class ITAbstractListenerSnapshotTest<T extends 
RaftGroupListener> {
+    /** Starting server port. */
+    private static final int PORT = 5003;
+
+    /** Starting client port. */
+    private static final int CLIENT_PORT = 6003;
+
+    /**
+     * Peers list.
+     */
+    private static final List<Peer> INITIAL_CONF = IntStream.rangeClosed(0, 2)
+        .mapToObj(i -> new NetworkAddress(getLocalAddress(), PORT + i))
+        .map(Peer::new)
+        .collect(Collectors.toUnmodifiableList());
+
+    /** Factory. */
+    private static final RaftClientMessagesFactory FACTORY = new 
RaftClientMessagesFactory();
+
+    /** Network factory. */
+    private static final ClusterServiceFactory NETWORK_FACTORY = new 
TestScaleCubeClusterServiceFactory();
+
+    /** */
+    private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = 
new MessageSerializationRegistryImpl();
+
+    /** */
+    @WorkDirectory
+    private Path workDir;
+
+    /** Cluster. */
+    private final List<ClusterService> cluster = new ArrayList<>();
+
+    /** Servers. */
+    private final List<JRaftServerImpl> servers = new ArrayList<>();
+
+    /** Clients. */
+    private final List<RaftGroupService> clients = new ArrayList<>();
+
+    /**
+     * Shutdown raft server and stop all cluster nodes.
+     *
+     * @throws Exception If failed to shutdown raft server,
+     */
+    @AfterEach
+    public void afterTest() throws Exception {
+        for (RaftGroupService client : clients)
+            client.shutdown();
+
+        for (JRaftServerImpl server : servers)
+            server.stop();
+
+        for (ClusterService service : cluster)
+            service.stop();
+    }
+
+    /**
+     * Test parameters for {@link #testSnapshot}.
+     */
+    private static class TestData {
+        /** Delete raft group folder. */
+        private final boolean deleteFolder;
+
+        /** Interact with the raft group after a snapshot. */
+        private final boolean interactAfterSnapshot;
+
+        /** */
+        private TestData(boolean deleteFolder, boolean interactAfterSnapshot) {
+            this.deleteFolder = deleteFolder;
+            this.interactAfterSnapshot = interactAfterSnapshot;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return String.format("deleteFolder=%s, interactAfterSnapshot=%s", 
deleteFolder, interactAfterSnapshot);
+        }
+    }
+
+    /**
+     * @return {@link #testSnapshot} parameters.
+     */
+    private static List<TestData> testSnapshotData() {
+        return List.of(
+            new TestData(false, false),
+            new TestData(true, true),
+            new TestData(false, true),
+            new TestData(true, false)
+        );
+    }
+
+    /**
+     * Tests that a joining raft node successfully restores a snapshot.
+     *
+     * @param testData Test parameters.
+     * @throws Exception If failed.
+     */
+    @ParameterizedTest
+    @MethodSource("testSnapshotData")
+    public void testSnapshot(TestData testData) throws Exception {
+        // Set up a raft group service
+        RaftGroupService service = prepareRaftGroup();
+
+        doBeforeStop(service);
+
+        // Select any node that is not the leader of the group
+        JRaftServerImpl toStop = servers.stream()
+            .filter(server -> 
!server.localPeer(raftGroupId()).equals(service.leader()))
+            .findAny()
+            .orElseThrow();
+
+        // Get the path to that node's raft directory
+        Path serverDataPath = toStop.getServerDataPath(raftGroupId());
+
+        // Get the path to that node's RocksDB key-value storage
+        Path dbPath = getListenerPersistencePath(getListener(toStop, 
raftGroupId()));
+
+        int stopIdx = servers.indexOf(toStop);
+
+        // Remove that node from the list of servers
+        servers.remove(stopIdx);
+
+        // Shutdown that node
+        toStop.stop();
+
+        // Create a snapshot of the raft group
+        service.snapshot(service.leader()).get();
+
+        doAfterStop(service);
+
+        // Create another raft snapshot
+        service.snapshot(service.leader()).get();
+
+        if (testData.deleteFolder) {
+            // Delete a stopped node's raft directory and key-value storage 
directory
+            // to check if snapshot could be restored by the restarted node
+            IgniteUtils.deleteIfExists(dbPath);
+            IgniteUtils.deleteIfExists(serverDataPath);
+        }
+
+        if (testData.interactAfterSnapshot) {
+            // Interact with the raft group after the second snapshot to check 
if the restarted node would see these
+            // interactions after restoring a snapshot and raft logs
+            doAfterSnapshot(service);
+        }
+
+        // Restart the node
+        JRaftServerImpl restarted = startServer(stopIdx);
+
+        assertTrue(waitForTopology(cluster.get(0), servers.size(), 3_000));
+
+        BooleanSupplier closure = snapshotCheckClosure(restarted, 
testData.interactAfterSnapshot);
+
+        boolean success = waitForCondition(closure, 10_000);
+
+        assertTrue(success);
+    }
+
+    /**
+     * Interacts with raft group before the follower is stopped.
+     *
+     * @param service Raft group service.
+     * @throws Exception If failed.
+     */
+    public abstract void doBeforeStop(RaftGroupService service) throws 
Exception;
+
+    /**
+     * Interacts with raft group after the follower is stopped.

Review comment:
       ```suggestion
        * Interacts with the raft group after a follower is stopped.
   ```

##########
File path: 
modules/raft/src/integrationTest/java/org/apache/ignite/raft/client/service/ITAbstractListenerSnapshotTest.java
##########
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.client.service;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BooleanSupplier;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.raft.server.impl.JRaftServerImpl;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.network.ClusterLocalConfiguration;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.ClusterServiceFactory;
+import org.apache.ignite.network.MessageSerializationRegistryImpl;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.StaticNodeFinder;
+import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import org.apache.ignite.raft.client.Peer;
+import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
+import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Base class for persistent raft group's snapshots tests.
+ *
+ * @param <T> Type of the raft group listener.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+public abstract class ITAbstractListenerSnapshotTest<T extends 
RaftGroupListener> {
+    /** Starting server port. */
+    private static final int PORT = 5003;
+
+    /** Starting client port. */
+    private static final int CLIENT_PORT = 6003;
+
+    /**
+     * Peers list.
+     */
+    private static final List<Peer> INITIAL_CONF = IntStream.rangeClosed(0, 2)
+        .mapToObj(i -> new NetworkAddress(getLocalAddress(), PORT + i))
+        .map(Peer::new)
+        .collect(Collectors.toUnmodifiableList());
+
+    /** Factory. */
+    private static final RaftClientMessagesFactory FACTORY = new 
RaftClientMessagesFactory();
+
+    /** Network factory. */
+    private static final ClusterServiceFactory NETWORK_FACTORY = new 
TestScaleCubeClusterServiceFactory();
+
+    /** */
+    private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = 
new MessageSerializationRegistryImpl();
+
+    /** */
+    @WorkDirectory
+    private Path workDir;
+
+    /** Cluster. */
+    private final List<ClusterService> cluster = new ArrayList<>();
+
+    /** Servers. */
+    private final List<JRaftServerImpl> servers = new ArrayList<>();
+
+    /** Clients. */
+    private final List<RaftGroupService> clients = new ArrayList<>();
+
+    /**
+     * Shutdown raft server and stop all cluster nodes.
+     *
+     * @throws Exception If failed to shutdown raft server,
+     */
+    @AfterEach
+    public void afterTest() throws Exception {
+        for (RaftGroupService client : clients)
+            client.shutdown();
+
+        for (JRaftServerImpl server : servers)
+            server.stop();
+
+        for (ClusterService service : cluster)
+            service.stop();
+    }
+
+    /**
+     * Test parameters for {@link #testSnapshot}.
+     */
+    private static class TestData {
+        /** Delete raft group folder. */
+        private final boolean deleteFolder;
+
+        /** Interact with the raft group after a snapshot. */
+        private final boolean interactAfterSnapshot;
+
+        /** */
+        private TestData(boolean deleteFolder, boolean interactAfterSnapshot) {
+            this.deleteFolder = deleteFolder;
+            this.interactAfterSnapshot = interactAfterSnapshot;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return String.format("deleteFolder=%s, interactAfterSnapshot=%s", 
deleteFolder, interactAfterSnapshot);
+        }
+    }
+
+    /**
+     * @return {@link #testSnapshot} parameters.
+     */
+    private static List<TestData> testSnapshotData() {
+        return List.of(
+            new TestData(false, false),
+            new TestData(true, true),
+            new TestData(false, true),
+            new TestData(true, false)
+        );
+    }
+
+    /**
+     * Tests that a joining raft node successfully restores a snapshot.
+     *
+     * @param testData Test parameters.
+     * @throws Exception If failed.
+     */
+    @ParameterizedTest
+    @MethodSource("testSnapshotData")
+    public void testSnapshot(TestData testData) throws Exception {
+        // Set up a raft group service
+        RaftGroupService service = prepareRaftGroup();
+
+        doBeforeStop(service);
+
+        // Select any node that is not the leader of the group
+        JRaftServerImpl toStop = servers.stream()
+            .filter(server -> 
!server.localPeer(raftGroupId()).equals(service.leader()))
+            .findAny()
+            .orElseThrow();
+
+        // Get the path to that node's raft directory
+        Path serverDataPath = toStop.getServerDataPath(raftGroupId());
+
+        // Get the path to that node's RocksDB key-value storage
+        Path dbPath = getListenerPersistencePath(getListener(toStop, 
raftGroupId()));
+
+        int stopIdx = servers.indexOf(toStop);
+
+        // Remove that node from the list of servers
+        servers.remove(stopIdx);
+
+        // Shutdown that node
+        toStop.stop();
+
+        // Create a snapshot of the raft group
+        service.snapshot(service.leader()).get();
+
+        doAfterStop(service);
+
+        // Create another raft snapshot
+        service.snapshot(service.leader()).get();
+
+        if (testData.deleteFolder) {
+            // Delete a stopped node's raft directory and key-value storage 
directory
+            // to check if snapshot could be restored by the restarted node
+            IgniteUtils.deleteIfExists(dbPath);
+            IgniteUtils.deleteIfExists(serverDataPath);
+        }
+
+        if (testData.interactAfterSnapshot) {
+            // Interact with the raft group after the second snapshot to check 
if the restarted node would see these
+            // interactions after restoring a snapshot and raft logs
+            doAfterSnapshot(service);
+        }
+
+        // Restart the node
+        JRaftServerImpl restarted = startServer(stopIdx);
+
+        assertTrue(waitForTopology(cluster.get(0), servers.size(), 3_000));
+
+        BooleanSupplier closure = snapshotCheckClosure(restarted, 
testData.interactAfterSnapshot);
+
+        boolean success = waitForCondition(closure, 10_000);
+
+        assertTrue(success);
+    }
+
+    /**
+     * Interacts with raft group before the follower is stopped.
+     *
+     * @param service Raft group service.
+     * @throws Exception If failed.
+     */
+    public abstract void doBeforeStop(RaftGroupService service) throws 
Exception;
+
+    /**
+     * Interacts with raft group after the follower is stopped.
+     *
+     * @param service Raft group service.
+     * @throws Exception If failed.
+     */
+    public abstract void doAfterStop(RaftGroupService service) throws 
Exception;
+
+    /**
+     * Interacts with raft group after the leader has captured a snapshot.
+     *
+     * @param service Raft group service.
+     * @throws Exception If failed.
+     */
+    public abstract void doAfterSnapshot(RaftGroupService service) throws 
Exception;
+
+    /**
+     * Creates a closure that will be executed periodically to check if the 
snapshot and (conditionally on the
+     * {@link TestData#interactAfterSnapshot}) the raft log was successfully 
restored by the follower node.
+     *
+     * @param restarted Restarted follower node.
+     * @param interactedAfterSnapshot {@code true} whether raft group was 
interacted with after the snapshot operation.
+     * @return Closure.
+     */
+    public abstract BooleanSupplier snapshotCheckClosure(JRaftServerImpl 
restarted, boolean interactedAfterSnapshot);

Review comment:
       Actually I think that all these abstract methods are a sign of 
over-engineering. Do we really need an abstract class for these two tests? I 
think it is easier to copy-paste the test itself....




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to