This is an automated email from the ASF dual-hosted git repository. ibessonov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 4406ebf825 IGNITE-20256 Refuse to install Raft snapshots on partitions when not enough schemas are available (#2473) 4406ebf825 is described below commit 4406ebf825083e21ebd0a20d2fafed738acdff55 Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com> AuthorDate: Mon Sep 4 18:57:10 2023 +0400 IGNITE-20256 Refuse to install Raft snapshots on partitions when not enough schemas are available (#2473) --- .../internal/catalog/CatalogManagerSelfTest.java | 2 + .../internal/testframework/IgniteTestUtils.java | 4 +- .../ignite/raft/jraft/entity/RaftOutter.java | 3 + .../raftsnapshot/ItTableRaftSnapshotsTest.java | 202 ++++++++++++++-- .../storage/rocksdb/RocksDbMvPartitionStorage.java | 2 +- .../internal/table/distributed/TableManager.java | 1 + .../raft/snapshot/PartitionSnapshotStorage.java | 13 ++ .../snapshot/PartitionSnapshotStorageFactory.java | 13 +- .../snapshot/incoming/IncomingSnapshotCopier.java | 126 ++++++++-- .../raft/snapshot/outgoing/OutgoingSnapshot.java | 8 +- .../snapshot/outgoing/OutgoingSnapshotReader.java | 2 +- .../raft/snapshot/outgoing/SnapshotMetaUtils.java | 6 +- .../schema/CatalogVersionSufficiency.java | 2 +- .../PartitionSnapshotStorageFactoryTest.java | 3 + .../snapshot/PartitionSnapshotStorageTest.java | 2 + .../incoming/IncomingSnapshotCopierTest.java | 254 ++++++++++++++------- .../outgoing/OutgoingSnapshotCommonTest.java | 11 +- .../OutgoingSnapshotMvDataStreamingTest.java | 6 +- .../outgoing/OutgoingSnapshotReaderTest.java | 2 + .../OutgoingSnapshotTxDataStreamingTest.java | 6 +- .../outgoing/OutgoingSnapshotsManagerTest.java | 6 +- .../snapshot/outgoing/SnapshotMetaUtilsTest.java | 5 +- 22 files changed, 536 insertions(+), 143 deletions(-) diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java index 21f92469f9..23239c547f 100644 --- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java +++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java @@ -173,6 +173,8 @@ public class CatalogManagerSelfTest extends BaseCatalogManagerTest { assertEquals(INFINITE_TIMER_VALUE, zone.dataNodesAutoAdjustScaleDown()); assertEquals(DEFAULT_STORAGE_ENGINE, zone.dataStorage().engine()); assertEquals(DEFAULT_DATA_REGION, zone.dataStorage().dataRegion()); + + assertThat(manager.latestCatalogVersion(), is(0)); } @Test diff --git a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java index a8c22d1eeb..c21b6e70e9 100644 --- a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java +++ b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java @@ -190,9 +190,9 @@ public final class IgniteTestUtils { * @param fieldName name of the field * @return field value */ - public static Object getFieldValue(@Nullable Object target, Class<?> declaredClass, String fieldName) { + public static <T> T getFieldValue(@Nullable Object target, Class<?> declaredClass, String fieldName) { try { - return getField(target, declaredClass, fieldName).get(target); + return (T) getField(target, declaredClass, fieldName).get(target); } catch (IllegalAccessException e) { throw new IgniteInternalException("Cannot get field value", e); } diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/RaftOutter.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/RaftOutter.java index 8e36f3ec75..935b4fbda1 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/RaftOutter.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/RaftOutter.java @@ -73,5 +73,8 @@ public final class RaftOutter { @Nullable Collection<String> oldLearnersList(); + + /** Minimum catalog version that is required for the snapshot to be accepted by a follower. */ + int requiredCatalogVersion(); } } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java index 88a2ec05e5..37299f5675 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java @@ -19,12 +19,14 @@ package org.apache.ignite.internal.raftsnapshot; import static java.util.stream.Collectors.toList; import static org.apache.ignite.internal.SessionUtils.executeUpdate; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldValue; import static org.apache.ignite.internal.testframework.IgniteTestUtils.hasCause; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -33,9 +35,11 @@ import java.net.ConnectException; import java.nio.file.Path; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -51,15 +55,21 @@ import org.apache.calcite.sql.validate.SqlValidatorException; import org.apache.ignite.internal.Cluster; import org.apache.ignite.internal.IgniteIntegrationTest; import org.apache.ignite.internal.ReplicationGroupsUtils; +import org.apache.ignite.internal.app.IgniteImpl; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId; +import org.apache.ignite.internal.raft.server.RaftServer; +import org.apache.ignite.internal.raft.server.impl.JraftServerImpl; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand; import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException; import org.apache.ignite.internal.storage.StorageRebalanceException; import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine; import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine; +import org.apache.ignite.internal.table.distributed.raft.snapshot.incoming.IncomingSnapshotCopier; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaResponse; +import org.apache.ignite.internal.test.WatchListenerInhibitor; import org.apache.ignite.internal.testframework.IgniteTestUtils; import org.apache.ignite.internal.testframework.WorkDirectory; import org.apache.ignite.internal.testframework.log4j2.LogInspector; @@ -71,14 +81,27 @@ import org.apache.ignite.lang.IgniteInternalCheckedException; import org.apache.ignite.lang.IgniteInternalException; import org.apache.ignite.network.NetworkMessage; import org.apache.ignite.raft.jraft.RaftGroupService; +import org.apache.ignite.raft.jraft.RaftMessagesFactory; import org.apache.ignite.raft.jraft.Status; import org.apache.ignite.raft.jraft.core.Replicator; +import org.apache.ignite.raft.jraft.error.RaftError; import org.apache.ignite.raft.jraft.rpc.ActionRequest; +import org.apache.ignite.raft.jraft.rpc.Message; +import org.apache.ignite.raft.jraft.rpc.RaftRpcFactory; +import org.apache.ignite.raft.jraft.rpc.RaftServerService; +import org.apache.ignite.raft.jraft.rpc.RpcProcessor; +import org.apache.ignite.raft.jraft.rpc.RpcRequestClosure; +import org.apache.ignite.raft.jraft.rpc.RpcRequestProcessor; +import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesRequest; +import org.apache.ignite.raft.jraft.rpc.RpcServer; +import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcServer; +import org.apache.ignite.raft.jraft.rpc.impl.core.AppendEntriesRequestProcessor; import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotExecutorImpl; import org.apache.ignite.sql.ResultSet; import org.apache.ignite.sql.SqlRow; +import org.apache.ignite.table.KeyValueView; +import org.apache.ignite.table.Tuple; import org.apache.ignite.tx.Transaction; -import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; @@ -126,23 +149,21 @@ class ItTableRaftSnapshotsTest extends IgniteIntegrationTest { private LogInspector replicatorLogInspector; - private @Nullable Handler replicaLoggerHandler; + private LogInspector copierLogInspector; @BeforeEach void createCluster(TestInfo testInfo) { cluster = new Cluster(testInfo, workDir, NODE_BOOTSTRAP_CFG); replicatorLogInspector = LogInspector.create(Replicator.class, true); + copierLogInspector = LogInspector.create(IncomingSnapshotCopier.class, true); } @AfterEach @Timeout(60) void shutdownCluster() { - if (replicaLoggerHandler != null) { - replicatorLogInspector.removeHandler(replicaLoggerHandler); - } - replicatorLogInspector.stop(); + copierLogInspector.stop(); cluster.shutdown(); } @@ -426,21 +447,23 @@ class ItTableRaftSnapshotsTest extends IgniteIntegrationTest { * on it for the sole table partition in the cluster. */ private void reanimateNodeAndWaitForSnapshotInstalled(int nodeIndex) throws InterruptedException { + CountDownLatch snapshotInstalledLatch = snapshotInstalledLatch(nodeIndex); + + reanimateNode(nodeIndex); + + assertTrue(snapshotInstalledLatch.await(60, TimeUnit.SECONDS), "Did not install a snapshot in time"); + } + + private CountDownLatch snapshotInstalledLatch(int nodeIndex) { CountDownLatch snapshotInstalledLatch = new CountDownLatch(1); - Handler handler = replicatorLogInspector.addHandler( + replicatorLogInspector.addHandler( evt -> evt.getMessage().getFormattedMessage().matches( - "Node .+ received InstallSnapshotResponse from .+_" + nodeIndex + " .+ success=true"), - () -> snapshotInstalledLatch.countDown() + "Node \\S+ received InstallSnapshotResponse from \\S+_" + nodeIndex + " .+ success=true"), + snapshotInstalledLatch::countDown ); - try { - reanimateNode(nodeIndex); - - assertTrue(snapshotInstalledLatch.await(60, TimeUnit.SECONDS), "Did not install a snapshot in time"); - } finally { - replicatorLogInspector.removeHandler(handler); - } + return snapshotInstalledLatch; } private void reanimateNode(int nodeIndex) { @@ -724,11 +747,12 @@ class ItTableRaftSnapshotsTest extends IgniteIntegrationTest { int nodeIndexTo, CompletableFuture<Void> snapshotInstallSuccessfullyFuture ) { - String regexp = "Node .+" + nodeIndexFrom + " received InstallSnapshotResponse from .+_" + nodeIndexTo + " .+ success=true"; + String regexp = "Node \\S+" + nodeIndexFrom + " received InstallSnapshotResponse from \\S+_" + nodeIndexTo + " .+ success=true"; - replicaLoggerHandler = replicatorLogInspector.addHandler( + replicatorLogInspector.addHandler( evt -> evt.getMessage().getFormattedMessage().matches(regexp), - () -> snapshotInstallSuccessfullyFuture.complete(null)); + () -> snapshotInstallSuccessfullyFuture.complete(null) + ); } /** @@ -782,6 +806,115 @@ class ItTableRaftSnapshotsTest extends IgniteIntegrationTest { assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one")))); } + /** + * The replication mechanism must not replicate commands for which schemas are not yet available on the node + * to which replication happens (in Raft, it means that followers/learners cannot receive commands that they + * cannot execute without waiting for schemas). This method tests that snapshots bringing such commands are + * rejected, and that, when metadata catches up, the snapshot gets successfully installed. + */ + @Test + void laggingSchemasOnFollowerPreventSnapshotInstallation() throws Exception { + cluster.startAndInit(3); + + createTestTableWith3Replicas(DEFAULT_STORAGE_ENGINE); + + // Prepare the scene: force node 0 to be a leader, and node 2 to be a follower. + final int leaderIndex = 0; + final int followerIndex = 2; + + transferLeadershipOnSolePartitionTo(leaderIndex); + cluster.transferLeadershipTo(leaderIndex, MetastorageGroupId.INSTANCE); + + // Block AppendEntries from being accepted on the follower so that the leader will have to use a snapshot. + blockIncomingAppendEntriesAt(followerIndex); + + // Inhibit the MetaStorage on the follower to make snapshots not eligible for installation. + WatchListenerInhibitor listenerInhibitor = inhibitMetastorageListenersAt(followerIndex); + + try { + // Add some data in a schema that is not yet available on the follower + updateTableSchemaAt(leaderIndex); + putToTableAt(leaderIndex); + + CountDownLatch installationRejected = installationRejectedLatch(); + CountDownLatch snapshotInstalled = snapshotInstalledLatch(followerIndex); + + // Force InstallSnapshot to be used. + causeLogTruncationOnSolePartitionLeader(leaderIndex); + + assertTrue(installationRejected.await(20, TimeUnit.SECONDS), "Did not see snapshot installation rejection"); + + assertThat("Snapshot was installed before unblocking", snapshotInstalled.getCount(), is(not(0L))); + + listenerInhibitor.stopInhibit(); + + assertTrue(snapshotInstalled.await(20, TimeUnit.SECONDS), "Did not see a snapshot installed"); + } finally { + listenerInhibitor.stopInhibit(); + } + } + + private void updateTableSchemaAt(int nodeIndex) { + cluster.doInSession(nodeIndex, session -> { + session.execute(null, "alter table test add column added int"); + }); + } + + private void putToTableAt(int nodeIndex) { + KeyValueView<Tuple, Tuple> kvView = cluster.node(nodeIndex) + .tables() + .table("test") + .keyValueView(); + kvView.put(null, Tuple.create().set("key", 1), Tuple.create().set("val", "one")); + } + + private void blockIncomingAppendEntriesAt(int nodeIndex) { + BlockingAppendEntriesRequestProcessor blockingProcessorOnFollower = installBlockingAppendEntriesProcessor(nodeIndex); + + blockingProcessorOnFollower.startBlocking(); + } + + private WatchListenerInhibitor inhibitMetastorageListenersAt(int nodeIndex) { + IgniteImpl nodeToInhibitMetaStorage = cluster.node(nodeIndex); + + WatchListenerInhibitor listenerInhibitor = WatchListenerInhibitor.metastorageEventsInhibitor(nodeToInhibitMetaStorage); + listenerInhibitor.startInhibit(); + + return listenerInhibitor; + } + + private CountDownLatch installationRejectedLatch() { + CountDownLatch installationRejected = new CountDownLatch(1); + + copierLogInspector.addHandler( + event -> event.getMessage().getFormattedMessage().startsWith("Metadata not yet available, rejecting snapshot installation"), + installationRejected::countDown + ); + + return installationRejected; + } + + private BlockingAppendEntriesRequestProcessor installBlockingAppendEntriesProcessor(int nodeIndex) { + RaftServer raftServer = cluster.node(nodeIndex).raftManager().server(); + RpcServer<?> rpcServer = getFieldValue(raftServer, JraftServerImpl.class, "rpcServer"); + Map<String, RpcProcessor<?>> processors = getFieldValue(rpcServer, IgniteRpcServer.class, "processors"); + + AppendEntriesRequestProcessor originalProcessor = + (AppendEntriesRequestProcessor) processors.get(AppendEntriesRequest.class.getName()); + Executor appenderExecutor = getFieldValue(originalProcessor, RpcRequestProcessor.class, "executor"); + RaftMessagesFactory raftMessagesFactory = getFieldValue(originalProcessor, RpcRequestProcessor.class, "msgFactory"); + + BlockingAppendEntriesRequestProcessor blockingProcessor = new BlockingAppendEntriesRequestProcessor( + appenderExecutor, + raftMessagesFactory, + cluster.solePartitionId().toString() + ); + + rpcServer.registerProcessor(blockingProcessor); + + return blockingProcessor; + } + /** * This exception is thrown to indicate that an operation can not possibly succeed after some error condition. * For example there is no reason to retry an operation that inserts a certain key after receiving a duplicate key error. @@ -794,4 +927,35 @@ class ItTableRaftSnapshotsTest extends IgniteIntegrationTest { super(cause); } } + + /** + * {@link AppendEntriesRequestProcessor} that, when blocking is enabled, blocks all AppendEntriesRequests of + * the given group (that is, returns EBUSY error code, which makes JRaft repeat them). + */ + private static class BlockingAppendEntriesRequestProcessor extends AppendEntriesRequestProcessor { + private final String idOfGroupToBlock; + private volatile boolean block; + + public BlockingAppendEntriesRequestProcessor(Executor executor, + RaftMessagesFactory msgFactory, String idOfGroupToBlock) { + super(executor, msgFactory); + + this.idOfGroupToBlock = idOfGroupToBlock; + } + + @Override + public Message processRequest0(RaftServerService service, AppendEntriesRequest request, RpcRequestClosure done) { + if (block && idOfGroupToBlock.equals(request.groupId())) { + return RaftRpcFactory.DEFAULT // + .newResponse(done.getMsgFactory(), RaftError.EBUSY, + "Blocking AppendEntries on '%s'.", request.groupId()); + } + + return super.processRequest0(service, request, done); + } + + public void startBlocking() { + block = true; + } + } } diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java index cb501fbd8a..c801e1b341 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java @@ -1431,7 +1431,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo()); } - // Changed storage states and expect all storage operations to stop soon. + // Change storage states and expect all storage operations to stop soon. busyLock.block(); try { diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index aa70f553f6..ce64e38fb6 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -1098,6 +1098,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp partitionUpdateHandlers.indexUpdateHandler, partitionUpdateHandlers.gcUpdateHandler ), + catalogService, incomingSnapshotsExecutor )); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorage.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorage.java index 16d58e23cb..1d803c97cd 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorage.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorage.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.table.distributed.raft.snapshot; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.ignite.internal.catalog.CatalogService; import org.apache.ignite.internal.storage.MvPartitionStorage; import org.apache.ignite.internal.table.distributed.raft.snapshot.incoming.IncomingSnapshotCopier; import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotReader; @@ -56,6 +57,8 @@ public class PartitionSnapshotStorage implements SnapshotStorage { /** Instance of partition. */ private final PartitionAccess partition; + private final CatalogService catalogService; + /** * Snapshot meta, constructed from the storage data and raft group configuration at startup. * {@code null} if the storage is empty. @@ -81,6 +84,7 @@ public class PartitionSnapshotStorage implements SnapshotStorage { * @param snapshotUri Snapshot URI. * @param raftOptions RAFT options. * @param partition Partition. + * @param catalogService Catalog service. * @param startupSnapshotMeta Snapshot meta at startup. {@code null} if the storage is empty. * @param incomingSnapshotsExecutor Incoming snapshots executor. */ @@ -90,6 +94,7 @@ public class PartitionSnapshotStorage implements SnapshotStorage { String snapshotUri, RaftOptions raftOptions, PartitionAccess partition, + CatalogService catalogService, @Nullable SnapshotMeta startupSnapshotMeta, Executor incomingSnapshotsExecutor ) { @@ -98,6 +103,7 @@ public class PartitionSnapshotStorage implements SnapshotStorage { this.snapshotUri = snapshotUri; this.raftOptions = raftOptions; this.partition = partition; + this.catalogService = catalogService; this.startupSnapshotMeta = startupSnapshotMeta; this.incomingSnapshotsExecutor = incomingSnapshotsExecutor; } @@ -137,6 +143,13 @@ public class PartitionSnapshotStorage implements SnapshotStorage { return partition; } + /** + * Returns catalog service. + */ + public CatalogService catalogService() { + return catalogService; + } + /** * Returns a snapshot meta, constructed from the storage data and raft group configuration at startup. */ diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java index 56641e34a2..99588bb5af 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.table.distributed.raft.snapshot; import java.util.concurrent.Executor; +import org.apache.ignite.internal.catalog.CatalogService; import org.apache.ignite.internal.raft.storage.SnapshotStorageFactory; import org.apache.ignite.internal.storage.MvPartitionStorage; import org.apache.ignite.internal.table.distributed.raft.RaftGroupConfiguration; @@ -50,6 +51,8 @@ public class PartitionSnapshotStorageFactory implements SnapshotStorageFactory { /** Partition storage. */ private final PartitionAccess partition; + private final CatalogService catalogService; + /** * RAFT log index, min of {@link MvPartitionStorage#lastAppliedIndex()} and {@link TxStateStorage#lastAppliedIndex()} * at the moment of this factory instantiation. @@ -62,6 +65,8 @@ public class PartitionSnapshotStorageFactory implements SnapshotStorageFactory { /** RAFT configuration corresponding to {@link #lastIncludedRaftIndex}. */ private final RaftGroupConfiguration lastIncludedConfiguration; + private final int lastCatalogVersionAtStart; + /** Incoming snapshots executor. */ private final Executor incomingSnapshotsExecutor; @@ -71,6 +76,7 @@ public class PartitionSnapshotStorageFactory implements SnapshotStorageFactory { * @param topologyService Topology service. * @param outgoingSnapshotsManager Snapshot manager. * @param partition MV partition storage. + * @param catalogService Access to the Catalog. * @param incomingSnapshotsExecutor Incoming snapshots executor. * @see SnapshotMeta */ @@ -79,11 +85,13 @@ public class PartitionSnapshotStorageFactory implements SnapshotStorageFactory { TopologyService topologyService, OutgoingSnapshotsManager outgoingSnapshotsManager, PartitionAccess partition, + CatalogService catalogService, Executor incomingSnapshotsExecutor ) { this.topologyService = topologyService; this.outgoingSnapshotsManager = outgoingSnapshotsManager; this.partition = partition; + this.catalogService = catalogService; this.incomingSnapshotsExecutor = incomingSnapshotsExecutor; // We must choose the minimum applied index for local recovery so that we don't skip the raft commands for the storage with the @@ -92,12 +100,14 @@ public class PartitionSnapshotStorageFactory implements SnapshotStorageFactory { lastIncludedRaftTerm = partition.minLastAppliedTerm(); lastIncludedConfiguration = partition.committedGroupConfiguration(); + + lastCatalogVersionAtStart = catalogService.latestCatalogVersion(); } @Override public PartitionSnapshotStorage createSnapshotStorage(String uri, RaftOptions raftOptions) { SnapshotMeta startupSnapshotMeta = lastIncludedRaftIndex == 0 ? null : SnapshotMetaUtils.snapshotMetaAt( - lastIncludedRaftIndex, lastIncludedRaftTerm, lastIncludedConfiguration + lastIncludedRaftIndex, lastIncludedRaftTerm, lastIncludedConfiguration, lastCatalogVersionAtStart ); return new PartitionSnapshotStorage( @@ -106,6 +116,7 @@ public class PartitionSnapshotStorageFactory implements SnapshotStorageFactory { uri, raftOptions, partition, + catalogService, startupSnapshotMeta, incomingSnapshotsExecutor ); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java index fd5a9a3c37..be713c7918 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java @@ -17,16 +17,24 @@ package org.apache.ignite.internal.table.distributed.raft.snapshot.incoming; +import static java.util.concurrent.CompletableFuture.anyOf; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.failedFuture; +import static java.util.stream.Collectors.toList; import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp; +import static org.apache.ignite.internal.table.distributed.schema.CatalogVersionSufficiency.isMetadataAvailableFor; +import java.util.List; +import java.util.Objects; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; +import java.util.stream.Stream; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.schema.BinaryRow; @@ -68,6 +76,9 @@ public class IncomingSnapshotCopier extends SnapshotCopier { private static final int MAX_TX_DATA_BATCH_SIZE = 1000; + /** Number of milliseconds that the follower is allowed to try to catch up the required catalog version. */ + private static final int WAIT_FOR_METADATA_CATCHUP_MS = 3000; + private final PartitionSnapshotStorage partitionSnapshotStorage; private final SnapshotUri snapshotUri; @@ -87,7 +98,10 @@ public class IncomingSnapshotCopier extends SnapshotCopier { private volatile SnapshotMeta snapshotMeta; @Nullable - private volatile CompletableFuture<?> rebalanceFuture; + private volatile CompletableFuture<Boolean> metadataSufficiencyFuture; + + @Nullable + private volatile CompletableFuture<Void> rebalanceFuture; /** * Future is to wait in {@link #join()} because it is important for us to wait for the rebalance to finish or abort. @@ -112,20 +126,57 @@ public class IncomingSnapshotCopier extends SnapshotCopier { LOG.info("Copier is started for the partition [{}]", createPartitionInfo()); - rebalanceFuture = partitionSnapshotStorage.partition().startRebalance() - .thenCompose(unused -> { - ClusterNode snapshotSender = getSnapshotSender(snapshotUri.nodeName); + ClusterNode snapshotSender = getSnapshotSender(snapshotUri.nodeName); + + metadataSufficiencyFuture = snapshotSender == null + ? failedFuture(new StorageRebalanceException("Snapshot sender not found: " + snapshotUri.nodeName)) + : loadSnapshotMeta(snapshotSender) + // Give metadata some time to catch up as it's very probable that the leader is ahead metadata-wise. + .thenCompose(unused -> waitForMetadataWithTimeout()) + .thenApply(unused -> metadataIsSufficientlyComplete()) + ; + + rebalanceFuture = metadataSufficiencyFuture.thenCompose(metadataSufficient -> { + if (metadataSufficient) { + return partitionSnapshotStorage.partition().startRebalance() + .thenCompose(unused -> { + assert snapshotSender != null; + + return loadSnapshotMvData(snapshotSender, executor) + .thenCompose(unused1 -> loadSnapshotTxData(snapshotSender, executor)); + }); + } else { + logMetadataInsufficiencyAndSetError(); + + return completedFuture(null); + } + }); - if (snapshotSender == null) { - throw new StorageRebalanceException("Snapshot sender not found: " + snapshotUri.nodeName); - } + joinFuture = metadataSufficiencyFuture.thenCompose(metadataSufficient -> { + if (metadataSufficient) { + return rebalanceFuture.handle((unused, throwable) -> completeRebalance(throwable)).thenCompose(Function.identity()); + } else { + return completedFuture(null); + } + }); + } - return loadSnapshotMeta(snapshotSender) - .thenCompose(unused1 -> loadSnapshotMvData(snapshotSender, executor)) - .thenCompose(unused1 -> loadSnapshotTxData(snapshotSender, executor)); - }); + private CompletableFuture<?> waitForMetadataWithTimeout() { + CompletableFuture<?> metadataReadyFuture = partitionSnapshotStorage.catalogService() + .catalogReadyFuture(snapshotMeta.requiredCatalogVersion()); + CompletableFuture<?> readinessTimeoutFuture = completeOnMetadataReadinessTimeout(); - joinFuture = rebalanceFuture.handle((unused, throwable) -> completeRebalance(throwable)).thenCompose(Function.identity()); + return anyOf(metadataReadyFuture, readinessTimeoutFuture); + } + + private static CompletableFuture<?> completeOnMetadataReadinessTimeout() { + return new CompletableFuture<>() + .orTimeout(WAIT_FOR_METADATA_CATCHUP_MS, TimeUnit.MILLISECONDS) + .exceptionally(ex -> { + assert (ex instanceof TimeoutException); + + return null; + }); } @Override @@ -140,14 +191,16 @@ public class IncomingSnapshotCopier extends SnapshotCopier { } catch (ExecutionException e) { Throwable cause = e.getCause(); - LOG.error("Error when completing the copier", cause); + if (!(cause instanceof CancellationException)) { + LOG.error("Error when completing the copier", cause); - if (!isOk()) { - setError(RaftError.UNKNOWN, "Unknown error on completion the copier"); - } + if (isOk()) { + setError(RaftError.UNKNOWN, "Unknown error on completion the copier"); + } - // By analogy with LocalSnapshotCopier#join. - throw new IllegalStateException(cause); + // By analogy with LocalSnapshotCopier#join. + throw new IllegalStateException(cause); + } } } } @@ -163,11 +216,14 @@ public class IncomingSnapshotCopier extends SnapshotCopier { LOG.info("Copier is canceled for partition [{}]", createPartitionInfo()); - CompletableFuture<?> fut = rebalanceFuture; + // Cancel all futures that might be upstream wrt joinFuture. + List<CompletableFuture<?>> futuresToCancel = Stream.of(metadataSufficiencyFuture, rebalanceFuture) + .filter(Objects::nonNull) + .collect(toList()); - if (fut != null) { - fut.cancel(false); + futuresToCancel.forEach(future -> future.cancel(false)); + if (!futuresToCancel.isEmpty()) { try { // Because after the cancellation, no one waits for #join. join(); @@ -215,6 +271,28 @@ public class IncomingSnapshotCopier extends SnapshotCopier { } } + private boolean metadataIsSufficientlyComplete() { + return isMetadataAvailableFor(snapshotMeta.requiredCatalogVersion(), partitionSnapshotStorage.catalogService()); + } + + private void logMetadataInsufficiencyAndSetError() { + LOG.warn( + "Metadata not yet available, rejecting snapshot installation [uri={}, requiredVersion={}].", + this.snapshotUri, + snapshotMeta.requiredCatalogVersion() + ); + + String errorMessage = String.format( + "Metadata not yet available, URI '%s', required level %s; rejecting snapshot installation.", + this.snapshotUri, + snapshotMeta.requiredCatalogVersion() + ); + + if (isOk()) { + setError(RaftError.EBUSY, errorMessage); + } + } + /** * Requests and stores data into {@link MvPartitionStorage}. */ @@ -276,7 +354,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier { /** * Requests and stores data into {@link TxStateStorage}. */ - private CompletableFuture<?> loadSnapshotTxData(ClusterNode snapshotSender, Executor executor) { + private CompletableFuture<Void> loadSnapshotTxData(ClusterNode snapshotSender, Executor executor) { if (!busyLock.enterBusy()) { return completedFuture(null); } @@ -340,7 +418,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier { */ private CompletableFuture<Void> completeRebalance(@Nullable Throwable throwable) { if (!busyLock.enterBusy()) { - if (!isOk()) { + if (isOk()) { setError(RaftError.ECANCELED, "Copier is cancelled"); } @@ -351,7 +429,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier { if (throwable != null) { LOG.error("Partition rebalancing error [{}]", throwable, createPartitionInfo()); - if (!isOk()) { + if (isOk()) { setError(RaftError.UNKNOWN, throwable.getMessage()); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java index 82544f9f06..6cec945695 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java @@ -25,6 +25,7 @@ import java.util.Queue; import java.util.Set; import java.util.UUID; import java.util.concurrent.locks.ReentrantLock; +import org.apache.ignite.internal.catalog.CatalogService; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.schema.BinaryRow; @@ -63,6 +64,8 @@ public class OutgoingSnapshot { private final PartitionAccess partition; + private final CatalogService catalogService; + /** * Lock that is used for mutual exclusion of MV snapshot reading (by this class) and threads that write MV data to the same * partition (currently, via {@link SnapshotAwarePartitionDataStorage}). @@ -113,9 +116,10 @@ public class OutgoingSnapshot { /** * Creates a new instance. */ - public OutgoingSnapshot(UUID id, PartitionAccess partition) { + public OutgoingSnapshot(UUID id, PartitionAccess partition, CatalogService catalogService) { this.id = id; this.partition = partition; + this.catalogService = catalogService; lastRowId = RowId.lowestRowId(partition.partitionKey().partitionId()); } @@ -159,7 +163,7 @@ public class OutgoingSnapshot { assert config != null : "Configuration should never be null when installing a snapshot"; - return SnapshotMetaUtils.snapshotMetaAt(lastAppliedIndex, lastAppliedTerm, config); + return SnapshotMetaUtils.snapshotMetaAt(lastAppliedIndex, lastAppliedTerm, config, catalogService.latestCatalogVersion()); } /** diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReader.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReader.java index 24224f92c5..0c7ad9a585 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReader.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReader.java @@ -50,7 +50,7 @@ public class OutgoingSnapshotReader extends SnapshotReader { public OutgoingSnapshotReader(PartitionSnapshotStorage snapshotStorage) { this.snapshotStorage = snapshotStorage; - snapshot = new OutgoingSnapshot(id, snapshotStorage.partition()); + snapshot = new OutgoingSnapshot(id, snapshotStorage.partition(), snapshotStorage.catalogService()); LOG.info("Starting snapshot reader for snapshot {}", id); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtils.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtils.java index 0ed0b403a1..cff8692500 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtils.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtils.java @@ -32,14 +32,16 @@ public class SnapshotMetaUtils { * @param logIndex RAFT log index. * @param term Term corresponding to the index. * @param config RAFT group configuration. + * @param requiredCatalogVersion Catalog version that a follower/learner must have to have ability to accept this snapshot. * @return SnapshotMeta corresponding to the given log index. */ - public static SnapshotMeta snapshotMetaAt(long logIndex, long term, RaftGroupConfiguration config) { + public static SnapshotMeta snapshotMetaAt(long logIndex, long term, RaftGroupConfiguration config, int requiredCatalogVersion) { SnapshotMetaBuilder metaBuilder = new RaftMessagesFactory().snapshotMeta() .lastIncludedIndex(logIndex) .lastIncludedTerm(term) .peersList(config.peers()) - .learnersList(config.learners()); + .learnersList(config.learners()) + .requiredCatalogVersion(requiredCatalogVersion); if (!config.isStable()) { //noinspection ConstantConditions diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CatalogVersionSufficiency.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CatalogVersionSufficiency.java index 499249eeae..d61f3197ad 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CatalogVersionSufficiency.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CatalogVersionSufficiency.java @@ -34,7 +34,7 @@ public class CatalogVersionSufficiency { * @param catalogService Catalog service. * @return {@code true} iff the local Catalog version is sufficient. */ - static boolean isMetadataAvailableFor(int requiredCatalogVersion, CatalogService catalogService) { + public static boolean isMetadataAvailableFor(int requiredCatalogVersion, CatalogService catalogService) { return requiredCatalogVersion <= catalogService.latestCatalogVersion(); } } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactoryTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactoryTest.java index 57a9a037f7..d8f6d0b504 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactoryTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactoryTest.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.util.concurrent.Executor; +import org.apache.ignite.internal.catalog.CatalogService; import org.apache.ignite.internal.table.distributed.raft.RaftGroupConfiguration; import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; @@ -55,6 +56,7 @@ public class PartitionSnapshotStorageFactoryTest extends BaseIgniteAbstractTest mock(TopologyService.class), mock(OutgoingSnapshotsManager.class), partitionAccess, + mock(CatalogService.class), mock(Executor.class) ); @@ -70,6 +72,7 @@ public class PartitionSnapshotStorageFactoryTest extends BaseIgniteAbstractTest mock(TopologyService.class), mock(OutgoingSnapshotsManager.class), partitionAccess, + mock(CatalogService.class), mock(Executor.class) ); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageTest.java index ed83f131e0..a51c75e468 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageTest.java @@ -24,6 +24,7 @@ import static org.hamcrest.Matchers.nullValue; import static org.mockito.Mockito.mock; import java.util.concurrent.Executor; +import org.apache.ignite.internal.catalog.CatalogService; import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager; import org.apache.ignite.internal.table.distributed.raft.snapshot.startup.StartupPartitionSnapshotReader; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; @@ -61,6 +62,7 @@ class PartitionSnapshotStorageTest extends BaseIgniteAbstractTest { "", mock(RaftOptions.class), mock(PartitionAccess.class), + mock(CatalogService.class), metaForCleanStorage, mock(Executor.class) ); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java index 9bc08ce2cb..9574aa777a 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java @@ -32,6 +32,7 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.not; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; @@ -39,6 +40,7 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -54,8 +56,10 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.ignite.internal.binarytuple.BinaryTupleReader; +import org.apache.ignite.internal.catalog.CatalogService; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.Column; @@ -80,6 +84,7 @@ import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey; import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionSnapshotStorage; import org.apache.ignite.internal.table.distributed.raft.snapshot.SnapshotUri; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaRequest; +import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaResponse; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataRequest; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse.ResponseEntry; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataRequest; @@ -98,6 +103,7 @@ import org.apache.ignite.network.TopologyService; import org.apache.ignite.raft.jraft.RaftMessagesFactory; import org.apache.ignite.raft.jraft.Status; import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta; +import org.apache.ignite.raft.jraft.error.RaftError; import org.apache.ignite.raft.jraft.option.RaftOptions; import org.apache.ignite.raft.jraft.option.SnapshotCopierOptions; import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotCopier; @@ -137,13 +143,28 @@ public class IncomingSnapshotCopierTest extends BaseIgniteAbstractTest { private final RaftGroupConfigurationConverter raftGroupConfigurationConverter = new RaftGroupConfigurationConverter(); - private MvGc mvGc; + private final MvGc mvGc = mock(MvGc.class); + + private final CatalogService catalogService = mock(CatalogService.class); + + private final MvPartitionStorage outgoingMvPartitionStorage = new TestMvPartitionStorage(TEST_PARTITION); + private final TxStateStorage outgoingTxStatePartitionStorage = new TestTxStateStorage(); + + private final MvTableStorage incomingMvTableStorage = spy(new TestMvTableStorage(TABLE_ID, DEFAULT_PARTITION_COUNT)); + private final TxStateTableStorage incomingTxStateTableStorage = spy(new TestTxStateTableStorage()); + + private final long expLastAppliedIndex = 100500L; + private final long expLastAppliedTerm = 100L; + private final RaftGroupConfiguration expLastGroupConfig = generateRaftGroupConfig(); + + private final List<RowId> rowIds = generateRowIds(); + private final List<UUID> txIds = generateTxIds(); @BeforeEach void setUp() { - mvGc = mock(MvGc.class); - when(mvGc.removeStorage(any(TablePartitionId.class))).then(invocation -> completedFuture(null)); + + when(catalogService.catalogReadyFuture(anyInt())).thenReturn(completedFuture(null)); } @AfterEach @@ -153,27 +174,12 @@ public class IncomingSnapshotCopierTest extends BaseIgniteAbstractTest { @Test void test() { - MvPartitionStorage outgoingMvPartitionStorage = new TestMvPartitionStorage(TEST_PARTITION); - TxStateStorage outgoingTxStatePartitionStorage = new TestTxStateStorage(); - - long expLastAppliedIndex = 100500L; - long expLastAppliedTerm = 100L; - RaftGroupConfiguration expLastGroupConfig = generateRaftGroupConfig(); + fillOriginalStorages(); - List<RowId> rowIds = generateRowIds(); - List<UUID> txIds = generateTxIds(); - - fillMvPartitionStorage(outgoingMvPartitionStorage, expLastAppliedIndex, expLastAppliedTerm, expLastGroupConfig, rowIds); - fillTxStatePartitionStorage(outgoingTxStatePartitionStorage, expLastAppliedIndex, expLastAppliedTerm, txIds); - - MvTableStorage incomingMvTableStorage = spy(new TestMvTableStorage(TABLE_ID, DEFAULT_PARTITION_COUNT)); - TxStateTableStorage incomingTxStateTableStorage = spy(new TestTxStateTableStorage()); - - assertThat(incomingMvTableStorage.createMvPartition(TEST_PARTITION), willCompleteSuccessfully()); - incomingTxStateTableStorage.getOrCreateTxStateStorage(TEST_PARTITION); + createTargetStorages(); MessagingService messagingService = messagingServiceForSuccessScenario(outgoingMvPartitionStorage, - outgoingTxStatePartitionStorage, expLastAppliedIndex, expLastAppliedTerm, expLastGroupConfig, rowIds, txIds, snapshotId); + outgoingTxStatePartitionStorage, rowIds, txIds); PartitionSnapshotStorage partitionSnapshotStorage = createPartitionSnapshotStorage( snapshotId, @@ -215,31 +221,21 @@ public class IncomingSnapshotCopierTest extends BaseIgniteAbstractTest { verify(incomingTxStatePartitionStorage, times(1)).startRebalance(); } - private MessagingService messagingServiceForSuccessScenario(MvPartitionStorage outgoingMvPartitionStorage, - TxStateStorage outgoingTxStatePartitionStorage, long expLastAppliedIndex, long expLastAppliedTerm, - RaftGroupConfiguration expLastGroupConfig, List<RowId> rowIds, List<UUID> txIds, UUID snapshotId) { - MessagingService messagingService = mock(MessagingService.class); + private void createTargetStorages() { + assertThat(incomingMvTableStorage.createMvPartition(TEST_PARTITION), willCompleteSuccessfully()); + incomingTxStateTableStorage.getOrCreateTxStateStorage(TEST_PARTITION); + } - when(messagingService.invoke(eq(clusterNode), any(SnapshotMetaRequest.class), anyLong())).then(answer -> { - SnapshotMetaRequest snapshotMetaRequest = answer.getArgument(1); + private void fillOriginalStorages() { + fillMvPartitionStorage(outgoingMvPartitionStorage, expLastAppliedIndex, expLastAppliedTerm, expLastGroupConfig, rowIds); + fillTxStatePartitionStorage(outgoingTxStatePartitionStorage, expLastAppliedIndex, expLastAppliedTerm, txIds); + } - assertEquals(snapshotId, snapshotMetaRequest.id()); + private MessagingService messagingServiceForSuccessScenario(MvPartitionStorage outgoingMvPartitionStorage, + TxStateStorage outgoingTxStatePartitionStorage, List<RowId> rowIds, List<UUID> txIds) { + MessagingService messagingService = mock(MessagingService.class); - return completedFuture( - TABLE_MSG_FACTORY.snapshotMetaResponse() - .meta( - RAFT_MSG_FACTORY.snapshotMeta() - .lastIncludedIndex(expLastAppliedIndex) - .lastIncludedTerm(expLastAppliedTerm) - .peersList(expLastGroupConfig.peers()) - .learnersList(expLastGroupConfig.learners()) - .oldPeersList(expLastGroupConfig.oldPeers()) - .oldLearnersList(expLastGroupConfig.oldLearners()) - .build() - ) - .build() - ); - }); + returnSnapshotMetaWhenAskedForIt(messagingService); when(messagingService.invoke(eq(clusterNode), any(SnapshotMvDataRequest.class), anyLong())).then(answer -> { SnapshotMvDataRequest snapshotMvDataRequest = answer.getArgument(1); @@ -266,6 +262,32 @@ public class IncomingSnapshotCopierTest extends BaseIgniteAbstractTest { return messagingService; } + private void returnSnapshotMetaWhenAskedForIt(MessagingService messagingService) { + when(messagingService.invoke(eq(clusterNode), any(SnapshotMetaRequest.class), anyLong())).then(answer -> { + SnapshotMetaRequest snapshotMetaRequest = answer.getArgument(1); + + assertEquals(snapshotId, snapshotMetaRequest.id()); + + return completedFuture(snapshotMetaResponse(0)); + }); + } + + private SnapshotMetaResponse snapshotMetaResponse(int requiredCatalogVersion) { + return TABLE_MSG_FACTORY.snapshotMetaResponse() + .meta( + RAFT_MSG_FACTORY.snapshotMeta() + .lastIncludedIndex(expLastAppliedIndex) + .lastIncludedTerm(expLastAppliedTerm) + .peersList(expLastGroupConfig.peers()) + .learnersList(expLastGroupConfig.learners()) + .oldPeersList(expLastGroupConfig.oldPeers()) + .oldLearnersList(expLastGroupConfig.oldLearners()) + .requiredCatalogVersion(requiredCatalogVersion) + .build() + ) + .build(); + } + private PartitionSnapshotStorage createPartitionSnapshotStorage( UUID snapshotId, MvTableStorage incomingTableStorage, @@ -293,6 +315,7 @@ public class IncomingSnapshotCopierTest extends BaseIgniteAbstractTest { mock(IndexUpdateHandler.class), mock(GcUpdateHandler.class) )), + catalogService, mock(SnapshotMeta.class), executorService ); @@ -435,18 +458,14 @@ public class IncomingSnapshotCopierTest extends BaseIgniteAbstractTest { } @Test - void cancellationMakesJoinFinishIfHangingOnNetworkCall() throws Exception { - MvTableStorage incomingMvTableStorage = spy(new TestMvTableStorage(TABLE_ID, DEFAULT_PARTITION_COUNT)); - TxStateTableStorage incomingTxStateTableStorage = spy(new TestTxStateTableStorage()); - - assertThat(incomingMvTableStorage.createMvPartition(TEST_PARTITION), willCompleteSuccessfully()); - incomingTxStateTableStorage.getOrCreateTxStateStorage(TEST_PARTITION); + void cancellationMakesJoinFinishIfHangingOnNetworkCallToSnapshotMetadata() throws Exception { + createTargetStorages(); CountDownLatch networkInvokeLatch = new CountDownLatch(1); MessagingService messagingService = mock(MessagingService.class); - when(messagingService.invoke(any(ClusterNode.class), any(), anyLong())).then(invocation -> { + when(messagingService.invoke(any(ClusterNode.class), any(SnapshotMetaRequest.class), anyLong())).then(invocation -> { networkInvokeLatch.countDown(); return new CompletableFuture<>(); @@ -474,32 +493,58 @@ public class IncomingSnapshotCopierTest extends BaseIgniteAbstractTest { assertThat(cancelAndJoinFuture, willSucceedIn(1, TimeUnit.SECONDS)); - verify(partitionSnapshotStorage.partition()).abortRebalance(); + verify(partitionSnapshotStorage.partition(), never()).startRebalance(); + verify(partitionSnapshotStorage.partition(), never()).abortRebalance(); } @Test - void testCancelOnMiddleRebalance() { - MvPartitionStorage outgoingMvPartitionStorage = new TestMvPartitionStorage(TEST_PARTITION); - TxStateStorage outgoingTxStatePartitionStorage = new TestTxStateStorage(); + void cancellationMakesJoinFinishIfHangingOnNetworkCallWhenGettingData() throws Exception { + createTargetStorages(); - long expLastAppliedIndex = 100500L; - long expLastAppliedTerm = 100L; - RaftGroupConfiguration expLastGroupConfig = generateRaftGroupConfig(); + CountDownLatch networkInvokeLatch = new CountDownLatch(1); - List<RowId> rowIds = generateRowIds(); - List<UUID> txIds = generateTxIds(); + MessagingService messagingService = mock(MessagingService.class); - fillMvPartitionStorage(outgoingMvPartitionStorage, expLastAppliedIndex, expLastAppliedTerm, expLastGroupConfig, rowIds); - fillTxStatePartitionStorage(outgoingTxStatePartitionStorage, expLastAppliedIndex, expLastAppliedTerm, txIds); + returnSnapshotMetaWhenAskedForIt(messagingService); + when(messagingService.invoke(any(ClusterNode.class), any(SnapshotMvDataRequest.class), anyLong())).then(invocation -> { + networkInvokeLatch.countDown(); - MvTableStorage incomingMvTableStorage = spy(new TestMvTableStorage(TABLE_ID, DEFAULT_PARTITION_COUNT)); - TxStateTableStorage incomingTxStateTableStorage = spy(new TestTxStateTableStorage()); + return new CompletableFuture<>(); + }); - assertThat(incomingMvTableStorage.createMvPartition(TEST_PARTITION), willCompleteSuccessfully()); - incomingTxStateTableStorage.getOrCreateTxStateStorage(TEST_PARTITION); + PartitionSnapshotStorage partitionSnapshotStorage = createPartitionSnapshotStorage( + snapshotId, + incomingMvTableStorage, + incomingTxStateTableStorage, + messagingService + ); + + SnapshotCopier snapshotCopier = partitionSnapshotStorage.startToCopyFrom( + SnapshotUri.toStringUri(snapshotId, NODE_NAME), + mock(SnapshotCopierOptions.class) + ); + + networkInvokeLatch.await(1, TimeUnit.SECONDS); + + CompletableFuture<?> cancelAndJoinFuture = runAsync(() -> { + snapshotCopier.cancel(); + + snapshotCopier.join(); + }); + + assertThat(cancelAndJoinFuture, willSucceedIn(1, TimeUnit.SECONDS)); + + verify(partitionSnapshotStorage.partition()).abortRebalance(); + } + + @Test + void testCancelOnMiddleRebalance() { + fillOriginalStorages(); + + createTargetStorages(); MessagingService messagingService = messagingServiceForSuccessScenario(outgoingMvPartitionStorage, - outgoingTxStatePartitionStorage, expLastAppliedIndex, expLastAppliedTerm, expLastGroupConfig, rowIds, txIds, snapshotId); + outgoingTxStatePartitionStorage, rowIds, txIds); PartitionSnapshotStorage partitionSnapshotStorage = createPartitionSnapshotStorage( snapshotId, @@ -547,27 +592,12 @@ public class IncomingSnapshotCopierTest extends BaseIgniteAbstractTest { @Test void testErrorInProcessOfRebalance() { - MvPartitionStorage outgoingMvPartitionStorage = new TestMvPartitionStorage(TEST_PARTITION); - TxStateStorage outgoingTxStatePartitionStorage = new TestTxStateStorage(); - - long expLastAppliedIndex = 100500L; - long expLastAppliedTerm = 100L; - RaftGroupConfiguration expLastGroupConfig = generateRaftGroupConfig(); - - List<RowId> rowIds = generateRowIds(); - List<UUID> txIds = generateTxIds(); - - fillMvPartitionStorage(outgoingMvPartitionStorage, expLastAppliedIndex, expLastAppliedTerm, expLastGroupConfig, rowIds); - fillTxStatePartitionStorage(outgoingTxStatePartitionStorage, expLastAppliedIndex, expLastAppliedTerm, txIds); + fillOriginalStorages(); - MvTableStorage incomingMvTableStorage = spy(new TestMvTableStorage(TABLE_ID, DEFAULT_PARTITION_COUNT)); - TxStateTableStorage incomingTxStateTableStorage = spy(new TestTxStateTableStorage()); - - assertThat(incomingMvTableStorage.createMvPartition(TEST_PARTITION), willCompleteSuccessfully()); - incomingTxStateTableStorage.getOrCreateTxStateStorage(TEST_PARTITION); + createTargetStorages(); MessagingService messagingService = messagingServiceForSuccessScenario(outgoingMvPartitionStorage, - outgoingTxStatePartitionStorage, expLastAppliedIndex, expLastAppliedTerm, expLastGroupConfig, rowIds, txIds, snapshotId); + outgoingTxStatePartitionStorage, rowIds, txIds); PartitionSnapshotStorage partitionSnapshotStorage = createPartitionSnapshotStorage( snapshotId, @@ -642,4 +672,60 @@ public class IncomingSnapshotCopierTest extends BaseIgniteAbstractTest { List.of("old-learner") ); } + + @Test + void laggingSchemasPreventSnapshotInstallation() { + fillOriginalStorages(); + + createTargetStorages(); + + MessagingService messagingService = mock(MessagingService.class); + + when(messagingService.invoke(eq(clusterNode), any(SnapshotMetaRequest.class), anyLong())) + .thenReturn(completedFuture(snapshotMetaResponse(42))); + + PartitionSnapshotStorage partitionSnapshotStorage = createPartitionSnapshotStorage( + snapshotId, + incomingMvTableStorage, + incomingTxStateTableStorage, + messagingService + ); + + SnapshotCopier snapshotCopier = partitionSnapshotStorage.startToCopyFrom( + SnapshotUri.toStringUri(snapshotId, NODE_NAME), + mock(SnapshotCopierOptions.class) + ); + + assertThat(runAsync(snapshotCopier::join), willSucceedIn(1, TimeUnit.SECONDS)); + + assertEquals(RaftError.EBUSY.getNumber(), snapshotCopier.getCode()); + + verify(messagingService, never()).invoke(any(ClusterNode.class), any(SnapshotMvDataRequest.class), anyLong()); + verify(messagingService, never()).invoke(any(ClusterNode.class), any(SnapshotTxDataRequest.class), anyLong()); + + verify(partitionSnapshotStorage.partition(), never()).startRebalance(); + verify(partitionSnapshotStorage.partition(), never()).abortRebalance(); + + assertThatTargetStoragesAreEmpty(incomingMvTableStorage, incomingTxStateTableStorage); + } + + private static void assertThatTargetStoragesAreEmpty( + MvTableStorage incomingMvTableStorage, + TxStateTableStorage incomingTxStateTableStorage + ) { + MvPartitionStorage incomingMvPartitionStorage = incomingMvTableStorage.getMvPartition(TEST_PARTITION); + TxStateStorage incomingTxStatePartitionStorage = incomingTxStateTableStorage.getTxStateStorage(TEST_PARTITION); + + assertEquals(0L, incomingMvPartitionStorage.lastAppliedIndex()); + assertEquals(0L, incomingMvPartitionStorage.lastAppliedTerm()); + assertArrayEquals( + null, + incomingMvPartitionStorage.committedGroupConfiguration() + ); + assertEquals(0L, incomingTxStatePartitionStorage.lastAppliedIndex()); + assertEquals(0L, incomingTxStatePartitionStorage.lastAppliedTerm()); + + assertFalse(incomingMvPartitionStorage.scan(HybridTimestamp.MAX_VALUE).hasNext()); + assertFalse(incomingTxStatePartitionStorage.scan().hasNext()); + } } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotCommonTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotCommonTest.java index c4bf8a1b5a..74a19e9471 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotCommonTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotCommonTest.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.when; import java.util.List; import java.util.UUID; +import org.apache.ignite.internal.catalog.CatalogService; import org.apache.ignite.internal.table.distributed.TableMessagesFactory; import org.apache.ignite.internal.table.distributed.raft.RaftGroupConfiguration; import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccess; @@ -44,17 +45,22 @@ class OutgoingSnapshotCommonTest extends BaseIgniteAbstractTest { @Mock private PartitionAccess partitionAccess; + @Mock + private CatalogService catalogService; + private OutgoingSnapshot snapshot; private final TableMessagesFactory messagesFactory = new TableMessagesFactory(); private final PartitionKey partitionKey = new PartitionKey(1, 1); + private static final int REQUIRED_CATALOG_VERSION = 42; + @BeforeEach void createTestInstance() { when(partitionAccess.partitionKey()).thenReturn(partitionKey); - snapshot = new OutgoingSnapshot(UUID.randomUUID(), partitionAccess); + snapshot = new OutgoingSnapshot(UUID.randomUUID(), partitionAccess, catalogService); } @Test @@ -75,6 +81,8 @@ class OutgoingSnapshotCommonTest extends BaseIgniteAbstractTest { List.of("learner1:3000") )); + when(catalogService.latestCatalogVersion()).thenReturn(REQUIRED_CATALOG_VERSION); + snapshot.freezeScopeUnderMvLock(); SnapshotMetaResponse response = getSnapshotMetaResponse(); @@ -85,6 +93,7 @@ class OutgoingSnapshotCommonTest extends BaseIgniteAbstractTest { assertThat(response.meta().learnersList(), is(List.of("learner1:3000", "learner2:3000"))); assertThat(response.meta().oldPeersList(), is(List.of("peer1:3000"))); assertThat(response.meta().oldLearnersList(), is(List.of("learner1:3000"))); + assertThat(response.meta().requiredCatalogVersion(), is(REQUIRED_CATALOG_VERSION)); } private SnapshotMetaResponse getSnapshotMetaResponse() { diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java index 1e868a3190..4615eec80a 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java @@ -34,6 +34,7 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.Objects; import java.util.UUID; +import org.apache.ignite.internal.catalog.CatalogService; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.schema.BinaryRow; @@ -61,6 +62,9 @@ class OutgoingSnapshotMvDataStreamingTest extends BaseIgniteAbstractTest { @Mock private PartitionAccess partitionAccess; + @Mock + private CatalogService catalogService; + private OutgoingSnapshot snapshot; private final TableMessagesFactory messagesFactory = new TableMessagesFactory(); @@ -83,7 +87,7 @@ class OutgoingSnapshotMvDataStreamingTest extends BaseIgniteAbstractTest { void createTestInstance() { when(partitionAccess.partitionKey()).thenReturn(partitionKey); - snapshot = new OutgoingSnapshot(UUID.randomUUID(), partitionAccess); + snapshot = new OutgoingSnapshot(UUID.randomUUID(), partitionAccess, catalogService); } @BeforeEach diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java index 82a598f4ca..c1ed183e0e 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java @@ -24,6 +24,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.util.concurrent.Executor; +import org.apache.ignite.internal.catalog.CatalogService; import org.apache.ignite.internal.table.distributed.raft.RaftGroupConfiguration; import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccess; import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey; @@ -60,6 +61,7 @@ public class OutgoingSnapshotReaderTest extends BaseIgniteAbstractTest { "", mock(RaftOptions.class), partitionAccess, + mock(CatalogService.class), mock(SnapshotMeta.class), mock(Executor.class) ); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotTxDataStreamingTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotTxDataStreamingTest.java index 3e25c9b1e1..998bfd41c2 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotTxDataStreamingTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotTxDataStreamingTest.java @@ -33,6 +33,7 @@ import static org.mockito.Mockito.when; import java.util.List; import java.util.UUID; +import org.apache.ignite.internal.catalog.CatalogService; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.replicator.TablePartitionId; @@ -59,6 +60,9 @@ class OutgoingSnapshotTxDataStreamingTest extends BaseIgniteAbstractTest { @Mock private PartitionAccess partitionAccess; + @Mock + private CatalogService catalogService; + private OutgoingSnapshot snapshot; private final TableMessagesFactory messagesFactory = new TableMessagesFactory(); @@ -82,7 +86,7 @@ class OutgoingSnapshotTxDataStreamingTest extends BaseIgniteAbstractTest { lenient().when(partitionAccess.committedGroupConfiguration()).thenReturn(mock(RaftGroupConfiguration.class)); - snapshot = new OutgoingSnapshot(UUID.randomUUID(), partitionAccess); + snapshot = new OutgoingSnapshot(UUID.randomUUID(), partitionAccess, catalogService); } @Test diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManagerTest.java index e01f7c9a25..6b57d31250 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManagerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManagerTest.java @@ -26,6 +26,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.util.UUID; +import org.apache.ignite.internal.catalog.CatalogService; import org.apache.ignite.internal.table.distributed.raft.RaftGroupConfiguration; import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccess; import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey; @@ -44,6 +45,9 @@ class OutgoingSnapshotsManagerTest extends BaseIgniteAbstractTest { @Mock private PartitionAccess partitionAccess; + @Mock + private CatalogService catalogService; + private final PartitionKey partitionKey = new PartitionKey(1, 1); @SuppressWarnings("EmptyTryBlock") @@ -68,7 +72,7 @@ class OutgoingSnapshotsManagerTest extends BaseIgniteAbstractTest { when(partitionAccess.committedGroupConfiguration()).thenReturn(mock(RaftGroupConfiguration.class)); - OutgoingSnapshot snapshot = new OutgoingSnapshot(UUID.randomUUID(), partitionAccess); + OutgoingSnapshot snapshot = new OutgoingSnapshot(UUID.randomUUID(), partitionAccess, catalogService); assertDoesNotThrow(() -> manager.startOutgoingSnapshot(UUID.randomUUID(), snapshot)); } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtilsTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtilsTest.java index f29cae8d0a..054ebe7e7d 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtilsTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtilsTest.java @@ -38,7 +38,7 @@ class SnapshotMetaUtilsTest extends BaseIgniteAbstractTest { List.of("peer1:3000"), List.of("learner1:3000") ); - SnapshotMeta meta = SnapshotMetaUtils.snapshotMetaAt(100, 3, config); + SnapshotMeta meta = SnapshotMetaUtils.snapshotMetaAt(100, 3, config, 42); assertThat(meta.lastIncludedIndex(), is(100L)); assertThat(meta.lastIncludedTerm(), is(3L)); @@ -46,11 +46,12 @@ class SnapshotMetaUtilsTest extends BaseIgniteAbstractTest { assertThat(meta.learnersList(), is(List.of("learner1:3000", "learner2:3000"))); assertThat(meta.oldPeersList(), is(List.of("peer1:3000"))); assertThat(meta.oldLearnersList(), is(List.of("learner1:3000"))); + assertThat(meta.requiredCatalogVersion(), is(42)); } @Test void doesNotIncludeOldConfigWhenItIsNotThere() { - SnapshotMeta meta = SnapshotMetaUtils.snapshotMetaAt(100, 3, new RaftGroupConfiguration(List.of(), List.of(), null, null)); + SnapshotMeta meta = SnapshotMetaUtils.snapshotMetaAt(100, 3, new RaftGroupConfiguration(List.of(), List.of(), null, null), 42); assertThat(meta.oldPeersList(), is(nullValue())); assertThat(meta.oldLearnersList(), is(nullValue()));