This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 22e2e77d7c7 IGNITE-24467 Fixed reset and filter rebalance race (#7445)
22e2e77d7c7 is described below

commit 22e2e77d7c72bcbf9617be96f9625c3098ef1890
Author: Egor <[email protected]>
AuthorDate: Wed Feb 4 12:24:15 2026 +0400

    IGNITE-24467 Fixed reset and filter rebalance race (#7445)
    
    Co-authored-by: Egor Kuts <[email protected]>
---
 .../partitiondistribution/Assignments.java         |  2 +-
 .../partitiondistribution/AssignmentsQueue.java    |  9 ++++++
 ...ilablePartitionsRecoveryByFilterUpdateTest.java | 24 ++++++++++++++--
 .../disaster/GroupUpdateRequestHandler.java        | 32 +++++++++++++++++++++-
 .../ItDisasterRecoveryReconfigurationTest.java     | 18 +++++++++---
 5 files changed, 76 insertions(+), 9 deletions(-)

diff --git 
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/Assignments.java
 
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/Assignments.java
index 1c2c606c9ed..06198b3be41 100644
--- 
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/Assignments.java
+++ 
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/Assignments.java
@@ -63,7 +63,7 @@ public class Assignments {
     /**
      * Constructor.
      */
-    private Assignments(Collection<Assignment> nodes, boolean force, long 
timestamp, boolean fromReset) {
+    public Assignments(Collection<Assignment> nodes, boolean force, long 
timestamp, boolean fromReset) {
         // A set of nodes must be a HashSet in order for serialization to 
produce stable results,
         // that could be compared as byte arrays.
         this.nodes = nodes instanceof HashSet ? ((HashSet<Assignment>) nodes) 
: new HashSet<>(nodes);
diff --git 
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/AssignmentsQueue.java
 
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/AssignmentsQueue.java
index c5d667c1fd7..1968ee975c7 100644
--- 
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/AssignmentsQueue.java
+++ 
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/AssignmentsQueue.java
@@ -41,6 +41,15 @@ public class AssignmentsQueue implements 
Iterable<Assignments> {
     @IgniteToStringInclude
     private final Deque<Assignments> queue;
 
+    /** Constructor. */
+    public AssignmentsQueue(AssignmentsQueue... assignmentsQueues) {
+        LinkedList<Assignments> assignments = new LinkedList<>();
+        for (AssignmentsQueue assignmentsQueue : assignmentsQueues) {
+            assignments.addAll(assignmentsQueue.queue);
+        }
+        this.queue = assignments;
+    }
+
     /** Constructor. */
     public AssignmentsQueue(Assignments... assignments) {
         this(Arrays.asList(assignments));
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java
index b5d82432680..8470b920920 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.table.distributed.disaster;
 
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertLogicalTopologyInMetastorage;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.empty;
@@ -31,6 +32,7 @@ import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.catalog.commands.AlterZoneCommand;
 import org.apache.ignite.internal.catalog.commands.AlterZoneCommandBuilder;
 import org.apache.ignite.internal.catalog.commands.StorageProfileParams;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
 import org.apache.ignite.internal.partitiondistribution.Assignment;
 import org.apache.ignite.table.Table;
 import org.intellij.lang.annotations.Language;
@@ -328,7 +330,7 @@ public class 
ItHighAvailablePartitionsRecoveryByFilterUpdateTest extends Abstrac
     }
 
     private void alterZoneSql(String filter, String zoneName) {
-        executeSql(String.format("ALTER ZONE \"%s\" SET (\"DATA_NODES_FILTER\" 
'%s')", zoneName, filter));
+        executeSql(String.format("ALTER ZONE \"%s\"  SET (NODES FILTER '%s')", 
zoneName, filter));
     }
 
     /**
@@ -344,7 +346,6 @@ public class 
ItHighAvailablePartitionsRecoveryByFilterUpdateTest extends Abstrac
      *   <li>No data should be lost</li>
      * </ol>
      */
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-24467";)
     @Test
     void testResetAfterChangeFilters() throws InterruptedException {
         startNode(1, EU_ONLY_NODES_CONFIG);
@@ -376,6 +377,15 @@ public class 
ItHighAvailablePartitionsRecoveryByFilterUpdateTest extends Abstrac
 
         stopNodes(1, 2);
 
+        Set<LogicalNode> expectedNodes = Set.of(
+                getLogicalNode(igniteImpl(0)),
+                getLogicalNode(igniteImpl(3)),
+                getLogicalNode(igniteImpl(4)),
+                getLogicalNode(igniteImpl(5))
+        );
+
+        assertLogicalTopologyInMetastorage(expectedNodes, 
node.metaStorageManager());
+
         String globalFilter = "$[?(@.region == \"US\")]";
 
         alterZoneSql(globalFilter, HA_ZONE_NAME);
@@ -402,7 +412,7 @@ public class 
ItHighAvailablePartitionsRecoveryByFilterUpdateTest extends Abstrac
      *   <li>No data should be lost</li>
      * </ol>
      */
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-24467";)
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-27643";)
     @Test
     void testResetAfterChangeStorageProfiles() throws InterruptedException {
         startNode(1, AIPERSIST_NODES_CONFIG);
@@ -519,4 +529,12 @@ public class 
ItHighAvailablePartitionsRecoveryByFilterUpdateTest extends Abstrac
                 + "  rest.port: {}\n"
                 + "}";
     }
+
+    private static LogicalNode getLogicalNode(IgniteImpl ignite) {
+
+        return 
ignite.logicalTopologyService().localLogicalTopology().nodes().stream()
+                .filter(n -> n.name().equals(ignite.name()))
+                .findFirst()
+                .orElseThrow(() -> new IllegalStateException("Node not found 
in logical topology: " + ignite.name()));
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequestHandler.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequestHandler.java
index 3bb6bc190ee..90f54e63bcd 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequestHandler.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequestHandler.java
@@ -333,7 +333,17 @@ class GroupUpdateRequestHandler {
                     .stable(Assignments.of(currentAssignments, 
assignmentsTimestamp))
                     .target(Assignments.forced(Set.of(nextAssignment), 
assignmentsTimestamp))
                     .toQueue();
-
+            if (!manualUpdate) {
+                ByteArray pendingKey = 
ZoneRebalanceUtil.pendingPartAssignmentsQueueKey(partId);
+                var entry = metaStorageMgr.getLocally(pendingKey);
+                if (entry != null) {
+                    AssignmentsQueue pendingQueue = 
AssignmentsQueue.fromBytes(entry.value());
+                    if (pendingQueue != null && !pendingQueue.isEmpty()) {
+                        AssignmentsQueue filteredPendingQueue = 
filterAliveNodesOnly(pendingQueue, aliveNodesConsistentIds);
+                        assignmentsQueue = new 
AssignmentsQueue(assignmentsQueue, filteredPendingQueue);
+                    }
+                }
+            }
             return invoke(
                     partId,
                     revision,
@@ -347,6 +357,26 @@ class GroupUpdateRequestHandler {
         });
     }
 
+    private static AssignmentsQueue filterAliveNodesOnly(AssignmentsQueue 
queue, Set<String> aliveNodesConsistentIds) {
+        List<Assignments> filteredAssignments = new ArrayList<>();
+
+        for (Assignments assignments : queue) {
+            Set<Assignment> aliveAssignments = assignments.nodes().stream()
+                    .filter(assignment -> 
aliveNodesConsistentIds.contains(assignment.consistentId()))
+                    .collect(toSet());
+
+            if (!aliveAssignments.isEmpty()) {
+                filteredAssignments.add(new Assignments(
+                        aliveAssignments,
+                        assignments.force(),
+                        assignments.timestamp(),
+                        assignments.fromReset())
+                );
+            }
+        }
+        return new 
AssignmentsQueue(filteredAssignments.toArray(Assignments[]::new));
+    }
+
     /**
      * Returns an assignment with the most up to date log index, if there are 
more than one node with the same index, returns the first one
      * in the lexicographic order.
diff --git 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
index 5c75fe98eaf..9d2d295e3d8 100644
--- 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
+++ 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
@@ -806,9 +806,9 @@ public class ItDisasterRecoveryReconfigurationTest extends 
ClusterPerTestIntegra
 
         assertRealAssignments(node0, partId, 1, 3, 4);
 
-        stopNodesInParallel(3, 4);
+        stopNodesInParallel(2, 3, 4);
 
-        waitForScale(node0, 3);
+        waitForScale(node0, 2);
 
         DisasterRecoveryManager disasterRecoveryManager = 
node0.disasterRecoveryManager();
         CompletableFuture<?> updateFuture = 
disasterRecoveryManager.resetPartitions(zoneName, emptySet(), false, 1);
@@ -817,18 +817,20 @@ public class ItDisasterRecoveryReconfigurationTest 
extends ClusterPerTestIntegra
 
         awaitPrimaryReplica(node0, partId);
 
-        assertRealAssignments(node0, partId, 1);
+        assertRealAssignments(node0, partId, 0, 1);
 
         List<Throwable> errors = insertValues(table, partId, 0);
         assertThat(errors, is(empty()));
 
+        awaitStableContainsNodes(node0, partId, 0, 1);
         // Check that there is no ongoing or planned rebalance.
         assertNull(getPendingAssignments(node0, partId));
 
-        assertRealAssignments(node0, partId, 1);
+        assertRealAssignments(node0, partId, 0, 1);
 
         // No fromReset flag is set on stable.
         Assignments assignmentsStable = Assignments.of(Set.of(
+                Assignment.forPeer(node(0).name()),
                 Assignment.forPeer(node(1).name())
         ), timestamp);
 
@@ -2056,6 +2058,14 @@ public class ItDisasterRecoveryReconfigurationTest 
extends ClusterPerTestIntegra
                 .until(() -> getStableAssignments(node0, 
partId).nodes().size() == 1);
     }
 
+    private void awaitStableContainsNodes(IgniteImpl node0, int partId, 
Integer ... expected) {
+        await().atMost(60, SECONDS)
+                .until(() -> requireNonNull(getStableAssignments(node0, 
partId)).nodes()
+                        
.stream().map(Assignment::consistentId).collect(Collectors.toList()).equals(
+                                Arrays.stream(expected).map(idx -> 
cluster.nodeName(idx)).collect(Collectors.toList())
+                        ));
+    }
+
     /**
      * Return assignments based on states of partitions in the cluster. It is 
possible that returned value contains nodes
      * from stable and pending, for example, when rebalance is in progress.

Reply via email to