This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 6bc4dccdfe0 IGNITE-22818 Fix cmg reordering (#6408)
6bc4dccdfe0 is described below
commit 6bc4dccdfe0ff99815fddd135b91b62173d8b1f3
Author: Cyrill <[email protected]>
AuthorDate: Wed Aug 20 18:54:07 2025 +0300
IGNITE-22818 Fix cmg reordering (#6408)
---
.../cluster/management/ItClusterManagerTest.java | 198 ++++++++++++++++++++-
.../management/raft/ItCmgRaftServiceTest.java | 3 +-
.../management/ClusterManagementGroupManager.java | 122 +++++++++++--
.../management/raft/CmgRaftGroupListener.java | 12 +-
.../cluster/management/raft/CmgRaftService.java | 15 ++
.../management/raft/CmgRaftGroupListenerTest.java | 3 +-
.../management/BaseItClusterManagementTest.java | 20 ++-
.../internal/cluster/management/MockNode.java | 8 +-
.../apache/ignite/raft/jraft/core/NodeImpl.java | 14 +-
.../ignite/raft/jraft/core/NotLeaderException.java | 11 +-
.../ignite/raft/jraft/rpc/RpcRequestProcessor.java | 24 ++-
.../java/org/apache/ignite/internal/Cluster.java | 3 +-
.../ItDisasterRecoveryReconfigurationTest.java | 4 +-
13 files changed, 384 insertions(+), 53 deletions(-)
diff --git
a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java
b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java
index 4c27b1f22e5..5eb22d21de5 100644
---
a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java
+++
b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.cluster.management;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.Collectors.toList;
import static
org.apache.ignite.internal.lang.IgniteSystemProperties.COLOCATION_FEATURE_FLAG;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
@@ -34,15 +36,27 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+import java.util.function.BiPredicate;
+import java.util.function.Consumer;
import org.apache.ignite.internal.cluster.management.raft.JoinDeniedException;
import
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.network.DefaultMessagingService;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.raft.RaftGroupConfiguration;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.raft.jraft.rpc.CliRequests.ResetLearnersRequest;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -74,20 +88,28 @@ public class ItClusterManagerTest extends
BaseItClusterManagementTest {
}
}
- private void startCluster(int numNodes) {
- cluster.addAll(createNodes(numNodes));
+ private void startCluster(int numNodes, BiConsumer<Integer,
RaftGroupConfiguration> onConfigurationCommittedListener) {
+ cluster.addAll(createNodes(numNodes,
onConfigurationCommittedListener));
cluster.parallelStream().forEach(MockNode::startAndJoin);
}
- private void startNode(int idx, int clusterSize) {
- MockNode node = createNode(idx, clusterSize);
+ private void startCluster(int numNodes) {
+ startCluster(numNodes, (i, config) -> {});
+ }
+
+ private void startNode(int idx, int clusterSize,
Consumer<RaftGroupConfiguration> onConfigurationCommittedListener) {
+ MockNode node = createNode(idx, clusterSize,
onConfigurationCommittedListener);
cluster.add(node);
node.startAndJoin();
}
+ private void startNode(int idx, int clusterSize) {
+ startNode(idx, clusterSize, config -> {});
+ }
+
private void stopCluster() throws Exception {
stopNodes(cluster);
@@ -290,6 +312,170 @@ public class ItClusterManagerTest extends
BaseItClusterManagementTest {
);
}
+ @Test
+ void testNoConfigurationReordering() throws Exception {
+ // Here we will be storing the raft configuration history for all
nodes.
+ Map<Integer, List<RaftGroupConfiguration>> configs = new
ConcurrentHashMap<>();
+
+ startCluster(5, (i, config) ->
+ configs.computeIfAbsent(i, k -> new
CopyOnWriteArrayList<>()).add(config)
+ );
+
+ ClusterManagementGroupManager clusterManager =
cluster.get(0).clusterManager();
+
+ List<String> votingNodes =
cluster.stream().map(MockNode::name).limit(3).collect(toList());
+
+ assertThat(
+ clusterManager.initClusterAsync(votingNodes, List.of(),
"cluster"),
+ willCompleteSuccessfully()
+ );
+
+ for (MockNode node : cluster) {
+ assertThat(node.clusterManager().joinFuture(),
willCompleteSuccessfully());
+ }
+
+ // Wait for the initial cluster reconfiguration to complete.
+ assertLearnerSize(2);
+
+ // Same as above, but check that all 5 nodes see the same number of
learners (which is 2 actually).
+ assertTrue(waitForCondition(() ->
+ configs.size() == 5 && configs.values().stream()
+ .map(list -> list.get(list.size() - 1))
+ .mapToInt(raftGroupConfiguration ->
raftGroupConfiguration.learners().size())
+ .allMatch(size -> size == 2),
+ 30_000
+ ));
+
+ String node3Name = cluster.get(3).name();
+
+ AtomicBoolean blockMessage = new AtomicBoolean(true);
+
+ // Block the first reconfiguration to simulate network issues.
+ // We stop node 4, that should produce a ResetLearnersRequest with
only one learner - node 3.
+ blockMessage((recipientName, networkMessage) -> {
+ if (!blockMessage.get()) {
+ return false;
+ }
+
+ if (networkMessage instanceof ResetLearnersRequest) {
+ ResetLearnersRequest rlr = (ResetLearnersRequest)
networkMessage;
+
+ if (rlr.learnersList().contains(node3Name) &&
rlr.learnersList().size() == 1) {
+ logger().info("Block message {} to {}", networkMessage,
recipientName);
+ return true;
+ }
+ }
+
+ return false;
+ });
+
+ logger().info("Stop the node [4].");
+ MockNode node4 = cluster.remove(cluster.size() - 1);
+ stopNodes(List.of(node4));
+
+ logger().info("Stop the node [3].");
+ MockNode node3 = cluster.remove(cluster.size() - 1);
+ stopNodes(List.of(node3));
+
+ // There should be still two learner nodes since the previous
reconfiguration was blocked.
+ assertLearnerSize(2);
+
+ // The configs will be properly updated when new 3 and 4 are started,
so remove the history for the stopped nodes.
+ configs.remove(3);
+ configs.remove(4);
+
+ // Start nodes 3 and 4 back, so that the topology is back to normal
and no node availability issues are expected.
+ logger().info("Start nodes [3] and [4].");
+ // Start node 4 first to avoid clashing with the earlier blocked
message (as we get ResetLearnersRequest(3)
+ // both when we move from [3, 4] to [3] and when we move from [] to
[3]).
+ startNode(4, 5, config ->
+ configs.computeIfAbsent(4, k -> new
CopyOnWriteArrayList<>()).add(config)
+ );
+ startNode(3, 5, config ->
+ configs.computeIfAbsent(3, k -> new
CopyOnWriteArrayList<>()).add(config)
+ );
+
+ // Wait for the nodes 3 and 4 to start.
+ for (MockNode node : cluster) {
+ assertThat(node.clusterManager().joinFuture(),
willCompleteSuccessfully());
+ }
+
+ logger().info("Nodes started.");
+ assertLearnerSize(2);
+
+ // Unblock the first reconfiguration.
+ logger().info("Unblock message.");
+ blockMessage.set(false);
+
+ // Now we need to wait for the reconfiguration to complete.
+ // To check it we will look through the raft configuration history and
verify that all nodes have same transition history
+ // with regards to the learner nodes: [] -> [3] -> [3, 4] -> [4] ->
[3, 4].
+ // Basically should be enough to check learners size only.
+ int[] counts = {0, 1, 2, 1, 2};
+ assertTrue(waitForCondition(() ->
+ configs.values().stream()
+ .allMatch(transition ->
checkLearnerTransitionsCorrect(transition, counts)),
+ 30_000
+ ));
+ }
+
+ /**
+ * Checks proper configuration history.
+ *
+ * @param configs Node configuration transitions history.
+ * @param counts Expected number of learner nodes for each transition.
+ * @return {@code true} if the history is correct, {@code false} otherwise.
+ */
+ private static boolean
checkLearnerTransitionsCorrect(List<RaftGroupConfiguration> configs, int[]
counts) {
+ if (configs.size() != counts.length) {
+ return false;
+ }
+
+ for (int i = 0; i < configs.size(); i++) {
+ if (configs.get(i).learners().size() != counts[i]) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ private void assertLearnerSize(int size) throws InterruptedException {
+ assertTrue(waitForCondition(() ->
+ cluster.stream()
+ .filter(node -> {
+ try {
+ return
node.clusterManager().isCmgLeader().get(10, SECONDS);
+ } catch (InterruptedException |
ExecutionException | TimeoutException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .mapToInt(node -> {
+ try {
+ return
node.clusterManager().learnerNodes().get(10, SECONDS).size();
+ } catch (InterruptedException |
ExecutionException | TimeoutException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .min().orElseThrow() == size,
+ 30_000
+ ));
+ }
+
+ private void blockMessage(BiPredicate<String, NetworkMessage> predicate) {
+ cluster.stream().map(node ->
node.clusterService().messagingService()).forEach(messagingService -> {
+ DefaultMessagingService dms = (DefaultMessagingService)
messagingService;
+
+ BiPredicate<String, NetworkMessage> oldPredicate =
dms.dropMessagesPredicate();
+
+ if (oldPredicate == null) {
+ dms.dropMessages(predicate);
+ } else {
+ dms.dropMessages(oldPredicate.or(predicate));
+ }
+ });
+ }
+
/**
* Tests a scenario, when every node in a cluster gets restarted.
*/
@@ -516,7 +702,7 @@ public class ItClusterManagerTest extends
BaseItClusterManagementTest {
private List<ClusterNode> currentPhysicalTopology() {
return cluster.stream()
.map(MockNode::localMember)
- .collect(Collectors.toList());
+ .collect(toList());
}
private static LogicalNode[] toLogicalNodes(List<ClusterNode>
clusterNodes) {
diff --git
a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
index 79032dbf754..e14411dc747 100644
---
a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
+++
b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
@@ -159,7 +159,8 @@ public class ItCmgRaftServiceTest extends
BaseIgniteAbstractTest {
new
ValidationManager(clusterStateStorageMgr, logicalTopology),
term -> {},
new ClusterIdHolder(),
- new NoOpFailureManager()
+ new NoOpFailureManager(),
+ config -> {}
),
RaftGroupEventsListener.noopLsnr,
null,
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index 6e6f3a917b3..59b95e6162c 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -40,6 +40,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import
org.apache.ignite.internal.cluster.management.LocalStateStorage.LocalState;
@@ -93,6 +94,7 @@ import org.apache.ignite.internal.network.TopologyService;
import org.apache.ignite.internal.properties.IgniteProductVersion;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.PeersAndLearners;
+import org.apache.ignite.internal.raft.RaftGroupConfiguration;
import org.apache.ignite.internal.raft.RaftGroupOptionsConfigurer;
import org.apache.ignite.internal.raft.RaftManager;
import org.apache.ignite.internal.raft.RaftNodeId;
@@ -132,6 +134,12 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
/** Lock for the {@code raftService} field. */
private final Object raftServiceLock = new Object();
+ /** Current updateLearners operation to ensure linearization of updates. */
+ private CompletableFuture<Void> currentUpdateLearners =
nullCompletedFuture();
+
+ /** Lock for linearizing updateLearners operations. */
+ private final Object updateLearnersLock = new Object();
+
/**
* Future that resolves after the node has been validated on the CMG
leader.
*/
@@ -184,6 +192,8 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
private final NodeProperties nodeProperties;
+ private final Consumer<RaftGroupConfiguration>
onConfigurationCommittedListener;
+
/** Constructor. */
public ClusterManagementGroupManager(
VaultManager vault,
@@ -200,6 +210,43 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
RaftGroupOptionsConfigurer raftGroupOptionsConfigurer,
MetricManager metricManager,
NodeProperties nodeProperties
+ ) {
+ this(
+ vault,
+ clusterResetStorage,
+ clusterService,
+ clusterInitializer,
+ raftManager,
+ clusterStateStorageMgr,
+ logicalTopology,
+ validationManager,
+ nodeAttributes,
+ failureProcessor,
+ clusterIdStore,
+ raftGroupOptionsConfigurer,
+ metricManager,
+ nodeProperties,
+ config -> {}
+ );
+ }
+
+ /** Constructor. */
+ public ClusterManagementGroupManager(
+ VaultManager vault,
+ ClusterResetStorage clusterResetStorage,
+ ClusterService clusterService,
+ ClusterInitializer clusterInitializer,
+ RaftManager raftManager,
+ ClusterStateStorageManager clusterStateStorageMgr,
+ LogicalTopology logicalTopology,
+ ValidationManager validationManager,
+ NodeAttributes nodeAttributes,
+ FailureProcessor failureProcessor,
+ ClusterIdStore clusterIdStore,
+ RaftGroupOptionsConfigurer raftGroupOptionsConfigurer,
+ MetricManager metricManager,
+ NodeProperties nodeProperties,
+ Consumer<RaftGroupConfiguration> onConfigurationCommittedListener
) {
this.clusterResetStorage = clusterResetStorage;
this.clusterService = clusterService;
@@ -235,6 +282,7 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
clusterService.messagingService().addMessageHandler(CmgMessageGroup.class,
message -> scheduledExecutor, cmgMessageHandler);
this.nodeProperties = nodeProperties;
+ this.onConfigurationCommittedListener =
onConfigurationCommittedListener;
}
private CmgMessageHandler createMessageHandler() {
@@ -323,7 +371,8 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
ClusterIdStore clusterIdStore,
RaftGroupOptionsConfigurer raftGroupOptionsConfigurer,
MetricManager metricManager,
- NodeProperties nodeProperties
+ NodeProperties nodeProperties,
+ Consumer<RaftGroupConfiguration> onConfigurationCommittedListener
) {
this(
vault,
@@ -339,7 +388,8 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
clusterIdStore,
raftGroupOptionsConfigurer,
metricManager,
- nodeProperties
+ nodeProperties,
+ onConfigurationCommittedListener
);
}
@@ -682,7 +732,7 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
.thenAccept(state ->
initialClusterConfigurationFuture.complete(state.initialClusterConfiguration()));
updateLogicalTopology(service)
- .thenCompose(v -> inBusyLock(() ->
service.updateLearners(term)))
+ .thenCompose(v -> inBusyLock(() ->
updateLearnersSerially(service, term, false)))
.thenAccept(v -> inBusyLock(() -> {
// Register a listener to send ClusterState
messages to new nodes.
TopologyService topologyService =
clusterService.topologyService();
@@ -931,7 +981,8 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
validationManager,
this::onLogicalTopologyChanged,
clusterIdStore,
- failureProcessor
+ failureProcessor,
+ onConfigurationCommittedListener
),
this::onElectedAsLeader,
null,
@@ -950,23 +1001,49 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
return new RaftNodeId(CmgGroupId.INSTANCE, serverPeer);
}
+ /**
+ * Serializes calls to updateLearners to prevent race conditions between
multiple topology changes
+ * and leader election callbacks.
+ *
+ * @param service CMG Raft service.
+ * @param term RAFT term.
+ * @param checkLeadership Whether to check leadership before updating
learners.
+ */
+ private CompletableFuture<Void> updateLearnersSerially(CmgRaftService
service, long term, boolean checkLeadership) {
+ synchronized (updateLearnersLock) {
+ currentUpdateLearners = currentUpdateLearners
+ .thenCompose(v -> {
+ if (checkLeadership) {
+ return
service.isCurrentNodeLeader().thenCompose(isLeader -> {
+ if (!isLeader) {
+ return nullCompletedFuture();
+ }
+ return service.updateLearners(term);
+ });
+ } else {
+ return service.updateLearners(term);
+ }
+ })
+ .exceptionally(e -> {
+ LOG.warn("Failed to update learners for term {}", e,
term);
+
+ return null;
+ });
+ return currentUpdateLearners;
+ }
+ }
+
private void onLogicalTopologyChanged(long term) {
// We don't do it under lock to avoid deadlocks during node restart.
-
CompletableFuture<CmgRaftService> serviceFuture = raftService;
// If the future is not here yet, this means we are still starting, so
learners will be updated after start
// (if we happen to become a leader).
-
- if (serviceFuture != null) {
- serviceFuture.thenCompose(service ->
service.isCurrentNodeLeader().thenCompose(isLeader -> {
- if (!isLeader) {
- return nullCompletedFuture();
- }
-
- return service.updateLearners(term);
- }));
+ if (serviceFuture == null) {
+ return;
}
+
+ serviceFuture.thenCompose(service -> updateLearnersSerially(service,
term, true));
}
/**
@@ -1191,6 +1268,23 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
}
}
+ /**
+ * Returns a future that, when complete, resolves into a list of learner
node names in the CMG.
+ */
+ @TestOnly
+ public CompletableFuture<Set<String>> learnerNodes() {
+ if (!busyLock.enterBusy()) {
+ return failedFuture(new NodeStoppingException());
+ }
+
+ try {
+ return raftServiceAfterJoin()
+ .thenCompose(CmgRaftService::learners);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
/**
* Returns a future that, when complete, resolves into a logical topology
snapshot.
*
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
index 02211d137f4..04de4a5f20f 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
@@ -56,6 +56,7 @@ import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftGroupConfiguration;
import org.apache.ignite.internal.raft.ReadCommand;
import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.internal.raft.service.CommandClosure;
@@ -87,6 +88,8 @@ public class CmgRaftGroupListener implements
RaftGroupListener {
private final CmgMessagesFactory cmgMessagesFactory = new
CmgMessagesFactory();
+ private final Consumer<RaftGroupConfiguration>
onConfigurationCommittedListener;
+
/**
* Creates a new instance.
*
@@ -103,7 +106,8 @@ public class CmgRaftGroupListener implements
RaftGroupListener {
ValidationManager validationManager,
LongConsumer onLogicalTopologyChanged,
ClusterIdStore clusterIdStore,
- FailureProcessor failureProcessor
+ FailureProcessor failureProcessor,
+ Consumer<RaftGroupConfiguration> onConfigurationCommittedListener
) {
this.storageManager = storageManager;
this.logicalTopology = logicalTopology;
@@ -111,6 +115,12 @@ public class CmgRaftGroupListener implements
RaftGroupListener {
this.onLogicalTopologyChanged = onLogicalTopologyChanged;
this.clusterIdStore = clusterIdStore;
this.failureProcessor = failureProcessor;
+ this.onConfigurationCommittedListener =
onConfigurationCommittedListener;
+ }
+
+ @Override
+ public void onConfigurationCommitted(RaftGroupConfiguration config, long
lastAppliedIndex, long lastAppliedTerm) {
+ onConfigurationCommittedListener.accept(config);
}
@Override
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
index eb44f7c83da..8f17d80d6e1 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
@@ -281,6 +281,21 @@ public class CmgRaftService implements ManuallyCloseable {
.build();
}
+ /**
+ * Returns a known set of consistent IDs of the learners nodes of the CMG.
+ */
+ public CompletableFuture<Set<String>> learners() {
+ List<Peer> currentLearners = raftService.learners();
+
+ if (currentLearners == null) {
+ return raftService.refreshMembers(true).thenCompose(v ->
learners());
+ }
+
+ return completedFuture(currentLearners.stream()
+ .map(Peer::consistentId)
+ .collect(toSet()));
+ }
+
/**
* Issues {@code changePeersAndLearnersAsync} request with same peers;
learners are recalculated based on the current peers (which is
* same as CMG nodes) and known logical topology. Any node in the logical
topology that is not a CMG node constitutes a learner.
diff --git
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java
index 868ba1b42a0..f075367efc1 100644
---
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java
+++
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java
@@ -113,7 +113,8 @@ public class CmgRaftGroupListenerTest extends
BaseIgniteAbstractTest {
validationManager,
onLogicalTopologyChanged,
clusterIdHolder,
- new NoOpFailureManager()
+ new NoOpFailureManager(),
+ config -> {}
);
}
diff --git
a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/BaseItClusterManagementTest.java
b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/BaseItClusterManagementTest.java
index 852caccf113..f2b3dbd73f5 100644
---
a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/BaseItClusterManagementTest.java
+++
b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/BaseItClusterManagementTest.java
@@ -23,12 +23,15 @@ import static
org.apache.ignite.internal.util.IgniteUtils.closeAll;
import java.util.Collection;
import java.util.List;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
import java.util.stream.IntStream;
import
org.apache.ignite.internal.cluster.management.configuration.NodeAttributesConfiguration;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.network.NodeFinder;
import org.apache.ignite.internal.network.StaticNodeFinder;
+import org.apache.ignite.internal.raft.RaftGroupConfiguration;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.storage.configurations.StorageConfiguration;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
@@ -44,7 +47,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
public abstract class BaseItClusterManagementTest extends IgniteAbstractTest {
private static final int PORT_BASE = 10000;
- @InjectConfiguration
+ @InjectConfiguration("mock.retryTimeoutMillis = 60000")
private static RaftConfiguration raftConfiguration;
@InjectConfiguration
@@ -61,6 +64,10 @@ public abstract class BaseItClusterManagementTest extends
IgniteAbstractTest {
}
protected List<MockNode> createNodes(int numNodes) {
+ return createNodes(numNodes, (i, config) -> {});
+ }
+
+ protected List<MockNode> createNodes(int numNodes, BiConsumer<Integer,
RaftGroupConfiguration> onConfigurationCommittedListener) {
List<NetworkAddress> seedAddresses = createSeedAddresses(numNodes);
NodeFinder nodeFinder = new StaticNodeFinder(seedAddresses);
@@ -72,8 +79,8 @@ public abstract class BaseItClusterManagementTest extends
IgniteAbstractTest {
workDir,
raftConfiguration,
userNodeAttributes,
- storageConfiguration
-
+ storageConfiguration,
+ config -> onConfigurationCommittedListener.accept(i,
config)
))
.collect(toList());
}
@@ -87,6 +94,10 @@ public abstract class BaseItClusterManagementTest extends
IgniteAbstractTest {
}
protected MockNode createNode(int idx, int clusterSize) {
+ return createNode(idx, clusterSize, config -> {});
+ }
+
+ protected MockNode createNode(int idx, int clusterSize,
Consumer<RaftGroupConfiguration> onConfigurationCommittedListener) {
return new MockNode(
testInfo,
new NetworkAddress("localhost", PORT_BASE + idx),
@@ -94,7 +105,8 @@ public abstract class BaseItClusterManagementTest extends
IgniteAbstractTest {
workDir,
raftConfiguration,
userNodeAttributes,
- storageConfiguration
+ storageConfiguration,
+ onConfigurationCommittedListener
);
}
diff --git
a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
index 8e675acec16..d22014107cf 100644
---
a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
+++
b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
@@ -30,6 +30,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
import
org.apache.ignite.internal.cluster.management.configuration.NodeAttributesConfiguration;
import
org.apache.ignite.internal.cluster.management.raft.RocksDbClusterStateStorage;
import
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
@@ -48,6 +49,7 @@ import org.apache.ignite.internal.metrics.NoOpMetricManager;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.NodeFinder;
import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
+import org.apache.ignite.internal.raft.RaftGroupConfiguration;
import org.apache.ignite.internal.raft.RaftGroupOptionsConfigurer;
import org.apache.ignite.internal.raft.TestLozaFactory;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
@@ -86,7 +88,8 @@ public class MockNode {
Path workDir,
RaftConfiguration raftConfiguration,
NodeAttributesConfiguration nodeAttributes,
- StorageConfiguration storageProfilesConfiguration
+ StorageConfiguration storageProfilesConfiguration,
+ Consumer<RaftGroupConfiguration> onConfigurationCommittedListener
) {
String nodeName = testNodeName(testInfo, addr.port());
@@ -146,7 +149,8 @@ public class MockNode {
clusterIdHolder,
cmgRaftConfigurer,
new NoOpMetricManager(),
- () -> colocationEnabled
+ () -> colocationEnabled,
+ onConfigurationCommittedListener
);
components = List.of(
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 7ad0f403ea6..c23b6c1ba16 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -3007,6 +3007,12 @@ public class NodeImpl implements Node, RaftServerService
{
}
}
+ private @Nullable String getLeaderConsistentId(){
+ PeerId leaderId = getLeaderId();
+
+ return leaderId == null ? null : leaderId.getConsistentId();
+ }
+
@Override
public String getGroupId() {
return this.groupId;
@@ -3505,7 +3511,7 @@ public class NodeImpl implements Node, RaftServerService {
this.readLock.lock();
try {
if (this.state != State.STATE_LEADER) {
- throw new NotLeaderException();
+ throw new NotLeaderException(getLeaderConsistentId());
}
return this.conf.getConf().listPeers();
}
@@ -3519,7 +3525,7 @@ public class NodeImpl implements Node, RaftServerService {
this.readLock.lock();
try {
if (this.state != State.STATE_LEADER) {
- throw new NotLeaderException();
+ throw new NotLeaderException(getLeaderConsistentId());
}
return getAliveNodes(this.conf.getConf().getPeers(),
Utils.monotonicMs());
}
@@ -3533,7 +3539,7 @@ public class NodeImpl implements Node, RaftServerService {
this.readLock.lock();
try {
if (this.state != State.STATE_LEADER) {
- throw new NotLeaderException();
+ throw new NotLeaderException(getLeaderConsistentId());
}
return this.conf.getConf().listLearners();
}
@@ -3547,7 +3553,7 @@ public class NodeImpl implements Node, RaftServerService {
this.readLock.lock();
try {
if (this.state != State.STATE_LEADER) {
- throw new NotLeaderException();
+ throw new NotLeaderException(getLeaderConsistentId());
}
return getAliveNodes(this.conf.getConf().getLearners(),
Utils.monotonicMs());
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NotLeaderException.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NotLeaderException.java
index 86db9818d31..5254540e2d9 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NotLeaderException.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NotLeaderException.java
@@ -16,13 +16,22 @@
*/
package org.apache.ignite.raft.jraft.core;
+import org.jetbrains.annotations.Nullable;
+
/**
* Thrown when a Raft node is asked to perform an action that is only allowed
for a leader, when the node is not a leader.
*/
public class NotLeaderException extends IllegalStateException {
private static final long serialVersionUID = 0L;
- public NotLeaderException() {
+ private final String leaderId;
+
+ NotLeaderException(@Nullable String leaderId) {
super("Not leader");
+ this.leaderId = leaderId;
+ }
+
+ public @Nullable String leaderId() {
+ return leaderId;
}
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequestProcessor.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequestProcessor.java
index 0b91b4d8f9b..4353fcc4ea3 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequestProcessor.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequestProcessor.java
@@ -21,6 +21,7 @@ import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
import org.apache.ignite.raft.jraft.core.NotLeaderException;
+import org.apache.ignite.raft.jraft.error.RaftError;
/**
* Abstract AsyncUserProcessor for RPC processors.
@@ -52,21 +53,16 @@ public abstract class RpcRequestProcessor<T extends
Message> implements RpcProce
if (msg != null) {
rpcCtx.sendResponse(msg);
}
+ } catch (NotLeaderException t) {
+ // It is ok if we lost leadership while a request to us was in
flight, there is no need to clutter up the log.
+ LOG.debug("handleRequest {} failed", t, request);
+ rpcCtx.sendResponse(RaftRpcFactory.DEFAULT
+ .newResponse(t.leaderId(), msgFactory, RaftError.EPERM,
t.getMessage()));
+ } catch (Throwable t) {
+ LOG.error("handleRequest {} failed", t, request);
+ rpcCtx.sendResponse(RaftRpcFactory.DEFAULT
+ .newResponse(msgFactory, RaftError.UNKNOWN.getNumber(),
"handleRequest internal error"));
}
- catch (final Throwable t) {
- if (isIgnorable(t)) {
- LOG.debug("handleRequest {} failed", t, request);
- } else {
- LOG.error("handleRequest {} failed", t, request);
- }
- rpcCtx.sendResponse(RaftRpcFactory.DEFAULT //
- .newResponse(msgFactory, -1, "handleRequest internal error"));
- }
- }
-
- private static boolean isIgnorable(Throwable t) {
- // It is ok if we lost leadership while a request to us was in flight,
there is no need to clutter up the log.
- return t instanceof NotLeaderException;
}
@Override
diff --git
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
index 77d5ff30242..de49d2aaba5 100644
---
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
+++
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
@@ -28,7 +28,6 @@ import static
org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static
org.apache.ignite.internal.lang.IgniteSystemProperties.colocationEnabled;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
-import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
import static org.apache.ignite.internal.util.CollectionUtils.setListAtIndex;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -448,7 +447,7 @@ public class Cluster {
*/
public Ignite startNode(int index, String nodeBootstrapConfigTemplate) {
ServerRegistration registration = startEmbeddedNode(index,
nodeBootstrapConfigTemplate);
- assertThat("nodeIndex=" + index, registration.registrationFuture(),
willSucceedIn(20, SECONDS));
+ assertThat("nodeIndex=" + index, registration.registrationFuture(),
willCompleteSuccessfully());
Ignite newIgniteNode = registration.server().api();
assertEquals(newIgniteNode, nodes.get(index));
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
index 2e27670b739..23a29daf917 100644
---
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
@@ -187,9 +187,7 @@ public class ItDisasterRecoveryReconfigurationTest extends
ClusterPerTestIntegra
ZoneParams zoneParams = testMethod.getAnnotation(ZoneParams.class);
- IntStream.range(INITIAL_NODES, zoneParams.nodes()).forEach(i ->
cluster.startNode(i));
- // TODO: IGNITE-22818 Fails with "Race operations took too long"
- // startNodesInParallel(IntStream.range(INITIAL_NODES,
zoneParams.nodes()).toArray());
+ startNodesInParallel(IntStream.range(INITIAL_NODES,
zoneParams.nodes()).toArray());
executeSql(format("CREATE ZONE %s (replicas %d, partitions %d, "
+ "auto scale down %d, auto scale up %d, consistency
mode '%s') storage profiles ['%s']",