This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new a1512e298bc IGNITE-25849 Fix 
testRaftLeaderChangedDuringAssignmentsQueueProcessing (#6318)
a1512e298bc is described below

commit a1512e298bc47400c1a320f63647885bca6b85b3
Author: Alexander Lapin <[email protected]>
AuthorDate: Wed Jul 30 13:12:12 2025 +0300

    IGNITE-25849 Fix testRaftLeaderChangedDuringAssignmentsQueueProcessing 
(#6318)
---
 .../ItRebalanceByPendingAssignmentsQueueTest.java  | 58 +++++++++++-----------
 1 file changed, 30 insertions(+), 28 deletions(-)

diff --git 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceByPendingAssignmentsQueueTest.java
 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceByPendingAssignmentsQueueTest.java
index 5f5e49567f4..595f230c7bc 100644
--- 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceByPendingAssignmentsQueueTest.java
+++ 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceByPendingAssignmentsQueueTest.java
@@ -81,8 +81,6 @@ import 
org.apache.ignite.internal.partitiondistribution.Assignment;
 import org.apache.ignite.internal.partitiondistribution.Assignments;
 import org.apache.ignite.internal.partitiondistribution.AssignmentsQueue;
 import org.apache.ignite.internal.placementdriver.ReplicaMeta;
-import 
org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory;
-import 
org.apache.ignite.internal.placementdriver.message.StopLeaseProlongationMessage;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.PeersAndLearners;
 import org.apache.ignite.internal.raft.RaftGroupEventsListener;
@@ -97,7 +95,6 @@ import org.apache.ignite.raft.jraft.Node;
 import org.apache.ignite.raft.jraft.RaftGroupService;
 import org.apache.ignite.raft.jraft.entity.PeerId;
 import org.jetbrains.annotations.Nullable;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
@@ -289,7 +286,7 @@ class ItRebalanceByPendingAssignmentsQueueTest extends 
ClusterPerTestIntegration
         String restartNode = stableAssignments.stream()
                 .map(Assignment::consistentId).filter(name -> 
!name.equals(leaseholder)).findFirst().orElseThrow();
 
-        putPendingAssignments(raftLeader(TABLE_NAME), TABLE_NAME, 
expectedPendingAssignmentsQueue);
+        putPendingAssignments(raftLeader(TABLE_NAME).leaderHost, TABLE_NAME, 
expectedPendingAssignmentsQueue);
 
         cluster.restartNode(cluster.nodeIndex(restartNode));
 
@@ -307,7 +304,6 @@ class ItRebalanceByPendingAssignmentsQueueTest extends 
ClusterPerTestIntegration
     }
 
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-25849";)
     void testRaftLeaderChangedDuringAssignmentsQueueProcessing() {
         createZoneAndTable(4, 2);
 
@@ -317,14 +313,14 @@ class ItRebalanceByPendingAssignmentsQueueTest extends 
ClusterPerTestIntegration
 
         var leaderBefore = raftLeader(TABLE_NAME);
 
-        putPendingAssignments(raftLeader(TABLE_NAME), TABLE_NAME, 
expectedPendingAssignmentsQueue);
+        // TODO https://issues.apache.org/jira/browse/IGNITE-26023 leader 
re-election and rebalance are not linearized.
+        putPendingAssignments(leaderBefore.leaderHost, TABLE_NAME, 
expectedPendingAssignmentsQueue);
         // most reliable way to change the leader in raft group is to stop and 
restart the current one
-        cancelLease(leaderBefore, TABLE_NAME);
-        cluster.restartNode(cluster.nodeIndex(leaderBefore.name()));
+        cluster.restartNode(cluster.nodeIndex(leaderBefore.leaderHost.name()));
 
         await().atMost(60, SECONDS).untilAsserted(() -> {
             var leaderAfter = raftLeader(TABLE_NAME);
-            assertThat(leaderAfter.name(), not(leaderBefore.name()));
+            assertTrue(leaderAfter.term > leaderBefore.term, "The leader has 
not changed.");
 
             var expected = 
expectedPendingAssignmentsQueue.peekLast().nodes().stream()
                     .map(Assignment::consistentId).collect(toSet());
@@ -332,8 +328,8 @@ class ItRebalanceByPendingAssignmentsQueueTest extends 
ClusterPerTestIntegration
                     .map(Assignment::consistentId).collect(toSet());
 
             assertThat(stableNames, equalTo(expected));
-            assertThat(stableNames, hasItem(leaderBefore.name()));
-            assertThat(stableNames, hasItem(leaderAfter.name()));
+            assertThat(stableNames, hasItem(leaderBefore.leaderHost.name()));
+            assertThat(stableNames, hasItem(leaderAfter.leaderHost.name()));
         });
     }
 
@@ -374,7 +370,7 @@ class ItRebalanceByPendingAssignmentsQueueTest extends 
ClusterPerTestIntegration
     }
 
     private Set<Assignment> stablePartitionAssignments(String tableName) {
-        IgniteImpl ignite = raftLeader(tableName);
+        IgniteImpl ignite = raftLeader(tableName).leaderHost;
         int zoneId = latestCatalog(ignite).zone(ZONE_NAME).id();
         int tableId = latestCatalog(ignite).table(DEFAULT_SCHEMA_NAME, 
TABLE_NAME).id();
         // TODO https://issues.apache.org/jira/browse/IGNITE-22522 
tableOrZoneId -> zoneId, remove.
@@ -389,7 +385,7 @@ class ItRebalanceByPendingAssignmentsQueueTest extends 
ClusterPerTestIntegration
     }
 
     private @Nullable Assignments stablePartitionAssignmentsValue(String 
tableName) {
-        IgniteImpl ignite = raftLeader(tableName);
+        IgniteImpl ignite = raftLeader(tableName).leaderHost;
         CompletableFuture<Entry> fut = ignite.metaStorageManager()
                 .get(stablePartitionAssignmentsKey(partitionGroupId(ignite, 
tableName, 0)));
         assertThat(fut, willCompleteSuccessfully());
@@ -398,7 +394,7 @@ class ItRebalanceByPendingAssignmentsQueueTest extends 
ClusterPerTestIntegration
     }
 
     private @Nullable Assignments pendingAssignmentsValue(String tableName) {
-        IgniteImpl ignite = raftLeader(tableName);
+        IgniteImpl ignite = raftLeader(tableName).leaderHost;
         CompletableFuture<Entry> fut = ignite.metaStorageManager()
                 .get(pendingPartitionAssignmentsKey(partitionGroupId(ignite, 
tableName, 0)));
         assertThat(fut, willCompleteSuccessfully());
@@ -418,7 +414,7 @@ class ItRebalanceByPendingAssignmentsQueueTest extends 
ClusterPerTestIntegration
     }
 
     private Set<String> dataNodes(String tableName) {
-        IgniteImpl ignite = raftLeader(tableName);
+        IgniteImpl ignite = raftLeader(tableName).leaderHost;
         return dataNodes(ignite);
     }
 
@@ -445,7 +441,7 @@ class ItRebalanceByPendingAssignmentsQueueTest extends 
ClusterPerTestIntegration
     }
 
     private ReplicaMeta primaryReplicaMeta(String tableName) {
-        return primaryReplicaMeta(raftLeader(tableName), tableName);
+        return primaryReplicaMeta(raftLeader(tableName).leaderHost, tableName);
     }
 
     private ReplicaMeta primaryReplicaMeta(IgniteImpl ignite, String 
tableName) {
@@ -457,7 +453,7 @@ class ItRebalanceByPendingAssignmentsQueueTest extends 
ClusterPerTestIntegration
         return fut.join();
     }
 
