This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 4406ebf825 IGNITE-20256 Refuse to install Raft snapshots on partitions 
when not enough schemas are available (#2473)
4406ebf825 is described below

commit 4406ebf825083e21ebd0a20d2fafed738acdff55
Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com>
AuthorDate: Mon Sep 4 18:57:10 2023 +0400

    IGNITE-20256 Refuse to install Raft snapshots on partitions when not enough 
schemas are available (#2473)
---
 .../internal/catalog/CatalogManagerSelfTest.java   |   2 +
 .../internal/testframework/IgniteTestUtils.java    |   4 +-
 .../ignite/raft/jraft/entity/RaftOutter.java       |   3 +
 .../raftsnapshot/ItTableRaftSnapshotsTest.java     | 202 ++++++++++++++--
 .../storage/rocksdb/RocksDbMvPartitionStorage.java |   2 +-
 .../internal/table/distributed/TableManager.java   |   1 +
 .../raft/snapshot/PartitionSnapshotStorage.java    |  13 ++
 .../snapshot/PartitionSnapshotStorageFactory.java  |  13 +-
 .../snapshot/incoming/IncomingSnapshotCopier.java  | 126 ++++++++--
 .../raft/snapshot/outgoing/OutgoingSnapshot.java   |   8 +-
 .../snapshot/outgoing/OutgoingSnapshotReader.java  |   2 +-
 .../raft/snapshot/outgoing/SnapshotMetaUtils.java  |   6 +-
 .../schema/CatalogVersionSufficiency.java          |   2 +-
 .../PartitionSnapshotStorageFactoryTest.java       |   3 +
 .../snapshot/PartitionSnapshotStorageTest.java     |   2 +
 .../incoming/IncomingSnapshotCopierTest.java       | 254 ++++++++++++++-------
 .../outgoing/OutgoingSnapshotCommonTest.java       |  11 +-
 .../OutgoingSnapshotMvDataStreamingTest.java       |   6 +-
 .../outgoing/OutgoingSnapshotReaderTest.java       |   2 +
 .../OutgoingSnapshotTxDataStreamingTest.java       |   6 +-
 .../outgoing/OutgoingSnapshotsManagerTest.java     |   6 +-
 .../snapshot/outgoing/SnapshotMetaUtilsTest.java   |   5 +-
 22 files changed, 536 insertions(+), 143 deletions(-)

diff --git 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
index 21f92469f9..23239c547f 100644
--- 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
+++ 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
@@ -173,6 +173,8 @@ public class CatalogManagerSelfTest extends 
BaseCatalogManagerTest {
         assertEquals(INFINITE_TIMER_VALUE, 
zone.dataNodesAutoAdjustScaleDown());
         assertEquals(DEFAULT_STORAGE_ENGINE, zone.dataStorage().engine());
         assertEquals(DEFAULT_DATA_REGION, zone.dataStorage().dataRegion());
+
+        assertThat(manager.latestCatalogVersion(), is(0));
     }
 
     @Test
diff --git 
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
 
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
index a8c22d1eeb..c21b6e70e9 100644
--- 
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
+++ 
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
@@ -190,9 +190,9 @@ public final class IgniteTestUtils {
      * @param fieldName     name of the field
      * @return field value
      */
-    public static Object getFieldValue(@Nullable Object target, Class<?> 
declaredClass, String fieldName) {
+    public static <T> T getFieldValue(@Nullable Object target, Class<?> 
declaredClass, String fieldName) {
         try {
-            return getField(target, declaredClass, fieldName).get(target);
+            return (T) getField(target, declaredClass, fieldName).get(target);
         } catch (IllegalAccessException e) {
             throw new IgniteInternalException("Cannot get field value", e);
         }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/RaftOutter.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/RaftOutter.java
index 8e36f3ec75..935b4fbda1 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/RaftOutter.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/RaftOutter.java
@@ -73,5 +73,8 @@ public final class RaftOutter {
 
         @Nullable
         Collection<String> oldLearnersList();
+
+        /** Minimum catalog version that is required for the snapshot to be 
accepted by a follower. */
+        int requiredCatalogVersion();
     }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
index 88a2ec05e5..37299f5675 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
@@ -19,12 +19,14 @@ package org.apache.ignite.internal.raftsnapshot;
 
 import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldValue;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.hasCause;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -33,9 +35,11 @@ import java.net.ConnectException;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -51,15 +55,21 @@ import 
org.apache.calcite.sql.validate.SqlValidatorException;
 import org.apache.ignite.internal.Cluster;
 import org.apache.ignite.internal.IgniteIntegrationTest;
 import org.apache.ignite.internal.ReplicationGroupsUtils;
+import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
+import org.apache.ignite.internal.raft.server.RaftServer;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
 import 
org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
 import org.apache.ignite.internal.storage.StorageRebalanceException;
 import 
org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
 import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import 
org.apache.ignite.internal.table.distributed.raft.snapshot.incoming.IncomingSnapshotCopier;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaResponse;
+import org.apache.ignite.internal.test.WatchListenerInhibitor;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.log4j2.LogInspector;
@@ -71,14 +81,27 @@ import 
org.apache.ignite.lang.IgniteInternalCheckedException;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
 import org.apache.ignite.raft.jraft.Status;
 import org.apache.ignite.raft.jraft.core.Replicator;
+import org.apache.ignite.raft.jraft.error.RaftError;
 import org.apache.ignite.raft.jraft.rpc.ActionRequest;
+import org.apache.ignite.raft.jraft.rpc.Message;
+import org.apache.ignite.raft.jraft.rpc.RaftRpcFactory;
+import org.apache.ignite.raft.jraft.rpc.RaftServerService;
+import org.apache.ignite.raft.jraft.rpc.RpcProcessor;
+import org.apache.ignite.raft.jraft.rpc.RpcRequestClosure;
+import org.apache.ignite.raft.jraft.rpc.RpcRequestProcessor;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesRequest;
+import org.apache.ignite.raft.jraft.rpc.RpcServer;
+import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcServer;
+import 
org.apache.ignite.raft.jraft.rpc.impl.core.AppendEntriesRequestProcessor;
 import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotExecutorImpl;
 import org.apache.ignite.sql.ResultSet;
 import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.Tuple;
 import org.apache.ignite.tx.Transaction;
-import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
@@ -126,23 +149,21 @@ class ItTableRaftSnapshotsTest extends 
IgniteIntegrationTest {
 
     private LogInspector replicatorLogInspector;
 
-    private @Nullable Handler replicaLoggerHandler;
+    private LogInspector copierLogInspector;
 
     @BeforeEach
     void createCluster(TestInfo testInfo) {
         cluster = new Cluster(testInfo, workDir, NODE_BOOTSTRAP_CFG);
 
         replicatorLogInspector = LogInspector.create(Replicator.class, true);
+        copierLogInspector = LogInspector.create(IncomingSnapshotCopier.class, 
true);
     }
 
     @AfterEach
     @Timeout(60)
     void shutdownCluster() {
-        if (replicaLoggerHandler != null) {
-            replicatorLogInspector.removeHandler(replicaLoggerHandler);
-        }
-
         replicatorLogInspector.stop();
+        copierLogInspector.stop();
 
         cluster.shutdown();
     }
@@ -426,21 +447,23 @@ class ItTableRaftSnapshotsTest extends 
IgniteIntegrationTest {
      * on it for the sole table partition in the cluster.
      */
     private void reanimateNodeAndWaitForSnapshotInstalled(int nodeIndex) 
throws InterruptedException {
+        CountDownLatch snapshotInstalledLatch = 
snapshotInstalledLatch(nodeIndex);
+
+        reanimateNode(nodeIndex);
+
+        assertTrue(snapshotInstalledLatch.await(60, TimeUnit.SECONDS), "Did 
not install a snapshot in time");
+    }
+
+    private CountDownLatch snapshotInstalledLatch(int nodeIndex) {
         CountDownLatch snapshotInstalledLatch = new CountDownLatch(1);
 
-        Handler handler = replicatorLogInspector.addHandler(
+        replicatorLogInspector.addHandler(
                 evt -> evt.getMessage().getFormattedMessage().matches(
-                        "Node .+ received InstallSnapshotResponse from .+_" + 
nodeIndex + " .+ success=true"),
-                () -> snapshotInstalledLatch.countDown()
+                        "Node \\S+ received InstallSnapshotResponse from 
\\S+_" + nodeIndex + " .+ success=true"),
+                snapshotInstalledLatch::countDown
         );
 
-        try {
-            reanimateNode(nodeIndex);
-
-            assertTrue(snapshotInstalledLatch.await(60, TimeUnit.SECONDS), 
"Did not install a snapshot in time");
-        } finally {
-            replicatorLogInspector.removeHandler(handler);
-        }
+        return snapshotInstalledLatch;
     }
 
     private void reanimateNode(int nodeIndex) {
@@ -724,11 +747,12 @@ class ItTableRaftSnapshotsTest extends 
IgniteIntegrationTest {
             int nodeIndexTo,
             CompletableFuture<Void> snapshotInstallSuccessfullyFuture
     ) {
-        String regexp = "Node .+" + nodeIndexFrom + " received 
InstallSnapshotResponse from .+_" + nodeIndexTo + " .+ success=true";
+        String regexp = "Node \\S+" + nodeIndexFrom + " received 
InstallSnapshotResponse from \\S+_" + nodeIndexTo + " .+ success=true";
 
-        replicaLoggerHandler = replicatorLogInspector.addHandler(
+        replicatorLogInspector.addHandler(
                 evt -> evt.getMessage().getFormattedMessage().matches(regexp),
-                () -> snapshotInstallSuccessfullyFuture.complete(null));
+                () -> snapshotInstallSuccessfullyFuture.complete(null)
+        );
     }
 
     /**
@@ -782,6 +806,115 @@ class ItTableRaftSnapshotsTest extends 
IgniteIntegrationTest {
         assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one"))));
     }
 
+    /**
+     * The replication mechanism must not replicate commands for which schemas 
are not yet available on the node
+     * to which replication happens (in Raft, it means that followers/learners 
cannot receive commands that they
+     * cannot execute without waiting for schemas). This method tests that 
snapshots bringing such commands are
+     * rejected, and that, when metadata catches up, the snapshot gets 
successfully installed.
+     */
+    @Test
+    void laggingSchemasOnFollowerPreventSnapshotInstallation() throws 
Exception {
+        cluster.startAndInit(3);
+
+        createTestTableWith3Replicas(DEFAULT_STORAGE_ENGINE);
+
+        // Prepare the scene: force node 0 to be a leader, and node 2 to be a 
follower.
+        final int leaderIndex = 0;
+        final int followerIndex = 2;
+
+        transferLeadershipOnSolePartitionTo(leaderIndex);
+        cluster.transferLeadershipTo(leaderIndex, MetastorageGroupId.INSTANCE);
+
+        // Block AppendEntries from being accepted on the follower so that the 
leader will have to use a snapshot.
+        blockIncomingAppendEntriesAt(followerIndex);
+
+        // Inhibit the MetaStorage on the follower to make snapshots not 
eligible for installation.
+        WatchListenerInhibitor listenerInhibitor = 
inhibitMetastorageListenersAt(followerIndex);
+
+        try {
+            // Add some data in a schema that is not yet available on the 
follower
+            updateTableSchemaAt(leaderIndex);
+            putToTableAt(leaderIndex);
+
+            CountDownLatch installationRejected = installationRejectedLatch();
+            CountDownLatch snapshotInstalled = 
snapshotInstalledLatch(followerIndex);
+
+            // Force InstallSnapshot to be used.
+            causeLogTruncationOnSolePartitionLeader(leaderIndex);
+
+            assertTrue(installationRejected.await(20, TimeUnit.SECONDS), "Did 
not see snapshot installation rejection");
+
+            assertThat("Snapshot was installed before unblocking", 
snapshotInstalled.getCount(), is(not(0L)));
+
+            listenerInhibitor.stopInhibit();
+
+            assertTrue(snapshotInstalled.await(20, TimeUnit.SECONDS), "Did not 
see a snapshot installed");
+        } finally {
+            listenerInhibitor.stopInhibit();
+        }
+    }
+
+    private void updateTableSchemaAt(int nodeIndex) {
+        cluster.doInSession(nodeIndex, session -> {
+            session.execute(null, "alter table test add column added int");
+        });
+    }
+
+    private void putToTableAt(int nodeIndex) {
+        KeyValueView<Tuple, Tuple> kvView = cluster.node(nodeIndex)
+                .tables()
+                .table("test")
+                .keyValueView();
+        kvView.put(null, Tuple.create().set("key", 1), 
Tuple.create().set("val", "one"));
+    }
+
+    private void blockIncomingAppendEntriesAt(int nodeIndex) {
+        BlockingAppendEntriesRequestProcessor blockingProcessorOnFollower = 
installBlockingAppendEntriesProcessor(nodeIndex);
+
+        blockingProcessorOnFollower.startBlocking();
+    }
+
+    private WatchListenerInhibitor inhibitMetastorageListenersAt(int 
nodeIndex) {
+        IgniteImpl nodeToInhibitMetaStorage = cluster.node(nodeIndex);
+
+        WatchListenerInhibitor listenerInhibitor = 
WatchListenerInhibitor.metastorageEventsInhibitor(nodeToInhibitMetaStorage);
+        listenerInhibitor.startInhibit();
+
+        return listenerInhibitor;
+    }
+
+    private CountDownLatch installationRejectedLatch() {
+        CountDownLatch installationRejected = new CountDownLatch(1);
+
+        copierLogInspector.addHandler(
+                event -> 
event.getMessage().getFormattedMessage().startsWith("Metadata not yet 
available, rejecting snapshot installation"),
+                installationRejected::countDown
+        );
+
+        return installationRejected;
+    }
+
+    private BlockingAppendEntriesRequestProcessor 
installBlockingAppendEntriesProcessor(int nodeIndex) {
+        RaftServer raftServer = cluster.node(nodeIndex).raftManager().server();
+        RpcServer<?> rpcServer = getFieldValue(raftServer, 
JraftServerImpl.class, "rpcServer");
+        Map<String, RpcProcessor<?>> processors = getFieldValue(rpcServer, 
IgniteRpcServer.class, "processors");
+
+        AppendEntriesRequestProcessor originalProcessor =
+                (AppendEntriesRequestProcessor) 
processors.get(AppendEntriesRequest.class.getName());
+        Executor appenderExecutor = getFieldValue(originalProcessor, 
RpcRequestProcessor.class, "executor");
+        RaftMessagesFactory raftMessagesFactory = 
getFieldValue(originalProcessor, RpcRequestProcessor.class, "msgFactory");
+
+        BlockingAppendEntriesRequestProcessor blockingProcessor = new 
BlockingAppendEntriesRequestProcessor(
+                appenderExecutor,
+                raftMessagesFactory,
+                cluster.solePartitionId().toString()
+        );
+
+        rpcServer.registerProcessor(blockingProcessor);
+
+        return blockingProcessor;
+    }
+
     /**
      * This exception is thrown to indicate that an operation can not possibly 
succeed after some error condition.
      * For example there is no reason to retry an operation that inserts a 
certain key after receiving a duplicate key error.
@@ -794,4 +927,35 @@ class ItTableRaftSnapshotsTest extends 
IgniteIntegrationTest {
             super(cause);
         }
     }
+
+    /**
+     * {@link AppendEntriesRequestProcessor} that, when blocking is enabled, 
blocks all AppendEntriesRequests of
+     * the given group (that is, returns EBUSY error code, which makes JRaft 
repeat them).
+     */
+    private static class BlockingAppendEntriesRequestProcessor extends 
AppendEntriesRequestProcessor {
+        private final String idOfGroupToBlock;
+        private volatile boolean block;
+
+        public BlockingAppendEntriesRequestProcessor(Executor executor,
+                RaftMessagesFactory msgFactory, String idOfGroupToBlock) {
+            super(executor, msgFactory);
+
+            this.idOfGroupToBlock = idOfGroupToBlock;
+        }
+
+        @Override
+        public Message processRequest0(RaftServerService service, 
AppendEntriesRequest request, RpcRequestClosure done) {
+            if (block && idOfGroupToBlock.equals(request.groupId())) {
+                return RaftRpcFactory.DEFAULT //
+                    .newResponse(done.getMsgFactory(), RaftError.EBUSY,
+                            "Blocking AppendEntries on '%s'.", 
request.groupId());
+            }
+
+            return super.processRequest0(service, request, done);
+        }
+
+        public void startBlocking() {
+            block = true;
+        }
+    }
 }
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index cb501fbd8a..c801e1b341 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -1431,7 +1431,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
             throwExceptionDependingOnStorageStateOnRebalance(state.get(), 
createStorageInfo());
         }
 
-        // Changed storage states and expect all storage operations to stop 
soon.
+        // Change storage states and expect all storage operations to stop 
soon.
         busyLock.block();
 
         try {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index aa70f553f6..ce64e38fb6 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -1098,6 +1098,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                         partitionUpdateHandlers.indexUpdateHandler,
                         partitionUpdateHandlers.gcUpdateHandler
                 ),
+                catalogService,
                 incomingSnapshotsExecutor
         ));
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorage.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorage.java
index 16d58e23cb..1d803c97cd 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorage.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorage.java
@@ -19,6 +19,7 @@ package 
org.apache.ignite.internal.table.distributed.raft.snapshot;
 
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.incoming.IncomingSnapshotCopier;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotReader;
@@ -56,6 +57,8 @@ public class PartitionSnapshotStorage implements 
SnapshotStorage {
     /** Instance of partition. */
     private final PartitionAccess partition;
 
+    private final CatalogService catalogService;
+
     /**
      *  Snapshot meta, constructed from the storage data and raft group 
configuration at startup.
      *  {@code null} if the storage is empty.
@@ -81,6 +84,7 @@ public class PartitionSnapshotStorage implements 
SnapshotStorage {
      * @param snapshotUri Snapshot URI.
      * @param raftOptions RAFT options.
      * @param partition Partition.
+     * @param catalogService Catalog service.
      * @param startupSnapshotMeta Snapshot meta at startup. {@code null} if 
the storage is empty.
      * @param incomingSnapshotsExecutor Incoming snapshots executor.
      */
@@ -90,6 +94,7 @@ public class PartitionSnapshotStorage implements 
SnapshotStorage {
             String snapshotUri,
             RaftOptions raftOptions,
             PartitionAccess partition,
+            CatalogService catalogService,
             @Nullable SnapshotMeta startupSnapshotMeta,
             Executor incomingSnapshotsExecutor
     ) {
@@ -98,6 +103,7 @@ public class PartitionSnapshotStorage implements 
SnapshotStorage {
         this.snapshotUri = snapshotUri;
         this.raftOptions = raftOptions;
         this.partition = partition;
+        this.catalogService = catalogService;
         this.startupSnapshotMeta = startupSnapshotMeta;
         this.incomingSnapshotsExecutor = incomingSnapshotsExecutor;
     }
@@ -137,6 +143,13 @@ public class PartitionSnapshotStorage implements 
SnapshotStorage {
         return partition;
     }
 
+    /**
+     * Returns catalog service.
+     */
+    public CatalogService catalogService() {
+        return catalogService;
+    }
+
     /**
      * Returns a snapshot meta, constructed from the storage data and raft 
group configuration at startup.
      */
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java
index 56641e34a2..99588bb5af 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.table.distributed.raft.snapshot;
 
 import java.util.concurrent.Executor;
+import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.raft.storage.SnapshotStorageFactory;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import 
org.apache.ignite.internal.table.distributed.raft.RaftGroupConfiguration;
@@ -50,6 +51,8 @@ public class PartitionSnapshotStorageFactory implements 
SnapshotStorageFactory {
     /** Partition storage. */
     private final PartitionAccess partition;
 
+    private final CatalogService catalogService;
+
     /**
      * RAFT log index, min of {@link MvPartitionStorage#lastAppliedIndex()} 
and {@link TxStateStorage#lastAppliedIndex()}
      * at the moment of this factory instantiation.
@@ -62,6 +65,8 @@ public class PartitionSnapshotStorageFactory implements 
SnapshotStorageFactory {
     /** RAFT configuration corresponding to {@link #lastIncludedRaftIndex}. */
     private final RaftGroupConfiguration lastIncludedConfiguration;
 
+    private final int lastCatalogVersionAtStart;
+
     /** Incoming snapshots executor. */
     private final Executor incomingSnapshotsExecutor;
 
@@ -71,6 +76,7 @@ public class PartitionSnapshotStorageFactory implements 
SnapshotStorageFactory {
      * @param topologyService Topology service.
      * @param outgoingSnapshotsManager Snapshot manager.
      * @param partition MV partition storage.
+     * @param catalogService Access to the Catalog.
      * @param incomingSnapshotsExecutor Incoming snapshots executor.
      * @see SnapshotMeta
      */
@@ -79,11 +85,13 @@ public class PartitionSnapshotStorageFactory implements 
SnapshotStorageFactory {
             TopologyService topologyService,
             OutgoingSnapshotsManager outgoingSnapshotsManager,
             PartitionAccess partition,
+            CatalogService catalogService,
             Executor incomingSnapshotsExecutor
     ) {
         this.topologyService = topologyService;
         this.outgoingSnapshotsManager = outgoingSnapshotsManager;
         this.partition = partition;
+        this.catalogService = catalogService;
         this.incomingSnapshotsExecutor = incomingSnapshotsExecutor;
 
         // We must choose the minimum applied index for local recovery so that 
we don't skip the raft commands for the storage with the
@@ -92,12 +100,14 @@ public class PartitionSnapshotStorageFactory implements 
SnapshotStorageFactory {
         lastIncludedRaftTerm = partition.minLastAppliedTerm();
 
         lastIncludedConfiguration = partition.committedGroupConfiguration();
+
+        lastCatalogVersionAtStart = catalogService.latestCatalogVersion();
     }
 
     @Override
     public PartitionSnapshotStorage createSnapshotStorage(String uri, 
RaftOptions raftOptions) {
         SnapshotMeta startupSnapshotMeta = lastIncludedRaftIndex == 0 ? null : 
SnapshotMetaUtils.snapshotMetaAt(
-                lastIncludedRaftIndex, lastIncludedRaftTerm, 
lastIncludedConfiguration
+                lastIncludedRaftIndex, lastIncludedRaftTerm, 
lastIncludedConfiguration, lastCatalogVersionAtStart
         );
 
         return new PartitionSnapshotStorage(
@@ -106,6 +116,7 @@ public class PartitionSnapshotStorageFactory implements 
SnapshotStorageFactory {
                 uri,
                 raftOptions,
                 partition,
+                catalogService,
                 startupSnapshotMeta,
                 incomingSnapshotsExecutor
         );
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
index fd5a9a3c37..be713c7918 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
@@ -17,16 +17,24 @@
 
 package org.apache.ignite.internal.table.distributed.raft.snapshot.incoming;
 
+import static java.util.concurrent.CompletableFuture.anyOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
+import static 
org.apache.ignite.internal.table.distributed.schema.CatalogVersionSufficiency.isMetadataAvailableFor;
 
+import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
+import java.util.stream.Stream;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.schema.BinaryRow;
@@ -68,6 +76,9 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
 
     private static final int MAX_TX_DATA_BATCH_SIZE = 1000;
 
+    /** Number of milliseconds that the follower is allowed to try to catch up 
the required catalog version. */
+    private static final int WAIT_FOR_METADATA_CATCHUP_MS = 3000;
+
     private final PartitionSnapshotStorage partitionSnapshotStorage;
 
     private final SnapshotUri snapshotUri;
@@ -87,7 +98,10 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
     private volatile SnapshotMeta snapshotMeta;
 
     @Nullable
-    private volatile CompletableFuture<?> rebalanceFuture;
+    private volatile CompletableFuture<Boolean> metadataSufficiencyFuture;
+
+    @Nullable
+    private volatile CompletableFuture<Void> rebalanceFuture;
 
     /**
      * Future is to wait in {@link #join()} because it is important for us to 
wait for the rebalance to finish or abort.
@@ -112,20 +126,57 @@ public class IncomingSnapshotCopier extends 
SnapshotCopier {
 
         LOG.info("Copier is started for the partition [{}]", 
createPartitionInfo());
 
-        rebalanceFuture = partitionSnapshotStorage.partition().startRebalance()
-                .thenCompose(unused -> {
-                    ClusterNode snapshotSender = 
getSnapshotSender(snapshotUri.nodeName);
+        ClusterNode snapshotSender = getSnapshotSender(snapshotUri.nodeName);
+
+        metadataSufficiencyFuture = snapshotSender == null
+                ? failedFuture(new StorageRebalanceException("Snapshot sender 
not found: " + snapshotUri.nodeName))
+                : loadSnapshotMeta(snapshotSender)
+                        // Give metadata some time to catch up as it's very 
probable that the leader is ahead metadata-wise.
+                        .thenCompose(unused -> waitForMetadataWithTimeout())
+                        .thenApply(unused -> metadataIsSufficientlyComplete())
+                ;
+
+        rebalanceFuture = 
metadataSufficiencyFuture.thenCompose(metadataSufficient -> {
+            if (metadataSufficient) {
+                return partitionSnapshotStorage.partition().startRebalance()
+                        .thenCompose(unused -> {
+                            assert snapshotSender != null;
+
+                            return loadSnapshotMvData(snapshotSender, executor)
+                                    .thenCompose(unused1 -> 
loadSnapshotTxData(snapshotSender, executor));
+                        });
+            } else {
+                logMetadataInsufficiencyAndSetError();
+
+                return completedFuture(null);
+            }
+        });
 
-                    if (snapshotSender == null) {
-                        throw new StorageRebalanceException("Snapshot sender 
not found: " + snapshotUri.nodeName);
-                    }
+        joinFuture = metadataSufficiencyFuture.thenCompose(metadataSufficient 
-> {
+            if (metadataSufficient) {
+                return rebalanceFuture.handle((unused, throwable) -> 
completeRebalance(throwable)).thenCompose(Function.identity());
+            } else {
+                return completedFuture(null);
+            }
+        });
+    }
 
-                    return loadSnapshotMeta(snapshotSender)
-                            .thenCompose(unused1 -> 
loadSnapshotMvData(snapshotSender, executor))
-                            .thenCompose(unused1 -> 
loadSnapshotTxData(snapshotSender, executor));
-                });
+    private CompletableFuture<?> waitForMetadataWithTimeout() {
+        CompletableFuture<?> metadataReadyFuture = 
partitionSnapshotStorage.catalogService()
+                .catalogReadyFuture(snapshotMeta.requiredCatalogVersion());
+        CompletableFuture<?> readinessTimeoutFuture = 
completeOnMetadataReadinessTimeout();
 
-        joinFuture = rebalanceFuture.handle((unused, throwable) -> 
completeRebalance(throwable)).thenCompose(Function.identity());
+        return anyOf(metadataReadyFuture, readinessTimeoutFuture);
+    }
+
+    private static CompletableFuture<?> completeOnMetadataReadinessTimeout() {
+        return new CompletableFuture<>()
+                .orTimeout(WAIT_FOR_METADATA_CATCHUP_MS, TimeUnit.MILLISECONDS)
+                .exceptionally(ex -> {
+                    assert (ex instanceof TimeoutException);
+
+                    return null;
+                });
     }
 
     @Override
@@ -140,14 +191,16 @@ public class IncomingSnapshotCopier extends 
SnapshotCopier {
             } catch (ExecutionException e) {
                 Throwable cause = e.getCause();
 
-                LOG.error("Error when completing the copier", cause);
+                if (!(cause instanceof CancellationException)) {
+                    LOG.error("Error when completing the copier", cause);
 
-                if (!isOk()) {
-                    setError(RaftError.UNKNOWN, "Unknown error on completion 
the copier");
-                }
+                    if (isOk()) {
+                        setError(RaftError.UNKNOWN, "Unknown error on 
completion the copier");
+                    }
 
-                // By analogy with LocalSnapshotCopier#join.
-                throw new IllegalStateException(cause);
+                    // By analogy with LocalSnapshotCopier#join.
+                    throw new IllegalStateException(cause);
+                }
             }
         }
     }
@@ -163,11 +216,14 @@ public class IncomingSnapshotCopier extends 
SnapshotCopier {
 
         LOG.info("Copier is canceled for partition [{}]", 
createPartitionInfo());
 
-        CompletableFuture<?> fut = rebalanceFuture;
+        // Cancel all futures that might be upstream wrt joinFuture.
+        List<CompletableFuture<?>> futuresToCancel = 
Stream.of(metadataSufficiencyFuture, rebalanceFuture)
+                .filter(Objects::nonNull)
+                .collect(toList());
 
-        if (fut != null) {
-            fut.cancel(false);
+        futuresToCancel.forEach(future -> future.cancel(false));
 
+        if (!futuresToCancel.isEmpty()) {
             try {
                 // Because after the cancellation, no one waits for #join.
                 join();
@@ -215,6 +271,28 @@ public class IncomingSnapshotCopier extends SnapshotCopier 
{
         }
     }
 
+    private boolean metadataIsSufficientlyComplete() {
+        return isMetadataAvailableFor(snapshotMeta.requiredCatalogVersion(), 
partitionSnapshotStorage.catalogService());
+    }
+
+    private void logMetadataInsufficiencyAndSetError() {
+        LOG.warn(
+                "Metadata not yet available, rejecting snapshot installation 
[uri={}, requiredVersion={}].",
+                this.snapshotUri,
+                snapshotMeta.requiredCatalogVersion()
+        );
+
+        String errorMessage = String.format(
+                "Metadata not yet available, URI '%s', required level %s; 
rejecting snapshot installation.",
+                this.snapshotUri,
+                snapshotMeta.requiredCatalogVersion()
+        );
+
+        if (isOk()) {
+            setError(RaftError.EBUSY, errorMessage);
+        }
+    }
+
     /**
      * Requests and stores data into {@link MvPartitionStorage}.
      */
@@ -276,7 +354,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
     /**
      * Requests and stores data into {@link TxStateStorage}.
      */
-    private CompletableFuture<?> loadSnapshotTxData(ClusterNode 
snapshotSender, Executor executor) {
+    private CompletableFuture<Void> loadSnapshotTxData(ClusterNode 
snapshotSender, Executor executor) {
         if (!busyLock.enterBusy()) {
             return completedFuture(null);
         }
@@ -340,7 +418,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
      */
     private CompletableFuture<Void> completeRebalance(@Nullable Throwable 
throwable) {
         if (!busyLock.enterBusy()) {
-            if (!isOk()) {
+            if (isOk()) {
                 setError(RaftError.ECANCELED, "Copier is cancelled");
             }
 
@@ -351,7 +429,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
             if (throwable != null) {
                 LOG.error("Partition rebalancing error [{}]", throwable, 
createPartitionInfo());
 
-                if (!isOk()) {
+                if (isOk()) {
                     setError(RaftError.UNKNOWN, throwable.getMessage());
                 }
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java
index 82544f9f06..6cec945695 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java
@@ -25,6 +25,7 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.locks.ReentrantLock;
+import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.schema.BinaryRow;
@@ -63,6 +64,8 @@ public class OutgoingSnapshot {
 
     private final PartitionAccess partition;
 
+    private final CatalogService catalogService;
+
     /**
      * Lock that is used for mutual exclusion of MV snapshot reading (by this 
class) and threads that write MV data to the same
      * partition (currently, via {@link SnapshotAwarePartitionDataStorage}).
@@ -113,9 +116,10 @@ public class OutgoingSnapshot {
     /**
      * Creates a new instance.
      */
-    public OutgoingSnapshot(UUID id, PartitionAccess partition) {
+    public OutgoingSnapshot(UUID id, PartitionAccess partition, CatalogService 
catalogService) {
         this.id = id;
         this.partition = partition;
+        this.catalogService = catalogService;
 
         lastRowId = RowId.lowestRowId(partition.partitionKey().partitionId());
     }
@@ -159,7 +163,7 @@ public class OutgoingSnapshot {
 
         assert config != null : "Configuration should never be null when 
installing a snapshot";
 
-        return SnapshotMetaUtils.snapshotMetaAt(lastAppliedIndex, 
lastAppliedTerm, config);
+        return SnapshotMetaUtils.snapshotMetaAt(lastAppliedIndex, 
lastAppliedTerm, config, catalogService.latestCatalogVersion());
     }
 
     /**
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReader.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReader.java
index 24224f92c5..0c7ad9a585 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReader.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReader.java
@@ -50,7 +50,7 @@ public class OutgoingSnapshotReader extends SnapshotReader {
     public OutgoingSnapshotReader(PartitionSnapshotStorage snapshotStorage) {
         this.snapshotStorage = snapshotStorage;
 
-        snapshot = new OutgoingSnapshot(id, snapshotStorage.partition());
+        snapshot = new OutgoingSnapshot(id, snapshotStorage.partition(), 
snapshotStorage.catalogService());
 
         LOG.info("Starting snapshot reader for snapshot {}", id);
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtils.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtils.java
index 0ed0b403a1..cff8692500 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtils.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtils.java
@@ -32,14 +32,16 @@ public class SnapshotMetaUtils {
      * @param logIndex RAFT log index.
      * @param term Term corresponding to the index.
      * @param config RAFT group configuration.
+     * @param requiredCatalogVersion Catalog version that a follower/learner 
must have to have ability to accept this snapshot.
      * @return SnapshotMeta corresponding to the given log index.
      */
-    public static SnapshotMeta snapshotMetaAt(long logIndex, long term, 
RaftGroupConfiguration config) {
+    public static SnapshotMeta snapshotMetaAt(long logIndex, long term, 
RaftGroupConfiguration config, int requiredCatalogVersion) {
         SnapshotMetaBuilder metaBuilder = new 
RaftMessagesFactory().snapshotMeta()
                 .lastIncludedIndex(logIndex)
                 .lastIncludedTerm(term)
                 .peersList(config.peers())
-                .learnersList(config.learners());
+                .learnersList(config.learners())
+                .requiredCatalogVersion(requiredCatalogVersion);
 
         if (!config.isStable()) {
             //noinspection ConstantConditions
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CatalogVersionSufficiency.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CatalogVersionSufficiency.java
index 499249eeae..d61f3197ad 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CatalogVersionSufficiency.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CatalogVersionSufficiency.java
@@ -34,7 +34,7 @@ public class CatalogVersionSufficiency {
      * @param catalogService Catalog service.
      * @return {@code true} iff the local Catalog version is sufficient.
      */
-    static boolean isMetadataAvailableFor(int requiredCatalogVersion, 
CatalogService catalogService) {
+    public static boolean isMetadataAvailableFor(int requiredCatalogVersion, 
CatalogService catalogService) {
         return requiredCatalogVersion <= catalogService.latestCatalogVersion();
     }
 }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactoryTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactoryTest.java
index 57a9a037f7..d8f6d0b504 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactoryTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactoryTest.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.util.concurrent.Executor;
+import org.apache.ignite.internal.catalog.CatalogService;
 import 
org.apache.ignite.internal.table.distributed.raft.RaftGroupConfiguration;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
@@ -55,6 +56,7 @@ public class PartitionSnapshotStorageFactoryTest extends 
BaseIgniteAbstractTest
                 mock(TopologyService.class),
                 mock(OutgoingSnapshotsManager.class),
                 partitionAccess,
+                mock(CatalogService.class),
                 mock(Executor.class)
         );
 
@@ -70,6 +72,7 @@ public class PartitionSnapshotStorageFactoryTest extends 
BaseIgniteAbstractTest
                 mock(TopologyService.class),
                 mock(OutgoingSnapshotsManager.class),
                 partitionAccess,
+                mock(CatalogService.class),
                 mock(Executor.class)
         );
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageTest.java
index ed83f131e0..a51c75e468 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageTest.java
@@ -24,6 +24,7 @@ import static org.hamcrest.Matchers.nullValue;
 import static org.mockito.Mockito.mock;
 
 import java.util.concurrent.Executor;
+import org.apache.ignite.internal.catalog.CatalogService;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.startup.StartupPartitionSnapshotReader;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
@@ -61,6 +62,7 @@ class PartitionSnapshotStorageTest extends 
BaseIgniteAbstractTest {
                 "",
                 mock(RaftOptions.class),
                 mock(PartitionAccess.class),
+                mock(CatalogService.class),
                 metaForCleanStorage,
                 mock(Executor.class)
         );
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
index 9bc08ce2cb..9574aa777a 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
@@ -32,6 +32,7 @@ import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.not;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
@@ -39,6 +40,7 @@ import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -54,8 +56,10 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
+import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.Column;
@@ -80,6 +84,7 @@ import 
org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionSnapshotStorage;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.SnapshotUri;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaRequest;
+import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaResponse;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataRequest;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse.ResponseEntry;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataRequest;
@@ -98,6 +103,7 @@ import org.apache.ignite.network.TopologyService;
 import org.apache.ignite.raft.jraft.RaftMessagesFactory;
 import org.apache.ignite.raft.jraft.Status;
 import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
+import org.apache.ignite.raft.jraft.error.RaftError;
 import org.apache.ignite.raft.jraft.option.RaftOptions;
 import org.apache.ignite.raft.jraft.option.SnapshotCopierOptions;
 import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotCopier;
@@ -137,13 +143,28 @@ public class IncomingSnapshotCopierTest extends 
BaseIgniteAbstractTest {
 
     private final RaftGroupConfigurationConverter 
raftGroupConfigurationConverter = new RaftGroupConfigurationConverter();
 
-    private MvGc mvGc;
+    private final MvGc mvGc = mock(MvGc.class);
+
+    private final CatalogService catalogService = mock(CatalogService.class);
+
+    private final MvPartitionStorage outgoingMvPartitionStorage = new 
TestMvPartitionStorage(TEST_PARTITION);
+    private final TxStateStorage outgoingTxStatePartitionStorage = new 
TestTxStateStorage();
+
+    private final MvTableStorage incomingMvTableStorage = spy(new 
TestMvTableStorage(TABLE_ID, DEFAULT_PARTITION_COUNT));
+    private final TxStateTableStorage incomingTxStateTableStorage = spy(new 
TestTxStateTableStorage());
+
+    private final long expLastAppliedIndex = 100500L;
+    private final long expLastAppliedTerm = 100L;
+    private final RaftGroupConfiguration expLastGroupConfig = 
generateRaftGroupConfig();
+
+    private final List<RowId> rowIds = generateRowIds();
+    private final List<UUID> txIds = generateTxIds();
 
     @BeforeEach
     void setUp() {
-        mvGc = mock(MvGc.class);
-
         when(mvGc.removeStorage(any(TablePartitionId.class))).then(invocation 
-> completedFuture(null));
+
+        
when(catalogService.catalogReadyFuture(anyInt())).thenReturn(completedFuture(null));
     }
 
     @AfterEach
@@ -153,27 +174,12 @@ public class IncomingSnapshotCopierTest extends 
BaseIgniteAbstractTest {
 
     @Test
     void test() {
-        MvPartitionStorage outgoingMvPartitionStorage = new 
TestMvPartitionStorage(TEST_PARTITION);
-        TxStateStorage outgoingTxStatePartitionStorage = new 
TestTxStateStorage();
-
-        long expLastAppliedIndex = 100500L;
-        long expLastAppliedTerm = 100L;
-        RaftGroupConfiguration expLastGroupConfig = generateRaftGroupConfig();
+        fillOriginalStorages();
 
-        List<RowId> rowIds = generateRowIds();
-        List<UUID> txIds = generateTxIds();
-
-        fillMvPartitionStorage(outgoingMvPartitionStorage, 
expLastAppliedIndex, expLastAppliedTerm, expLastGroupConfig, rowIds);
-        fillTxStatePartitionStorage(outgoingTxStatePartitionStorage, 
expLastAppliedIndex, expLastAppliedTerm, txIds);
-
-        MvTableStorage incomingMvTableStorage = spy(new 
TestMvTableStorage(TABLE_ID, DEFAULT_PARTITION_COUNT));
-        TxStateTableStorage incomingTxStateTableStorage = spy(new 
TestTxStateTableStorage());
-
-        assertThat(incomingMvTableStorage.createMvPartition(TEST_PARTITION), 
willCompleteSuccessfully());
-        incomingTxStateTableStorage.getOrCreateTxStateStorage(TEST_PARTITION);
+        createTargetStorages();
 
         MessagingService messagingService = 
messagingServiceForSuccessScenario(outgoingMvPartitionStorage,
-                outgoingTxStatePartitionStorage, expLastAppliedIndex, 
expLastAppliedTerm, expLastGroupConfig, rowIds, txIds, snapshotId);
+                outgoingTxStatePartitionStorage, rowIds, txIds);
 
         PartitionSnapshotStorage partitionSnapshotStorage = 
createPartitionSnapshotStorage(
                 snapshotId,
@@ -215,31 +221,21 @@ public class IncomingSnapshotCopierTest extends 
BaseIgniteAbstractTest {
         verify(incomingTxStatePartitionStorage, times(1)).startRebalance();
     }
 
-    private MessagingService 
messagingServiceForSuccessScenario(MvPartitionStorage 
outgoingMvPartitionStorage,
-            TxStateStorage outgoingTxStatePartitionStorage, long 
expLastAppliedIndex, long expLastAppliedTerm,
-            RaftGroupConfiguration expLastGroupConfig, List<RowId> rowIds, 
List<UUID> txIds, UUID snapshotId) {
-        MessagingService messagingService = mock(MessagingService.class);
+    private void createTargetStorages() {
+        assertThat(incomingMvTableStorage.createMvPartition(TEST_PARTITION), 
willCompleteSuccessfully());
+        incomingTxStateTableStorage.getOrCreateTxStateStorage(TEST_PARTITION);
+    }
 
-        when(messagingService.invoke(eq(clusterNode), 
any(SnapshotMetaRequest.class), anyLong())).then(answer -> {
-            SnapshotMetaRequest snapshotMetaRequest = answer.getArgument(1);
+    private void fillOriginalStorages() {
+        fillMvPartitionStorage(outgoingMvPartitionStorage, 
expLastAppliedIndex, expLastAppliedTerm, expLastGroupConfig, rowIds);
+        fillTxStatePartitionStorage(outgoingTxStatePartitionStorage, 
expLastAppliedIndex, expLastAppliedTerm, txIds);
+    }
 
-            assertEquals(snapshotId, snapshotMetaRequest.id());
+    private MessagingService 
messagingServiceForSuccessScenario(MvPartitionStorage 
outgoingMvPartitionStorage,
+            TxStateStorage outgoingTxStatePartitionStorage, List<RowId> 
rowIds, List<UUID> txIds) {
+        MessagingService messagingService = mock(MessagingService.class);
 
-            return completedFuture(
-                    TABLE_MSG_FACTORY.snapshotMetaResponse()
-                            .meta(
-                                    RAFT_MSG_FACTORY.snapshotMeta()
-                                            
.lastIncludedIndex(expLastAppliedIndex)
-                                            
.lastIncludedTerm(expLastAppliedTerm)
-                                            
.peersList(expLastGroupConfig.peers())
-                                            
.learnersList(expLastGroupConfig.learners())
-                                            
.oldPeersList(expLastGroupConfig.oldPeers())
-                                            
.oldLearnersList(expLastGroupConfig.oldLearners())
-                                            .build()
-                            )
-                            .build()
-            );
-        });
+        returnSnapshotMetaWhenAskedForIt(messagingService);
 
         when(messagingService.invoke(eq(clusterNode), 
any(SnapshotMvDataRequest.class), anyLong())).then(answer -> {
             SnapshotMvDataRequest snapshotMvDataRequest = 
answer.getArgument(1);
@@ -266,6 +262,32 @@ public class IncomingSnapshotCopierTest extends 
BaseIgniteAbstractTest {
         return messagingService;
     }
 
+    private void returnSnapshotMetaWhenAskedForIt(MessagingService 
messagingService) {
+        when(messagingService.invoke(eq(clusterNode), 
any(SnapshotMetaRequest.class), anyLong())).then(answer -> {
+            SnapshotMetaRequest snapshotMetaRequest = answer.getArgument(1);
+
+            assertEquals(snapshotId, snapshotMetaRequest.id());
+
+            return completedFuture(snapshotMetaResponse(0));
+        });
+    }
+
+    private SnapshotMetaResponse snapshotMetaResponse(int 
requiredCatalogVersion) {
+        return TABLE_MSG_FACTORY.snapshotMetaResponse()
+                .meta(
+                        RAFT_MSG_FACTORY.snapshotMeta()
+                                .lastIncludedIndex(expLastAppliedIndex)
+                                .lastIncludedTerm(expLastAppliedTerm)
+                                .peersList(expLastGroupConfig.peers())
+                                .learnersList(expLastGroupConfig.learners())
+                                .oldPeersList(expLastGroupConfig.oldPeers())
+                                
.oldLearnersList(expLastGroupConfig.oldLearners())
+                                .requiredCatalogVersion(requiredCatalogVersion)
+                                .build()
+                )
+                .build();
+    }
+
     private PartitionSnapshotStorage createPartitionSnapshotStorage(
             UUID snapshotId,
             MvTableStorage incomingTableStorage,
@@ -293,6 +315,7 @@ public class IncomingSnapshotCopierTest extends 
BaseIgniteAbstractTest {
                         mock(IndexUpdateHandler.class),
                         mock(GcUpdateHandler.class)
                 )),
+                catalogService,
                 mock(SnapshotMeta.class),
                 executorService
         );
@@ -435,18 +458,14 @@ public class IncomingSnapshotCopierTest extends 
BaseIgniteAbstractTest {
     }
 
     @Test
-    void cancellationMakesJoinFinishIfHangingOnNetworkCall() throws Exception {
-        MvTableStorage incomingMvTableStorage = spy(new 
TestMvTableStorage(TABLE_ID, DEFAULT_PARTITION_COUNT));
-        TxStateTableStorage incomingTxStateTableStorage = spy(new 
TestTxStateTableStorage());
-
-        assertThat(incomingMvTableStorage.createMvPartition(TEST_PARTITION), 
willCompleteSuccessfully());
-        incomingTxStateTableStorage.getOrCreateTxStateStorage(TEST_PARTITION);
+    void cancellationMakesJoinFinishIfHangingOnNetworkCallToSnapshotMetadata() 
throws Exception {
+        createTargetStorages();
 
         CountDownLatch networkInvokeLatch = new CountDownLatch(1);
 
         MessagingService messagingService = mock(MessagingService.class);
 
-        when(messagingService.invoke(any(ClusterNode.class), any(), 
anyLong())).then(invocation -> {
+        when(messagingService.invoke(any(ClusterNode.class), 
any(SnapshotMetaRequest.class), anyLong())).then(invocation -> {
             networkInvokeLatch.countDown();
 
             return new CompletableFuture<>();
@@ -474,32 +493,58 @@ public class IncomingSnapshotCopierTest extends 
BaseIgniteAbstractTest {
 
         assertThat(cancelAndJoinFuture, willSucceedIn(1, TimeUnit.SECONDS));
 
-        verify(partitionSnapshotStorage.partition()).abortRebalance();
+        verify(partitionSnapshotStorage.partition(), never()).startRebalance();
+        verify(partitionSnapshotStorage.partition(), never()).abortRebalance();
     }
 
     @Test
-    void testCancelOnMiddleRebalance() {
-        MvPartitionStorage outgoingMvPartitionStorage = new 
TestMvPartitionStorage(TEST_PARTITION);
-        TxStateStorage outgoingTxStatePartitionStorage = new 
TestTxStateStorage();
+    void cancellationMakesJoinFinishIfHangingOnNetworkCallWhenGettingData() 
throws Exception {
+        createTargetStorages();
 
-        long expLastAppliedIndex = 100500L;
-        long expLastAppliedTerm = 100L;
-        RaftGroupConfiguration expLastGroupConfig = generateRaftGroupConfig();
+        CountDownLatch networkInvokeLatch = new CountDownLatch(1);
 
-        List<RowId> rowIds = generateRowIds();
-        List<UUID> txIds = generateTxIds();
+        MessagingService messagingService = mock(MessagingService.class);
 
-        fillMvPartitionStorage(outgoingMvPartitionStorage, 
expLastAppliedIndex, expLastAppliedTerm, expLastGroupConfig, rowIds);
-        fillTxStatePartitionStorage(outgoingTxStatePartitionStorage, 
expLastAppliedIndex, expLastAppliedTerm, txIds);
+        returnSnapshotMetaWhenAskedForIt(messagingService);
+        when(messagingService.invoke(any(ClusterNode.class), 
any(SnapshotMvDataRequest.class), anyLong())).then(invocation -> {
+            networkInvokeLatch.countDown();
 
-        MvTableStorage incomingMvTableStorage = spy(new 
TestMvTableStorage(TABLE_ID, DEFAULT_PARTITION_COUNT));
-        TxStateTableStorage incomingTxStateTableStorage = spy(new 
TestTxStateTableStorage());
+            return new CompletableFuture<>();
+        });
 
-        assertThat(incomingMvTableStorage.createMvPartition(TEST_PARTITION), 
willCompleteSuccessfully());
-        incomingTxStateTableStorage.getOrCreateTxStateStorage(TEST_PARTITION);
+        PartitionSnapshotStorage partitionSnapshotStorage = 
createPartitionSnapshotStorage(
+                snapshotId,
+                incomingMvTableStorage,
+                incomingTxStateTableStorage,
+                messagingService
+        );
+
+        SnapshotCopier snapshotCopier = 
partitionSnapshotStorage.startToCopyFrom(
+                SnapshotUri.toStringUri(snapshotId, NODE_NAME),
+                mock(SnapshotCopierOptions.class)
+        );
+
+        networkInvokeLatch.await(1, TimeUnit.SECONDS);
+
+        CompletableFuture<?> cancelAndJoinFuture = runAsync(() -> {
+            snapshotCopier.cancel();
+
+            snapshotCopier.join();
+        });
+
+        assertThat(cancelAndJoinFuture, willSucceedIn(1, TimeUnit.SECONDS));
+
+        verify(partitionSnapshotStorage.partition()).abortRebalance();
+    }
+
+    @Test
+    void testCancelOnMiddleRebalance() {
+        fillOriginalStorages();
+
+        createTargetStorages();
 
         MessagingService messagingService = 
messagingServiceForSuccessScenario(outgoingMvPartitionStorage,
-                outgoingTxStatePartitionStorage, expLastAppliedIndex, 
expLastAppliedTerm, expLastGroupConfig, rowIds, txIds, snapshotId);
+                outgoingTxStatePartitionStorage, rowIds, txIds);
 
         PartitionSnapshotStorage partitionSnapshotStorage = 
createPartitionSnapshotStorage(
                 snapshotId,
@@ -547,27 +592,12 @@ public class IncomingSnapshotCopierTest extends 
BaseIgniteAbstractTest {
 
     @Test
     void testErrorInProcessOfRebalance() {
-        MvPartitionStorage outgoingMvPartitionStorage = new 
TestMvPartitionStorage(TEST_PARTITION);
-        TxStateStorage outgoingTxStatePartitionStorage = new 
TestTxStateStorage();
-
-        long expLastAppliedIndex = 100500L;
-        long expLastAppliedTerm = 100L;
-        RaftGroupConfiguration expLastGroupConfig = generateRaftGroupConfig();
-
-        List<RowId> rowIds = generateRowIds();
-        List<UUID> txIds = generateTxIds();
-
-        fillMvPartitionStorage(outgoingMvPartitionStorage, 
expLastAppliedIndex, expLastAppliedTerm, expLastGroupConfig, rowIds);
-        fillTxStatePartitionStorage(outgoingTxStatePartitionStorage, 
expLastAppliedIndex, expLastAppliedTerm, txIds);
+        fillOriginalStorages();
 
-        MvTableStorage incomingMvTableStorage = spy(new 
TestMvTableStorage(TABLE_ID, DEFAULT_PARTITION_COUNT));
-        TxStateTableStorage incomingTxStateTableStorage = spy(new 
TestTxStateTableStorage());
-
-        assertThat(incomingMvTableStorage.createMvPartition(TEST_PARTITION), 
willCompleteSuccessfully());
-        incomingTxStateTableStorage.getOrCreateTxStateStorage(TEST_PARTITION);
+        createTargetStorages();
 
         MessagingService messagingService = 
messagingServiceForSuccessScenario(outgoingMvPartitionStorage,
-                outgoingTxStatePartitionStorage, expLastAppliedIndex, 
expLastAppliedTerm, expLastGroupConfig, rowIds, txIds, snapshotId);
+                outgoingTxStatePartitionStorage, rowIds, txIds);
 
         PartitionSnapshotStorage partitionSnapshotStorage = 
createPartitionSnapshotStorage(
                 snapshotId,
@@ -642,4 +672,60 @@ public class IncomingSnapshotCopierTest extends 
BaseIgniteAbstractTest {
                 List.of("old-learner")
         );
     }
+
+    @Test
+    void laggingSchemasPreventSnapshotInstallation() {
+        fillOriginalStorages();
+
+        createTargetStorages();
+
+        MessagingService messagingService = mock(MessagingService.class);
+
+        when(messagingService.invoke(eq(clusterNode), 
any(SnapshotMetaRequest.class), anyLong()))
+                .thenReturn(completedFuture(snapshotMetaResponse(42)));
+
+        PartitionSnapshotStorage partitionSnapshotStorage = 
createPartitionSnapshotStorage(
+                snapshotId,
+                incomingMvTableStorage,
+                incomingTxStateTableStorage,
+                messagingService
+        );
+
+        SnapshotCopier snapshotCopier = 
partitionSnapshotStorage.startToCopyFrom(
+                SnapshotUri.toStringUri(snapshotId, NODE_NAME),
+                mock(SnapshotCopierOptions.class)
+        );
+
+        assertThat(runAsync(snapshotCopier::join), willSucceedIn(1, 
TimeUnit.SECONDS));
+
+        assertEquals(RaftError.EBUSY.getNumber(), snapshotCopier.getCode());
+
+        verify(messagingService, never()).invoke(any(ClusterNode.class), 
any(SnapshotMvDataRequest.class), anyLong());
+        verify(messagingService, never()).invoke(any(ClusterNode.class), 
any(SnapshotTxDataRequest.class), anyLong());
+
+        verify(partitionSnapshotStorage.partition(), never()).startRebalance();
+        verify(partitionSnapshotStorage.partition(), never()).abortRebalance();
+
+        assertThatTargetStoragesAreEmpty(incomingMvTableStorage, 
incomingTxStateTableStorage);
+    }
+
+    private static void assertThatTargetStoragesAreEmpty(
+            MvTableStorage incomingMvTableStorage,
+            TxStateTableStorage incomingTxStateTableStorage
+    ) {
+        MvPartitionStorage incomingMvPartitionStorage = 
incomingMvTableStorage.getMvPartition(TEST_PARTITION);
+        TxStateStorage incomingTxStatePartitionStorage = 
incomingTxStateTableStorage.getTxStateStorage(TEST_PARTITION);
+
+        assertEquals(0L, incomingMvPartitionStorage.lastAppliedIndex());
+        assertEquals(0L, incomingMvPartitionStorage.lastAppliedTerm());
+        assertArrayEquals(
+                null,
+                incomingMvPartitionStorage.committedGroupConfiguration()
+        );
+        assertEquals(0L, incomingTxStatePartitionStorage.lastAppliedIndex());
+        assertEquals(0L, incomingTxStatePartitionStorage.lastAppliedTerm());
+
+        
assertFalse(incomingMvPartitionStorage.scan(HybridTimestamp.MAX_VALUE).hasNext());
+        assertFalse(incomingTxStatePartitionStorage.scan().hasNext());
+    }
 }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotCommonTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotCommonTest.java
index c4bf8a1b5a..74a19e9471 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotCommonTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotCommonTest.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.when;
 
 import java.util.List;
 import java.util.UUID;
+import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
 import 
org.apache.ignite.internal.table.distributed.raft.RaftGroupConfiguration;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccess;
@@ -44,17 +45,22 @@ class OutgoingSnapshotCommonTest extends 
BaseIgniteAbstractTest {
     @Mock
     private PartitionAccess partitionAccess;
 
+    @Mock
+    private CatalogService catalogService;
+
     private OutgoingSnapshot snapshot;
 
     private final TableMessagesFactory messagesFactory = new 
TableMessagesFactory();
 
     private final PartitionKey partitionKey = new PartitionKey(1, 1);
 
+    private static final int REQUIRED_CATALOG_VERSION = 42;
+
     @BeforeEach
     void createTestInstance() {
         when(partitionAccess.partitionKey()).thenReturn(partitionKey);
 
-        snapshot = new OutgoingSnapshot(UUID.randomUUID(), partitionAccess);
+        snapshot = new OutgoingSnapshot(UUID.randomUUID(), partitionAccess, 
catalogService);
     }
 
     @Test
@@ -75,6 +81,8 @@ class OutgoingSnapshotCommonTest extends 
BaseIgniteAbstractTest {
                 List.of("learner1:3000")
         ));
 
+        
when(catalogService.latestCatalogVersion()).thenReturn(REQUIRED_CATALOG_VERSION);
+
         snapshot.freezeScopeUnderMvLock();
 
         SnapshotMetaResponse response = getSnapshotMetaResponse();
@@ -85,6 +93,7 @@ class OutgoingSnapshotCommonTest extends 
BaseIgniteAbstractTest {
         assertThat(response.meta().learnersList(), is(List.of("learner1:3000", 
"learner2:3000")));
         assertThat(response.meta().oldPeersList(), is(List.of("peer1:3000")));
         assertThat(response.meta().oldLearnersList(), 
is(List.of("learner1:3000")));
+        assertThat(response.meta().requiredCatalogVersion(), 
is(REQUIRED_CATALOG_VERSION));
     }
 
     private SnapshotMetaResponse getSnapshotMetaResponse() {
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java
index 1e868a3190..4615eec80a 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java
@@ -34,6 +34,7 @@ import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Objects;
 import java.util.UUID;
+import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.schema.BinaryRow;
@@ -61,6 +62,9 @@ class OutgoingSnapshotMvDataStreamingTest extends 
BaseIgniteAbstractTest {
     @Mock
     private PartitionAccess partitionAccess;
 
+    @Mock
+    private CatalogService catalogService;
+
     private OutgoingSnapshot snapshot;
 
     private final TableMessagesFactory messagesFactory = new 
TableMessagesFactory();
@@ -83,7 +87,7 @@ class OutgoingSnapshotMvDataStreamingTest extends 
BaseIgniteAbstractTest {
     void createTestInstance() {
         when(partitionAccess.partitionKey()).thenReturn(partitionKey);
 
-        snapshot = new OutgoingSnapshot(UUID.randomUUID(), partitionAccess);
+        snapshot = new OutgoingSnapshot(UUID.randomUUID(), partitionAccess, 
catalogService);
     }
 
     @BeforeEach
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java
index 82a598f4ca..c1ed183e0e 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java
@@ -24,6 +24,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.util.concurrent.Executor;
+import org.apache.ignite.internal.catalog.CatalogService;
 import 
org.apache.ignite.internal.table.distributed.raft.RaftGroupConfiguration;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccess;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey;
@@ -60,6 +61,7 @@ public class OutgoingSnapshotReaderTest extends 
BaseIgniteAbstractTest {
                 "",
                 mock(RaftOptions.class),
                 partitionAccess,
+                mock(CatalogService.class),
                 mock(SnapshotMeta.class),
                 mock(Executor.class)
         );
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotTxDataStreamingTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotTxDataStreamingTest.java
index 3e25c9b1e1..998bfd41c2 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotTxDataStreamingTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotTxDataStreamingTest.java
@@ -33,6 +33,7 @@ import static org.mockito.Mockito.when;
 
 import java.util.List;
 import java.util.UUID;
+import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.replicator.TablePartitionId;
@@ -59,6 +60,9 @@ class OutgoingSnapshotTxDataStreamingTest extends 
BaseIgniteAbstractTest {
     @Mock
     private PartitionAccess partitionAccess;
 
+    @Mock
+    private CatalogService catalogService;
+
     private OutgoingSnapshot snapshot;
 
     private final TableMessagesFactory messagesFactory = new 
TableMessagesFactory();
@@ -82,7 +86,7 @@ class OutgoingSnapshotTxDataStreamingTest extends 
BaseIgniteAbstractTest {
 
         
lenient().when(partitionAccess.committedGroupConfiguration()).thenReturn(mock(RaftGroupConfiguration.class));
 
-        snapshot = new OutgoingSnapshot(UUID.randomUUID(), partitionAccess);
+        snapshot = new OutgoingSnapshot(UUID.randomUUID(), partitionAccess, 
catalogService);
     }
 
     @Test
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManagerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManagerTest.java
index e01f7c9a25..6b57d31250 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManagerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManagerTest.java
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.util.UUID;
+import org.apache.ignite.internal.catalog.CatalogService;
 import 
org.apache.ignite.internal.table.distributed.raft.RaftGroupConfiguration;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccess;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey;
@@ -44,6 +45,9 @@ class OutgoingSnapshotsManagerTest extends 
BaseIgniteAbstractTest {
     @Mock
     private PartitionAccess partitionAccess;
 
+    @Mock
+    private CatalogService catalogService;
+
     private final PartitionKey partitionKey = new PartitionKey(1, 1);
 
     @SuppressWarnings("EmptyTryBlock")
@@ -68,7 +72,7 @@ class OutgoingSnapshotsManagerTest extends 
BaseIgniteAbstractTest {
 
         
when(partitionAccess.committedGroupConfiguration()).thenReturn(mock(RaftGroupConfiguration.class));
 
-        OutgoingSnapshot snapshot = new OutgoingSnapshot(UUID.randomUUID(), 
partitionAccess);
+        OutgoingSnapshot snapshot = new OutgoingSnapshot(UUID.randomUUID(), 
partitionAccess, catalogService);
 
         assertDoesNotThrow(() -> 
manager.startOutgoingSnapshot(UUID.randomUUID(), snapshot));
     }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtilsTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtilsTest.java
index f29cae8d0a..054ebe7e7d 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtilsTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtilsTest.java
@@ -38,7 +38,7 @@ class SnapshotMetaUtilsTest extends BaseIgniteAbstractTest {
                 List.of("peer1:3000"), List.of("learner1:3000")
         );
 
-        SnapshotMeta meta = SnapshotMetaUtils.snapshotMetaAt(100, 3, config);
+        SnapshotMeta meta = SnapshotMetaUtils.snapshotMetaAt(100, 3, config, 
42);
 
         assertThat(meta.lastIncludedIndex(), is(100L));
         assertThat(meta.lastIncludedTerm(), is(3L));
@@ -46,11 +46,12 @@ class SnapshotMetaUtilsTest extends BaseIgniteAbstractTest {
         assertThat(meta.learnersList(), is(List.of("learner1:3000", 
"learner2:3000")));
         assertThat(meta.oldPeersList(), is(List.of("peer1:3000")));
         assertThat(meta.oldLearnersList(), is(List.of("learner1:3000")));
+        assertThat(meta.requiredCatalogVersion(), is(42));
     }
 
     @Test
     void doesNotIncludeOldConfigWhenItIsNotThere() {
-        SnapshotMeta meta = SnapshotMetaUtils.snapshotMetaAt(100, 3, new 
RaftGroupConfiguration(List.of(), List.of(), null, null));
+        SnapshotMeta meta = SnapshotMetaUtils.snapshotMetaAt(100, 3, new 
RaftGroupConfiguration(List.of(), List.of(), null, null), 42);
 
         assertThat(meta.oldPeersList(), is(nullValue()));
         assertThat(meta.oldLearnersList(), is(nullValue()));

Reply via email to