This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 6bc4dccdfe0 IGNITE-22818 Fix cmg reordering (#6408)
6bc4dccdfe0 is described below

commit 6bc4dccdfe0ff99815fddd135b91b62173d8b1f3
Author: Cyrill <[email protected]>
AuthorDate: Wed Aug 20 18:54:07 2025 +0300

    IGNITE-22818 Fix cmg reordering (#6408)
---
 .../cluster/management/ItClusterManagerTest.java   | 198 ++++++++++++++++++++-
 .../management/raft/ItCmgRaftServiceTest.java      |   3 +-
 .../management/ClusterManagementGroupManager.java  | 122 +++++++++++--
 .../management/raft/CmgRaftGroupListener.java      |  12 +-
 .../cluster/management/raft/CmgRaftService.java    |  15 ++
 .../management/raft/CmgRaftGroupListenerTest.java  |   3 +-
 .../management/BaseItClusterManagementTest.java    |  20 ++-
 .../internal/cluster/management/MockNode.java      |   8 +-
 .../apache/ignite/raft/jraft/core/NodeImpl.java    |  14 +-
 .../ignite/raft/jraft/core/NotLeaderException.java |  11 +-
 .../ignite/raft/jraft/rpc/RpcRequestProcessor.java |  24 ++-
 .../java/org/apache/ignite/internal/Cluster.java   |   3 +-
 .../ItDisasterRecoveryReconfigurationTest.java     |   4 +-
 13 files changed, 384 insertions(+), 53 deletions(-)

diff --git 
a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java
 
b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java
index 4c27b1f22e5..5eb22d21de5 100644
--- 
a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java
+++ 
b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.cluster.management;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.Collectors.toList;
 import static 
org.apache.ignite.internal.lang.IgniteSystemProperties.COLOCATION_FEATURE_FLAG;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
@@ -34,15 +36,27 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+import java.util.function.BiPredicate;
+import java.util.function.Consumer;
 import org.apache.ignite.internal.cluster.management.raft.JoinDeniedException;
 import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
 import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.network.DefaultMessagingService;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.raft.RaftGroupConfiguration;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.raft.jraft.rpc.CliRequests.ResetLearnersRequest;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -74,20 +88,28 @@ public class ItClusterManagerTest extends 
BaseItClusterManagementTest {
         }
     }
 
-    private void startCluster(int numNodes) {
-        cluster.addAll(createNodes(numNodes));
+    private void startCluster(int numNodes, BiConsumer<Integer, 
RaftGroupConfiguration> onConfigurationCommittedListener) {
+        cluster.addAll(createNodes(numNodes, 
onConfigurationCommittedListener));
 
         cluster.parallelStream().forEach(MockNode::startAndJoin);
     }
 
-    private void startNode(int idx, int clusterSize) {
-        MockNode node = createNode(idx, clusterSize);
+    private void startCluster(int numNodes) {
+        startCluster(numNodes, (i, config) -> {});
+    }
+
+    private void startNode(int idx, int clusterSize, 
Consumer<RaftGroupConfiguration> onConfigurationCommittedListener) {
+        MockNode node = createNode(idx, clusterSize, 
onConfigurationCommittedListener);
 
         cluster.add(node);
 
         node.startAndJoin();
     }
 
+    private void startNode(int idx, int clusterSize) {
+        startNode(idx, clusterSize, config -> {});
+    }
+
     private void stopCluster() throws Exception {
         stopNodes(cluster);
 
@@ -290,6 +312,170 @@ public class ItClusterManagerTest extends 
BaseItClusterManagementTest {
         );
     }
 
+    @Test
+    void testNoConfigurationReordering() throws Exception {
+        // Here we will be storing the raft configuration history for all 
nodes.
+        Map<Integer, List<RaftGroupConfiguration>> configs = new 
ConcurrentHashMap<>();
+
+        startCluster(5, (i, config) ->
+                configs.computeIfAbsent(i, k -> new 
CopyOnWriteArrayList<>()).add(config)
+        );
+
+        ClusterManagementGroupManager clusterManager = 
cluster.get(0).clusterManager();
+
+        List<String> votingNodes = 
cluster.stream().map(MockNode::name).limit(3).collect(toList());
+
+        assertThat(
+                clusterManager.initClusterAsync(votingNodes, List.of(), 
"cluster"),
+                willCompleteSuccessfully()
+        );
+
+        for (MockNode node : cluster) {
+            assertThat(node.clusterManager().joinFuture(), 
willCompleteSuccessfully());
+        }
+
+        // Wait for the initial cluster reconfiguration to complete.
+        assertLearnerSize(2);
+
+        // Same as above, but check that all 5 nodes see the same number of 
learners (which is 2 actually).
+        assertTrue(waitForCondition(() ->
+                        configs.size() == 5 && configs.values().stream()
+                                .map(list -> list.get(list.size() - 1))
+                                .mapToInt(raftGroupConfiguration -> 
raftGroupConfiguration.learners().size())
+                                .allMatch(size -> size == 2),
+                30_000
+        ));
+
+        String node3Name = cluster.get(3).name();
+
+        AtomicBoolean blockMessage = new AtomicBoolean(true);
+
+        // Block the first reconfiguration to simulate network issues.
+        // We stop node 4, that should produce a ResetLearnersRequest with 
only one learner - node 3.
+        blockMessage((recipientName, networkMessage) -> {
+            if (!blockMessage.get()) {
+                return false;
+            }
+
+            if (networkMessage instanceof ResetLearnersRequest) {
+                ResetLearnersRequest rlr = (ResetLearnersRequest) 
networkMessage;
+
+                if (rlr.learnersList().contains(node3Name) &&  
rlr.learnersList().size() == 1) {
+                    logger().info("Block message {} to {}", networkMessage, 
recipientName);
+                    return true;
+                }
+            }
+
+            return false;
+        });
+
+        logger().info("Stop the node [4].");
+        MockNode node4 = cluster.remove(cluster.size() - 1);
+        stopNodes(List.of(node4));
+
+        logger().info("Stop the node [3].");
+        MockNode node3 = cluster.remove(cluster.size() - 1);
+        stopNodes(List.of(node3));
+
+        // There should be still two learner nodes since the previous 
reconfiguration was blocked.
+        assertLearnerSize(2);
+
+        // The configs will be properly updated when new 3 and 4 are started, 
so remove the history for the stopped nodes.
+        configs.remove(3);
+        configs.remove(4);
+
+        // Start nodes 3 and 4 back, so that the topology is back to normal 
and no node availability issues are expected.
+        logger().info("Start nodes [3] and [4].");
+        // Start node 4 first to avoid clashing with the earlier blocked 
message (as we get ResetLearnersRequest(3)
+        // both when we move from [3, 4] to [3] and when we move from [] to 
[3]).
+        startNode(4, 5, config ->
+                configs.computeIfAbsent(4, k -> new 
CopyOnWriteArrayList<>()).add(config)
+        );
+        startNode(3, 5, config ->
+                configs.computeIfAbsent(3, k -> new 
CopyOnWriteArrayList<>()).add(config)
+        );
+
+        // Wait for the nodes 3 and 4 to start.
+        for (MockNode node : cluster) {
+            assertThat(node.clusterManager().joinFuture(), 
willCompleteSuccessfully());
+        }
+
+        logger().info("Nodes started.");
+        assertLearnerSize(2);
+
+        // Unblock the first reconfiguration.
+        logger().info("Unblock message.");
+        blockMessage.set(false);
+
+        // Now we need to wait for the reconfiguration to complete.
+        // To check it we will look through the raft configuration history and 
verify that all nodes have same transition history
+        // with regards to the learner nodes: [] -> [3] -> [3, 4] -> [4] -> 
[3, 4].
+        // Basically should be enough to check learners size only.
+        int[] counts = {0, 1, 2, 1, 2};
+        assertTrue(waitForCondition(() ->
+                        configs.values().stream()
+                                .allMatch(transition -> 
checkLearnerTransitionsCorrect(transition, counts)),
+                30_000
+        ));
+    }
+
+    /**
+     * Checks proper configuration history.
+     *
+     * @param configs Node configuration transitions history.
+     * @param counts Expected number of learner nodes for each transition.
+     * @return {@code true} if the history is correct, {@code false} otherwise.
+     */
+    private static boolean 
checkLearnerTransitionsCorrect(List<RaftGroupConfiguration> configs, int[] 
counts) {
+        if (configs.size() != counts.length) {
+            return false;
+        }
+
+        for (int i = 0; i < configs.size(); i++) {
+            if (configs.get(i).learners().size() != counts[i]) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    private void assertLearnerSize(int size) throws InterruptedException {
+        assertTrue(waitForCondition(() ->
+                        cluster.stream()
+                                .filter(node -> {
+                                    try {
+                                        return 
node.clusterManager().isCmgLeader().get(10, SECONDS);
+                                    } catch (InterruptedException | 
ExecutionException | TimeoutException e) {
+                                        throw new RuntimeException(e);
+                                    }
+                                })
+                                .mapToInt(node -> {
+                                    try {
+                                        return 
node.clusterManager().learnerNodes().get(10, SECONDS).size();
+                                    } catch (InterruptedException | 
ExecutionException | TimeoutException e) {
+                                        throw new RuntimeException(e);
+                                    }
+                                })
+                                .min().orElseThrow() == size,
+                30_000
+        ));
+    }
+
+    private void blockMessage(BiPredicate<String, NetworkMessage> predicate) {
+        cluster.stream().map(node -> 
node.clusterService().messagingService()).forEach(messagingService -> {
+            DefaultMessagingService dms = (DefaultMessagingService) 
messagingService;
+
+            BiPredicate<String, NetworkMessage> oldPredicate = 
dms.dropMessagesPredicate();
+
+            if (oldPredicate == null) {
+                dms.dropMessages(predicate);
+            } else {
+                dms.dropMessages(oldPredicate.or(predicate));
+            }
+        });
+    }
+
     /**
      * Tests a scenario, when every node in a cluster gets restarted.
      */
@@ -516,7 +702,7 @@ public class ItClusterManagerTest extends 
BaseItClusterManagementTest {
     private List<ClusterNode> currentPhysicalTopology() {
         return cluster.stream()
                 .map(MockNode::localMember)
-                .collect(Collectors.toList());
+                .collect(toList());
     }
 
     private static LogicalNode[] toLogicalNodes(List<ClusterNode> 
clusterNodes) {
diff --git 
a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
 
b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
index 79032dbf754..e14411dc747 100644
--- 
a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
+++ 
b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
@@ -159,7 +159,8 @@ public class ItCmgRaftServiceTest extends 
BaseIgniteAbstractTest {
                                     new 
ValidationManager(clusterStateStorageMgr, logicalTopology),
                                     term -> {},
                                     new ClusterIdHolder(),
-                                    new NoOpFailureManager()
+                                    new NoOpFailureManager(),
+                                    config -> {}
                             ),
                             RaftGroupEventsListener.noopLsnr,
                             null,
diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index 6e6f3a917b3..59b95e6162c 100644
--- 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++ 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -40,6 +40,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import 
org.apache.ignite.internal.cluster.management.LocalStateStorage.LocalState;
@@ -93,6 +94,7 @@ import org.apache.ignite.internal.network.TopologyService;
 import org.apache.ignite.internal.properties.IgniteProductVersion;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.PeersAndLearners;
+import org.apache.ignite.internal.raft.RaftGroupConfiguration;
 import org.apache.ignite.internal.raft.RaftGroupOptionsConfigurer;
 import org.apache.ignite.internal.raft.RaftManager;
 import org.apache.ignite.internal.raft.RaftNodeId;
@@ -132,6 +134,12 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
     /** Lock for the {@code raftService} field. */
     private final Object raftServiceLock = new Object();
 
+    /** Current updateLearners operation to ensure linearization of updates. */
+    private CompletableFuture<Void> currentUpdateLearners = 
nullCompletedFuture();
+
+    /** Lock for linearizing updateLearners operations. */
+    private final Object updateLearnersLock = new Object();
+
     /**
      * Future that resolves after the node has been validated on the CMG 
leader.
      */
@@ -184,6 +192,8 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
 
     private final NodeProperties nodeProperties;
 
+    private final Consumer<RaftGroupConfiguration> 
onConfigurationCommittedListener;
+
     /** Constructor. */
     public ClusterManagementGroupManager(
             VaultManager vault,
@@ -200,6 +210,43 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
             RaftGroupOptionsConfigurer raftGroupOptionsConfigurer,
             MetricManager metricManager,
             NodeProperties nodeProperties
+    ) {
+        this(
+                vault,
+                clusterResetStorage,
+                clusterService,
+                clusterInitializer,
+                raftManager,
+                clusterStateStorageMgr,
+                logicalTopology,
+                validationManager,
+                nodeAttributes,
+                failureProcessor,
+                clusterIdStore,
+                raftGroupOptionsConfigurer,
+                metricManager,
+                nodeProperties,
+                config -> {}
+        );
+    }
+
+    /** Constructor. */
+    public ClusterManagementGroupManager(
+            VaultManager vault,
+            ClusterResetStorage clusterResetStorage,
+            ClusterService clusterService,
+            ClusterInitializer clusterInitializer,
+            RaftManager raftManager,
+            ClusterStateStorageManager clusterStateStorageMgr,
+            LogicalTopology logicalTopology,
+            ValidationManager validationManager,
+            NodeAttributes nodeAttributes,
+            FailureProcessor failureProcessor,
+            ClusterIdStore clusterIdStore,
+            RaftGroupOptionsConfigurer raftGroupOptionsConfigurer,
+            MetricManager metricManager,
+            NodeProperties nodeProperties,
+            Consumer<RaftGroupConfiguration> onConfigurationCommittedListener
     ) {
         this.clusterResetStorage = clusterResetStorage;
         this.clusterService = clusterService;
@@ -235,6 +282,7 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
         
clusterService.messagingService().addMessageHandler(CmgMessageGroup.class, 
message -> scheduledExecutor, cmgMessageHandler);
 
         this.nodeProperties = nodeProperties;
+        this.onConfigurationCommittedListener = 
onConfigurationCommittedListener;
     }
 
     private CmgMessageHandler createMessageHandler() {
@@ -323,7 +371,8 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
             ClusterIdStore clusterIdStore,
             RaftGroupOptionsConfigurer raftGroupOptionsConfigurer,
             MetricManager metricManager,
-            NodeProperties nodeProperties
+            NodeProperties nodeProperties,
+            Consumer<RaftGroupConfiguration> onConfigurationCommittedListener
     ) {
         this(
                 vault,
@@ -339,7 +388,8 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
                 clusterIdStore,
                 raftGroupOptionsConfigurer,
                 metricManager,
-                nodeProperties
+                nodeProperties,
+                onConfigurationCommittedListener
         );
     }
 
@@ -682,7 +732,7 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
                         .thenAccept(state -> 
initialClusterConfigurationFuture.complete(state.initialClusterConfiguration()));
 
                 updateLogicalTopology(service)
-                        .thenCompose(v -> inBusyLock(() -> 
service.updateLearners(term)))
+                        .thenCompose(v -> inBusyLock(() -> 
updateLearnersSerially(service, term, false)))
                         .thenAccept(v -> inBusyLock(() -> {
                             // Register a listener to send ClusterState 
messages to new nodes.
                             TopologyService topologyService = 
clusterService.topologyService();
@@ -931,7 +981,8 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
                             validationManager,
                             this::onLogicalTopologyChanged,
                             clusterIdStore,
-                            failureProcessor
+                            failureProcessor,
+                            onConfigurationCommittedListener
                     ),
                     this::onElectedAsLeader,
                     null,
@@ -950,23 +1001,49 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
         return new RaftNodeId(CmgGroupId.INSTANCE, serverPeer);
     }
 
+    /**
+     * Serializes calls to updateLearners to prevent race conditions between 
multiple topology changes
+     * and leader election callbacks.
+     *
+     * @param service CMG Raft service.
+     * @param term RAFT term.
+     * @param checkLeadership Whether to check leadership before updating 
learners.
+     */
+    private CompletableFuture<Void> updateLearnersSerially(CmgRaftService 
service, long term, boolean checkLeadership) {
+        synchronized (updateLearnersLock) {
+            currentUpdateLearners = currentUpdateLearners
+                    .thenCompose(v -> {
+                        if (checkLeadership) {
+                            return 
service.isCurrentNodeLeader().thenCompose(isLeader -> {
+                                if (!isLeader) {
+                                    return nullCompletedFuture();
+                                }
+                                return service.updateLearners(term);
+                            });
+                        } else {
+                            return service.updateLearners(term);
+                        }
+                    })
+                    .exceptionally(e -> {
+                        LOG.warn("Failed to update learners for term {}", e, 
term);
+
+                        return null;
+                    });
+            return currentUpdateLearners;
+        }
+    }
+
     private void onLogicalTopologyChanged(long term) {
         // We don't do it under lock to avoid deadlocks during node restart.
-
         CompletableFuture<CmgRaftService> serviceFuture = raftService;
 
         // If the future is not here yet, this means we are still starting, so 
learners will be updated after start
         // (if we happen to become a leader).
-
-        if (serviceFuture != null) {
-            serviceFuture.thenCompose(service -> 
service.isCurrentNodeLeader().thenCompose(isLeader -> {
-                if (!isLeader) {
-                    return nullCompletedFuture();
-                }
-
-                return service.updateLearners(term);
-            }));
+        if (serviceFuture == null) {
+            return;
         }
+
+        serviceFuture.thenCompose(service -> updateLearnersSerially(service, 
term, true));
     }
 
     /**
@@ -1191,6 +1268,23 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
         }
     }
 
+    /**
+     * Returns a future that, when complete, resolves into a list of learner 
node names in the CMG.
+     */
+    @TestOnly
+    public CompletableFuture<Set<String>> learnerNodes() {
+        if (!busyLock.enterBusy()) {
+            return failedFuture(new NodeStoppingException());
+        }
+
+        try {
+            return raftServiceAfterJoin()
+                    .thenCompose(CmgRaftService::learners);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
     /**
      * Returns a future that, when complete, resolves into a logical topology 
snapshot.
      *
diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
index 02211d137f4..04de4a5f20f 100644
--- 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
+++ 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
@@ -56,6 +56,7 @@ import org.apache.ignite.internal.failure.FailureProcessor;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftGroupConfiguration;
 import org.apache.ignite.internal.raft.ReadCommand;
 import org.apache.ignite.internal.raft.WriteCommand;
 import org.apache.ignite.internal.raft.service.CommandClosure;
@@ -87,6 +88,8 @@ public class CmgRaftGroupListener implements 
RaftGroupListener {
 
     private final CmgMessagesFactory cmgMessagesFactory = new 
CmgMessagesFactory();
 
+    private final Consumer<RaftGroupConfiguration> 
onConfigurationCommittedListener;
+
     /**
      * Creates a new instance.
      *
@@ -103,7 +106,8 @@ public class CmgRaftGroupListener implements 
RaftGroupListener {
             ValidationManager validationManager,
             LongConsumer onLogicalTopologyChanged,
             ClusterIdStore clusterIdStore,
-            FailureProcessor failureProcessor
+            FailureProcessor failureProcessor,
+            Consumer<RaftGroupConfiguration> onConfigurationCommittedListener
     ) {
         this.storageManager = storageManager;
         this.logicalTopology = logicalTopology;
@@ -111,6 +115,12 @@ public class CmgRaftGroupListener implements 
RaftGroupListener {
         this.onLogicalTopologyChanged = onLogicalTopologyChanged;
         this.clusterIdStore = clusterIdStore;
         this.failureProcessor = failureProcessor;
+        this.onConfigurationCommittedListener = 
onConfigurationCommittedListener;
+    }
+
+    @Override
+    public void onConfigurationCommitted(RaftGroupConfiguration config, long 
lastAppliedIndex, long lastAppliedTerm) {
+        onConfigurationCommittedListener.accept(config);
     }
 
     @Override
diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
index eb44f7c83da..8f17d80d6e1 100644
--- 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
+++ 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
@@ -281,6 +281,21 @@ public class CmgRaftService implements ManuallyCloseable {
                 .build();
     }
 
+    /**
+     * Returns a known set of consistent IDs of the learners nodes of the CMG.
+     */
+    public CompletableFuture<Set<String>> learners() {
+        List<Peer> currentLearners = raftService.learners();
+
+        if (currentLearners == null) {
+            return raftService.refreshMembers(true).thenCompose(v -> 
learners());
+        }
+
+        return completedFuture(currentLearners.stream()
+                .map(Peer::consistentId)
+                .collect(toSet()));
+    }
+
     /**
      * Issues {@code changePeersAndLearnersAsync} request with same peers; 
learners are recalculated based on the current peers (which is
      * same as CMG nodes) and known logical topology. Any node in the logical 
topology that is not a CMG node constitutes a learner.
diff --git 
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java
 
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java
index 868ba1b42a0..f075367efc1 100644
--- 
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java
+++ 
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java
@@ -113,7 +113,8 @@ public class CmgRaftGroupListenerTest extends 
BaseIgniteAbstractTest {
                 validationManager,
                 onLogicalTopologyChanged,
                 clusterIdHolder,
-                new NoOpFailureManager()
+                new NoOpFailureManager(),
+                config -> {}
         );
     }
 
diff --git 
a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/BaseItClusterManagementTest.java
 
b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/BaseItClusterManagementTest.java
index 852caccf113..f2b3dbd73f5 100644
--- 
a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/BaseItClusterManagementTest.java
+++ 
b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/BaseItClusterManagementTest.java
@@ -23,12 +23,15 @@ import static 
org.apache.ignite.internal.util.IgniteUtils.closeAll;
 
 import java.util.Collection;
 import java.util.List;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
 import java.util.stream.IntStream;
 import 
org.apache.ignite.internal.cluster.management.configuration.NodeAttributesConfiguration;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.network.NodeFinder;
 import org.apache.ignite.internal.network.StaticNodeFinder;
+import org.apache.ignite.internal.raft.RaftGroupConfiguration;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.storage.configurations.StorageConfiguration;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
@@ -44,7 +47,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
 public abstract class BaseItClusterManagementTest extends IgniteAbstractTest {
     private static final int PORT_BASE = 10000;
 
-    @InjectConfiguration
+    @InjectConfiguration("mock.retryTimeoutMillis = 60000")
     private static RaftConfiguration raftConfiguration;
 
     @InjectConfiguration
@@ -61,6 +64,10 @@ public abstract class BaseItClusterManagementTest extends 
IgniteAbstractTest {
     }
 
     protected List<MockNode> createNodes(int numNodes) {
+        return createNodes(numNodes, (i, config) -> {});
+    }
+
+    protected List<MockNode> createNodes(int numNodes, BiConsumer<Integer, 
RaftGroupConfiguration> onConfigurationCommittedListener) {
         List<NetworkAddress> seedAddresses = createSeedAddresses(numNodes);
         NodeFinder nodeFinder = new StaticNodeFinder(seedAddresses);
 
@@ -72,8 +79,8 @@ public abstract class BaseItClusterManagementTest extends 
IgniteAbstractTest {
                         workDir,
                         raftConfiguration,
                         userNodeAttributes,
-                        storageConfiguration
-
+                        storageConfiguration,
+                        config -> onConfigurationCommittedListener.accept(i, 
config)
                 ))
                 .collect(toList());
     }
@@ -87,6 +94,10 @@ public abstract class BaseItClusterManagementTest extends 
IgniteAbstractTest {
     }
 
     protected MockNode createNode(int idx, int clusterSize) {
+        return createNode(idx, clusterSize, config -> {});
+    }
+
+    protected MockNode createNode(int idx, int clusterSize, 
Consumer<RaftGroupConfiguration> onConfigurationCommittedListener) {
         return new MockNode(
                 testInfo,
                 new NetworkAddress("localhost", PORT_BASE + idx),
@@ -94,7 +105,8 @@ public abstract class BaseItClusterManagementTest extends 
IgniteAbstractTest {
                 workDir,
                 raftConfiguration,
                 userNodeAttributes,
-                storageConfiguration
+                storageConfiguration,
+                onConfigurationCommittedListener
         );
     }
 
diff --git 
a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
 
b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
index 8e675acec16..d22014107cf 100644
--- 
a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
+++ 
b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
@@ -30,6 +30,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
 import 
org.apache.ignite.internal.cluster.management.configuration.NodeAttributesConfiguration;
 import 
org.apache.ignite.internal.cluster.management.raft.RocksDbClusterStateStorage;
 import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
@@ -48,6 +49,7 @@ import org.apache.ignite.internal.metrics.NoOpMetricManager;
 import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.NodeFinder;
 import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
+import org.apache.ignite.internal.raft.RaftGroupConfiguration;
 import org.apache.ignite.internal.raft.RaftGroupOptionsConfigurer;
 import org.apache.ignite.internal.raft.TestLozaFactory;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
@@ -86,7 +88,8 @@ public class MockNode {
             Path workDir,
             RaftConfiguration raftConfiguration,
             NodeAttributesConfiguration nodeAttributes,
-            StorageConfiguration storageProfilesConfiguration
+            StorageConfiguration storageProfilesConfiguration,
+            Consumer<RaftGroupConfiguration> onConfigurationCommittedListener
     ) {
         String nodeName = testNodeName(testInfo, addr.port());
 
@@ -146,7 +149,8 @@ public class MockNode {
                 clusterIdHolder,
                 cmgRaftConfigurer,
                 new NoOpMetricManager(),
-                () -> colocationEnabled
+                () -> colocationEnabled,
+                onConfigurationCommittedListener
         );
 
         components = List.of(
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 7ad0f403ea6..c23b6c1ba16 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -3007,6 +3007,12 @@ public class NodeImpl implements Node, RaftServerService 
{
         }
     }
 
+    private @Nullable String getLeaderConsistentId(){
+        PeerId leaderId = getLeaderId();
+
+        return leaderId == null ? null : leaderId.getConsistentId();
+    }
+
     @Override
     public String getGroupId() {
         return this.groupId;
@@ -3505,7 +3511,7 @@ public class NodeImpl implements Node, RaftServerService {
         this.readLock.lock();
         try {
             if (this.state != State.STATE_LEADER) {
-                throw new NotLeaderException();
+                throw new NotLeaderException(getLeaderConsistentId());
             }
             return this.conf.getConf().listPeers();
         }
@@ -3519,7 +3525,7 @@ public class NodeImpl implements Node, RaftServerService {
         this.readLock.lock();
         try {
             if (this.state != State.STATE_LEADER) {
-                throw new NotLeaderException();
+                throw new NotLeaderException(getLeaderConsistentId());
             }
             return getAliveNodes(this.conf.getConf().getPeers(), 
Utils.monotonicMs());
         }
@@ -3533,7 +3539,7 @@ public class NodeImpl implements Node, RaftServerService {
         this.readLock.lock();
         try {
             if (this.state != State.STATE_LEADER) {
-                throw new NotLeaderException();
+                throw new NotLeaderException(getLeaderConsistentId());
             }
             return this.conf.getConf().listLearners();
         }
@@ -3547,7 +3553,7 @@ public class NodeImpl implements Node, RaftServerService {
         this.readLock.lock();
         try {
             if (this.state != State.STATE_LEADER) {
-                throw new NotLeaderException();
+                throw new NotLeaderException(getLeaderConsistentId());
             }
             return getAliveNodes(this.conf.getConf().getLearners(), 
Utils.monotonicMs());
         }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NotLeaderException.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NotLeaderException.java
index 86db9818d31..5254540e2d9 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NotLeaderException.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NotLeaderException.java
@@ -16,13 +16,22 @@
  */
 package org.apache.ignite.raft.jraft.core;
 
+import org.jetbrains.annotations.Nullable;
+
 /**
  * Thrown when a Raft node is asked to perform an action that is only allowed 
for a leader, when the node is not a leader.
  */
 public class NotLeaderException extends IllegalStateException {
     private static final long serialVersionUID = 0L;
 
-    public NotLeaderException() {
+    private final String leaderId;
+
+    NotLeaderException(@Nullable String leaderId) {
         super("Not leader");
+        this.leaderId = leaderId;
+    }
+
+    public @Nullable String leaderId() {
+        return leaderId;
     }
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequestProcessor.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequestProcessor.java
index 0b91b4d8f9b..4353fcc4ea3 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequestProcessor.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequestProcessor.java
@@ -21,6 +21,7 @@ import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.raft.jraft.RaftMessagesFactory;
 import org.apache.ignite.raft.jraft.core.NotLeaderException;
+import org.apache.ignite.raft.jraft.error.RaftError;
 
 /**
  * Abstract AsyncUserProcessor for RPC processors.
@@ -52,21 +53,16 @@ public abstract class RpcRequestProcessor<T extends 
Message> implements RpcProce
             if (msg != null) {
                 rpcCtx.sendResponse(msg);
             }
+        } catch (NotLeaderException t) {
+            // It is ok if we lost leadership while a request to us was in 
flight, there is no need to clutter up the log.
+            LOG.debug("handleRequest {} failed", t, request);
+            rpcCtx.sendResponse(RaftRpcFactory.DEFAULT
+                .newResponse(t.leaderId(), msgFactory, RaftError.EPERM, 
t.getMessage()));
+        } catch (Throwable t) {
+            LOG.error("handleRequest {} failed", t, request);
+            rpcCtx.sendResponse(RaftRpcFactory.DEFAULT
+                .newResponse(msgFactory, RaftError.UNKNOWN.getNumber(), 
"handleRequest internal error"));
         }
-        catch (final Throwable t) {
-            if (isIgnorable(t)) {
-                LOG.debug("handleRequest {} failed", t, request);
-            } else {
-                LOG.error("handleRequest {} failed", t, request);
-            }
-            rpcCtx.sendResponse(RaftRpcFactory.DEFAULT //
-                .newResponse(msgFactory, -1, "handleRequest internal error"));
-        }
-    }
-
-    private static boolean isIgnorable(Throwable t) {
-        // It is ok if we lost leadership while a request to us was in flight, 
there is no need to clutter up the log.
-        return t instanceof NotLeaderException;
     }
 
     @Override
diff --git 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
index 77d5ff30242..de49d2aaba5 100644
--- 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
+++ 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
@@ -28,7 +28,6 @@ import static 
org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
 import static 
org.apache.ignite.internal.lang.IgniteSystemProperties.colocationEnabled;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
-import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
 import static org.apache.ignite.internal.util.CollectionUtils.setListAtIndex;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -448,7 +447,7 @@ public class Cluster {
      */
     public Ignite startNode(int index, String nodeBootstrapConfigTemplate) {
         ServerRegistration registration = startEmbeddedNode(index, 
nodeBootstrapConfigTemplate);
-        assertThat("nodeIndex=" + index, registration.registrationFuture(), 
willSucceedIn(20, SECONDS));
+        assertThat("nodeIndex=" + index, registration.registrationFuture(), 
willCompleteSuccessfully());
         Ignite newIgniteNode = registration.server().api();
 
         assertEquals(newIgniteNode, nodes.get(index));
diff --git 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
index 2e27670b739..23a29daf917 100644
--- 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
+++ 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
@@ -187,9 +187,7 @@ public class ItDisasterRecoveryReconfigurationTest extends 
ClusterPerTestIntegra
 
         ZoneParams zoneParams = testMethod.getAnnotation(ZoneParams.class);
 
-        IntStream.range(INITIAL_NODES, zoneParams.nodes()).forEach(i -> 
cluster.startNode(i));
-        // TODO: IGNITE-22818 Fails with "Race operations took too long"
-        // startNodesInParallel(IntStream.range(INITIAL_NODES, 
zoneParams.nodes()).toArray());
+        startNodesInParallel(IntStream.range(INITIAL_NODES, 
zoneParams.nodes()).toArray());
 
         executeSql(format("CREATE ZONE %s (replicas %d, partitions %d, "
                         + "auto scale down %d, auto scale up %d, consistency 
mode '%s') storage profiles ['%s']",


Reply via email to