-    private IgniteImpl raftLeader(String tableName) {
+    private LeaderWithTerm raftLeader(String tableName) {
         IgniteImpl ignite = unwrapIgniteImpl(cluster.aliveNode());
         var raftNodeId = new RaftNodeId(partitionGroupId(ignite, tableName, 
0), new Peer(ignite.name()));
         var jraftServer = (JraftServerImpl) ignite.raftManager().server();
@@ -467,22 +463,17 @@ class ItRebalanceByPendingAssignmentsQueueTest extends 
ClusterPerTestIntegration
                 .map(Node::getLeaderId)
                 .map(PeerId::getConsistentId)
                 .orElse(null);
-        assertNotNull(consistentId);
-        return unwrapIgniteImpl(cluster.node(cluster.nodeIndex(consistentId)));
-    }
 
-    private void cancelLease(IgniteImpl leaseholder, String tableName) {
-        StopLeaseProlongationMessage msg = new PlacementDriverMessagesFactory()
-                .stopLeaseProlongationMessage()
-                .groupId(partitionGroupId(cluster.aliveNode(), tableName, 0))
-                .build();
+        assertNotNull(consistentId);
 
-        // Just sent it to all nodes to not determine the exact placement 
driver active actor.
-        runningNodes().forEach(node -> 
leaseholder.sendFakeMessage(node.name(), msg));
+        return new LeaderWithTerm(
+                
unwrapIgniteImpl(cluster.node(cluster.nodeIndex(consistentId))),
+                raftGroupService.getRaftNode().getCurrentTerm()
+        );
     }
 
     private Set<String> realAssignments(String tableName) {
-        IgniteImpl ignite = raftLeader(tableName);
+        IgniteImpl ignite = raftLeader(tableName).leaderHost;
         Catalog catalog = latestCatalog(ignite);
         int zoneId = catalog.zone(ZONE_NAME).id();
         int tableId = catalog.table(DEFAULT_SCHEMA_NAME, TABLE_NAME).id();
@@ -623,4 +614,15 @@ class ItRebalanceByPendingAssignmentsQueueTest extends 
ClusterPerTestIntegration
             ignite.metaStorageManager().unregisterWatch(this);
         }
     }
+
+    private static final class LeaderWithTerm {
+        private final IgniteImpl leaderHost;
+
+        private final long term;
+
+        private LeaderWithTerm(IgniteImpl leaderHost, long term) {
+            this.leaderHost = leaderHost;
+            this.term = term;
+        }
+    }
 }

Reply via email to