This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new a1512e298bc IGNITE-25849 Fix
testRaftLeaderChangedDuringAssignmentsQueueProcessing (#6318)
a1512e298bc is described below
commit a1512e298bc47400c1a320f63647885bca6b85b3
Author: Alexander Lapin <[email protected]>
AuthorDate: Wed Jul 30 13:12:12 2025 +0300
IGNITE-25849 Fix testRaftLeaderChangedDuringAssignmentsQueueProcessing
(#6318)
---
.../ItRebalanceByPendingAssignmentsQueueTest.java | 58 +++++++++++-----------
1 file changed, 30 insertions(+), 28 deletions(-)
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceByPendingAssignmentsQueueTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceByPendingAssignmentsQueueTest.java
index 5f5e49567f4..595f230c7bc 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceByPendingAssignmentsQueueTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceByPendingAssignmentsQueueTest.java
@@ -81,8 +81,6 @@ import
org.apache.ignite.internal.partitiondistribution.Assignment;
import org.apache.ignite.internal.partitiondistribution.Assignments;
import org.apache.ignite.internal.partitiondistribution.AssignmentsQueue;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
-import
org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory;
-import
org.apache.ignite.internal.placementdriver.message.StopLeaseProlongationMessage;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.PeersAndLearners;
import org.apache.ignite.internal.raft.RaftGroupEventsListener;
@@ -97,7 +95,6 @@ import org.apache.ignite.raft.jraft.Node;
import org.apache.ignite.raft.jraft.RaftGroupService;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.jetbrains.annotations.Nullable;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -289,7 +286,7 @@ class ItRebalanceByPendingAssignmentsQueueTest extends
ClusterPerTestIntegration
String restartNode = stableAssignments.stream()
.map(Assignment::consistentId).filter(name ->
!name.equals(leaseholder)).findFirst().orElseThrow();
- putPendingAssignments(raftLeader(TABLE_NAME), TABLE_NAME,
expectedPendingAssignmentsQueue);
+ putPendingAssignments(raftLeader(TABLE_NAME).leaderHost, TABLE_NAME,
expectedPendingAssignmentsQueue);
cluster.restartNode(cluster.nodeIndex(restartNode));
@@ -307,7 +304,6 @@ class ItRebalanceByPendingAssignmentsQueueTest extends
ClusterPerTestIntegration
}
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-25849")
void testRaftLeaderChangedDuringAssignmentsQueueProcessing() {
createZoneAndTable(4, 2);
@@ -317,14 +313,14 @@ class ItRebalanceByPendingAssignmentsQueueTest extends
ClusterPerTestIntegration
var leaderBefore = raftLeader(TABLE_NAME);
- putPendingAssignments(raftLeader(TABLE_NAME), TABLE_NAME,
expectedPendingAssignmentsQueue);
+ // TODO https://issues.apache.org/jira/browse/IGNITE-26023 leader
re-election and rebalance are not linearized.
+ putPendingAssignments(leaderBefore.leaderHost, TABLE_NAME,
expectedPendingAssignmentsQueue);
// most reliable way to change the leader in raft group is to stop and
restart the current one
- cancelLease(leaderBefore, TABLE_NAME);
- cluster.restartNode(cluster.nodeIndex(leaderBefore.name()));
+ cluster.restartNode(cluster.nodeIndex(leaderBefore.leaderHost.name()));
await().atMost(60, SECONDS).untilAsserted(() -> {
var leaderAfter = raftLeader(TABLE_NAME);
- assertThat(leaderAfter.name(), not(leaderBefore.name()));
+ assertTrue(leaderAfter.term > leaderBefore.term, "The leader has
not changed.");
var expected =
expectedPendingAssignmentsQueue.peekLast().nodes().stream()
.map(Assignment::consistentId).collect(toSet());
@@ -332,8 +328,8 @@ class ItRebalanceByPendingAssignmentsQueueTest extends
ClusterPerTestIntegration
.map(Assignment::consistentId).collect(toSet());
assertThat(stableNames, equalTo(expected));
- assertThat(stableNames, hasItem(leaderBefore.name()));
- assertThat(stableNames, hasItem(leaderAfter.name()));
+ assertThat(stableNames, hasItem(leaderBefore.leaderHost.name()));
+ assertThat(stableNames, hasItem(leaderAfter.leaderHost.name()));
});
}
@@ -374,7 +370,7 @@ class ItRebalanceByPendingAssignmentsQueueTest extends
ClusterPerTestIntegration
}
private Set<Assignment> stablePartitionAssignments(String tableName) {
- IgniteImpl ignite = raftLeader(tableName);
+ IgniteImpl ignite = raftLeader(tableName).leaderHost;
int zoneId = latestCatalog(ignite).zone(ZONE_NAME).id();
int tableId = latestCatalog(ignite).table(DEFAULT_SCHEMA_NAME,
TABLE_NAME).id();
// TODO https://issues.apache.org/jira/browse/IGNITE-22522
tableOrZoneId -> zoneId, remove.
@@ -389,7 +385,7 @@ class ItRebalanceByPendingAssignmentsQueueTest extends
ClusterPerTestIntegration
}
private @Nullable Assignments stablePartitionAssignmentsValue(String
tableName) {
- IgniteImpl ignite = raftLeader(tableName);
+ IgniteImpl ignite = raftLeader(tableName).leaderHost;
CompletableFuture<Entry> fut = ignite.metaStorageManager()
.get(stablePartitionAssignmentsKey(partitionGroupId(ignite,
tableName, 0)));
assertThat(fut, willCompleteSuccessfully());
@@ -398,7 +394,7 @@ class ItRebalanceByPendingAssignmentsQueueTest extends
ClusterPerTestIntegration
}
private @Nullable Assignments pendingAssignmentsValue(String tableName) {
- IgniteImpl ignite = raftLeader(tableName);
+ IgniteImpl ignite = raftLeader(tableName).leaderHost;
CompletableFuture<Entry> fut = ignite.metaStorageManager()
.get(pendingPartitionAssignmentsKey(partitionGroupId(ignite,
tableName, 0)));
assertThat(fut, willCompleteSuccessfully());
@@ -418,7 +414,7 @@ class ItRebalanceByPendingAssignmentsQueueTest extends
ClusterPerTestIntegration
}
private Set<String> dataNodes(String tableName) {
- IgniteImpl ignite = raftLeader(tableName);
+ IgniteImpl ignite = raftLeader(tableName).leaderHost;
return dataNodes(ignite);
}
@@ -445,7 +441,7 @@ class ItRebalanceByPendingAssignmentsQueueTest extends
ClusterPerTestIntegration
}
private ReplicaMeta primaryReplicaMeta(String tableName) {
- return primaryReplicaMeta(raftLeader(tableName), tableName);
+ return primaryReplicaMeta(raftLeader(tableName).leaderHost, tableName);
}
private ReplicaMeta primaryReplicaMeta(IgniteImpl ignite, String
tableName) {
@@ -457,7 +453,7 @@ class ItRebalanceByPendingAssignmentsQueueTest extends
ClusterPerTestIntegration
return fut.join();
}
- private IgniteImpl raftLeader(String tableName) {
+ private LeaderWithTerm raftLeader(String tableName) {
IgniteImpl ignite = unwrapIgniteImpl(cluster.aliveNode());
var raftNodeId = new RaftNodeId(partitionGroupId(ignite, tableName,
0), new Peer(ignite.name()));
var jraftServer = (JraftServerImpl) ignite.raftManager().server();
@@ -467,22 +463,17 @@ class ItRebalanceByPendingAssignmentsQueueTest extends
ClusterPerTestIntegration
.map(Node::getLeaderId)
.map(PeerId::getConsistentId)
.orElse(null);
- assertNotNull(consistentId);
- return unwrapIgniteImpl(cluster.node(cluster.nodeIndex(consistentId)));
- }
- private void cancelLease(IgniteImpl leaseholder, String tableName) {
- StopLeaseProlongationMessage msg = new PlacementDriverMessagesFactory()
- .stopLeaseProlongationMessage()
- .groupId(partitionGroupId(cluster.aliveNode(), tableName, 0))
- .build();
+ assertNotNull(consistentId);
- // Just sent it to all nodes to not determine the exact placement
driver active actor.
- runningNodes().forEach(node ->
leaseholder.sendFakeMessage(node.name(), msg));
+ return new LeaderWithTerm(
+
unwrapIgniteImpl(cluster.node(cluster.nodeIndex(consistentId))),
+ raftGroupService.getRaftNode().getCurrentTerm()
+ );
}
private Set<String> realAssignments(String tableName) {
- IgniteImpl ignite = raftLeader(tableName);
+ IgniteImpl ignite = raftLeader(tableName).leaderHost;
Catalog catalog = latestCatalog(ignite);
int zoneId = catalog.zone(ZONE_NAME).id();
int tableId = catalog.table(DEFAULT_SCHEMA_NAME, TABLE_NAME).id();
@@ -623,4 +614,15 @@ class ItRebalanceByPendingAssignmentsQueueTest extends
ClusterPerTestIntegration
ignite.metaStorageManager().unregisterWatch(this);
}
}
+
+ private static final class LeaderWithTerm {
+ private final IgniteImpl leaderHost;
+
+ private final long term;
+
+ private LeaderWithTerm(IgniteImpl leaderHost, long term) {
+ this.leaderHost = leaderHost;
+ this.term = term;
+ }
+ }
}