This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 27ce7f278d IGNITE-22808 Migrate running node missed during CMG
reparation to new cluster (#4315)
27ce7f278d is described below
commit 27ce7f278d10cf92a2b20b46737b37822ebbbcd9
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Sat Aug 31 22:05:57 2024 +0400
IGNITE-22808 Migrate running node missed during CMG reparation to new
cluster (#4315)
---
.../management/ClusterManagementGroupManager.java | 18 ++-
.../org/apache/ignite/internal/app/IgniteImpl.java | 4 +-
.../system/message/ResetClusterMessage.java | 5 -
.../disaster/system/ItCmgDisasterRecoveryTest.java | 132 ++++++++++++++++++++-
.../system/SystemDisasterRecoveryManager.java | 9 ++
.../system/SystemDisasterRecoveryManagerImpl.java | 66 +++++++++--
.../SystemDisasterRecoveryManagerImplTest.java | 81 +++++++++++--
7 files changed, 277 insertions(+), 38 deletions(-)
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index 9d8affae0b..e9781a74d9 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -33,6 +33,7 @@ import static
org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
import java.util.Collection;
import java.util.List;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executors;
@@ -392,7 +393,13 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
cmgMessageHandler.onRecoveryComplete();
return serviceFuture
- .thenCompose(service -> doInit(service,
cmgInitMessageFromResetClusterMessage(resetClusterMessage)))
+ .thenCompose(
+ service -> doInit(
+ service,
+
cmgInitMessageFromResetClusterMessage(resetClusterMessage),
+ resetClusterMessage.formerClusterIds()
+ )
+ )
.thenApply(unused -> null);
}
@@ -482,7 +489,7 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
// handle this case by applying only the first attempt and
returning the actual cluster state for all other
// attempts.
raftService = serviceFuture
- .thenCompose(service -> doInit(service, msg)
+ .thenCompose(service -> doInit(service, msg, null)
.handle((v, e) -> {
NetworkMessage response;
@@ -513,8 +520,8 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
}
}
- private CompletableFuture<CmgRaftService> doInit(CmgRaftService service,
CmgInitMessage msg) {
- return service.initClusterState(createClusterState(msg))
+ private CompletableFuture<CmgRaftService> doInit(CmgRaftService service,
CmgInitMessage msg, @Nullable List<UUID> formerClusterIds) {
+ return service.initClusterState(createClusterState(msg,
formerClusterIds))
.thenCompose(state -> {
var localState = new LocalState(state.cmgNodes(),
state.clusterTag());
@@ -526,13 +533,14 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
});
}
- private ClusterState createClusterState(CmgInitMessage msg) {
+ private ClusterState createClusterState(CmgInitMessage msg, @Nullable
List<UUID> formerClusterIds) {
return msgFactory.clusterState()
.cmgNodes(Set.copyOf(msg.cmgNodes()))
.metaStorageNodes(Set.copyOf(msg.metaStorageNodes()))
.version(IgniteProductVersion.CURRENT_VERSION.toString())
.clusterTag(clusterTag(msgFactory, msg.clusterName(),
msg.clusterId()))
.initialClusterConfiguration(msg.initialClusterConfiguration())
+ .formerClusterIds(formerClusterIds)
.build();
}
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index a095b99611..d610ce3697 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -1444,8 +1444,8 @@ public class IgniteImpl implements Ignite {
}
@TestOnly
- public ClusterStateStorage clusterStateStorage() {
- return clusterStateStorage;
+ public ClusterManagementGroupManager clusterManagementGroupManager() {
+ return cmgMgr;
}
@TestOnly
diff --git
a/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/ResetClusterMessage.java
b/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/ResetClusterMessage.java
index 7acc6bff0f..2cce9a9663 100644
---
a/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/ResetClusterMessage.java
+++
b/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/ResetClusterMessage.java
@@ -53,9 +53,4 @@ public interface ResetClusterMessage extends NetworkMessage,
Serializable {
* IDs that the cluster had before (including the current incarnation by
which this message is sent).
*/
List<UUID> formerClusterIds();
-
- /**
- * Consistent ID of the node that conducts the cluster reset.
- */
- String conductor();
}
diff --git
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItCmgDisasterRecoveryTest.java
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItCmgDisasterRecoveryTest.java
index 3cdf5bd9ee..7f3b0a2cd6 100644
---
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItCmgDisasterRecoveryTest.java
+++
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItCmgDisasterRecoveryTest.java
@@ -18,22 +18,30 @@
package org.apache.ignite.internal.disaster.system;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutIn;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
import java.util.stream.IntStream;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.app.IgniteServerImpl;
+import org.apache.ignite.internal.cluster.management.ClusterState;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Test;
@@ -52,6 +60,8 @@ class ItCmgDisasterRecoveryTest extends
ClusterPerTestIntegrationTest {
});
waitTillClusterStateIsSavedToVaultOnConductor(1);
+ UUID originalClusterId =
clusterState(igniteImpl(0)).clusterTag().clusterId();
+
// This makes the CMG majority go away.
cluster.stopNode(0);
@@ -64,6 +74,10 @@ class ItCmgDisasterRecoveryTest extends
ClusterPerTestIntegrationTest {
IgniteImpl restartedIgniteImpl1 = waitTillNodeRestartsInternally(1);
waitTillCmgHasMajority(restartedIgniteImpl1);
+ ClusterState newClusterState = clusterState(restartedIgniteImpl1);
+ assertThat(newClusterState.clusterTag().clusterId(),
is(not(originalClusterId)));
+ assertThat(newClusterState.formerClusterIds(),
contains(originalClusterId));
+
assertResetClusterMessageIsNotPresentAt(restartedIgniteImpl1);
assertResetClusterMessageIsNotPresentAt(waitTillNodeRestartsInternally(2));
}
@@ -100,7 +114,10 @@ class ItCmgDisasterRecoveryTest extends
ClusterPerTestIntegrationTest {
private IgniteImpl waitTillNodeRestartsInternally(int nodeIndex) throws
InterruptedException {
// restartOrShutdownFuture() becomes non-null when restart or shutdown
is initiated; we know it's restart.
- assertTrue(waitForCondition(() -> restartOrShutdownFuture(nodeIndex)
!= null, SECONDS.toMillis(20)));
+ assertTrue(
+ waitForCondition(() -> restartOrShutdownFuture(nodeIndex) !=
null, SECONDS.toMillis(20)),
+ "Node did not attempt to be restarted (or shut down) in time"
+ );
assertThat(restartOrShutdownFuture(nodeIndex),
willCompleteSuccessfully());
return unwrapIgniteImpl(cluster.server(nodeIndex).api());
@@ -111,6 +128,11 @@ class ItCmgDisasterRecoveryTest extends
ClusterPerTestIntegrationTest {
return ((IgniteServerImpl)
cluster.server(nodeIndex)).restartOrShutdownFuture();
}
+ private static ClusterState clusterState(IgniteImpl restartedIgniteImpl1)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ return
restartedIgniteImpl1.clusterManagementGroupManager().clusterState().get(10,
SECONDS);
+ }
+
private static void assertResetClusterMessageIsNotPresentAt(IgniteImpl
ignite) {
assertThat(new
SystemDisasterRecoveryStorage(ignite.vault()).readResetClusterMessage(),
is(nullValue()));
}
@@ -200,7 +222,7 @@ class ItCmgDisasterRecoveryTest extends
ClusterPerTestIntegrationTest {
}
@Test
- void nodesThatSawNoReparationHaveSeparatePhysicalTopology() throws
Exception {
+ void nodesThatSawNoReparationHaveSeparatePhysicalTopologies() throws
Exception {
cluster.startAndInit(2, paramsBuilder -> {
paramsBuilder.cmgNodeNames(nodeNames(0));
paramsBuilder.metaStorageNodeNames(nodeNames(1));
@@ -223,4 +245,110 @@ class ItCmgDisasterRecoveryTest extends
ClusterPerTestIntegrationTest {
"Nodes from different clusters were able to establish a
connection"
);
}
+
+ @Test
+ void migratesNodesThatSawNoReparationToNewCluster() throws Exception {
+ cluster.startAndInit(2, paramsBuilder -> {
+ paramsBuilder.cmgNodeNames(nodeNames(0));
+ paramsBuilder.metaStorageNodeNames(nodeNames(1));
+ });
+ waitTillClusterStateIsSavedToVaultOnConductor(1);
+
+ // This makes the CMG majority go away.
+ cluster.stopNode(0);
+
+ // Repair CMG with just node 1.
+ initiateCmgRepairVia(igniteImpl(1), 1);
+ IgniteImpl restartedIgniteImpl1 = waitTillNodeRestartsInternally(1);
+ waitTillCmgHasMajority(restartedIgniteImpl1);
+
+ migrate(0, 1);
+
+ LogicalTopologySnapshot topologySnapshot =
igniteImpl(1).logicalTopologyService().logicalTopologyOnLeader().get(10,
SECONDS);
+ assertTopologyContainsNode(0, topologySnapshot);
+ }
+
+ private void assertTopologyContainsNode(int nodeIndex,
LogicalTopologySnapshot topologySnapshot) {
+ assertTrue(topologySnapshot.nodes().stream().anyMatch(node ->
node.name().equals(cluster.nodeName(nodeIndex))));
+ }
+
+ private void migrate(int oldClusterNodeIndex, int newClusterNodeIndex)
throws Exception {
+ // Starting the node that did not see the repair.
+ IgniteImpl nodeMissingRepair = ((IgniteServerImpl)
cluster.startEmbeddedNode(oldClusterNodeIndex).server()).igniteImpl();
+
+ initiateMigrationToNewCluster(nodeMissingRepair,
igniteImpl(newClusterNodeIndex));
+
+ waitTillNodeRestartsInternally(oldClusterNodeIndex);
+ }
+
+ private static void initiateMigrationToNewCluster(IgniteImpl
nodeMissingRepair, IgniteImpl repairedNode) throws Exception {
+ // TODO: IGNITE-22879 - initiate migration via CLI.
+
+ ClusterState newClusterState = clusterState(repairedNode);
+
+ CompletableFuture<Void> migrationFuture =
nodeMissingRepair.systemDisasterRecoveryManager().migrate(newClusterState);
+ assertThat(migrationFuture, willCompleteSuccessfully());
+ }
+
+ @Test
+ void migratesManyNodesThatSawNoReparationToNewCluster() throws Exception {
+ cluster.startAndInit(5, paramsBuilder -> {
+ paramsBuilder.cmgNodeNames(nodeNames(0, 1, 2));
+ paramsBuilder.metaStorageNodeNames(nodeNames(2, 3, 4));
+ });
+ waitTillClusterStateIsSavedToVaultOnConductor(2);
+
+ // Stop the majority of CMG.
+ IntStream.of(0, 1).parallel().forEach(cluster::stopNode);
+
+ // Repair CMG with nodes 2, 3, 4.
+ initiateCmgRepairVia(igniteImpl(2), 2, 3, 4);
+ IgniteImpl restartedIgniteImpl2 = waitTillNodeRestartsInternally(2);
+ waitTillCmgHasMajority(restartedIgniteImpl2);
+
+ // Starting the nodes that did not see the repair (in parallel, to
save time).
+ List<IgniteImpl> partialNodes = IntStream.of(0, 1).parallel()
+ .mapToObj(index -> ((IgniteServerImpl)
cluster.startEmbeddedNode(index).server()).igniteImpl())
+ .collect(toList());
+
+ initiateMigrationToNewCluster(partialNodes.get(0), igniteImpl(2));
+
+ waitTillNodeRestartsInternally(0);
+ waitTillNodeRestartsInternally(1);
+
+ LogicalTopologySnapshot topologySnapshot =
igniteImpl(2).logicalTopologyService().logicalTopologyOnLeader().get(10,
SECONDS);
+ assertTopologyContainsNode(0, topologySnapshot);
+ assertTopologyContainsNode(1, topologySnapshot);
+
+ // TODO: IGNITE-23096 - remove after the hang is fixed.
+ waitTillNodesRestartInProcess(3, 4);
+ }
+
+ @Test
+ void repeatedRepairWorks() throws Exception {
+ cluster.startAndInit(2, paramsBuilder -> {
+ paramsBuilder.cmgNodeNames(nodeNames(0));
+ paramsBuilder.metaStorageNodeNames(nodeNames(1));
+ });
+ waitTillClusterStateIsSavedToVaultOnConductor(1);
+
+ // This makes the CMG majority go away.
+ cluster.stopNode(0);
+
+ // Repair CMG with just node 1.
+ initiateCmgRepairVia(igniteImpl(1), 1);
+ IgniteImpl restartedIgniteImpl1 = waitTillNodeRestartsInternally(1);
+ waitTillCmgHasMajority(restartedIgniteImpl1);
+
+ // Starting the node that did not see the repair.
+ migrate(0, 1);
+
+ // Second repair.
+ initiateCmgRepairVia(igniteImpl(1), 1);
+ IgniteImpl igniteImpl1RestartedSecondTime =
waitTillNodeRestartsInternally(1);
+ waitTillCmgHasMajority(igniteImpl1RestartedSecondTime);
+
+ // TODO: IGNITE-23096 - remove after the hang is fixed.
+ waitTillNodesRestartInProcess(0, 1);
+ }
}
diff --git
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManager.java
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManager.java
index ea2779a244..6521760ccd 100644
---
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManager.java
+++
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManager.java
@@ -46,4 +46,13 @@ public interface SystemDisasterRecoveryManager {
* @return Future completing with the result of the operation ({@link
ResetClusterMessage} in case of error related to reset logic).
*/
CompletableFuture<Void> resetCluster(List<String>
proposedCmgConsistentIds);
+
+ /**
+ * Migrates nodes missed during CMG repair to the new cluster (which is
the result of the repair). To do so, sends the
+ * corresponding {@link ResetClusterMessage} to all nodes that are in the
physical topology (including itself).
+ *
+ * @param targetClusterState State of the new cluster.
+ * @return Future completing with the result of the operation.
+ */
+ CompletableFuture<Void> migrate(ClusterState targetClusterState);
}
diff --git
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImpl.java
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImpl.java
index 90299e3272..fd65b03c3f 100644
---
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImpl.java
+++
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImpl.java
@@ -103,7 +103,7 @@ public class SystemDisasterRecoveryManagerImpl implements
SystemDisasterRecovery
messagingService.respond(sender, successResponseMessage(),
correlationId)
.thenRunAsync(() -> {
- if (!thisNodeName.equals(message.conductor())) {
+ if (!thisNodeName.equals(sender.name())) {
restarter.initiateRestart();
}
}, restartExecutor);
@@ -148,19 +148,14 @@ public class SystemDisasterRecoveryManagerImpl implements
SystemDisasterRecovery
ensureInitConfigApplied();
ClusterState clusterState = ensureClusterStateIsPresent();
- ResetClusterMessage message = buildResetClusterMessage(
- proposedCmgConsistentIds,
- clusterState
- );
+ ResetClusterMessage message =
buildResetClusterMessageForReset(proposedCmgConsistentIds, clusterState);
- Map<String, CompletableFuture<NetworkMessage>> responseFutures =
sendResetClusterMessageTo(
- nodesInTopology,
- message
- );
+ Map<String, CompletableFuture<NetworkMessage>> responseFutures =
sendResetClusterMessageTo(nodesInTopology, message);
return allOf(responseFutures.values())
.handleAsync((res, ex) -> {
// We ignore upstream exceptions on purpose.
+ rethrowIfError(ex);
if (isMajorityOfCmgAreSuccesses(proposedCmgConsistentIds,
responseFutures)) {
restarter.initiateRestart();
@@ -172,8 +167,10 @@ public class SystemDisasterRecoveryManagerImpl implements
SystemDisasterRecovery
}, restartExecutor);
}
- private Map<String, CompletableFuture<NetworkMessage>>
sendResetClusterMessageTo(Collection<ClusterNode> nodesInTopology,
- ResetClusterMessage message) {
+ private Map<String, CompletableFuture<NetworkMessage>>
sendResetClusterMessageTo(
+ Collection<ClusterNode> nodesInTopology,
+ ResetClusterMessage message
+ ) {
Map<String, CompletableFuture<NetworkMessage>> responseFutures = new
HashMap<>();
for (ClusterNode node : nodesInTopology) {
responseFutures.put(node.name(), messagingService.invoke(node,
message, 10_000));
@@ -221,12 +218,11 @@ public class SystemDisasterRecoveryManagerImpl implements
SystemDisasterRecovery
return clusterState;
}
- private ResetClusterMessage buildResetClusterMessage(List<String>
proposedCmgConsistentIds, ClusterState clusterState) {
+ private ResetClusterMessage
buildResetClusterMessageForReset(Collection<String> proposedCmgConsistentIds,
ClusterState clusterState) {
List<UUID> formerClusterIds = new
ArrayList<>(requireNonNullElse(clusterState.formerClusterIds(), new
ArrayList<>()));
formerClusterIds.add(clusterState.clusterTag().clusterId());
return messagesFactory.resetClusterMessage()
- .conductor(thisNodeName)
.cmgNodes(new HashSet<>(proposedCmgConsistentIds))
.metaStorageNodes(clusterState.metaStorageNodes())
.clusterName(clusterState.clusterTag().clusterName())
@@ -235,6 +231,12 @@ public class SystemDisasterRecoveryManagerImpl implements
SystemDisasterRecovery
.build();
}
+ private static void rethrowIfError(Throwable ex) {
+ if (ex instanceof Error) {
+ throw (Error) ex;
+ }
+ }
+
private static boolean isMajorityOfCmgAreSuccesses(
List<String> proposedCmgConsistentIds,
Map<String, CompletableFuture<NetworkMessage>> responseFutures
@@ -255,6 +257,44 @@ public class SystemDisasterRecoveryManagerImpl implements
SystemDisasterRecovery
return successes >= (futuresFromNewCmg.size() + 1) / 2;
}
+ @Override
+ public CompletableFuture<Void> migrate(ClusterState targetClusterState) {
+ if (targetClusterState.formerClusterIds() == null) {
+ return failedFuture(
+ new ClusterResetException("Migration can only happen using
cluster state from a node that saw a cluster reset")
+ );
+ }
+
+ Collection<ClusterNode> nodesInTopology = topologyService.allMembers();
+
+ ResetClusterMessage message =
buildResetClusterMessageForMigrate(targetClusterState);
+
+ Map<String, CompletableFuture<NetworkMessage>> responseFutures =
sendResetClusterMessageTo(nodesInTopology, message);
+
+ return allOf(responseFutures.values())
+ .handleAsync((res, ex) -> {
+ // We ignore upstream exceptions on purpose.
+ rethrowIfError(ex);
+
+ restarter.initiateRestart();
+ return null;
+ }, restartExecutor);
+ }
+
+ private ResetClusterMessage
buildResetClusterMessageForMigrate(ClusterState clusterState) {
+ List<UUID> formerClusterIds = clusterState.formerClusterIds();
+ assert formerClusterIds != null : "formerClusterIds is null, but it
must never be here as it's from a node that saw a CMG reset; "
+ + "current node is " + thisNodeName;
+
+ return messagesFactory.resetClusterMessage()
+ .cmgNodes(clusterState.cmgNodes())
+ .metaStorageNodes(clusterState.metaStorageNodes())
+ .clusterName(clusterState.clusterTag().clusterName())
+ .clusterId(clusterState.clusterTag().clusterId())
+ .formerClusterIds(formerClusterIds)
+ .build();
+ }
+
/**
* An executor that spawns a thread per task. This is very inefficient for
scenarios with many tasks,
* but we use it here for very rare tasks; it has an advantage that it
doesn't need to be shut down
diff --git
a/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
b/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
index 84cbd7c7d6..ad5a7d1999 100644
---
a/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
+++
b/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
@@ -284,17 +284,16 @@ class SystemDisasterRecoveryManagerImplTest extends
BaseIgniteAbstractTest {
}
private void assertThatResetClusterMessageIsAsExpected(ResetClusterMessage
message) {
- assertThatResetClusterMessageContentIsAsExpected(message,
thisNodeName);
+ assertThatResetClusterMessageContentIsAsExpected(message);
}
- private void assertThatResetClusterMessageContentIsAsExpected(@Nullable
ResetClusterMessage message, String expectedConductor) {
+ private void assertThatResetClusterMessageContentIsAsExpected(@Nullable
ResetClusterMessage message) {
assertThat(message, is(notNullValue()));
assertThat(message.cmgNodes(), containsInAnyOrder(thisNodeName,
node2.name()));
assertThat(message.metaStorageNodes(),
is(usualClusterState.metaStorageNodes()));
assertThat(message.clusterName(), is(CLUSTER_NAME));
assertThat(message.clusterId(),
is(not(usualClusterState.clusterTag().clusterId())));
assertThat(message.formerClusterIds(),
contains(usualClusterState.clusterTag().clusterId()));
- assertThat(message.conductor(), is(expectedConductor));
}
@Test
@@ -369,7 +368,7 @@ class SystemDisasterRecoveryManagerImplTest extends
BaseIgniteAbstractTest {
NetworkMessageHandler handler = extractMessageHandler();
ClusterNode conductor = fromSelf ? thisNode : node2;
- handler.onReceived(resetClusterMessageOn2Nodes(conductor.name()),
conductor, 0L);
+ handler.onReceived(resetClusterMessageOn2Nodes(), conductor, 0L);
waitTillResetClusterMessageGetsSavedToVault();
VaultEntry entry = vaultManager.get(RESET_CLUSTER_MESSAGE_VAULT_KEY);
@@ -377,7 +376,7 @@ class SystemDisasterRecoveryManagerImplTest extends
BaseIgniteAbstractTest {
ResetClusterMessage savedMessage = fromBytes(entry.value());
- assertThatResetClusterMessageContentIsAsExpected(savedMessage,
conductor.name());
+ assertThatResetClusterMessageContentIsAsExpected(savedMessage);
}
private void waitTillResetClusterMessageGetsSavedToVault() throws
InterruptedException {
@@ -402,14 +401,13 @@ class SystemDisasterRecoveryManagerImplTest extends
BaseIgniteAbstractTest {
return handler;
}
- private ResetClusterMessage resetClusterMessageOn2Nodes(String
conductorName) {
+ private ResetClusterMessage resetClusterMessageOn2Nodes() {
return messagesFactory.resetClusterMessage()
.cmgNodes(Set.of(thisNodeName, node2.name()))
.metaStorageNodes(usualClusterState.metaStorageNodes())
.clusterName(CLUSTER_NAME)
.clusterId(randomUUID())
.formerClusterIds(List.of(usualClusterState.clusterTag().clusterId()))
- .conductor(conductorName)
.build();
}
@@ -420,7 +418,7 @@ class SystemDisasterRecoveryManagerImplTest extends
BaseIgniteAbstractTest {
NetworkMessageHandler handler = extractMessageHandler();
ClusterNode conductor = thisNode;
- handler.onReceived(resetClusterMessageOn2Nodes(conductor.name()),
conductor, 123L);
+ handler.onReceived(resetClusterMessageOn2Nodes(), conductor, 123L);
InOrder inOrder = inOrder(messagingService, vaultManager);
@@ -437,7 +435,7 @@ class SystemDisasterRecoveryManagerImplTest extends
BaseIgniteAbstractTest {
NetworkMessageHandler handler = extractMessageHandler();
ClusterNode conductor = node2;
- handler.onReceived(resetClusterMessageOn2Nodes(conductor.name()),
conductor, 123L);
+ handler.onReceived(resetClusterMessageOn2Nodes(), conductor, 123L);
InOrder inOrder = inOrder(messagingService, vaultManager, restarter);
@@ -452,7 +450,7 @@ class SystemDisasterRecoveryManagerImplTest extends
BaseIgniteAbstractTest {
void initiatesRestartWhenGetsMessageFromOtherNode() throws Exception {
NetworkMessageHandler handler = extractMessageHandler();
- handler.onReceived(resetClusterMessageOn2Nodes(node2.name()), node2,
0L);
+ handler.onReceived(resetClusterMessageOn2Nodes(), node2, 0L);
verify(restarter, timeout(SECONDS.toMillis(10))).initiateRestart();
@@ -464,11 +462,72 @@ class SystemDisasterRecoveryManagerImplTest extends
BaseIgniteAbstractTest {
void doesNotInitiateRestartWhenGetsMessageFromSelf() throws Exception {
NetworkMessageHandler handler = extractMessageHandler();
- handler.onReceived(resetClusterMessageOn2Nodes(thisNodeName),
thisNode, 0L);
+ handler.onReceived(resetClusterMessageOn2Nodes(), thisNode, 0L);
verify(restarter, never()).initiateRestart();
// Wait till it gets saved to Vault to avoid an attempt to write to it
after the after-each method stops the Vault.
waitTillResetClusterMessageGetsSavedToVault();
}
+
+ @Test
+ void migrateSendsMessagesToAllNodes() {
+ ClusterState newState = newClusterState();
+
+ ArgumentCaptor<ResetClusterMessage> messageCaptor =
ArgumentCaptor.forClass(ResetClusterMessage.class);
+
+ when(topologyService.allMembers()).thenReturn(List.of(thisNode, node2,
node3));
+ when(messagingService.invoke(any(ClusterNode.class), any(), anyLong()))
+ .thenReturn(completedFuture(successResponseMessage));
+
+ assertThat(manager.migrate(newState), willCompleteSuccessfully());
+
+ verify(messagingService).invoke(eq(thisNode), messageCaptor.capture(),
anyLong());
+ ResetClusterMessage messageToSelf = messageCaptor.getValue();
+
assertThatResetClusterMessageContentIsAsExpectedAfterMigrate(messageToSelf,
newState);
+
+ verify(messagingService).invoke(eq(node2), messageCaptor.capture(),
anyLong());
+ ResetClusterMessage messageToOtherNewCmgNode =
messageCaptor.getValue();
+
assertThatResetClusterMessageContentIsAsExpectedAfterMigrate(messageToOtherNewCmgNode,
newState);
+
+ verify(messagingService).invoke(eq(node3), messageCaptor.capture(),
anyLong());
+ ResetClusterMessage messageToOtherNonCmgNode =
messageCaptor.getValue();
+
assertThatResetClusterMessageContentIsAsExpectedAfterMigrate(messageToOtherNonCmgNode,
newState);
+ }
+
+ private ClusterState newClusterState() {
+ return cmgMessagesFactory.clusterState()
+ .cmgNodes(Set.of("node5"))
+ .metaStorageNodes(Set.of("node6"))
+ .version(IgniteProductVersion.CURRENT_VERSION.toString())
+ .clusterTag(randomClusterTag(cmgMessagesFactory, CLUSTER_NAME))
+ .formerClusterIds(List.of(randomUUID(), randomUUID()))
+ .build();
+ }
+
+ private static void
assertThatResetClusterMessageContentIsAsExpectedAfterMigrate(
+ ResetClusterMessage message,
+ ClusterState clusterState
+ ) {
+ assertThat(message, is(notNullValue()));
+
+ assertThat(message.cmgNodes(), is(clusterState.cmgNodes()));
+ assertThat(message.metaStorageNodes(),
is(clusterState.metaStorageNodes()));
+ assertThat(message.clusterName(),
is(clusterState.clusterTag().clusterName()));
+ assertThat(message.clusterId(),
is(clusterState.clusterTag().clusterId()));
+ assertThat(message.formerClusterIds(),
is(clusterState.formerClusterIds()));
+ }
+
+ @Test
+ void migrateInitiatesRestart() {
+ ClusterState newState = newClusterState();
+
+ when(topologyService.allMembers()).thenReturn(List.of(thisNode, node2,
node3));
+ respondSuccessfullyFrom(thisNode, node2);
+ respondWithExceptionFrom(node3);
+
+ assertThat(manager.migrate(newState), willCompleteSuccessfully());
+
+ verify(restarter).initiateRestart();
+ }
}