This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 27ce7f278d IGNITE-22808 Migrate running node missed during CMG 
reparation to new cluster (#4315)
27ce7f278d is described below

commit 27ce7f278d10cf92a2b20b46737b37822ebbbcd9
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Sat Aug 31 22:05:57 2024 +0400

    IGNITE-22808 Migrate running node missed during CMG reparation to new 
cluster (#4315)
---
 .../management/ClusterManagementGroupManager.java  |  18 ++-
 .../org/apache/ignite/internal/app/IgniteImpl.java |   4 +-
 .../system/message/ResetClusterMessage.java        |   5 -
 .../disaster/system/ItCmgDisasterRecoveryTest.java | 132 ++++++++++++++++++++-
 .../system/SystemDisasterRecoveryManager.java      |   9 ++
 .../system/SystemDisasterRecoveryManagerImpl.java  |  66 +++++++++--
 .../SystemDisasterRecoveryManagerImplTest.java     |  81 +++++++++++--
 7 files changed, 277 insertions(+), 38 deletions(-)

diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index 9d8affae0b..e9781a74d9 100644
--- 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++ 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -33,6 +33,7 @@ import static 
org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
 import java.util.Collection;
 import java.util.List;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executors;
@@ -392,7 +393,13 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
         cmgMessageHandler.onRecoveryComplete();
 
         return serviceFuture
-                .thenCompose(service -> doInit(service, 
cmgInitMessageFromResetClusterMessage(resetClusterMessage)))
+                .thenCompose(
+                        service -> doInit(
+                                service,
+                                
cmgInitMessageFromResetClusterMessage(resetClusterMessage),
+                                resetClusterMessage.formerClusterIds()
+                        )
+                )
                 .thenApply(unused -> null);
     }
 
@@ -482,7 +489,7 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
             // handle this case by applying only the first attempt and 
returning the actual cluster state for all other
             // attempts.
             raftService = serviceFuture
-                    .thenCompose(service -> doInit(service, msg)
+                    .thenCompose(service -> doInit(service, msg, null)
                             .handle((v, e) -> {
                                 NetworkMessage response;
 
@@ -513,8 +520,8 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
         }
     }
 
-    private CompletableFuture<CmgRaftService> doInit(CmgRaftService service, 
CmgInitMessage msg) {
-        return service.initClusterState(createClusterState(msg))
+    private CompletableFuture<CmgRaftService> doInit(CmgRaftService service, 
CmgInitMessage msg, @Nullable List<UUID> formerClusterIds) {
+        return service.initClusterState(createClusterState(msg, 
formerClusterIds))
                 .thenCompose(state -> {
                     var localState = new LocalState(state.cmgNodes(), 
state.clusterTag());
 
@@ -526,13 +533,14 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
                 });
     }
 
-    private ClusterState createClusterState(CmgInitMessage msg) {
+    private ClusterState createClusterState(CmgInitMessage msg, @Nullable 
List<UUID> formerClusterIds) {
         return msgFactory.clusterState()
                 .cmgNodes(Set.copyOf(msg.cmgNodes()))
                 .metaStorageNodes(Set.copyOf(msg.metaStorageNodes()))
                 .version(IgniteProductVersion.CURRENT_VERSION.toString())
                 .clusterTag(clusterTag(msgFactory, msg.clusterName(), 
msg.clusterId()))
                 .initialClusterConfiguration(msg.initialClusterConfiguration())
+                .formerClusterIds(formerClusterIds)
                 .build();
     }
 
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index a095b99611..d610ce3697 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -1444,8 +1444,8 @@ public class IgniteImpl implements Ignite {
     }
 
     @TestOnly
-    public ClusterStateStorage clusterStateStorage() {
-        return clusterStateStorage;
+    public ClusterManagementGroupManager clusterManagementGroupManager() {
+        return cmgMgr;
     }
 
     @TestOnly
diff --git 
a/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/ResetClusterMessage.java
 
b/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/ResetClusterMessage.java
index 7acc6bff0f..2cce9a9663 100644
--- 
a/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/ResetClusterMessage.java
+++ 
b/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/ResetClusterMessage.java
@@ -53,9 +53,4 @@ public interface ResetClusterMessage extends NetworkMessage, 
Serializable {
      * IDs that the cluster had before (including the current incarnation by 
which this message is sent).
      */
     List<UUID> formerClusterIds();
-
-    /**
-     * Consistent ID of the node that conducts the cluster reset.
-     */
-    String conductor();
 }
diff --git 
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItCmgDisasterRecoveryTest.java
 
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItCmgDisasterRecoveryTest.java
index 3cdf5bd9ee..7f3b0a2cd6 100644
--- 
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItCmgDisasterRecoveryTest.java
+++ 
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItCmgDisasterRecoveryTest.java
@@ -18,22 +18,30 @@
 package org.apache.ignite.internal.disaster.system;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutIn;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
 import java.util.stream.IntStream;
 import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.app.IgniteServerImpl;
+import org.apache.ignite.internal.cluster.management.ClusterState;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.Test;
 
@@ -52,6 +60,8 @@ class ItCmgDisasterRecoveryTest extends 
ClusterPerTestIntegrationTest {
         });
         waitTillClusterStateIsSavedToVaultOnConductor(1);
 
+        UUID originalClusterId = 
clusterState(igniteImpl(0)).clusterTag().clusterId();
+
         // This makes the CMG majority go away.
         cluster.stopNode(0);
 
@@ -64,6 +74,10 @@ class ItCmgDisasterRecoveryTest extends 
ClusterPerTestIntegrationTest {
         IgniteImpl restartedIgniteImpl1 = waitTillNodeRestartsInternally(1);
         waitTillCmgHasMajority(restartedIgniteImpl1);
 
+        ClusterState newClusterState = clusterState(restartedIgniteImpl1);
+        assertThat(newClusterState.clusterTag().clusterId(), 
is(not(originalClusterId)));
+        assertThat(newClusterState.formerClusterIds(), 
contains(originalClusterId));
+
         assertResetClusterMessageIsNotPresentAt(restartedIgniteImpl1);
         
assertResetClusterMessageIsNotPresentAt(waitTillNodeRestartsInternally(2));
     }
@@ -100,7 +114,10 @@ class ItCmgDisasterRecoveryTest extends 
ClusterPerTestIntegrationTest {
     private IgniteImpl waitTillNodeRestartsInternally(int nodeIndex) throws 
InterruptedException {
         // restartOrShutdownFuture() becomes non-null when restart or shutdown 
is initiated; we know it's restart.
 
-        assertTrue(waitForCondition(() -> restartOrShutdownFuture(nodeIndex) 
!= null, SECONDS.toMillis(20)));
+        assertTrue(
+                waitForCondition(() -> restartOrShutdownFuture(nodeIndex) != 
null, SECONDS.toMillis(20)),
+                "Node did not attempt to be restarted (or shut down) in time"
+        );
         assertThat(restartOrShutdownFuture(nodeIndex), 
willCompleteSuccessfully());
 
         return unwrapIgniteImpl(cluster.server(nodeIndex).api());
@@ -111,6 +128,11 @@ class ItCmgDisasterRecoveryTest extends 
ClusterPerTestIntegrationTest {
         return ((IgniteServerImpl) 
cluster.server(nodeIndex)).restartOrShutdownFuture();
     }
 
+    private static ClusterState clusterState(IgniteImpl restartedIgniteImpl1)
+            throws InterruptedException, ExecutionException, TimeoutException {
+        return 
restartedIgniteImpl1.clusterManagementGroupManager().clusterState().get(10, 
SECONDS);
+    }
+
     private static void assertResetClusterMessageIsNotPresentAt(IgniteImpl 
ignite) {
         assertThat(new 
SystemDisasterRecoveryStorage(ignite.vault()).readResetClusterMessage(), 
is(nullValue()));
     }
@@ -200,7 +222,7 @@ class ItCmgDisasterRecoveryTest extends 
ClusterPerTestIntegrationTest {
     }
 
     @Test
-    void nodesThatSawNoReparationHaveSeparatePhysicalTopology() throws 
Exception {
+    void nodesThatSawNoReparationHaveSeparatePhysicalTopologies() throws 
Exception {
         cluster.startAndInit(2, paramsBuilder -> {
             paramsBuilder.cmgNodeNames(nodeNames(0));
             paramsBuilder.metaStorageNodeNames(nodeNames(1));
@@ -223,4 +245,110 @@ class ItCmgDisasterRecoveryTest extends 
ClusterPerTestIntegrationTest {
                 "Nodes from different clusters were able to establish a 
connection"
         );
     }
+
+    @Test
+    void migratesNodesThatSawNoReparationToNewCluster() throws Exception {
+        cluster.startAndInit(2, paramsBuilder -> {
+            paramsBuilder.cmgNodeNames(nodeNames(0));
+            paramsBuilder.metaStorageNodeNames(nodeNames(1));
+        });
+        waitTillClusterStateIsSavedToVaultOnConductor(1);
+
+        // This makes the CMG majority go away.
+        cluster.stopNode(0);
+
+        // Repair CMG with just node 1.
+        initiateCmgRepairVia(igniteImpl(1), 1);
+        IgniteImpl restartedIgniteImpl1 = waitTillNodeRestartsInternally(1);
+        waitTillCmgHasMajority(restartedIgniteImpl1);
+
+        migrate(0, 1);
+
+        LogicalTopologySnapshot topologySnapshot = 
igniteImpl(1).logicalTopologyService().logicalTopologyOnLeader().get(10, 
SECONDS);
+        assertTopologyContainsNode(0, topologySnapshot);
+    }
+
+    private void assertTopologyContainsNode(int nodeIndex, 
LogicalTopologySnapshot topologySnapshot) {
+        assertTrue(topologySnapshot.nodes().stream().anyMatch(node -> 
node.name().equals(cluster.nodeName(nodeIndex))));
+    }
+
+    private void migrate(int oldClusterNodeIndex, int newClusterNodeIndex) 
throws Exception {
+        // Starting the node that did not see the repair.
+        IgniteImpl nodeMissingRepair = ((IgniteServerImpl) 
cluster.startEmbeddedNode(oldClusterNodeIndex).server()).igniteImpl();
+
+        initiateMigrationToNewCluster(nodeMissingRepair, 
igniteImpl(newClusterNodeIndex));
+
+        waitTillNodeRestartsInternally(oldClusterNodeIndex);
+    }
+
+    private static void initiateMigrationToNewCluster(IgniteImpl 
nodeMissingRepair, IgniteImpl repairedNode) throws Exception {
+        // TODO: IGNITE-22879 - initiate migration via CLI.
+
+        ClusterState newClusterState = clusterState(repairedNode);
+
+        CompletableFuture<Void> migrationFuture = 
nodeMissingRepair.systemDisasterRecoveryManager().migrate(newClusterState);
+        assertThat(migrationFuture, willCompleteSuccessfully());
+    }
+
+    @Test
+    void migratesManyNodesThatSawNoReparationToNewCluster() throws Exception {
+        cluster.startAndInit(5, paramsBuilder -> {
+            paramsBuilder.cmgNodeNames(nodeNames(0, 1, 2));
+            paramsBuilder.metaStorageNodeNames(nodeNames(2, 3, 4));
+        });
+        waitTillClusterStateIsSavedToVaultOnConductor(2);
+
+        // Stop the majority of CMG.
+        IntStream.of(0, 1).parallel().forEach(cluster::stopNode);
+
+        // Repair CMG with nodes 2, 3, 4.
+        initiateCmgRepairVia(igniteImpl(2), 2, 3, 4);
+        IgniteImpl restartedIgniteImpl2 = waitTillNodeRestartsInternally(2);
+        waitTillCmgHasMajority(restartedIgniteImpl2);
+
+        // Starting the nodes that did not see the repair (in parallel, to 
save time).
+        List<IgniteImpl> partialNodes = IntStream.of(0, 1).parallel()
+                .mapToObj(index -> ((IgniteServerImpl) 
cluster.startEmbeddedNode(index).server()).igniteImpl())
+                .collect(toList());
+
+        initiateMigrationToNewCluster(partialNodes.get(0), igniteImpl(2));
+
+        waitTillNodeRestartsInternally(0);
+        waitTillNodeRestartsInternally(1);
+
+        LogicalTopologySnapshot topologySnapshot = 
igniteImpl(2).logicalTopologyService().logicalTopologyOnLeader().get(10, 
SECONDS);
+        assertTopologyContainsNode(0, topologySnapshot);
+        assertTopologyContainsNode(1, topologySnapshot);
+
+        // TODO: IGNITE-23096 - remove after the hang is fixed.
+        waitTillNodesRestartInProcess(3, 4);
+    }
+
+    @Test
+    void repeatedRepairWorks() throws Exception {
+        cluster.startAndInit(2, paramsBuilder -> {
+            paramsBuilder.cmgNodeNames(nodeNames(0));
+            paramsBuilder.metaStorageNodeNames(nodeNames(1));
+        });
+        waitTillClusterStateIsSavedToVaultOnConductor(1);
+
+        // This makes the CMG majority go away.
+        cluster.stopNode(0);
+
+        // Repair CMG with just node 1.
+        initiateCmgRepairVia(igniteImpl(1), 1);
+        IgniteImpl restartedIgniteImpl1 = waitTillNodeRestartsInternally(1);
+        waitTillCmgHasMajority(restartedIgniteImpl1);
+
+        // Starting the node that did not see the repair.
+        migrate(0, 1);
+
+        // Second repair.
+        initiateCmgRepairVia(igniteImpl(1), 1);
+        IgniteImpl igniteImpl1RestartedSecondTime = 
waitTillNodeRestartsInternally(1);
+        waitTillCmgHasMajority(igniteImpl1RestartedSecondTime);
+
+        // TODO: IGNITE-23096 - remove after the hang is fixed.
+        waitTillNodesRestartInProcess(0, 1);
+    }
 }
diff --git 
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManager.java
 
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManager.java
index ea2779a244..6521760ccd 100644
--- 
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManager.java
+++ 
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManager.java
@@ -46,4 +46,13 @@ public interface SystemDisasterRecoveryManager {
      * @return Future completing with the result of the operation ({@link 
ResetClusterMessage} in case of error related to reset logic).
      */
     CompletableFuture<Void> resetCluster(List<String> 
proposedCmgConsistentIds);
+
+    /**
+     * Migrates nodes missed during CMG repair to the new cluster (which is 
the result of the repair). To do so, sends the
+     * corresponding {@link ResetClusterMessage} to all nodes that are in the 
physical topology (including itself).
+     *
+     * @param targetClusterState State of the new cluster.
+     * @return Future completing with the result of the operation.
+     */
+    CompletableFuture<Void> migrate(ClusterState targetClusterState);
 }
diff --git 
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImpl.java
 
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImpl.java
index 90299e3272..fd65b03c3f 100644
--- 
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImpl.java
+++ 
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImpl.java
@@ -103,7 +103,7 @@ public class SystemDisasterRecoveryManagerImpl implements 
SystemDisasterRecovery
 
             messagingService.respond(sender, successResponseMessage(), 
correlationId)
                     .thenRunAsync(() -> {
-                        if (!thisNodeName.equals(message.conductor())) {
+                        if (!thisNodeName.equals(sender.name())) {
                             restarter.initiateRestart();
                         }
                     }, restartExecutor);
@@ -148,19 +148,14 @@ public class SystemDisasterRecoveryManagerImpl implements 
SystemDisasterRecovery
         ensureInitConfigApplied();
         ClusterState clusterState = ensureClusterStateIsPresent();
 
-        ResetClusterMessage message = buildResetClusterMessage(
-                proposedCmgConsistentIds,
-                clusterState
-        );
+        ResetClusterMessage message = 
buildResetClusterMessageForReset(proposedCmgConsistentIds, clusterState);
 
-        Map<String, CompletableFuture<NetworkMessage>> responseFutures = 
sendResetClusterMessageTo(
-                nodesInTopology,
-                message
-        );
+        Map<String, CompletableFuture<NetworkMessage>> responseFutures = 
sendResetClusterMessageTo(nodesInTopology, message);
 
         return allOf(responseFutures.values())
                 .handleAsync((res, ex) -> {
                     // We ignore upstream exceptions on purpose.
+                    rethrowIfError(ex);
 
                     if (isMajorityOfCmgAreSuccesses(proposedCmgConsistentIds, 
responseFutures)) {
                         restarter.initiateRestart();
@@ -172,8 +167,10 @@ public class SystemDisasterRecoveryManagerImpl implements 
SystemDisasterRecovery
                 }, restartExecutor);
     }
 
-    private Map<String, CompletableFuture<NetworkMessage>> 
sendResetClusterMessageTo(Collection<ClusterNode> nodesInTopology,
-            ResetClusterMessage message) {
+    private Map<String, CompletableFuture<NetworkMessage>> 
sendResetClusterMessageTo(
+            Collection<ClusterNode> nodesInTopology,
+            ResetClusterMessage message
+    ) {
         Map<String, CompletableFuture<NetworkMessage>> responseFutures = new 
HashMap<>();
         for (ClusterNode node : nodesInTopology) {
             responseFutures.put(node.name(), messagingService.invoke(node, 
message, 10_000));
@@ -221,12 +218,11 @@ public class SystemDisasterRecoveryManagerImpl implements 
SystemDisasterRecovery
         return clusterState;
     }
 
-    private ResetClusterMessage buildResetClusterMessage(List<String> 
proposedCmgConsistentIds, ClusterState clusterState) {
+    private ResetClusterMessage 
buildResetClusterMessageForReset(Collection<String> proposedCmgConsistentIds, 
ClusterState clusterState) {
         List<UUID> formerClusterIds = new 
ArrayList<>(requireNonNullElse(clusterState.formerClusterIds(), new 
ArrayList<>()));
         formerClusterIds.add(clusterState.clusterTag().clusterId());
 
         return messagesFactory.resetClusterMessage()
-                .conductor(thisNodeName)
                 .cmgNodes(new HashSet<>(proposedCmgConsistentIds))
                 .metaStorageNodes(clusterState.metaStorageNodes())
                 .clusterName(clusterState.clusterTag().clusterName())
@@ -235,6 +231,12 @@ public class SystemDisasterRecoveryManagerImpl implements 
SystemDisasterRecovery
                 .build();
     }
 
+    private static void rethrowIfError(Throwable ex) {
+        if (ex instanceof Error) {
+            throw (Error) ex;
+        }
+    }
+
     private static boolean isMajorityOfCmgAreSuccesses(
             List<String> proposedCmgConsistentIds,
             Map<String, CompletableFuture<NetworkMessage>> responseFutures
@@ -255,6 +257,44 @@ public class SystemDisasterRecoveryManagerImpl implements 
SystemDisasterRecovery
         return successes >= (futuresFromNewCmg.size() + 1) / 2;
     }
 
+    @Override
+    public CompletableFuture<Void> migrate(ClusterState targetClusterState) {
+        if (targetClusterState.formerClusterIds() == null) {
+            return failedFuture(
+                    new ClusterResetException("Migration can only happen using 
cluster state from a node that saw a cluster reset")
+            );
+        }
+
+        Collection<ClusterNode> nodesInTopology = topologyService.allMembers();
+
+        ResetClusterMessage message = 
buildResetClusterMessageForMigrate(targetClusterState);
+
+        Map<String, CompletableFuture<NetworkMessage>> responseFutures = 
sendResetClusterMessageTo(nodesInTopology, message);
+
+        return allOf(responseFutures.values())
+                .handleAsync((res, ex) -> {
+                    // We ignore upstream exceptions on purpose.
+                    rethrowIfError(ex);
+
+                    restarter.initiateRestart();
+                    return null;
+                }, restartExecutor);
+    }
+
+    private ResetClusterMessage 
buildResetClusterMessageForMigrate(ClusterState clusterState) {
+        List<UUID> formerClusterIds = clusterState.formerClusterIds();
+        assert formerClusterIds != null : "formerClusterIds is null, but it 
must never be here as it's from a node that saw a CMG reset; "
+                + "current node is " + thisNodeName;
+
+        return messagesFactory.resetClusterMessage()
+                .cmgNodes(clusterState.cmgNodes())
+                .metaStorageNodes(clusterState.metaStorageNodes())
+                .clusterName(clusterState.clusterTag().clusterName())
+                .clusterId(clusterState.clusterTag().clusterId())
+                .formerClusterIds(formerClusterIds)
+                .build();
+    }
+
     /**
      * An executor that spawns a thread per task. This is very inefficient for 
scenarios with many tasks,
      * but we use it here for very rare tasks; it has an advantage that it 
doesn't need to be shut down
diff --git 
a/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
 
b/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
index 84cbd7c7d6..ad5a7d1999 100644
--- 
a/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
+++ 
b/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
@@ -284,17 +284,16 @@ class SystemDisasterRecoveryManagerImplTest extends 
BaseIgniteAbstractTest {
     }
 
     private void assertThatResetClusterMessageIsAsExpected(ResetClusterMessage 
message) {
-        assertThatResetClusterMessageContentIsAsExpected(message, 
thisNodeName);
+        assertThatResetClusterMessageContentIsAsExpected(message);
     }
 
-    private void assertThatResetClusterMessageContentIsAsExpected(@Nullable 
ResetClusterMessage message, String expectedConductor) {
+    private void assertThatResetClusterMessageContentIsAsExpected(@Nullable 
ResetClusterMessage message) {
         assertThat(message, is(notNullValue()));
         assertThat(message.cmgNodes(), containsInAnyOrder(thisNodeName, 
node2.name()));
         assertThat(message.metaStorageNodes(), 
is(usualClusterState.metaStorageNodes()));
         assertThat(message.clusterName(), is(CLUSTER_NAME));
         assertThat(message.clusterId(), 
is(not(usualClusterState.clusterTag().clusterId())));
         assertThat(message.formerClusterIds(), 
contains(usualClusterState.clusterTag().clusterId()));
-        assertThat(message.conductor(), is(expectedConductor));
     }
 
     @Test
@@ -369,7 +368,7 @@ class SystemDisasterRecoveryManagerImplTest extends 
BaseIgniteAbstractTest {
         NetworkMessageHandler handler = extractMessageHandler();
 
         ClusterNode conductor = fromSelf ? thisNode : node2;
-        handler.onReceived(resetClusterMessageOn2Nodes(conductor.name()), 
conductor, 0L);
+        handler.onReceived(resetClusterMessageOn2Nodes(), conductor, 0L);
 
         waitTillResetClusterMessageGetsSavedToVault();
         VaultEntry entry = vaultManager.get(RESET_CLUSTER_MESSAGE_VAULT_KEY);
@@ -377,7 +376,7 @@ class SystemDisasterRecoveryManagerImplTest extends 
BaseIgniteAbstractTest {
 
         ResetClusterMessage savedMessage = fromBytes(entry.value());
 
-        assertThatResetClusterMessageContentIsAsExpected(savedMessage, 
conductor.name());
+        assertThatResetClusterMessageContentIsAsExpected(savedMessage);
     }
 
     private void waitTillResetClusterMessageGetsSavedToVault() throws 
InterruptedException {
@@ -402,14 +401,13 @@ class SystemDisasterRecoveryManagerImplTest extends 
BaseIgniteAbstractTest {
         return handler;
     }
 
-    private ResetClusterMessage resetClusterMessageOn2Nodes(String 
conductorName) {
+    private ResetClusterMessage resetClusterMessageOn2Nodes() {
         return messagesFactory.resetClusterMessage()
                 .cmgNodes(Set.of(thisNodeName, node2.name()))
                 .metaStorageNodes(usualClusterState.metaStorageNodes())
                 .clusterName(CLUSTER_NAME)
                 .clusterId(randomUUID())
                 
.formerClusterIds(List.of(usualClusterState.clusterTag().clusterId()))
-                .conductor(conductorName)
                 .build();
     }
 
@@ -420,7 +418,7 @@ class SystemDisasterRecoveryManagerImplTest extends 
BaseIgniteAbstractTest {
         NetworkMessageHandler handler = extractMessageHandler();
         ClusterNode conductor = thisNode;
 
-        handler.onReceived(resetClusterMessageOn2Nodes(conductor.name()), 
conductor, 123L);
+        handler.onReceived(resetClusterMessageOn2Nodes(), conductor, 123L);
 
         InOrder inOrder = inOrder(messagingService, vaultManager);
 
@@ -437,7 +435,7 @@ class SystemDisasterRecoveryManagerImplTest extends 
BaseIgniteAbstractTest {
         NetworkMessageHandler handler = extractMessageHandler();
         ClusterNode conductor = node2;
 
-        handler.onReceived(resetClusterMessageOn2Nodes(conductor.name()), 
conductor, 123L);
+        handler.onReceived(resetClusterMessageOn2Nodes(), conductor, 123L);
 
         InOrder inOrder = inOrder(messagingService, vaultManager, restarter);
 
@@ -452,7 +450,7 @@ class SystemDisasterRecoveryManagerImplTest extends 
BaseIgniteAbstractTest {
     void initiatesRestartWhenGetsMessageFromOtherNode() throws Exception {
         NetworkMessageHandler handler = extractMessageHandler();
 
-        handler.onReceived(resetClusterMessageOn2Nodes(node2.name()), node2, 
0L);
+        handler.onReceived(resetClusterMessageOn2Nodes(), node2, 0L);
 
         verify(restarter, timeout(SECONDS.toMillis(10))).initiateRestart();
 
@@ -464,11 +462,72 @@ class SystemDisasterRecoveryManagerImplTest extends 
BaseIgniteAbstractTest {
     void doesNotInitiateRestartWhenGetsMessageFromSelf() throws Exception {
         NetworkMessageHandler handler = extractMessageHandler();
 
-        handler.onReceived(resetClusterMessageOn2Nodes(thisNodeName), 
thisNode, 0L);
+        handler.onReceived(resetClusterMessageOn2Nodes(), thisNode, 0L);
 
         verify(restarter, never()).initiateRestart();
 
         // Wait till it gets saved to Vault to avoid an attempt to write to it 
after the after-each method stops the Vault.
         waitTillResetClusterMessageGetsSavedToVault();
     }
+
+    @Test
+    void migrateSendsMessagesToAllNodes() {
+        ClusterState newState = newClusterState();
+
+        ArgumentCaptor<ResetClusterMessage> messageCaptor = 
ArgumentCaptor.forClass(ResetClusterMessage.class);
+
+        when(topologyService.allMembers()).thenReturn(List.of(thisNode, node2, 
node3));
+        when(messagingService.invoke(any(ClusterNode.class), any(), anyLong()))
+                .thenReturn(completedFuture(successResponseMessage));
+
+        assertThat(manager.migrate(newState), willCompleteSuccessfully());
+
+        verify(messagingService).invoke(eq(thisNode), messageCaptor.capture(), 
anyLong());
+        ResetClusterMessage messageToSelf = messageCaptor.getValue();
+        
assertThatResetClusterMessageContentIsAsExpectedAfterMigrate(messageToSelf, 
newState);
+
+        verify(messagingService).invoke(eq(node2), messageCaptor.capture(), 
anyLong());
+        ResetClusterMessage messageToOtherNewCmgNode = 
messageCaptor.getValue();
+        
assertThatResetClusterMessageContentIsAsExpectedAfterMigrate(messageToOtherNewCmgNode,
 newState);
+
+        verify(messagingService).invoke(eq(node3), messageCaptor.capture(), 
anyLong());
+        ResetClusterMessage messageToOtherNonCmgNode = 
messageCaptor.getValue();
+        
assertThatResetClusterMessageContentIsAsExpectedAfterMigrate(messageToOtherNonCmgNode,
 newState);
+    }
+
+    private ClusterState newClusterState() {
+        return cmgMessagesFactory.clusterState()
+                .cmgNodes(Set.of("node5"))
+                .metaStorageNodes(Set.of("node6"))
+                .version(IgniteProductVersion.CURRENT_VERSION.toString())
+                .clusterTag(randomClusterTag(cmgMessagesFactory, CLUSTER_NAME))
+                .formerClusterIds(List.of(randomUUID(), randomUUID()))
+                .build();
+    }
+
+    private static void 
assertThatResetClusterMessageContentIsAsExpectedAfterMigrate(
+            ResetClusterMessage message,
+            ClusterState clusterState
+    ) {
+        assertThat(message, is(notNullValue()));
+
+        assertThat(message.cmgNodes(), is(clusterState.cmgNodes()));
+        assertThat(message.metaStorageNodes(), 
is(clusterState.metaStorageNodes()));
+        assertThat(message.clusterName(), 
is(clusterState.clusterTag().clusterName()));
+        assertThat(message.clusterId(), 
is(clusterState.clusterTag().clusterId()));
+        assertThat(message.formerClusterIds(), 
is(clusterState.formerClusterIds()));
+    }
+
+    @Test
+    void migrateInitiatesRestart() {
+        ClusterState newState = newClusterState();
+
+        when(topologyService.allMembers()).thenReturn(List.of(thisNode, node2, 
node3));
+        respondSuccessfullyFrom(thisNode, node2);
+        respondWithExceptionFrom(node3);
+
+        assertThat(manager.migrate(newState), willCompleteSuccessfully());
+
+        verify(restarter).initiateRestart();
+    }
 }

Reply via email to