This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new ee637b4cd72 IGNITE-27419 Fix exception handling on retry of reset 
(#7395)
ee637b4cd72 is described below

commit ee637b4cd72df7b5c278667d9bde27f9541dbf97
Author: Cyrill <[email protected]>
AuthorDate: Thu Jan 15 18:19:30 2026 +0300

    IGNITE-27419 Fix exception handling on retry of reset (#7395)
    
    Co-authored-by: Kirill Sizov <[email protected]>
---
 .../internal/raft/rebalance/ExceptionUtils.java    | 10 +--
 ...ls.java => RaftPeerConfigurationException.java} | 26 ++----
 .../ignite/internal/replicator/ReplicaManager.java | 19 ++++-
 .../ItDisasterRecoveryReconfigurationTest.java     | 99 ++++++++++------------
 4 files changed, 74 insertions(+), 80 deletions(-)

diff --git 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/rebalance/ExceptionUtils.java
 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/rebalance/ExceptionUtils.java
index 5b9e836cec6..c6170de68c0 100644
--- 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/rebalance/ExceptionUtils.java
+++ 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/rebalance/ExceptionUtils.java
@@ -34,10 +34,10 @@ public class ExceptionUtils {
      * @return {@code True} if this is a recoverable exception.
      */
     public static boolean recoverable(Throwable t) {
-        if (hasCause(t, NodeStoppingException.class, 
ComponentStoppingException.class, RaftStaleUpdateException.class)) {
-            return false;
-        }
-
-        return true;
+        return !hasCause(t,
+                NodeStoppingException.class,
+                ComponentStoppingException.class,
+                RaftStaleUpdateException.class,
+                RaftPeerConfigurationException.class);
     }
 }
diff --git 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/rebalance/ExceptionUtils.java
 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/rebalance/RaftPeerConfigurationException.java
similarity index 55%
copy from 
modules/raft-api/src/main/java/org/apache/ignite/internal/raft/rebalance/ExceptionUtils.java
copy to 
modules/raft-api/src/main/java/org/apache/ignite/internal/raft/rebalance/RaftPeerConfigurationException.java
index 5b9e836cec6..cdd62f14e9a 100644
--- 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/rebalance/ExceptionUtils.java
+++ 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/rebalance/RaftPeerConfigurationException.java
@@ -17,27 +17,17 @@
 
 package org.apache.ignite.internal.raft.rebalance;
 
-import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
-
-import org.apache.ignite.internal.lang.ComponentStoppingException;
-import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
 
 /**
- * Helper class for exception handling.
+ * Exception thrown when a RAFT configuration change fails due to 
non-recoverable errors
+ * such as the node being in an invalid state (EPERM) or invalid arguments 
(EINVAL).
+ *
+ * <p>This exception is non-recoverable and should not be retried.
  */
-public class ExceptionUtils {
-
-    /**
-     * Checks if an error is recoverable, so we can retry a rebalance intent.
-     *
-     * @param t The throwable.
-     * @return {@code True} if this is a recoverable exception.
-     */
-    public static boolean recoverable(Throwable t) {
-        if (hasCause(t, NodeStoppingException.class, 
ComponentStoppingException.class, RaftStaleUpdateException.class)) {
-            return false;
-        }
+public class RaftPeerConfigurationException extends 
IgniteInternalCheckedException {
 
-        return true;
+    public RaftPeerConfigurationException(String message) {
+        super(message);
     }
 }
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 9d11528e820..64b8c7cd83b 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -99,6 +99,7 @@ import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
 import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
 import org.apache.ignite.internal.raft.configuration.LogStorageBudgetView;
 import org.apache.ignite.internal.raft.rebalance.RaftCommandWithRetry;
+import 
org.apache.ignite.internal.raft.rebalance.RaftPeerConfigurationException;
 import org.apache.ignite.internal.raft.rebalance.RaftStaleUpdateException;
 import org.apache.ignite.internal.raft.server.RaftGroupOptions;
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
@@ -802,9 +803,21 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
         Loza loza = (Loza) raftManager;
         Status status = loza.resetPeers(raftNodeId, peersAndLearners, 
sequenceToken);
 
-        // Stale configuration change will not be retried.
-        if (!status.isOk() && status.getRaftError() == RaftError.ESTALE) {
-            throw new IgniteException(INTERNAL_ERR, new 
RaftStaleUpdateException(status.getErrorMsg()));
+        if (!status.isOk()) {
+            RaftError error = status.getRaftError();
+
+            // Stale configuration change will not be retried.
+            if (error == RaftError.ESTALE) {
+                throw new IgniteException(INTERNAL_ERR, new 
RaftStaleUpdateException(status.getErrorMsg()));
+            }
+
+            // EBUSY means there's an ongoing configuration change - 
retriable, will eventually complete.
+            if (error == RaftError.EBUSY) {
+                throw new IgniteException(INTERNAL_ERR, "Configuration change 
in progress, will retry: " + status);
+            }
+
+            // EPERM (node not active) and EINVAL (invalid args) are not 
retriable.
+            throw new IgniteException(INTERNAL_ERR, new 
RaftPeerConfigurationException("Failed to reset peers: " + status));
         }
     }
 
diff --git 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
index 8c1f08e45c1..a45ae85d5b2 100644
--- 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
+++ 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
@@ -23,6 +23,7 @@ import static java.util.Map.of;
 import static java.util.Objects.requireNonNull;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.Collectors.toSet;
 import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
 import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
 import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.INFINITE_TIMER_VALUE;
@@ -75,7 +76,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
-import java.util.stream.Stream;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.internal.ClusterConfiguration.Builder;
 import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
@@ -1069,7 +1069,7 @@ public class ItDisasterRecoveryReconfigurationTest 
extends ClusterPerTestIntegra
 
         Set<Assignment> peers = nodesNamesForFinalAssignments.stream()
                 .map(Assignment::forPeer)
-                .collect(Collectors.toSet());
+                .collect(toSet());
 
         Assignments assignmentsPlanned = Assignments.of(peers, timestamp, 
true);
 
@@ -1215,7 +1215,7 @@ public class ItDisasterRecoveryReconfigurationTest 
extends ClusterPerTestIntegra
 
         Set<Assignment> peers = nodesNamesForFinalAssignments.stream()
                 .map(Assignment::forPeer)
-                .collect(Collectors.toSet());
+                .collect(toSet());
 
         Assignments assignmentsPlanned = Assignments.of(peers, timestamp, 
true);
 
@@ -1451,11 +1451,7 @@ public class ItDisasterRecoveryReconfigurationTest 
extends ClusterPerTestIntegra
         assertRealAssignments(node0, partId, 0, 1, 2, 3, 4, 5, 6);
 
         CatalogZoneDescriptor zone = 
node0.catalogManager().activeCatalog(node0.clock().nowLong()).zone(zoneName);
-        Collection<String> dataNodeNames = new HashSet<>();
-        for (int i = 0; i < 7; i++) {
-            dataNodeNames.add(node(i).name());
-        }
-
+        Collection<String> dataNodeNames = nodeNames(0, 1, 2, 3, 4, 5, 6);
         logger().info("Zone {}", zone);
 
         Set<Assignment> allAssignmentsSet = calculateAssignmentForPartition(
@@ -1472,11 +1468,7 @@ public class ItDisasterRecoveryReconfigurationTest 
extends ClusterPerTestIntegra
 
         assertInsertedValuesOnSpecificNodes(table.name(), dataNodeNames, 
partId, 0);
 
-        Assignments link2Assignments = Assignments.of(Set.of(
-                Assignment.forPeer(node(0).name()),
-                Assignment.forPeer(node(1).name()),
-                Assignment.forPeer(node(2).name())
-        ), timestamp);
+        Assignments link2Assignments = peersFrom(timestamp, 0, 1, 2);
 
         AtomicBoolean blockedLink2 = new AtomicBoolean(true);
 
@@ -1490,13 +1482,12 @@ public class ItDisasterRecoveryReconfigurationTest 
extends ClusterPerTestIntegra
 
         stopNodesInParallel(3, 4, 5, 6);
 
-        int firstPhaseReset = getNodeForFirstPhaseReset(partId, 0, 1, 2);
-
-        Assignments link2FirstPhaseReset = Assignments.of(Set.of(
-                Assignment.forPeer(node(firstPhaseReset).name())
-        ), timestamp);
+        awaitStableContainsSingleNode(node0, partId);
 
-        assertStableAssignments(node0, partId, link2FirstPhaseReset, 60_000);
+        // Read the actual stable assignments - this is what the system 
selected.
+        Assignments link2FirstPhaseReset = getStableAssignments(node0, partId);
+        String selectedNode = 
link2FirstPhaseReset.nodes().iterator().next().consistentId();
+        logger().info("Reset selected node [name={}].", selectedNode);
 
         // Assignments chain consists of stable and the first phase of reset.
         assertAssignmentsChain(node0, partId, 
AssignmentsChain.of(allAssignments, link2FirstPhaseReset));
@@ -1512,9 +1503,7 @@ public class ItDisasterRecoveryReconfigurationTest 
extends ClusterPerTestIntegra
         logger().info("Stopping nodes [ids={}].", Arrays.toString(new int[]{1, 
2}));
         stopNodesInParallel(1, 2);
 
-        Assignments link3Assignments = Assignments.of(Set.of(
-                Assignment.forPeer(node(0).name())
-        ), timestamp);
+        Assignments link3Assignments = peersFrom(timestamp, 0);
 
         assertStableAssignments(node0, partId, link3Assignments, 30_000);
 
@@ -1540,10 +1529,7 @@ public class ItDisasterRecoveryReconfigurationTest 
extends ClusterPerTestIntegra
         assertRealAssignments(node0, partId, 0, 1, 2, 3, 4, 5, 6);
 
         CatalogZoneDescriptor zone = 
node0.catalogManager().activeCatalog(node0.clock().nowLong()).zone(zoneName);
-        Collection<String> dataNodes = new HashSet<>();
-        for (int i = 0; i < 7; i++) {
-            dataNodes.add(node(i).name());
-        }
+        Collection<String> dataNodes = nodeNames(0, 1, 2, 3, 4, 5, 6);
 
         logger().info("Zone {}", zone);
 
@@ -1562,32 +1548,25 @@ public class ItDisasterRecoveryReconfigurationTest 
extends ClusterPerTestIntegra
 
         assertInsertedValuesOnSpecificNodes(table.name(), dataNodes, partId, 
0);
 
-        Assignments blockedRebalance = Assignments.of(timestamp,
-                Assignment.forPeer(node(0).name()),
-                Assignment.forPeer(node(1).name()),
-                Assignment.forPeer(node(2).name())
-        );
+        Assignments blockedRebalance = peersFrom(timestamp, 0, 1, 2);
 
         blockRebalanceStableSwitch(partId, blockedRebalance);
 
         logger().info("Stopping nodes [ids={}].", Arrays.toString(new int[]{3, 
4, 5, 6}));
         stopNodesInParallel(3, 4, 5, 6);
 
-        int firstPhaseReset = getNodeForFirstPhaseReset(partId, 0, 1, 2);
-
-        logger().info("Max node replicated assignments [ids={}].", 
firstPhaseReset);
-
         DisasterRecoveryManager disasterRecoveryManager = 
node0.disasterRecoveryManager();
         CompletableFuture<?> updateFuture = 
disasterRecoveryManager.resetPartitions(zoneName, emptySet(), true, -1);
 
         assertThat(updateFuture, willCompleteSuccessfully());
 
         // First phase of reset. The second phase stable switch is blocked.
-        Assignments link2Assignments = Assignments.of(Set.of(
-                Assignment.forPeer(node(firstPhaseReset).name())
-        ), timestamp);
+        awaitStableContainsSingleNode(node0, partId);
 
-        assertStableAssignments(node0, partId, link2Assignments, 30_000);
+        // Read the actual stable assignments - this is what the system 
selected.
+        Assignments link2Assignments = getStableAssignments(node0, partId);
+        String selectedNode = 
link2Assignments.nodes().iterator().next().consistentId();
+        logger().info("Reset selected node [name={}].", selectedNode);
 
         assertAssignmentsChain(node0, partId, 
AssignmentsChain.of(allAssignments, link2Assignments));
 
@@ -1616,20 +1595,6 @@ public class ItDisasterRecoveryReconfigurationTest 
extends ClusterPerTestIntegra
         assertAssignmentsChain(node0, partId, 
AssignmentsChain.of(allAssignments, link2Assignments, link3Assignments));
     }
 
-    private int getNodeForFirstPhaseReset(int partId, Integer ...nodes) {
-        return Stream.of(nodes)
-                .max((index1, index2) -> {
-                    int cmp = Long.compare(getRaftLogIndex(index1, 
partId).getIndex(),
-                            getRaftLogIndex(index2, partId).getIndex());
-                    if (cmp != 0) {
-                        return cmp;
-                    }
-                    // Among equal raft log indexes, prefer smaller node index
-                    return Integer.compare(index2, index1);
-                })
-                .get();
-    }
-
     @Test
     @ZoneParams(nodes = 6, replicas = 3, partitions = 1, consistencyMode = 
ConsistencyMode.HIGH_AVAILABILITY)
     void testGracefulRewritesChainAfterForceReset() throws Exception {
@@ -2042,6 +2007,32 @@ public class ItDisasterRecoveryReconfigurationTest 
extends ClusterPerTestIntegra
         }, 250, SECONDS.toMillis(60)));
     }
 
+    private Set<String> nodeNames(int... indices) {
+        return Arrays.stream(indices)
+                .mapToObj(idx -> node(idx).name())
+                .collect(toSet());
+    }
+
+    private Assignments peersFrom(long timestamp, int... indices) {
+        return peersFrom(timestamp, nodeNames(indices));
+    }
+
+    private Assignments peersFrom(long timestamp, Set<String> nodeNames) {
+        Set<Assignment> peerSet = nodeNames
+                .stream()
+                .map(Assignment::forPeer)
+                .collect(toSet());
+
+        return Assignments.of(peerSet, timestamp);
+    }
+
+    private void awaitStableContainsSingleNode(IgniteImpl node0, int partId) {
+        // Wait for first phase of reset to complete.
+        // The reset selects the node with the highest raft log index (or 
lexicographically first on tie).
+        await().atMost(60, SECONDS)
+                .until(() -> getStableAssignments(node0, 
partId).nodes().size() == 1);
+    }
+
     /**
      * Return assignments based on states of partitions in the cluster. It is 
possible that returned value contains nodes
      * from stable and pending, for example, when rebalance is in progress.
@@ -2126,7 +2117,7 @@ public class ItDisasterRecoveryReconfigurationTest 
extends ClusterPerTestIntegra
         Set<IgniteImpl> nodes = cluster.runningNodes()
                 .map(TestWrappers::unwrapIgniteImpl)
                 .filter(node -> nodesNames.contains(node.name()))
-                .collect(Collectors.toSet());
+                .collect(toSet());
 
         for (IgniteImpl node : nodes) {
             Table table = node.tables().table(tableName);

Reply via email to