This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new ee637b4cd72 IGNITE-27419 Fix exception handling on retry of reset
(#7395)
ee637b4cd72 is described below
commit ee637b4cd72df7b5c278667d9bde27f9541dbf97
Author: Cyrill <[email protected]>
AuthorDate: Thu Jan 15 18:19:30 2026 +0300
IGNITE-27419 Fix exception handling on retry of reset (#7395)
Co-authored-by: Kirill Sizov <[email protected]>
---
.../internal/raft/rebalance/ExceptionUtils.java | 10 +--
...ls.java => RaftPeerConfigurationException.java} | 26 ++----
.../ignite/internal/replicator/ReplicaManager.java | 19 ++++-
.../ItDisasterRecoveryReconfigurationTest.java | 99 ++++++++++------------
4 files changed, 74 insertions(+), 80 deletions(-)
diff --git
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/rebalance/ExceptionUtils.java
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/rebalance/ExceptionUtils.java
index 5b9e836cec6..c6170de68c0 100644
---
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/rebalance/ExceptionUtils.java
+++
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/rebalance/ExceptionUtils.java
@@ -34,10 +34,10 @@ public class ExceptionUtils {
* @return {@code True} if this is a recoverable exception.
*/
public static boolean recoverable(Throwable t) {
- if (hasCause(t, NodeStoppingException.class,
ComponentStoppingException.class, RaftStaleUpdateException.class)) {
- return false;
- }
-
- return true;
+ return !hasCause(t,
+ NodeStoppingException.class,
+ ComponentStoppingException.class,
+ RaftStaleUpdateException.class,
+ RaftPeerConfigurationException.class);
}
}
diff --git
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/rebalance/ExceptionUtils.java
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/rebalance/RaftPeerConfigurationException.java
similarity index 55%
copy from
modules/raft-api/src/main/java/org/apache/ignite/internal/raft/rebalance/ExceptionUtils.java
copy to
modules/raft-api/src/main/java/org/apache/ignite/internal/raft/rebalance/RaftPeerConfigurationException.java
index 5b9e836cec6..cdd62f14e9a 100644
---
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/rebalance/ExceptionUtils.java
+++
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/rebalance/RaftPeerConfigurationException.java
@@ -17,27 +17,17 @@
package org.apache.ignite.internal.raft.rebalance;
-import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
-
-import org.apache.ignite.internal.lang.ComponentStoppingException;
-import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
/**
- * Helper class for exception handling.
+ * Exception thrown when a RAFT configuration change fails due to
non-recoverable errors
+ * such as the node being in an invalid state (EPERM) or invalid arguments
(EINVAL).
+ *
+ * <p>This exception is non-recoverable and should not be retried.
*/
-public class ExceptionUtils {
-
- /**
- * Checks if an error is recoverable, so we can retry a rebalance intent.
- *
- * @param t The throwable.
- * @return {@code True} if this is a recoverable exception.
- */
- public static boolean recoverable(Throwable t) {
- if (hasCause(t, NodeStoppingException.class,
ComponentStoppingException.class, RaftStaleUpdateException.class)) {
- return false;
- }
+public class RaftPeerConfigurationException extends
IgniteInternalCheckedException {
- return true;
+ public RaftPeerConfigurationException(String message) {
+ super(message);
}
}
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 9d11528e820..64b8c7cd83b 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -99,6 +99,7 @@ import
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
import
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
import org.apache.ignite.internal.raft.configuration.LogStorageBudgetView;
import org.apache.ignite.internal.raft.rebalance.RaftCommandWithRetry;
+import
org.apache.ignite.internal.raft.rebalance.RaftPeerConfigurationException;
import org.apache.ignite.internal.raft.rebalance.RaftStaleUpdateException;
import org.apache.ignite.internal.raft.server.RaftGroupOptions;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
@@ -802,9 +803,21 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
Loza loza = (Loza) raftManager;
Status status = loza.resetPeers(raftNodeId, peersAndLearners,
sequenceToken);
- // Stale configuration change will not be retried.
- if (!status.isOk() && status.getRaftError() == RaftError.ESTALE) {
- throw new IgniteException(INTERNAL_ERR, new
RaftStaleUpdateException(status.getErrorMsg()));
+ if (!status.isOk()) {
+ RaftError error = status.getRaftError();
+
+ // Stale configuration change will not be retried.
+ if (error == RaftError.ESTALE) {
+ throw new IgniteException(INTERNAL_ERR, new
RaftStaleUpdateException(status.getErrorMsg()));
+ }
+
+ // EBUSY means there's an ongoing configuration change -
retriable, will eventually complete.
+ if (error == RaftError.EBUSY) {
+ throw new IgniteException(INTERNAL_ERR, "Configuration change
in progress, will retry: " + status);
+ }
+
+ // EPERM (node not active) and EINVAL (invalid args) are not
retriable.
+ throw new IgniteException(INTERNAL_ERR, new
RaftPeerConfigurationException("Failed to reset peers: " + status));
}
}
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
index 8c1f08e45c1..a45ae85d5b2 100644
---
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
@@ -23,6 +23,7 @@ import static java.util.Map.of;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.INFINITE_TIMER_VALUE;
@@ -75,7 +76,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import java.util.stream.Stream;
import org.apache.ignite.Ignite;
import org.apache.ignite.internal.ClusterConfiguration.Builder;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
@@ -1069,7 +1069,7 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
Set<Assignment> peers = nodesNamesForFinalAssignments.stream()
.map(Assignment::forPeer)
- .collect(Collectors.toSet());
+ .collect(toSet());
Assignments assignmentsPlanned = Assignments.of(peers, timestamp,
true);
@@ -1215,7 +1215,7 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
Set<Assignment> peers = nodesNamesForFinalAssignments.stream()
.map(Assignment::forPeer)
- .collect(Collectors.toSet());
+ .collect(toSet());
Assignments assignmentsPlanned = Assignments.of(peers, timestamp,
true);
@@ -1451,11 +1451,7 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
assertRealAssignments(node0, partId, 0, 1, 2, 3, 4, 5, 6);
CatalogZoneDescriptor zone =
node0.catalogManager().activeCatalog(node0.clock().nowLong()).zone(zoneName);
- Collection<String> dataNodeNames = new HashSet<>();
- for (int i = 0; i < 7; i++) {
- dataNodeNames.add(node(i).name());
- }
-
+ Collection<String> dataNodeNames = nodeNames(0, 1, 2, 3, 4, 5, 6);
logger().info("Zone {}", zone);
Set<Assignment> allAssignmentsSet = calculateAssignmentForPartition(
@@ -1472,11 +1468,7 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
assertInsertedValuesOnSpecificNodes(table.name(), dataNodeNames,
partId, 0);
- Assignments link2Assignments = Assignments.of(Set.of(
- Assignment.forPeer(node(0).name()),
- Assignment.forPeer(node(1).name()),
- Assignment.forPeer(node(2).name())
- ), timestamp);
+ Assignments link2Assignments = peersFrom(timestamp, 0, 1, 2);
AtomicBoolean blockedLink2 = new AtomicBoolean(true);
@@ -1490,13 +1482,12 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
stopNodesInParallel(3, 4, 5, 6);
- int firstPhaseReset = getNodeForFirstPhaseReset(partId, 0, 1, 2);
-
- Assignments link2FirstPhaseReset = Assignments.of(Set.of(
- Assignment.forPeer(node(firstPhaseReset).name())
- ), timestamp);
+ awaitStableContainsSingleNode(node0, partId);
- assertStableAssignments(node0, partId, link2FirstPhaseReset, 60_000);
+ // Read the actual stable assignments - this is what the system
selected.
+ Assignments link2FirstPhaseReset = getStableAssignments(node0, partId);
+ String selectedNode =
link2FirstPhaseReset.nodes().iterator().next().consistentId();
+ logger().info("Reset selected node [name={}].", selectedNode);
// Assignments chain consists of stable and the first phase of reset.
assertAssignmentsChain(node0, partId,
AssignmentsChain.of(allAssignments, link2FirstPhaseReset));
@@ -1512,9 +1503,7 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
logger().info("Stopping nodes [ids={}].", Arrays.toString(new int[]{1,
2}));
stopNodesInParallel(1, 2);
- Assignments link3Assignments = Assignments.of(Set.of(
- Assignment.forPeer(node(0).name())
- ), timestamp);
+ Assignments link3Assignments = peersFrom(timestamp, 0);
assertStableAssignments(node0, partId, link3Assignments, 30_000);
@@ -1540,10 +1529,7 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
assertRealAssignments(node0, partId, 0, 1, 2, 3, 4, 5, 6);
CatalogZoneDescriptor zone =
node0.catalogManager().activeCatalog(node0.clock().nowLong()).zone(zoneName);
- Collection<String> dataNodes = new HashSet<>();
- for (int i = 0; i < 7; i++) {
- dataNodes.add(node(i).name());
- }
+ Collection<String> dataNodes = nodeNames(0, 1, 2, 3, 4, 5, 6);
logger().info("Zone {}", zone);
@@ -1562,32 +1548,25 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
assertInsertedValuesOnSpecificNodes(table.name(), dataNodes, partId,
0);
- Assignments blockedRebalance = Assignments.of(timestamp,
- Assignment.forPeer(node(0).name()),
- Assignment.forPeer(node(1).name()),
- Assignment.forPeer(node(2).name())
- );
+ Assignments blockedRebalance = peersFrom(timestamp, 0, 1, 2);
blockRebalanceStableSwitch(partId, blockedRebalance);
logger().info("Stopping nodes [ids={}].", Arrays.toString(new int[]{3,
4, 5, 6}));
stopNodesInParallel(3, 4, 5, 6);
- int firstPhaseReset = getNodeForFirstPhaseReset(partId, 0, 1, 2);
-
- logger().info("Max node replicated assignments [ids={}].",
firstPhaseReset);
-
DisasterRecoveryManager disasterRecoveryManager =
node0.disasterRecoveryManager();
CompletableFuture<?> updateFuture =
disasterRecoveryManager.resetPartitions(zoneName, emptySet(), true, -1);
assertThat(updateFuture, willCompleteSuccessfully());
// First phase of reset. The second phase stable switch is blocked.
- Assignments link2Assignments = Assignments.of(Set.of(
- Assignment.forPeer(node(firstPhaseReset).name())
- ), timestamp);
+ awaitStableContainsSingleNode(node0, partId);
- assertStableAssignments(node0, partId, link2Assignments, 30_000);
+ // Read the actual stable assignments - this is what the system
selected.
+ Assignments link2Assignments = getStableAssignments(node0, partId);
+ String selectedNode =
link2Assignments.nodes().iterator().next().consistentId();
+ logger().info("Reset selected node [name={}].", selectedNode);
assertAssignmentsChain(node0, partId,
AssignmentsChain.of(allAssignments, link2Assignments));
@@ -1616,20 +1595,6 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
assertAssignmentsChain(node0, partId,
AssignmentsChain.of(allAssignments, link2Assignments, link3Assignments));
}
- private int getNodeForFirstPhaseReset(int partId, Integer ...nodes) {
- return Stream.of(nodes)
- .max((index1, index2) -> {
- int cmp = Long.compare(getRaftLogIndex(index1,
partId).getIndex(),
- getRaftLogIndex(index2, partId).getIndex());
- if (cmp != 0) {
- return cmp;
- }
- // Among equal raft log indexes, prefer smaller node index
- return Integer.compare(index2, index1);
- })
- .get();
- }
-
@Test
@ZoneParams(nodes = 6, replicas = 3, partitions = 1, consistencyMode =
ConsistencyMode.HIGH_AVAILABILITY)
void testGracefulRewritesChainAfterForceReset() throws Exception {
@@ -2042,6 +2007,32 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
}, 250, SECONDS.toMillis(60)));
}
+ private Set<String> nodeNames(int... indices) {
+ return Arrays.stream(indices)
+ .mapToObj(idx -> node(idx).name())
+ .collect(toSet());
+ }
+
+ private Assignments peersFrom(long timestamp, int... indices) {
+ return peersFrom(timestamp, nodeNames(indices));
+ }
+
+ private Assignments peersFrom(long timestamp, Set<String> nodeNames) {
+ Set<Assignment> peerSet = nodeNames
+ .stream()
+ .map(Assignment::forPeer)
+ .collect(toSet());
+
+ return Assignments.of(peerSet, timestamp);
+ }
+
+ private void awaitStableContainsSingleNode(IgniteImpl node0, int partId) {
+ // Wait for first phase of reset to complete.
+ // The reset selects the node with the highest raft log index (or
lexicographically first on tie).
+ await().atMost(60, SECONDS)
+ .until(() -> getStableAssignments(node0,
partId).nodes().size() == 1);
+ }
+
/**
* Return assignments based on states of partitions in the cluster. It is
possible that returned value contains nodes
* from stable and pending, for example, when rebalance is in progress.
@@ -2126,7 +2117,7 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
Set<IgniteImpl> nodes = cluster.runningNodes()
.map(TestWrappers::unwrapIgniteImpl)
.filter(node -> nodesNames.contains(node.name()))
- .collect(Collectors.toSet());
+ .collect(toSet());
for (IgniteImpl node : nodes) {
Table table = node.tables().table(tableName);