This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 3c4439be787 IGNITE-27156 Change timeout semantics (#7242)
3c4439be787 is described below
commit 3c4439be78766067593aa7b65f1a3118d458b738
Author: Cyrill <[email protected]>
AuthorDate: Thu Jan 29 22:02:33 2026 +0300
IGNITE-27156 Change timeout semantics (#7242)
Co-authored-by: Kirill Sizov <[email protected]>
---
.../raft/client/LeaderAvailabilityState.java | 237 +++++
.../PhysicalTopologyAwareRaftGroupService.java | 48 +-
.../internal/raft/client/RaftCommandExecutor.java | 1044 ++++++++++++++++++++
.../internal/raft/client/RaftErrorUtils.java | 45 +
.../ignite/internal/raft/client/RetryContext.java | 48 +-
.../raft/client/LeaderAvailabilityStateTest.java | 466 +++++++++
...ysicalTopologyAwareRaftGroupServiceRunTest.java | 390 +++++++-
.../internal/raft/client/RaftGroupServiceTest.java | 46 +
8 files changed, 2297 insertions(+), 27 deletions(-)
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/LeaderAvailabilityState.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/LeaderAvailabilityState.java
new file mode 100644
index 00000000000..ed427840e06
--- /dev/null
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/LeaderAvailabilityState.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.client;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.network.InternalClusterNode;
+import org.jetbrains.annotations.VisibleForTesting;
+
+/**
+ * State machine for tracking leader availability.
+ *
+ * <p>Manages transitions between two states:
+ * <ul>
+ * <li>{@link State#WAITING_FOR_LEADER} - No leader is currently known,
operations must wait</li>
+ * <li>{@link State#LEADER_AVAILABLE} - A leader is known, operations can
proceed</li>
+ * </ul>
+ *
+ * <p>State transitions:
+ * <pre>
+ * WAITING_FOR_LEADER --[onLeaderElected]--> LEADER_AVAILABLE
+ * LEADER_AVAILABLE --[onGroupUnavailable]--> WAITING_FOR_LEADER
+ * Any state --[stop]--> stopped (terminal state)
+ * </pre>
+ *
+ * <p>Thread-safe: all public methods are synchronized or use proper
synchronization.
+ */
+@VisibleForTesting
+class LeaderAvailabilityState {
+ /** The logger. */
+ private static final IgniteLogger LOG =
Loggers.forClass(LeaderAvailabilityState.class);
+
+ /** Synchronization mutex. */
+ private final Object mutex = new Object();
+
+ /** Possible states of leader availability. */
+ enum State {
+ /** No leader is known, waiting for leader election. */
+ WAITING_FOR_LEADER,
+ /** A leader is available and operations can proceed. */
+ LEADER_AVAILABLE
+ }
+
+ /** Current state. Guarded by {@code mutex}. */
+ private State currentState = State.WAITING_FOR_LEADER;
+
+ /** Current leader term. Initialized to -1 to accept term 0 as the first
valid term. Guarded by {@code mutex}. */
+ private long currentTerm = -1;
+
+ /** Future that waiters block on. Guarded by {@code mutex}. */
+ private CompletableFuture<Long> waiters = new CompletableFuture<>();
+
+ /** Whether the state machine has been stopped. Guarded by {@code mutex}.
*/
+ private boolean stopped = false;
+
+ /** Exception used to fail futures after destruction. Guarded by {@code
mutex}. */
+ private Throwable stopException;
+
+ /**
+ * Returns a future that completes when a leader becomes available.
+ *
+ * <p>If a leader is already available, returns an already-completed
future with the current term.
+ * Otherwise, returns a future that will complete when {@link
#onLeaderElected} is called.
+ * If the state machine has been destroyed, returns a failed future.
+ *
+ * @return Future that completes with the leader term when a leader is
available.
+ */
+ CompletableFuture<Long> awaitLeader() {
+ synchronized (mutex) {
+ if (stopped) {
+ return CompletableFuture.failedFuture(stopException);
+ }
+ if (currentState == State.LEADER_AVAILABLE) {
+ return CompletableFuture.completedFuture(currentTerm);
+ }
+ return waiters;
+ }
+ }
+
+ /**
+ * Handles leader election notification.
+ *
+ * <p>Transitions from {@link State#WAITING_FOR_LEADER} to {@link
State#LEADER_AVAILABLE}
+ * if the new term is greater than the current term. Stale notifications
(with term <= current)
+ * are ignored. Has no effect if the state machine has been stopped.
+ *
+ * @param leader The newly elected leader node.
+ * @param term The term of the new leader.
+ */
+ void onLeaderElected(InternalClusterNode leader, long term) {
+ if (term < 0) {
+ throw new IllegalArgumentException("Term must be non-negative: " +
term);
+ }
+
+ CompletableFuture<Long> futureToComplete = null;
+
+ synchronized (mutex) {
+ if (stopped) {
+ LOG.debug("Ignoring leader election after stop [leader={},
term={}]", leader, term);
+ return;
+ }
+
+ // Ignore stale term notifications.
+ if (term <= currentTerm) {
+ LOG.debug("Ignoring stale leader [newTerm={},
currentTerm={}]", term, currentTerm);
+ return;
+ }
+
+ long previousTerm = currentTerm;
+ State previousState = currentState;
+
+ currentTerm = term;
+
+ if (currentState == State.WAITING_FOR_LEADER) {
+ currentState = State.LEADER_AVAILABLE;
+ futureToComplete = waiters;
+ }
+
+ LOG.debug("Leader elected [leader={}, term={}, previousTerm={},
stateChange={}->{}]",
+ leader, term, previousTerm, previousState, currentState);
+ }
+
+ // Complete outside the lock to avoid potential deadlocks with future
callbacks.
+ if (futureToComplete != null) {
+ futureToComplete.complete(term);
+ }
+ }
+
+ /**
+ * Handles group unavailability notification.
+ *
+ * <p>Transitions from {@link State#LEADER_AVAILABLE} to {@link
State#WAITING_FOR_LEADER}
+ * only if the term hasn't changed since the unavailability was detected.
This prevents
+ * resetting the state if a new leader was already elected.
+ * Has no effect if the state machine has been stopped.
+ *
+ * @param termWhenDetected The term at which unavailability was detected.
+ */
+ void onGroupUnavailable(long termWhenDetected) {
+ synchronized (mutex) {
+ if (stopped) {
+ return;
+ }
+
+ // Only transition if we're still at the same term (no new leader
elected in the meantime).
+ if (currentTerm == termWhenDetected && currentState ==
State.LEADER_AVAILABLE) {
+ State previousState = currentState;
+
+ currentState = State.WAITING_FOR_LEADER;
+ waiters = new CompletableFuture<>();
+
+ LOG.debug("Group unavailable [term={}, stateChange={}->{}]",
+ termWhenDetected, previousState, currentState);
+ }
+ }
+ }
+
+ /**
+ * Returns the current leader term.
+ *
+ * @return Current leader term (-1 if no leader has been elected yet).
+ */
+ long currentTerm() {
+ synchronized (mutex) {
+ return currentTerm;
+ }
+ }
+
+ /**
+ * Returns the current state.
+ *
+ * @return Current state.
+ */
+ State currentState() {
+ synchronized (mutex) {
+ return currentState;
+ }
+ }
+
+ /**
+ * Returns whether the state machine has been stopped.
+ *
+ * @return True if stopped.
+ */
+ boolean stopped() {
+ synchronized (mutex) {
+ return stopped;
+ }
+ }
+
+ /**
+ * Stops the state machine, cancelling all pending waiters with the given
exception.
+ *
+ * <p>After stop, the state machine is no longer active:
+ * <ul>
+ * <li>{@link #awaitLeader()} returns a failed future</li>
+ * <li>{@link #onLeaderElected} is ignored</li>
+ * <li>{@link #onGroupUnavailable} is ignored</li>
+ * </ul>
+ *
+ * <p>Called during shutdown to ensure waiters don't hang indefinitely.
+ *
+ * @param exception The exception to complete waiters with.
+ */
+ void stop(Throwable exception) {
+ CompletableFuture<Long> futureToCancel;
+
+ synchronized (mutex) {
+ if (stopped) {
+ return;
+ }
+
+ stopped = true;
+ stopException = exception;
+ futureToCancel = waiters;
+ }
+
+ // Complete outside the lock to avoid potential deadlocks.
+ futureToCancel.completeExceptionally(exception);
+ }
+}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java
index 50bc085a877..6e6d00b311a 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java
@@ -20,14 +20,12 @@ package org.apache.ignite.internal.raft.client;
import static java.util.concurrent.CompletableFuture.runAsync;
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeoutException;
import org.apache.ignite.internal.failure.FailureContext;
import org.apache.ignite.internal.failure.FailureManager;
import org.apache.ignite.internal.lang.NodeStoppingException;
@@ -44,7 +42,6 @@ import org.apache.ignite.internal.raft.LeaderElectionListener;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.Marshaller;
import org.apache.ignite.internal.raft.Peer;
-import org.apache.ignite.internal.raft.PeerUnavailableException;
import org.apache.ignite.internal.raft.PeersAndLearners;
import org.apache.ignite.internal.raft.ThrottlingContextHolder;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
@@ -60,6 +57,8 @@ import org.jetbrains.annotations.Nullable;
/**
* The RAFT group service is based on the cluster physical topology. This
service has ability to subscribe of a RAFT group leader update.
+ *
+ * <p>Command execution with leader-aware retry semantics is delegated to
{@link RaftCommandExecutor}.
*/
public class PhysicalTopologyAwareRaftGroupService implements
TimeAwareRaftGroupService {
/** The logger. */
@@ -81,11 +80,17 @@ public class PhysicalTopologyAwareRaftGroupService
implements TimeAwareRaftGroup
private final RaftGroupService raftClient;
/** Executor to invoke RPC requests. */
- private final Executor executor;
+ private final ScheduledExecutorService executor;
/** RAFT configuration. */
private final RaftConfiguration raftConfiguration;
+ /** Factory for creating stopping exceptions. */
+ private final ExceptionFactory stoppingExceptionFactory;
+
+ /** Command executor with retry semantics. */
+ private final RaftCommandExecutor commandExecutor;
+
/**
* Constructor.
*
@@ -111,6 +116,7 @@ public class PhysicalTopologyAwareRaftGroupService
implements TimeAwareRaftGroup
this.clusterService = clusterService;
this.executor = executor;
this.raftConfiguration = raftConfiguration;
+ this.stoppingExceptionFactory = stoppingExceptionFactory;
this.raftClient = RaftGroupServiceImpl.start(
groupId,
clusterService,
@@ -123,10 +129,23 @@ public class PhysicalTopologyAwareRaftGroupService
implements TimeAwareRaftGroup
throttlingContextHolder
);
+ this.commandExecutor = new RaftCommandExecutor(
+ groupId,
+ raftClient::peers,
+ clusterService,
+ executor,
+ raftConfiguration,
+ cmdMarshaller,
+ stoppingExceptionFactory
+ );
+
this.generalLeaderElectionListener = new ServerEventHandler(executor);
eventsClientListener.addLeaderElectionListener(raftClient.groupId(),
generalLeaderElectionListener);
+ // Subscribe the command executor's leader availability state to
leader election notifications.
+ subscribeLeader(commandExecutor.leaderElectionListener());
+
TopologyService topologyService = clusterService.topologyService();
topologyService.addEventHandler(new TopologyEventHandler() {
@@ -170,6 +189,11 @@ public class PhysicalTopologyAwareRaftGroupService
implements TimeAwareRaftGroup
return;
}
+ if (leaderWithTerm == null ||
leaderWithTerm.leader() == null) {
+ LOG.debug("No leader information available
[grp={}].", groupId());
+ return;
+ }
+
InternalClusterNode leaderHost = topologyService
.getByConsistentId(leaderWithTerm.leader().consistentId());
@@ -296,7 +320,7 @@ public class PhysicalTopologyAwareRaftGroupService
implements TimeAwareRaftGroup
msgSendFut.complete(false);
}
- } else if (recoverable(invokeCause)) {
+ } else if (RaftErrorUtils.recoverable(invokeCause)) {
sendWithRetry(node, msg, msgSendFut);
} else if (invokeCause instanceof RecipientLeftException) {
LOG.info(
@@ -315,18 +339,9 @@ public class PhysicalTopologyAwareRaftGroupService
implements TimeAwareRaftGroup
}, executor);
}
- private static boolean recoverable(Throwable t) {
- t = unwrapCause(t);
-
- return t instanceof TimeoutException
- || t instanceof IOException
- || t instanceof PeerUnavailableException
- || t instanceof RecipientLeftException;
- }
-
@Override
public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
- return raftClient.run(cmd, timeoutMillis);
+ return commandExecutor.run(cmd, timeoutMillis);
}
@Override
@@ -411,6 +426,9 @@ public class PhysicalTopologyAwareRaftGroupService
implements TimeAwareRaftGroup
@Override
public void shutdown() {
+ // Stop the command executor first - blocks new run() calls, cancels
leader waiters.
+ commandExecutor.shutdown(stoppingExceptionFactory.create("Raft client
is stopping [groupId=" + groupId() + "]."));
+
finishSubscriptions();
raftClient.shutdown();
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/RaftCommandExecutor.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/RaftCommandExecutor.java
new file mode 100644
index 00000000000..fb293931f2f
--- /dev/null
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/RaftCommandExecutor.java
@@ -0,0 +1,1044 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.client;
+
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static
org.apache.ignite.internal.tostring.IgniteToStringBuilder.includeSensitive;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
+import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.network.InternalClusterNode;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.raft.ExceptionFactory;
+import org.apache.ignite.internal.raft.LeaderElectionListener;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.Marshaller;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.PeerUnavailableException;
+import org.apache.ignite.internal.raft.ReadCommand;
+import org.apache.ignite.internal.raft.ReplicationGroupUnavailableException;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import org.apache.ignite.internal.raft.rebalance.RaftStaleUpdateException;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.rpc.ActionRequest;
+import org.apache.ignite.raft.jraft.rpc.ActionResponse;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests.ErrorResponse;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests.SMErrorResponse;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftException;
+import org.apache.ignite.raft.jraft.rpc.impl.SMCompactedThrowable;
+import org.apache.ignite.raft.jraft.rpc.impl.SMFullThrowable;
+import org.apache.ignite.raft.jraft.rpc.impl.SMThrowable;
+import org.apache.ignite.raft.jraft.util.Utils;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Executes RAFT commands with leader-aware retry semantics.
+ *
+ * <p>Supports three timeout modes:
+ * <ul>
+ * <li>{@code timeout == 0}: Single attempt - tries each peer once, throws
{@link ReplicationGroupUnavailableException}
+ * if no leader is available.</li>
+ * <li>{@code timeout < 0} or {@code == Long.MAX_VALUE}: Infinite wait
mode - waits indefinitely for leader,
+ * retries recoverable errors within {@code retryTimeoutMillis}.</li>
+ * <li>{@code 0 < timeout < Long.MAX_VALUE}: Bounded wait mode - waits up
to {@code timeout} for leader,
+ * throws {@link ReplicationGroupUnavailableException} on timeout.</li>
+ * </ul>
+ */
+class RaftCommandExecutor {
+ /** The logger. */
+ private static final IgniteLogger LOG =
Loggers.forClass(RaftCommandExecutor.class);
+
+ /** Raft message factory. */
+ private static final RaftMessagesFactory MESSAGES_FACTORY = Loza.FACTORY;
+
+ /** Replication group ID. */
+ private final ReplicationGroupId groupId;
+
+ /** Supplier for the current peer list. Returns an immutable snapshot
backed by a volatile field. */
+ private final Supplier<List<Peer>> peersSupplier;
+
+ /** Cluster service. */
+ private final ClusterService clusterService;
+
+ /** Executor to invoke RPC requests. */
+ private final ScheduledExecutorService executor;
+
+ /** RAFT configuration. */
+ private final RaftConfiguration raftConfiguration;
+
+ /** Command marshaller. */
+ private final Marshaller commandsMarshaller;
+
+ /** Factory for creating stopping exceptions. */
+ private final ExceptionFactory stoppingExceptionFactory;
+
+ /** State machine for tracking leader availability. */
+ private final LeaderAvailabilityState leaderAvailabilityState;
+
+ /** Current leader. */
+ private volatile Peer leader;
+
+ /** Busy lock for shutdown. */
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ /** This flag is used only for logging. */
+ private final AtomicBoolean peersAreUnavailable = new AtomicBoolean();
+
+ /**
+ * Constructor.
+ *
+ * @param groupId Replication group ID.
+ * @param peersSupplier Supplier for the current peer list. Must return an
immutable snapshot.
+ * @param clusterService Cluster service.
+ * @param executor Executor to invoke RPC requests.
+ * @param raftConfiguration RAFT configuration.
+ * @param commandsMarshaller Command marshaller.
+ * @param stoppingExceptionFactory Factory for creating stopping
exceptions.
+ */
+ RaftCommandExecutor(
+ ReplicationGroupId groupId,
+ Supplier<List<Peer>> peersSupplier,
+ ClusterService clusterService,
+ ScheduledExecutorService executor,
+ RaftConfiguration raftConfiguration,
+ Marshaller commandsMarshaller,
+ ExceptionFactory stoppingExceptionFactory
+ ) {
+ this.groupId = groupId;
+ this.peersSupplier = peersSupplier;
+ this.clusterService = clusterService;
+ this.executor = executor;
+ this.raftConfiguration = raftConfiguration;
+ this.commandsMarshaller = commandsMarshaller;
+ this.stoppingExceptionFactory = stoppingExceptionFactory;
+ this.leaderAvailabilityState = new LeaderAvailabilityState();
+ }
+
+ /**
+ * Returns a {@link LeaderElectionListener} that feeds the internal leader
availability state machine.
+ *
+ * @return Leader election listener callback.
+ */
+ LeaderElectionListener leaderElectionListener() {
+ return leaderAvailabilityState::onLeaderElected;
+ }
+
+ /**
+ * Executes a command with leader-aware retry semantics.
+ *
+ * @param cmd Command to execute.
+ * @param timeoutMillis Timeout in milliseconds (0 = single attempt,
negative/MAX_VALUE = infinite, positive = bounded).
+ * @return Future that completes with the command result.
+ */
+ <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
+ // Normalize timeout: negative values mean infinite wait.
+ long effectiveTimeout = (timeoutMillis < 0) ? Long.MAX_VALUE :
timeoutMillis;
+ // Wait for leader mode (bounded or infinite).
+ long deadline = Utils.monotonicMsAfter(effectiveTimeout);
+
+ return executeWithBusyLock(responseFuture -> {
+ if (effectiveTimeout == 0) {
+ tryAllPeersOnce(responseFuture, cmd);
+ } else {
+ startRetryPhase(responseFuture, cmd, deadline,
leaderAvailabilityState.currentTerm());
+ }
+ });
+ }
+
+ /**
+ * Shuts down the executor, blocking new run() calls and cancelling leader
waiters.
+ *
+ * <p>Shutdown ordering is critical:
+ * <ol>
+ * <li>Block busyLock first - ensures no new run() calls enter the
busy section.
+ * In-flight calls waiting on awaitLeader() have already exited
the busy section.</li>
+ * <li>Stop leaderAvailabilityState second - completes awaitLeader()
futures exceptionally,
+ * triggering callbacks that will be rejected by the blocked
busyLock.</li>
+ * </ol>
+ *
+ * @param stopException Exception to complete waiters with.
+ */
+ void shutdown(Throwable stopException) {
+ busyLock.block();
+
+ leaderAvailabilityState.stop(stopException);
+ }
+
+ /**
+ * Resolves initial target peer for a command execution.
+ *
+ * <p>Tries the known leader first, falling back to a random peer if no
leader is known.
+ *
+ * @return Initial target peer, or {@code null}.
+ */
+ private @Nullable Peer resolveInitialPeer() {
+ Peer targetPeer = leader;
+ if (targetPeer == null) {
+ targetPeer = randomNode(null, false);
+ }
+ return targetPeer;
+ }
+
+ /**
+ * Tries all peers once without waiting for leader.
+ *
+ * @param resultFuture Future that completes with the response, or fails
with {@link ReplicationGroupUnavailableException} if no
+ * peer responds successfully.
+ * @param cmd The command to execute.
+ */
+ private void tryAllPeersOnce(CompletableFuture<ActionResponse>
resultFuture, Command cmd) {
+ Peer targetPeer = resolveInitialPeer();
+ if (targetPeer == null) {
+ resultFuture.completeExceptionally(new
ReplicationGroupUnavailableException(groupId));
+
+ return;
+ }
+
+ var context = new RetryContext(
+ groupId.toString(),
+ targetPeer,
+ cmd::toStringForLightLogging,
+ createRequestFactory(cmd),
+ 0, // Single attempt - no retry timeout
+ raftConfiguration.responseTimeoutMillis().value()
+ );
+
+ sendWithRetrySingleAttempt(resultFuture, context);
+ }
+
+ /**
+ * Executes an action within busy lock and transforms the response.
+ */
+ @SuppressWarnings("unchecked")
+ private <R> CompletableFuture<R>
executeWithBusyLock(Consumer<CompletableFuture<ActionResponse>> action) {
+ var responseFuture = new CompletableFuture<ActionResponse>();
+ var resultFuture = new CompletableFuture<R>();
+
+ if (!busyLock.enterBusy()) {
+
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client
is stopping [groupId=" + groupId + "]."));
+
+ return resultFuture;
+ }
+
+ try {
+ action.accept(responseFuture);
+
+ // Transform ActionResponse to result type.
+ responseFuture.whenComplete((resp, err) -> {
+ if (err != null) {
+ resultFuture.completeExceptionally(err);
+ } else {
+ resultFuture.complete((R) resp.result());
+ }
+ });
+
+ return resultFuture;
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * Applies deadline to a future using orTimeout.
+ *
+ * @param future The future to apply deadline to.
+ * @param deadline Deadline in monotonic milliseconds, or Long.MAX_VALUE
for no deadline.
+ * @return The future with timeout applied, or the original future if no
deadline.
+ */
+ private static <T> CompletableFuture<T> applyDeadline(CompletableFuture<T>
future, long deadline) {
+ if (deadline == Long.MAX_VALUE) {
+ return future;
+ }
+ long remainingTime = deadline - Utils.monotonicMs();
+ if (remainingTime <= 0) {
+ return CompletableFuture.failedFuture(new TimeoutException());
+ }
+ return future.orTimeout(remainingTime, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Creates a timeout exception for leader wait.
+ */
+ private ReplicationGroupUnavailableException createTimeoutException() {
+ return new ReplicationGroupUnavailableException(
+ groupId,
+ "Timeout waiting for leader [groupId=" + groupId + "]."
+ );
+ }
+
+ /**
+ * Waits for leader to become available and then retries the command.
+ */
+ private void waitForLeaderAndRetry(CompletableFuture<ActionResponse>
resultFuture, Command cmd, long deadline) {
+ CompletableFuture<Long> leaderFuture =
leaderAvailabilityState.awaitLeader();
+
+ // Apply timeout if bounded.
+ CompletableFuture<Long> timedLeaderFuture =
applyDeadline(leaderFuture, deadline);
+
+ timedLeaderFuture.whenCompleteAsync((term, waitError) -> {
+ if (waitError != null) {
+ Throwable cause = unwrapCause(waitError);
+ if (cause instanceof TimeoutException) {
+
resultFuture.completeExceptionally(createTimeoutException());
+ } else {
+ resultFuture.completeExceptionally(cause);
+ }
+ return;
+ }
+
+ if (!busyLock.enterBusy()) {
+
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client
is stopping [groupId=" + groupId + "]."));
+ return;
+ }
+
+ try {
+ // Leader is available, now run the command with retry logic.
+ startRetryPhase(resultFuture, cmd, deadline, term);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }, executor);
+ }
+
+ /**
+ * Starts the retry phase after leader is available.
+ */
+ private void startRetryPhase(CompletableFuture<ActionResponse>
resultFuture, Command cmd, long deadline, long term) {
+ Peer targetPeer = resolveInitialPeer();
+ if (targetPeer == null) {
+ resultFuture.completeExceptionally(new
ReplicationGroupUnavailableException(groupId));
+
+ return;
+ }
+
+ // Check deadline before starting retry phase.
+ long now = Utils.monotonicMs();
+ if (deadline != Long.MAX_VALUE && now >= deadline) {
+ resultFuture.completeExceptionally(createTimeoutException());
+ return;
+ }
+
+ // Use retry timeout bounded by remaining time until deadline.
+ long configTimeout = raftConfiguration.retryTimeoutMillis().value();
+
+ long sendWithRetryTimeoutMillis = deadline == Long.MAX_VALUE ?
configTimeout : Math.min(configTimeout, deadline - now);
+
+ var context = new RetryContext(
+ groupId.toString(),
+ targetPeer,
+ cmd::toStringForLightLogging,
+ createRequestFactory(cmd),
+ sendWithRetryTimeoutMillis,
+ raftConfiguration.responseTimeoutMillis().value()
+ );
+
+ sendWithRetryWaitingForLeader(resultFuture, context, cmd, deadline,
term);
+ }
+
+ /**
+ * Creates a request factory for the given command.
+ */
+ private Function<Peer, ActionRequest> createRequestFactory(Command cmd) {
+ if (cmd instanceof WriteCommand) {
+ return targetPeer -> MESSAGES_FACTORY.writeActionRequest()
+ .groupId(groupId.toString())
+ .command(commandsMarshaller.marshall(cmd))
+ .deserializedCommand((WriteCommand) cmd)
+ .build();
+ } else {
+ return targetPeer -> MESSAGES_FACTORY.readActionRequest()
+ .groupId(groupId.toString())
+ .command((ReadCommand) cmd)
+ .readOnlySafe(true)
+ .build();
+ }
+ }
+
+ /**
+ * Sends a request with single attempt (no retry on timeout).
+ *
+ * <p>In single-attempt mode, each peer is tried at most once.
+ */
+ private <R extends NetworkMessage> void sendWithRetrySingleAttempt(
+ CompletableFuture<R> fut,
+ RetryContext retryContext
+ ) {
+ if (!busyLock.enterBusy()) {
+ fut.completeExceptionally(stoppingExceptionFactory.create("Raft
client is stopping [groupId=" + groupId + "]."));
+ return;
+ }
+
+ try {
+ long responseTimeout = retryContext.responseTimeoutMillis();
+
+ retryContext.onNewAttempt();
+
+ resolvePeer(retryContext.targetPeer())
+ .thenCompose(node ->
clusterService.messagingService().invoke(node, retryContext.request(),
responseTimeout))
+ .whenCompleteAsync((resp, err) ->
+ handleResponse(
+ fut,
+ resp,
+ err,
+ retryContext,
+ new SingleAttemptRetryStrategy(fut)
+ ),
+ executor
+ );
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * Attempts to select a peer for retrying after a recoverable error.
+ *
+ * <p>This method combines error validation with peer selection for retry
handling.
+ *
+ * @param err The throwable to check (will be unwrapped).
+ * @param retryContext Retry context for getting next peer.
+ * @return Result containing the next peer, or the exception to complete
the future with.
+ */
+ private RetryPeerResult selectPeerForRetry(Throwable err, RetryContext
retryContext) {
+ Throwable unwrappedErr = unwrapCause(err);
+
+ if (!RaftErrorUtils.recoverable(unwrappedErr)) {
+ return RetryPeerResult.fail(unwrappedErr);
+ }
+
+ Peer nextPeer = randomNode(retryContext, false);
+
+ if (nextPeer == null) {
+ return RetryPeerResult.fail(new
ReplicationGroupUnavailableException(groupId));
+ }
+
+ return RetryPeerResult.success(nextPeer);
+ }
+
+ /**
+ * Result of attempting to select a peer for retry.
+ */
+ private static final class RetryPeerResult {
+ private final @Nullable Peer peer;
+ private final @Nullable Throwable error;
+
+ private RetryPeerResult(@Nullable Peer peer, @Nullable Throwable
error) {
+ this.peer = peer;
+ this.error = error;
+ }
+
+ static RetryPeerResult success(Peer peer) {
+ return new RetryPeerResult(peer, null);
+ }
+
+ static RetryPeerResult fail(Throwable error) {
+ return new RetryPeerResult(null, error);
+ }
+
+ boolean isSuccess() {
+ return peer != null;
+ }
+
+ Peer peer() {
+ assert peer != null : "Check isSuccess() before calling peer()";
+ return peer;
+ }
+
+ Throwable error() {
+ assert error != null : "Check isSuccess() before calling error()";
+ return error;
+ }
+ }
+
+ private static void logRecoverableError(RetryContext retryContext, Peer
nextPeer) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Recoverable error during the request occurred (will be
retried) [request={}, peer={}, newPeer={}, traceId={}].",
+ includeSensitive() ? retryContext.request() :
retryContext.request().toStringForLightLogging(),
+ retryContext.targetPeer(),
+ nextPeer,
+ retryContext.errorTraceId()
+ );
+ }
+ }
+
+ /**
+ * Handles the response from a raft peer request.
+ *
+ * <p>Dispatches the response to the appropriate handler based on its type:
+ * throwable, error response, state machine error, or successful response.
+ *
+ * @param fut Future to complete with the result.
+ * @param resp Response message, or {@code null} if an error occurred.
+ * @param err Throwable if the request failed, or {@code null} on success.
+ * @param retryContext Retry context.
+ * @param strategy Strategy for executing retries.
+ */
+ @SuppressWarnings("unchecked")
+ private <R extends NetworkMessage> void handleResponse(
+ CompletableFuture<R> fut,
+ @Nullable NetworkMessage resp,
+ @Nullable Throwable err,
+ RetryContext retryContext,
+ RetryExecutionStrategy strategy
+ ) {
+ if (!busyLock.enterBusy()) {
+ fut.completeExceptionally(stoppingExceptionFactory.create("Raft
client is stopping [groupId=" + groupId + "]."));
+ return;
+ }
+
+ try {
+ if (err != null) {
+ handleThrowableWithRetry(fut, err, retryContext, strategy);
+ } else if (resp instanceof ErrorResponse) {
+ handleErrorResponseCommon(fut, (ErrorResponse) resp,
retryContext, strategy);
+ } else if (resp instanceof SMErrorResponse) {
+ fut.completeExceptionally(extractSmError((SMErrorResponse)
resp, retryContext));
+ } else {
+ leader = retryContext.targetPeer();
+ fut.complete((R) resp);
+ }
+ } catch (Throwable e) {
+ fut.completeExceptionally(e);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ private void handleThrowableWithRetry(
+ CompletableFuture<? extends NetworkMessage> fut,
+ Throwable err,
+ RetryContext retryContext,
+ RetryExecutionStrategy strategy
+ ) {
+ RetryPeerResult result = selectPeerForRetry(err, retryContext);
+
+ if (!result.isSuccess()) {
+ fut.completeExceptionally(result.error());
+ return;
+ }
+
+ Peer nextPeer = result.peer();
+ logRecoverableError(retryContext, nextPeer);
+
+ String shortReasonMessage = "Peer " +
retryContext.targetPeer().consistentId()
+ + " threw " + unwrapCause(err).getClass().getSimpleName();
+ strategy.executeRetry(retryContext, nextPeer, PeerTracking.COMMON,
shortReasonMessage);
+ }
+
+ /**
+ * Sends a request with retry and waits for leader on leader absence.
+ */
+ private void sendWithRetryWaitingForLeader(
+ CompletableFuture<ActionResponse> fut,
+ RetryContext retryContext,
+ Command cmd,
+ long deadline,
+ long termWhenStarted
+ ) {
+ if (!busyLock.enterBusy()) {
+ fut.completeExceptionally(stoppingExceptionFactory.create("Raft
client is stopping [groupId=" + groupId + "]."));
+ return;
+ }
+
+ try {
+ long requestStartTime = Utils.monotonicMs();
+ long stopTime = retryContext.stopTime();
+
+ if (requestStartTime >= stopTime) {
+ // Retry timeout expired - fail with timeout exception.
+ // For non-leader-related retriable errors, we don't wait for
leader, just fail.
+ fut.completeExceptionally(createTimeoutException());
+ return;
+ }
+
+ retryContext.onNewAttempt();
+
+ // Bound response timeout by remaining time until retry stop time.
+ long responseTimeout =
Math.min(retryContext.responseTimeoutMillis(), stopTime - requestStartTime);
+
+ resolvePeer(retryContext.targetPeer())
+ .thenCompose(node ->
clusterService.messagingService().invoke(node, retryContext.request(),
responseTimeout))
+ // Enforce timeout even if messaging service doesn't
(e.g., in tests with mocks).
+ .orTimeout(responseTimeout > 0 ? responseTimeout :
Long.MAX_VALUE, TimeUnit.MILLISECONDS)
+ .whenCompleteAsync(
+ (resp, err) -> handleResponse(
+ fut,
+ resp,
+ err,
+ retryContext,
+ new LeaderWaitRetryStrategy(fut, cmd,
deadline, termWhenStarted)
+ ),
+ executor
+ );
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * Extracts the error from a state machine error response.
+ *
+ * @param resp State machine error response.
+ * @param retryContext Retry context (used for error trace ID).
+ * @return The throwable to complete the future with.
+ */
+ private static Throwable extractSmError(SMErrorResponse resp, RetryContext
retryContext) {
+ SMThrowable th = resp.error();
+
+ if (th instanceof SMCompactedThrowable) {
+ SMCompactedThrowable compactedThrowable = (SMCompactedThrowable)
th;
+
+ try {
+ return (Throwable)
Class.forName(compactedThrowable.throwableClassName())
+ .getConstructor(String.class)
+ .newInstance(compactedThrowable.throwableMessage());
+ } catch (Exception e) {
+ LOG.warn("Cannot restore throwable from user's state machine. "
+ + "Check if throwable " +
compactedThrowable.throwableClassName()
+ + " is present in the classpath.");
+
+ return new IgniteInternalException(
+ retryContext.errorTraceId(), INTERNAL_ERR,
compactedThrowable.throwableMessage()
+ );
+ }
+ } else if (th instanceof SMFullThrowable) {
+ return ((SMFullThrowable) th).throwable();
+ } else {
+ return new IgniteInternalException(
+ retryContext.errorTraceId(),
+ INTERNAL_ERR,
+ "Unknown SMThrowable type: " + (th == null ? "null" :
th.getClass().getName())
+ );
+ }
+ }
+
+ private static boolean retriableError(@Nullable Throwable e,
NetworkMessage raftResponse) {
+ int errorCode = raftResponse instanceof ErrorResponse ?
((ErrorResponse) raftResponse).errorCode() : 0;
+ RaftError raftError = RaftError.forNumber(errorCode);
+
+ if (e != null) {
+ e = unwrapCause(e);
+
+ // Retriable error but can be caused by an overload.
+ return e instanceof TimeoutException || e instanceof IOException;
+ }
+
+ return raftError == RaftError.EBUSY || raftError == RaftError.EAGAIN;
+ }
+
+ /**
+ * Resolves a peer to an internal cluster node.
+ */
+ private CompletableFuture<InternalClusterNode> resolvePeer(Peer peer) {
+ InternalClusterNode node =
clusterService.topologyService().getByConsistentId(peer.consistentId());
+
+ if (node == null) {
+ return CompletableFuture.failedFuture(new
PeerUnavailableException(peer.consistentId()));
+ }
+
+ return CompletableFuture.completedFuture(node);
+ }
+
+ /**
+ * Parses a peer ID string to a Peer object.
+ *
+ * @param peerId Peer ID string in format "consistentId:idx" or just
"consistentId".
+ * @return Parsed Peer object, or {@code null} if parsing fails.
+ */
+ private static @Nullable Peer parsePeer(@Nullable String peerId) {
+ PeerId id = PeerId.parsePeer(peerId);
+
+ return id == null ? null : new Peer(id.getConsistentId(), id.getIdx());
+ }
+
+ /**
+ * Returns a random peer excluding unavailable peers and optionally "no
leader" peers.
+ *
+ * <p>Behavior depends on parameters:
+ * <ul>
+ * <li>If {@code retryContext} is null, returns any peer from the
list.</li>
+ * <li>If {@code excludeNoLeaderPeers} is true, also excludes peers
that returned "no leader"
+ * and does NOT reset exclusion sets when exhausted (returns null
instead).</li>
+ * <li>If {@code excludeNoLeaderPeers} is false and all peers are
unavailable in non-single-shot mode,
+ * resets unavailable peers and tries again.</li>
+ * </ul>
+ *
+ * @param retryContext Retry context, or null for initial peer selection.
+ * @param excludeNoLeaderPeers Whether to exclude peers that returned "no
leader" response.
+ * @return A random available peer, or null if none available.
+ */
+ private @Nullable Peer randomNode(@Nullable RetryContext retryContext,
boolean excludeNoLeaderPeers) {
+ List<Peer> localPeers = peers();
+
+ if (localPeers == null || localPeers.isEmpty()) {
+ return null;
+ }
+
+ var availablePeers = new ArrayList<Peer>(localPeers.size());
+
+ if (retryContext == null) {
+ availablePeers.addAll(localPeers);
+ } else {
+ for (Peer peer : localPeers) {
+ if (retryContext.targetPeer().equals(peer) ||
retryContext.unavailablePeers().contains(peer)) {
+ continue;
+ }
+ if (excludeNoLeaderPeers &&
retryContext.noLeaderPeers().contains(peer)) {
+ continue;
+ }
+ availablePeers.add(peer);
+ }
+
+ // Reset unavailable peers only if NOT tracking no-leader peers
separately.
+ // When tracking no-leader peers, we want to exhaust all peers and
then wait for leader.
+ if (availablePeers.isEmpty() && !excludeNoLeaderPeers &&
!retryContext.singleShotRequest()) {
+ if (!peersAreUnavailable.getAndSet(true)) {
+ LOG.warn(
+ "All peers are unavailable, going to keep retrying
until timeout [peers = {}, group = {}, trace ID: {}, "
+ + "request {}, origin command {},
instance={}].",
+ localPeers,
+ groupId,
+ retryContext.errorTraceId(),
+ retryContext.request().toStringForLightLogging(),
+ retryContext.originCommandDescription(),
+ this
+ );
+ }
+
+ retryContext.resetUnavailablePeers();
+ retryContext.resetNoLeaderPeers();
+
+ // Read the volatile field again, just in case it changed.
+ availablePeers.addAll(peers());
+ } else {
+ peersAreUnavailable.set(false);
+ }
+ }
+
+ if (availablePeers.isEmpty()) {
+ return null;
+ }
+
+ Collections.shuffle(availablePeers, ThreadLocalRandom.current());
+
+ return availablePeers.stream()
+ .filter(peer ->
clusterService.topologyService().getByConsistentId(peer.consistentId()) != null)
+ .findAny()
+ .orElse(availablePeers.get(0));
+ }
+
+ private List<Peer> peers() {
+ return peersSupplier.get();
+ }
+
+ private static String getShortReasonMessage(RetryContext retryContext,
RaftError error, ErrorResponse resp) {
+ return format("Peer {} returned code {}: {}",
retryContext.targetPeer().consistentId(), error, resp.errorMsg());
+ }
+
+ /**
+ * How to track the current peer when moving to a new one.
+ */
+ private enum PeerTracking {
+ /** Don't mark the current peer (transient errors, leader redirects).
*/
+ COMMON,
+ /** Mark as unavailable (down, shutting down). */
+ UNAVAILABLE,
+ /** Mark as "no leader" (working but doesn't know leader). */
+ NO_LEADER
+ }
+
+ /**
+ * Strategy for executing retries. Each method receives everything needed
to perform the retry.
+ *
+ * <p>This interface abstracts the differences between single-attempt mode
and leader-wait mode:
+ * <ul>
+ * <li><b>Single-attempt mode</b>: Each peer is tried at most once.
All errors mark the peer
+ * as unavailable. When all peers exhausted, fail immediately.</li>
+ * <li><b>Leader-wait mode</b>: Transient errors retry on the same
peer with delay.
+ * "No leader" errors track peers separately. When exhausted, wait
for leader notification.</li>
+ * </ul>
+ */
+ private interface RetryExecutionStrategy {
+ /**
+ * Executes retry on the specified peer with the given tracking for
the current peer.
+ *
+ * @param context Current retry context.
+ * @param nextPeer Peer to retry on.
+ * @param trackCurrentAs How to track the current peer ({@link
PeerTracking#COMMON} for "don't track").
+ * @param reason Human-readable reason for the retry.
+ */
+ void executeRetry(RetryContext context, Peer nextPeer, PeerTracking
trackCurrentAs, String reason);
+
+ /**
+ * Called when all peers have been exhausted.
+ *
+ * <p>In leader-wait mode, this triggers waiting for leader
notification.
+ * In single-attempt mode, this completes with {@link
ReplicationGroupUnavailableException}.
+ */
+ void onAllPeersExhausted();
+
+ /**
+ * Whether to track "no leader" peers separately from unavailable
peers.
+ *
+ * <p>In leader-wait mode, peers that return "no leader" are tracked
separately so that
+ * when all peers are exhausted, the strategy can wait for a leader
notification rather
+ * than failing immediately. In single-attempt mode, all errors are
treated uniformly
+ * as unavailable.
+ */
+ boolean trackNoLeaderSeparately();
+ }
+
+ /**
+ * Retry strategy for single-attempt mode.
+ *
+ * <p>In single-attempt mode, each peer is tried at most once. All errors
mark the peer
+ * as unavailable. When all peers have been tried, the request fails with
+ * {@link ReplicationGroupUnavailableException}.
+ */
+ private class SingleAttemptRetryStrategy implements RetryExecutionStrategy
{
+ private final CompletableFuture<? extends NetworkMessage> fut;
+
+ SingleAttemptRetryStrategy(CompletableFuture<? extends NetworkMessage>
fut) {
+ this.fut = fut;
+ }
+
+ @Override
+ public void executeRetry(RetryContext context, Peer nextPeer,
PeerTracking trackCurrentAs, String reason) {
+ // In single-attempt mode, each peer is tried at most once.
+ // For transient errors, the caller passes the same peer (meaning
"retry on same peer"),
+ // but we must select a different one instead.
+ Peer effectiveNextPeer = nextPeer.equals(context.targetPeer())
+ ? randomNode(context, false)
+ : nextPeer;
+
+ if (effectiveNextPeer == null) {
+ onAllPeersExhausted();
+ return;
+ }
+
+ sendWithRetrySingleAttempt(fut,
context.nextAttemptForUnavailablePeer(effectiveNextPeer, reason));
+ }
+
+ @Override
+ public void onAllPeersExhausted() {
+ fut.completeExceptionally(new
ReplicationGroupUnavailableException(groupId));
+ }
+
+ @Override
+ public boolean trackNoLeaderSeparately() {
+ return false;
+ }
+ }
+
+ /**
+ * Retry strategy for leader-wait mode.
+ *
+ * <p>In leader-wait mode:
+ * <ul>
+ * <li>Transient errors retry on the same peer after delay</li>
+ * <li>"No leader" errors track peers separately and try each peer
once</li>
+ * <li>When all peers are exhausted, waits for leader notification</li>
+ * </ul>
+ */
+ private class LeaderWaitRetryStrategy implements RetryExecutionStrategy {
+ private final CompletableFuture<ActionResponse> fut;
+ private final Command cmd;
+ private final long deadline;
+ private final long termWhenStarted;
+
+ LeaderWaitRetryStrategy(
+ CompletableFuture<ActionResponse> fut,
+ Command cmd,
+ long deadline,
+ long termWhenStarted
+ ) {
+ this.fut = fut;
+ this.cmd = cmd;
+ this.deadline = deadline;
+ this.termWhenStarted = termWhenStarted;
+ }
+
+ @Override
+ public void executeRetry(RetryContext context, Peer nextPeer,
PeerTracking trackCurrentAs, String reason) {
+ RetryContext nextContext;
+ switch (trackCurrentAs) {
+ case COMMON:
+ nextContext = context.nextAttempt(nextPeer, reason);
+ break;
+ case UNAVAILABLE:
+ nextContext =
context.nextAttemptForUnavailablePeer(nextPeer, reason);
+ break;
+ case NO_LEADER:
+ nextContext = context.nextAttemptForNoLeaderPeer(nextPeer,
reason);
+ break;
+ default:
+ throw new AssertionError("Unexpected tracking: " +
trackCurrentAs);
+ }
+
+ executor.schedule(
+ () -> sendWithRetryWaitingForLeader(fut, nextContext, cmd,
deadline, termWhenStarted),
+ raftConfiguration.retryDelayMillis().value(),
+ TimeUnit.MILLISECONDS
+ );
+ }
+
+ @Override
+ public void onAllPeersExhausted() {
+ LOG.debug("All peers exhausted, waiting for leader [groupId={},
term={}]", groupId, termWhenStarted);
+ leaderAvailabilityState.onGroupUnavailable(termWhenStarted);
+ waitForLeaderAndRetry(fut, cmd, deadline);
+ }
+
+ @Override
+ public boolean trackNoLeaderSeparately() {
+ return true;
+ }
+ }
+
+ /**
+ * Selects next peer and executes retry, or calls exhausted handler.
+ * Centralizes the "select peer + null check + fallback" pattern.
+ *
+ * @param context Retry context.
+ * @param excludeNoLeaderPeers Whether to also exclude peers that returned
"no leader".
+ * @param trackCurrentAs How to track the current peer.
+ * @param reason Human-readable reason for the retry.
+ * @param strategy Strategy for executing the retry.
+ */
+ private void selectPeerAndRetry(
+ RetryContext context,
+ boolean excludeNoLeaderPeers,
+ PeerTracking trackCurrentAs,
+ String reason,
+ RetryExecutionStrategy strategy
+ ) {
+ Peer nextPeer = randomNode(context, excludeNoLeaderPeers);
+ if (nextPeer != null) {
+ strategy.executeRetry(context, nextPeer, trackCurrentAs, reason);
+ } else {
+ strategy.onAllPeersExhausted();
+ }
+ }
+
+ /**
+ * Handles error response using the provided strategy.
+ *
+ * <p>This method handles peer selection centrally. The strategy only
decides HOW to retry
+ * (with delay, marking peers, waiting for leader, etc.).
+ *
+ * <p>Error categories:
+ * <ul>
+ * <li><b>SUCCESS</b>: Complete successfully</li>
+ * <li><b>Transient</b> (EBUSY/EAGAIN/UNKNOWN/EINTERNAL/ENOENT): Retry
on same peer</li>
+ * <li><b>Unavailable</b> (EHOSTDOWN/ESHUTDOWN/ENODESHUTDOWN/ESTOP):
Peer is down, try another</li>
+ * <li><b>EPERM with leader</b>: Redirect to actual leader</li>
+ * <li><b>EPERM without leader</b>: No leader known, try another peer
or wait</li>
+ * <li><b>ESTALE</b>: Terminal error, stale update</li>
+ * <li><b>Other</b>: Terminal error</li>
+ * </ul>
+ *
+ * @param fut Future to complete on terminal errors.
+ * @param resp Error response from the peer.
+ * @param retryContext Retry context.
+ * @param strategy Strategy for executing retries.
+ */
+ private void handleErrorResponseCommon(
+ CompletableFuture<? extends NetworkMessage> fut,
+ ErrorResponse resp,
+ RetryContext retryContext,
+ RetryExecutionStrategy strategy
+ ) {
+ boolean trackNoLeaderSeparately = strategy.trackNoLeaderSeparately();
+ RaftError error = RaftError.forNumber(resp.errorCode());
+ String reason = getShortReasonMessage(retryContext, error, resp);
+
+ switch (error) {
+ case SUCCESS:
+ leader = retryContext.targetPeer();
+ fut.complete(null);
+ break;
+
+ case EBUSY:
+ case EAGAIN:
+ case UNKNOWN:
+ case EINTERNAL:
+ case ENOENT:
+ // Transient errors - retry on same peer (COMMON = don't mark
current peer).
+ strategy.executeRetry(retryContext, retryContext.targetPeer(),
PeerTracking.COMMON, reason);
+ break;
+
+ case EHOSTDOWN:
+ case ESHUTDOWN:
+ case ENODESHUTDOWN:
+ case ESTOP:
+ // Peer is down - select new peer, mark current as UNAVAILABLE.
+ selectPeerAndRetry(retryContext, false,
PeerTracking.UNAVAILABLE, reason, strategy);
+ break;
+
+ case EPERM:
+ if (resp.leaderId() == null) {
+ // No leader known - select new peer, track based on mode.
+ PeerTracking tracking = trackNoLeaderSeparately ?
PeerTracking.NO_LEADER : PeerTracking.UNAVAILABLE;
+ selectPeerAndRetry(retryContext, trackNoLeaderSeparately,
tracking, reason, strategy);
+ } else {
+ // Redirect to known leader (COMMON = don't mark current
peer as bad).
+ Peer leaderPeer = parsePeer(resp.leaderId());
+
+ if (leaderPeer == null) {
+ throw new IllegalStateException("parsePeer returned
null for non-null leaderId: " + resp.leaderId());
+ }
+
+ leader = leaderPeer;
+ strategy.executeRetry(retryContext, leaderPeer,
PeerTracking.COMMON, reason);
+ }
+ break;
+
+ case ESTALE:
+ fut.completeExceptionally(new
RaftStaleUpdateException(resp.errorMsg()));
+ break;
+
+ default:
+ fut.completeExceptionally(new RaftException(error,
resp.errorMsg()));
+ break;
+ }
+ }
+}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/RaftErrorUtils.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/RaftErrorUtils.java
new file mode 100644
index 00000000000..6d4278d74f6
--- /dev/null
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/RaftErrorUtils.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.client;
+
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
+
+import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.internal.network.RecipientLeftException;
+import org.apache.ignite.internal.raft.PeerUnavailableException;
+
+/**
+ * Utility methods for classifying RAFT errors.
+ */
+class RaftErrorUtils {
+ /**
+ * Returns {@code true} if the given throwable represents a recoverable
error that can be retried.
+ *
+ * @param t Throwable to check.
+ * @return {@code true} if recoverable.
+ */
+ static boolean recoverable(Throwable t) {
+ t = unwrapCause(t);
+
+ return t instanceof TimeoutException
+ || t instanceof IOException
+ || t instanceof PeerUnavailableException
+ || t instanceof RecipientLeftException;
+ }
+}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/RetryContext.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/RetryContext.java
index 4b7cc3b6307..ccf60bede8c 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/RetryContext.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/RetryContext.java
@@ -44,8 +44,14 @@ import org.jetbrains.annotations.Nullable;
* calls.
*/
class RetryContext {
+ /** Indicates that default response timeout should be used. */
+ static final long USE_DEFAULT_RESPONSE_TIMEOUT = -1;
+
private static final int MAX_RETRY_REASONS = 25;
+ private static final DateTimeFormatter TIMESTAMP_FORMATTER =
+
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss,SSS").withZone(ZoneId.systemDefault());
+
private final String groupId;
private Peer targetPeer;
@@ -75,6 +81,13 @@ class RetryContext {
*/
private final Set<Peer> unavailablePeers = new HashSet<>();
+ /**
+ * Set of peers that returned "no leader" response (EPERM with
leaderId==null).
+ * Unlike {@link #unavailablePeers}, this set is NOT reset when all peers
are exhausted.
+ * Instead, when all peers are in this set, we wait for leader
notification.
+ */
+ private final Set<Peer> noLeaderPeers = new HashSet<>();
+
/**
* List of last {@value MAX_RETRY_REASONS} retry reasons. {@link
LinkedList} in order to allow fast head removal upon overflow.
*/
@@ -96,6 +109,11 @@ class RetryContext {
private final long responseTimeoutMillis;
+ /**
+ * If {@code true} then all peers will be retried only once.
+ */
+ private final boolean singleShotRequest;
+
/**
* Creates a context.
*
@@ -127,6 +145,7 @@ class RetryContext {
this.attemptScheduleTime = this.startTime;
this.attemptStartTime = this.startTime;
this.responseTimeoutMillis = responseTimeoutMillis;
+ this.singleShotRequest = sendWithRetryTimeoutMillis == 0;
}
Peer targetPeer() {
@@ -157,10 +176,34 @@ class RetryContext {
return errorTraceId;
}
+ boolean singleShotRequest() {
+ return singleShotRequest;
+ }
+
void resetUnavailablePeers() {
unavailablePeers.clear();
}
+ void resetNoLeaderPeers() {
+ noLeaderPeers.clear();
+ }
+
+ Set<Peer> noLeaderPeers() {
+ return noLeaderPeers;
+ }
+
+ /**
+ * Updates this context by changing the target peer and adding the
previous target peer to the "no leader" set.
+ * Used when a peer returns EPERM with no leader information.
+ *
+ * @return {@code this}.
+ */
+ RetryContext nextAttemptForNoLeaderPeer(Peer newTargetPeer, String
shortReasonMessage) {
+ noLeaderPeers.add(targetPeer);
+
+ return nextAttempt(newTargetPeer, shortReasonMessage);
+ }
+
/**
* Updates this context by changing the target peer.
*
@@ -195,10 +238,7 @@ class RetryContext {
}
private static String timestampToString(long timestamp) {
- DateTimeFormatter formatter =
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss,SSS")
- .withZone(ZoneId.systemDefault());
- Instant instant = Instant.ofEpochMilli(timestamp);
- return formatter.format(instant);
+ return TIMESTAMP_FORMATTER.format(Instant.ofEpochMilli(timestamp));
}
/**
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/client/LeaderAvailabilityStateTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/client/LeaderAvailabilityStateTest.java
new file mode 100644
index 00000000000..acc1b3a7af8
--- /dev/null
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/client/LeaderAvailabilityStateTest.java
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.client;
+
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.deriveUuidFrom;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.network.ClusterNodeImpl;
+import org.apache.ignite.internal.network.InternalClusterNode;
+import org.apache.ignite.internal.raft.client.LeaderAvailabilityState.State;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.network.NetworkAddress;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for {@link LeaderAvailabilityState} state machine.
+ */
+public class LeaderAvailabilityStateTest extends BaseIgniteAbstractTest {
+
+ /**
+ * Initial state should be {@link State#WAITING_FOR_LEADER} with term -1.
+ * {@link LeaderAvailabilityState#onGroupUnavailable(long)} has no effect
when already in {@link State#WAITING_FOR_LEADER}.
+ */
+ @Test
+ void testOnGroupUnavailableWhenAlreadyWaiting() {
+ LeaderAvailabilityState state = new LeaderAvailabilityState();
+
+ assertEquals(State.WAITING_FOR_LEADER, state.currentState());
+ assertEquals(-1, state.currentTerm());
+
+ // Already in WAITING_FOR_LEADER, should have no effect
+ state.onGroupUnavailable(0);
+
+ assertEquals(State.WAITING_FOR_LEADER, state.currentState());
+ assertEquals(-1, state.currentTerm());
+ }
+
+ /** {@link LeaderAvailabilityState#awaitLeader()} returns incomplete
future when no leader elected. */
+ @Test
+ void testAwaitLeaderWhenNoLeader() {
+ LeaderAvailabilityState state = new LeaderAvailabilityState();
+
+ CompletableFuture<Long> future = state.awaitLeader();
+
+ assertFalse(future.isDone(), "Future should not be completed when no
leader");
+ }
+
+ /**
+ * {@link LeaderAvailabilityState#onLeaderElected(InternalClusterNode,
long)} transitions to {@link State#LEADER_AVAILABLE} state and
+ * completes waiting futures.
+ * {@link LeaderAvailabilityState#awaitLeader()} returns completed future
when leader is available.
+ */
+ @Test
+ void testAwaitLeader() {
+ LeaderAvailabilityState state = new LeaderAvailabilityState();
+ InternalClusterNode leaderNode = createNode("leader-node");
+
+ long expectedTerm = 1;
+
+ // No leader present. The future will wait.
+ CompletableFuture<Long> waiter = state.awaitLeader();
+ assertFalse(waiter.isDone());
+
+ state.onLeaderElected(leaderNode, expectedTerm);
+
+ // Verify leader is available after onLeaderElected.
+ assertEquals(State.LEADER_AVAILABLE, state.currentState());
+ assertEquals(expectedTerm, state.currentTerm());
+
+ assertTrue(waiter.isDone());
+ assertEquals(expectedTerm, waiter.join());
+
+ // Leader is present. The future is completed.
+ CompletableFuture<Long> future = state.awaitLeader();
+
+ assertTrue(future.isDone(), "Future should be completed when leader
available");
+ assertEquals(expectedTerm, future.join());
+
+ // Verify the state has not changed and we see exactly the same
results as previously.
+ assertEquals(State.LEADER_AVAILABLE, state.currentState());
+ assertEquals(expectedTerm, state.currentTerm());
+ }
+
+ /** Stale term notifications are ignored. */
+ @Test
+ void testStaleTermIgnored() {
+ LeaderAvailabilityState state = new LeaderAvailabilityState();
+ InternalClusterNode leaderNode1 = createNode("leader-1");
+ InternalClusterNode leaderNode2 = createNode("leader-2");
+
+ long expectedTerm = 5;
+
+ state.onLeaderElected(leaderNode1, expectedTerm);
+ assertEquals(expectedTerm, state.currentTerm());
+
+ // Stale notification with lower term should be ignored.
+ state.onLeaderElected(leaderNode2, 3);
+ assertEquals(expectedTerm, state.currentTerm());
+ }
+
+ /** Equal term notifications are ignored. */
+ @Test
+ void testEqualTermIgnored() {
+ LeaderAvailabilityState state = new LeaderAvailabilityState();
+ InternalClusterNode leaderNode = createNode("leader");
+
+ long expectedTerm = 5;
+
+ state.onLeaderElected(leaderNode, expectedTerm);
+ assertEquals(expectedTerm, state.currentTerm());
+
+ // Same term should be ignored.
+ state.onLeaderElected(leaderNode, expectedTerm);
+ assertEquals(expectedTerm, state.currentTerm());
+ }
+
+ /** Higher term updates current term. */
+ @Test
+ void testHigherTermUpdates() {
+ LeaderAvailabilityState state = new LeaderAvailabilityState();
+ InternalClusterNode leaderNode1 = createNode("leader-1");
+ InternalClusterNode leaderNode2 = createNode("leader-2");
+
+ state.onLeaderElected(leaderNode1, 1);
+ assertEquals(1, state.currentTerm());
+
+ state.onLeaderElected(leaderNode2, 5);
+ assertEquals(5, state.currentTerm());
+ assertEquals(State.LEADER_AVAILABLE, state.currentState());
+ }
+
+ /**
+ * {@link LeaderAvailabilityState#onGroupUnavailable(long)} transitions
back to WAITING_FOR_LEADER.
+ */
+ @Test
+ void testOnGroupUnavailableTransition() {
+ LeaderAvailabilityState state = new LeaderAvailabilityState();
+ InternalClusterNode leaderNode = createNode("leader-node");
+
+ long expectedTerm = 1;
+
+ state.onLeaderElected(leaderNode, expectedTerm);
+ assertEquals(State.LEADER_AVAILABLE, state.currentState());
+ assertEquals(expectedTerm, state.currentTerm());
+
+ state.onGroupUnavailable(expectedTerm);
+ assertEquals(State.WAITING_FOR_LEADER, state.currentState());
+ assertEquals(expectedTerm, state.currentTerm());
+ }
+
+ /**
+ * {@link LeaderAvailabilityState#onGroupUnavailable(long)} ignored if
term changed.
+ */
+ @Test
+ void testOnGroupUnavailableIgnoredIfTermChanged() {
+ LeaderAvailabilityState state = new LeaderAvailabilityState();
+ InternalClusterNode leaderNode = createNode("leader-node");
+
+ state.onLeaderElected(leaderNode, 1);
+ state.onLeaderElected(leaderNode, 2);
+ assertEquals(State.LEADER_AVAILABLE, state.currentState());
+ assertEquals(2, state.currentTerm());
+
+ // Should be ignored because term changed from 1 to 2
+ state.onGroupUnavailable(1);
+ assertEquals(State.LEADER_AVAILABLE, state.currentState());
+ assertEquals(2, state.currentTerm());
+ }
+
+ /**
+ * After {@link LeaderAvailabilityState#onGroupUnavailable(long)}, new
waiters get fresh future.
+ */
+ @Test
+ void testNewWaitersAfterUnavailable() {
+ LeaderAvailabilityState state = new LeaderAvailabilityState();
+ InternalClusterNode leaderNode = createNode("leader-node");
+
+ state.onLeaderElected(leaderNode, 1);
+ CompletableFuture<Long> future1 = state.awaitLeader();
+ assertTrue(future1.isDone());
+
+ state.onGroupUnavailable(1);
+
+ CompletableFuture<Long> future2 = state.awaitLeader();
+ assertFalse(future2.isDone(), "New waiter should get incomplete future
after unavailable");
+ }
+
+ /** Multiple waiters are all completed on leader election. */
+ @Test
+ void testMultipleWaitersCompleted() {
+ LeaderAvailabilityState state = new LeaderAvailabilityState();
+ InternalClusterNode leaderNode = createNode("leader-node");
+
+ CompletableFuture<Long> waiter1 = state.awaitLeader();
+ CompletableFuture<Long> waiter2 = state.awaitLeader();
+ CompletableFuture<Long> waiter3 = state.awaitLeader();
+
+ assertFalse(waiter1.isDone());
+ assertFalse(waiter2.isDone());
+ assertFalse(waiter3.isDone());
+
+ state.onLeaderElected(leaderNode, 10);
+
+ assertTrue(waiter1.isDone());
+ assertTrue(waiter2.isDone());
+ assertTrue(waiter3.isDone());
+
+ assertEquals(10L, waiter1.join());
+ assertEquals(10L, waiter2.join());
+ assertEquals(10L, waiter3.join());
+ }
+
+ /** Term 0 is accepted as first valid term. */
+ @Test
+ void testTermZeroAccepted() {
+ LeaderAvailabilityState state = new LeaderAvailabilityState();
+ InternalClusterNode leaderNode = createNode("leader");
+
+ assertEquals(-1, state.currentTerm());
+
+ state.onLeaderElected(leaderNode, 0);
+
+ assertEquals(0, state.currentTerm());
+ assertEquals(State.LEADER_AVAILABLE, state.currentState());
+ }
+
+ /**
+ * Negative terms are rejected with {@link IllegalArgumentException}.
+ */
+ @Test
+ void testNegativeTermRejected() {
+ LeaderAvailabilityState state = new LeaderAvailabilityState();
+ InternalClusterNode leaderNode = createNode("leader");
+
+ assertEquals(-1, state.currentTerm());
+
+ IllegalArgumentException thrown = assertThrows(
+ IllegalArgumentException.class,
+ () -> state.onLeaderElected(leaderNode, -1)
+ );
+
+ assertEquals("Term must be non-negative: -1", thrown.getMessage());
+ assertEquals(State.WAITING_FOR_LEADER, state.currentState());
+ assertEquals(-1, state.currentTerm());
+ }
+
+ /** Concurrent leader elections with different terms. */
+ @Test
+ void testConcurrentLeaderElections() throws Exception {
+ LeaderAvailabilityState state = new LeaderAvailabilityState();
+ int threadCount = 10;
+ CountDownLatch startLatch = new CountDownLatch(1);
+ CountDownLatch doneLatch = new CountDownLatch(threadCount);
+
+ Thread[] threads = new Thread[threadCount];
+ for (int i = 0; i < threadCount; i++) {
+ int term = i;
+ threads[i] = new Thread(() -> {
+ try {
+ startLatch.await();
+ InternalClusterNode leader = createNode("leader-" + term);
+ state.onLeaderElected(leader, term);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ doneLatch.countDown();
+ }
+ });
+ threads[i].start();
+ }
+
+ startLatch.countDown();
+ assertTrue(doneLatch.await(5, TimeUnit.SECONDS));
+
+ // The final term should be the maximum term that was set
+ assertEquals(threadCount - 1, state.currentTerm());
+ }
+
+ /**
+ * {@link LeaderAvailabilityState#awaitLeader()} returns same future for
multiple calls when waiting.
+ */
+ @Test
+ void testAwaitLeaderReturnsSameFuture() {
+ LeaderAvailabilityState state = new LeaderAvailabilityState();
+
+ CompletableFuture<Long> future1 = state.awaitLeader();
+ CompletableFuture<Long> future2 = state.awaitLeader();
+
+ // Should return the same future instance when in WAITING_FOR_LEADER
state
+ assertSame(future1, future2, "Should return same future instance when
waiting");
+ }
+
+ /** Stop completes multiple waiters exceptionally. */
+ @Test
+ void testStopCompletesMultipleWaiters() {
+ LeaderAvailabilityState state = new LeaderAvailabilityState();
+ RuntimeException testException = new RuntimeException("Test shutdown");
+
+ // All waiters share the same future
+ CompletableFuture<Long> waiter1 = state.awaitLeader();
+ CompletableFuture<Long> waiter2 = state.awaitLeader();
+ CompletableFuture<Long> waiter3 = state.awaitLeader();
+
+ assertFalse(waiter1.isDone());
+ assertFalse(waiter2.isDone());
+ assertFalse(waiter3.isDone());
+
+ state.stop(testException);
+
+ assertTrue(waiter1.isCompletedExceptionally());
+ assertTrue(waiter2.isCompletedExceptionally());
+ assertTrue(waiter3.isCompletedExceptionally());
+
+ CompletionException thrown1 = assertThrows(CompletionException.class,
waiter1::join);
+ assertSame(testException, thrown1.getCause());
+ CompletionException thrown2 = assertThrows(CompletionException.class,
waiter2::join);
+ assertSame(testException, thrown2.getCause());
+ CompletionException thrown3 = assertThrows(CompletionException.class,
waiter3::join);
+ assertSame(testException, thrown3.getCause());
+
+ // New waiter should immediately fail with the same exception
+ CompletableFuture<Long> waiterAfterComplete = state.awaitLeader();
+ assertTrue(waiterAfterComplete.isCompletedExceptionally());
+
+ CompletionException thrown = assertThrows(CompletionException.class,
waiterAfterComplete::join);
+ assertSame(testException, thrown.getCause());
+ }
+
+ /** Stop when no waiters are pending still marks state as stopped. */
+ @Test
+ void testStopWhenNoWaiters() {
+ LeaderAvailabilityState state = new LeaderAvailabilityState();
+ RuntimeException testException = new RuntimeException("Test shutdown");
+
+ assertFalse(state.stopped());
+ // Stop without any waiters - should not throw
+ state.stop(testException);
+
+ assertTrue(state.stopped());
+
+ // Subsequent awaitLeader should fail
+ CompletableFuture<Long> waiter = state.awaitLeader();
+ assertTrue(waiter.isDone());
+ assertTrue(waiter.isCompletedExceptionally());
+
+ CompletionException thrown = assertThrows(CompletionException.class,
waiter::join);
+ assertSame(testException, thrown.getCause());
+ }
+
+ /** Stop when leader is available fails subsequent {@link
LeaderAvailabilityState#awaitLeader()}. */
+ @Test
+ void testStopWhenLeaderAvailable() {
+ LeaderAvailabilityState state = new LeaderAvailabilityState();
+ InternalClusterNode leader = createNode("leader");
+ RuntimeException testException = new RuntimeException("Test shutdown");
+
+ state.onLeaderElected(leader, 1);
+ assertEquals(State.LEADER_AVAILABLE, state.currentState());
+
+ state.stop(testException);
+
+ assertTrue(state.stopped());
+
+ // awaitLeader should fail after stop, even though leader was available
+ CompletableFuture<Long> waiter = state.awaitLeader();
+ assertTrue(waiter.isDone());
+ assertTrue(waiter.isCompletedExceptionally());
+
+ CompletionException thrown = assertThrows(CompletionException.class,
waiter::join);
+ assertSame(testException, thrown.getCause());
+ }
+
+ /** Multiple stop calls are idempotent. */
+ @Test
+ void testMultipleStopCalls() {
+ LeaderAvailabilityState state = new LeaderAvailabilityState();
+ IllegalStateException exception1 = new
IllegalStateException("Component stopping");
+ RuntimeException exception2 = new RuntimeException("Shutdown 2");
+
+ CompletableFuture<Long> waiter1 = state.awaitLeader();
+ state.stop(exception1);
+ assertTrue(waiter1.isCompletedExceptionally());
+
+ // Second stop call should be ignored
+ state.stop(exception2);
+
+ // Subsequent awaitLeader should fail with the FIRST exception
+ CompletableFuture<Long> waiter2 = state.awaitLeader();
+ assertTrue(waiter2.isDone());
+
+ CompletionException thrown = assertThrows(CompletionException.class,
waiter2::join);
+ assertSame(exception1, thrown.getCause(), "Should use exception from
first stop call");
+ }
+
+ /**
+ * {@link LeaderAvailabilityState#onLeaderElected(InternalClusterNode,
long)} is ignored after stop.
+ */
+ @Test
+ void testOnLeaderElectedIgnoredAfterStop() {
+ LeaderAvailabilityState state = new LeaderAvailabilityState();
+ InternalClusterNode leader = createNode("leader");
+ RuntimeException testException = new RuntimeException("Test shutdown");
+
+ CompletableFuture<Long> waiter1 = state.awaitLeader();
+ state.stop(testException);
+ assertTrue(waiter1.isCompletedExceptionally());
+
+ // Leader election after stop should be ignored
+ state.onLeaderElected(leader, 5);
+
+ // State should still be stopped.
+ assertTrue(state.stopped());
+
+ // New waiter should still fail
+ CompletableFuture<Long> waiter2 = state.awaitLeader();
+ assertTrue(waiter2.isDone());
+ assertTrue(waiter2.isCompletedExceptionally());
+ }
+
+ /**
+ * {@link LeaderAvailabilityState#onGroupUnavailable(long)} is ignored
after stop.
+ */
+ @Test
+ void testOnGroupUnavailableIgnoredAfterStop() {
+ LeaderAvailabilityState state = new LeaderAvailabilityState();
+ InternalClusterNode leader = createNode("leader");
+ RuntimeException testException = new RuntimeException("Test shutdown");
+
+ state.onLeaderElected(leader, 1);
+ assertEquals(State.LEADER_AVAILABLE, state.currentState());
+
+ state.stop(testException);
+
+ // Group unavailable after stop should be ignored
+ state.onGroupUnavailable(1);
+
+ assertTrue(state.stopped());
+ }
+
+ private static InternalClusterNode createNode(String name) {
+ return new ClusterNodeImpl(deriveUuidFrom(name), name, new
NetworkAddress("localhost", 123));
+ }
+}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupServiceRunTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupServiceRunTest.java
index 9560e631f9d..54dee9568e0 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupServiceRunTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupServiceRunTest.java
@@ -40,6 +40,7 @@ import static org.mockito.Mockito.when;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
@@ -81,7 +82,6 @@ import org.apache.ignite.raft.jraft.rpc.WriteActionRequest;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
@@ -103,13 +103,16 @@ import org.mockito.quality.Strictness;
*/
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
-@Disabled("IGNITE-27156")
public class PhysicalTopologyAwareRaftGroupServiceRunTest extends
BaseIgniteAbstractTest {
private static final List<Peer> NODES = Stream.of(20000, 20001, 20002)
.map(port -> new Peer("localhost-" + port))
.collect(toUnmodifiableList());
+ private static final List<Peer> FIVE_NODES = Stream.of(20000, 20001,
20002, 20003, 20004)
+ .map(port -> new Peer("localhost-" + port))
+ .collect(toUnmodifiableList());
+
private static final RaftMessagesFactory FACTORY = new
RaftMessagesFactory();
private static final FailureManager NOOP_FAILURE_PROCESSOR = new
FailureManager(new NoOpFailureHandler());
@@ -168,6 +171,10 @@ public class PhysicalTopologyAwareRaftGroupServiceRunTest
extends BaseIgniteAbst
lenient().when(responseTimeoutValue.value()).thenReturn(3000L);
lenient().when(raftConfiguration.responseTimeoutMillis()).thenReturn(responseTimeoutValue);
+ ConfigurationValue<Long> retryDelayValue =
mock(ConfigurationValue.class);
+ lenient().when(retryDelayValue.value()).thenReturn(50L);
+
lenient().when(raftConfiguration.retryDelayMillis()).thenReturn(retryDelayValue);
+
executor = new ScheduledThreadPoolExecutor(20,
IgniteThreadFactory.create("common", Loza.CLIENT_POOL_NAME, logger()));
eventsClientListener = new RaftGroupEventsClientListener();
}
@@ -181,10 +188,14 @@ public class PhysicalTopologyAwareRaftGroupServiceRunTest
extends BaseIgniteAbst
}
private PhysicalTopologyAwareRaftGroupService startService() {
+ return startService(NODES);
+ }
+
+ private PhysicalTopologyAwareRaftGroupService startService(List<Peer>
peers) {
var commandsMarshaller = new
ThreadLocalOptimizedMarshaller(cluster.serializationRegistry());
PeersAndLearners peersAndLearners = PeersAndLearners.fromConsistentIds(
-
NODES.stream().map(Peer::consistentId).collect(Collectors.toSet())
+
peers.stream().map(Peer::consistentId).collect(Collectors.toSet())
);
service = PhysicalTopologyAwareRaftGroupService.start(
@@ -595,6 +606,131 @@ public class PhysicalTopologyAwareRaftGroupServiceRunTest
extends BaseIgniteAbst
assertThat(result, willThrow(NodeStoppingException.class, 5,
TimeUnit.SECONDS));
}
+ /**
+ * Tests that UNKNOWN/EINTERNAL/ENOENT errors retry on the same peer for
ActionRequests.
+ * This is because these errors are transient and the peer is likely to
recover.
+ * This behavior is consistent with {@link RaftGroupServiceImpl}.
+ *
+ * <p>The test verifies that after receiving an EINTERNAL error from a
peer,
+ * the next retry goes to the same peer (not a different one).
+ */
+ @Test
+ void testTransientErrorRetriesOnSamePeer() {
+ // Track the sequence of peers called.
+ List<String> calledPeers = new CopyOnWriteArrayList<>();
+
+ // Mock all WriteActionRequest calls - track which peer is called.
+ when(messagingService.invoke(
+ any(InternalClusterNode.class),
+ argThat(this::isTestWriteCommand),
+ anyLong())
+ ).thenAnswer(invocation -> {
+ InternalClusterNode target = invocation.getArgument(0);
+ calledPeers.add(target.name());
+
+ if (calledPeers.size() == 1) {
+ // First call returns EINTERNAL error.
+ return completedFuture(FACTORY.errorResponse()
+ .errorCode(RaftError.EINTERNAL.getNumber())
+ .build());
+ }
+ // Second call succeeds.
+ return completedFuture(FACTORY.actionResponse().result(new
TestResponse()).build());
+ });
+
+ PhysicalTopologyAwareRaftGroupService svc = startService();
+
+ // Simulate leader election and wait for it to be processed.
+ simulateLeaderElectionAndWait(NODES.get(0), CURRENT_TERM);
+
+ CompletableFuture<Object> result = svc.run(testWriteCommand(),
Long.MAX_VALUE);
+
+ assertThat(result, willBe(instanceOf(TestResponse.class)));
+
+ // Verify that exactly 2 calls were made.
+ assertThat("Should have exactly 2 calls", calledPeers.size(), is(2));
+
+ // Key assertion: both calls should be to the SAME peer (retry on same
peer for transient errors).
+ assertThat("Transient error should retry on the same peer, but first
call was to "
+ + calledPeers.get(0) + " and second was to " +
calledPeers.get(1),
+ calledPeers.get(0), is(calledPeers.get(1)));
+ }
+
+ /**
+ * Tests that with bounded timeout, a non-leader-related retriable error
(like EBUSY) causes
+ * retries until timeout without waiting for leader. This differs from
EPERM with no leader,
+ * which tries all peers once then waits for leader.
+ */
+ @Test
+ void testBoundedTimeoutRetriesNonLeaderErrorUntilTimeout() {
+ AtomicInteger callCount = new AtomicInteger(0);
+
+ // All peers return EBUSY (a retriable error that is NOT related to
missing leader).
+ when(messagingService.invoke(
+ any(InternalClusterNode.class),
+ argThat(this::isTestWriteCommand),
+ anyLong()
+ )).thenAnswer(invocation -> {
+ callCount.incrementAndGet();
+ return completedFuture(FACTORY.errorResponse()
+ .errorCode(RaftError.EBUSY.getNumber())
+ .build());
+ });
+
+ PhysicalTopologyAwareRaftGroupService svc = startService();
+
+ // With 300ms timeout and EBUSY errors, should retry until timeout.
+ // Unlike EPERM (no leader), EBUSY should cause continuous retries,
not wait for leader.
+ CompletableFuture<Object> result = svc.run(testWriteCommand(), 300);
+
+ assertThat(result,
willThrow(ReplicationGroupUnavailableException.class, 1, TimeUnit.SECONDS));
+
+ // Should have made more than 3 calls (cycling through peers multiple
times).
+ // With retryDelayMillis=50 and 300ms timeout, we should get several
retry rounds.
+ assertTrue(callCount.get() > 3,
+ "Expected more than 3 calls (multiple retry rounds), but got "
+ callCount.get());
+ }
+
+ /**
+ * Tests that with infinite timeout, a non-leader-related retriable error
(like EBUSY) causes
+ * retries indefinitely until success. This differs from EPERM with no
leader.
+ */
+ @Test
+ void testInfiniteTimeoutRetriesNonLeaderErrorUntilSuccess() throws
Exception {
+ AtomicInteger callCount = new AtomicInteger(0);
+ CountDownLatch multipleRetriesDone = new CountDownLatch(5);
+
+ // First 5 calls return EBUSY, then success.
+ when(messagingService.invoke(
+ any(InternalClusterNode.class),
+ argThat(this::isTestWriteCommand),
+ anyLong()
+ )).thenAnswer(invocation -> {
+ int count = callCount.incrementAndGet();
+ if (count <= 5) {
+ multipleRetriesDone.countDown();
+ return completedFuture(FACTORY.errorResponse()
+ .errorCode(RaftError.EBUSY.getNumber())
+ .build());
+ }
+ return completedFuture(FACTORY.actionResponse().result(new
TestResponse()).build());
+ });
+
+ PhysicalTopologyAwareRaftGroupService svc = startService();
+
+ // With infinite timeout and EBUSY errors, should retry until success.
+ CompletableFuture<Object> result = svc.run(testWriteCommand(),
Long.MAX_VALUE);
+
+ // Wait for multiple retry attempts.
+ assertTrue(multipleRetriesDone.await(5, TimeUnit.SECONDS), "Should
have multiple retries");
+
+ // Should eventually succeed.
+ assertThat(result, willCompleteSuccessfully());
+
+ // Should have made more than 5 calls.
+ assertTrue(callCount.get() > 5, "Expected more than 5 calls, but got "
+ callCount.get());
+ }
+
/**
* Tests that with bounded timeout, if the client is shutting down during
retry phase,
* the future completes with NodeStoppingException immediately without new
retry round.
@@ -604,6 +740,7 @@ public class PhysicalTopologyAwareRaftGroupServiceRunTest
extends BaseIgniteAbst
var pendingRetryInvoke = new CompletableFuture<Void>();
var retryPhaseStarted = new CountDownLatch(1);
var allPeersTried = new CountDownLatch(3);
+ var fifthCallAttempted = new CountDownLatch(1);
AtomicInteger callCount = new AtomicInteger(0);
when(messagingService.invoke(
@@ -626,7 +763,8 @@ public class PhysicalTopologyAwareRaftGroupServiceRunTest
extends BaseIgniteAbst
.errorCode(RaftError.EBUSY.getNumber())
.build());
}
- // Should not reach here.
+ // Fifth call should not happen after shutdown.
+ fifthCallAttempted.countDown();
return completedFuture(FACTORY.actionResponse().result(new
TestResponse()).build());
});
@@ -653,14 +791,75 @@ public class PhysicalTopologyAwareRaftGroupServiceRunTest
extends BaseIgniteAbst
// The result should complete with NodeStoppingException.
assertThat(result, willThrow(NodeStoppingException.class, 5,
TimeUnit.SECONDS));
- // Give some time for any additional retry attempts.
- Thread.sleep(200);
-
// Verify no additional retry attempts were made after shutdown.
- // We should have exactly 4 invocations (3 initial + 1 retry attempt
that was interrupted).
+ // The fifth call latch should NOT be counted down (wait briefly and
check).
+ assertThat("No 5th call should be attempted after shutdown",
+ fifthCallAttempted.await(100, TimeUnit.MILLISECONDS),
is(false));
assertThat(callCount.get(), is(4));
}
+ /**
+ * Tests that when a leader was previously elected and then becomes
unavailable (all peers return EPERM with no leader),
+ * the service tries all peers exactly once and then waits for leader
notification.
+ *
+ * <p>This test verifies that the term is correctly passed to the retry
logic so that
+ * {@code onGroupUnavailable(term)} can properly transition the state from
LEADER_AVAILABLE to WAITING_FOR_LEADER.
+ */
+ @Test
+ void testInfiniteTimeoutWithPreviousLeaderTriesPeersOnceBeforeWaiting()
throws Exception {
+ AtomicInteger callCount = new AtomicInteger(0);
+ CountDownLatch allPeersTried = new CountDownLatch(3);
+ // This latch will be counted down if more than 3 calls are made
(indicating the bug).
+ CountDownLatch extraCallsMade = new CountDownLatch(1);
+
+ // All peers return EPERM with no leader.
+ when(messagingService.invoke(
+ any(InternalClusterNode.class),
+ argThat(this::isTestWriteCommand),
+ anyLong()
+ )).thenAnswer(invocation -> {
+ int count = callCount.incrementAndGet();
+ if (count <= 3) {
+ allPeersTried.countDown();
+ } else {
+ // More than 3 calls means the bug exists - extra retry cycle
happened.
+ extraCallsMade.countDown();
+ }
+ return completedFuture(FACTORY.errorResponse()
+ .errorCode(RaftError.EPERM.getNumber())
+ .build());
+ });
+
+ PhysicalTopologyAwareRaftGroupService svc = startService();
+
+ // Simulate leader election BEFORE calling run().
+ // After this, state is LEADER_AVAILABLE with currentTerm=1.
+ simulateLeaderElectionAndWait(NODES.get(0), CURRENT_TERM);
+
+ // Start the command with infinite timeout.
+ // The service should try all peers once and then wait for leader (not
do an extra cycle).
+ CompletableFuture<Object> result = svc.run(testWriteCommand(),
Long.MAX_VALUE);
+
+ // Wait for all 3 peer attempts to complete.
+ assertTrue(allPeersTried.await(5, TimeUnit.SECONDS), "All 3 peers
should be tried");
+
+ // Give some time for potential extra calls (if bug exists).
+ // If the term is incorrectly passed as -1, awaitLeader() returns
immediately
+ // and another retry cycle starts immediately.
+ boolean extraCallsHappened = extraCallsMade.await(500,
TimeUnit.MILLISECONDS);
+
+ // The result should NOT be complete yet - it should be waiting for
leader.
+ assertThat("Result should be waiting for leader, not completed",
+ result.isDone(), is(false));
+
+ // Verify exactly 3 calls were made (one per peer), not 6.
+ assertThat("Expected exactly 3 calls (one per peer), but got " +
callCount.get()
+ + ". If 6+ calls were made, the term was incorrectly
passed as -1 "
+ + "causing an extra retry cycle before proper
waiting.",
+ extraCallsHappened, is(false));
+ assertThat(callCount.get(), is(3));
+ }
+
private void verifyExact3PeersCalled() {
ArgumentCaptor<InternalClusterNode> nodeCaptor =
ArgumentCaptor.forClass(InternalClusterNode.class);
Mockito.verify(this.messagingService, Mockito.times(3)).invoke(
@@ -683,4 +882,179 @@ public class PhysicalTopologyAwareRaftGroupServiceRunTest
extends BaseIgniteAbst
private static class TestResponse {
}
+
+ /**
+ * Tests single-attempt mode (timeout=0) with 5 nodes: all return "no
leader".
+ *
+ * <p>In single-attempt mode, "no leader" is treated same as unavailable.
+ * Each peer is tried exactly once, then fails with
ReplicationGroupUnavailableException.
+ */
+ @Test
+ void testSingleAttemptModeWithAllNoLeader() {
+ AtomicInteger callCount = new AtomicInteger(0);
+ Set<String> calledPeers = ConcurrentHashMap.newKeySet();
+
+ // All peers return EPERM with no leader.
+ when(messagingService.invoke(
+ any(InternalClusterNode.class),
+ argThat(this::isTestWriteCommand),
+ anyLong()
+ )).thenAnswer(invocation -> {
+ InternalClusterNode target = invocation.getArgument(0);
+ calledPeers.add(target.name());
+ callCount.incrementAndGet();
+ return completedFuture(FACTORY.errorResponse()
+ .errorCode(RaftError.EPERM.getNumber())
+ .leaderId(null)
+ .build());
+ });
+
+ PhysicalTopologyAwareRaftGroupService svc = startService(FIVE_NODES);
+
+ // With timeout=0, should try each peer once and fail.
+ CompletableFuture<Object> result = svc.run(testWriteCommand(), 0);
+
+ assertThat(result,
willThrow(ReplicationGroupUnavailableException.class));
+
+ // Verify each peer was tried exactly once.
+ assertThat("Should call exactly 5 peers", callCount.get(), is(5));
+ assertThat("Should call all 5 unique peers", calledPeers.size(),
is(5));
+ }
+
+ /**
+ * Tests single-attempt mode (timeout=0) with 5 nodes: 3 return "no
leader", 2 return EHOSTDOWN.
+ *
+ * <p>In single-attempt mode, all errors are treated the same - each peer
is tried once.
+ * The request should fail after all 5 peers are tried.
+ */
+ @Test
+ void testSingleAttemptModeWithMixedErrors() {
+ AtomicInteger callCount = new AtomicInteger(0);
+ Set<String> calledPeers = ConcurrentHashMap.newKeySet();
+
+ when(messagingService.invoke(
+ any(InternalClusterNode.class),
+ argThat(this::isTestWriteCommand),
+ anyLong()
+ )).thenAnswer(invocation -> {
+ InternalClusterNode target = invocation.getArgument(0);
+ calledPeers.add(target.name());
+ int count = callCount.incrementAndGet();
+
+ // First 3 calls return "no leader", next 2 return EHOSTDOWN.
+ if (count <= 3) {
+ return completedFuture(FACTORY.errorResponse()
+ .errorCode(RaftError.EPERM.getNumber())
+ .leaderId(null)
+ .build());
+ }
+ return completedFuture(FACTORY.errorResponse()
+ .errorCode(RaftError.EHOSTDOWN.getNumber())
+ .build());
+ });
+
+ PhysicalTopologyAwareRaftGroupService svc = startService(FIVE_NODES);
+
+ // With timeout=0, should try each peer once and fail.
+ CompletableFuture<Object> result = svc.run(testWriteCommand(), 0);
+
+ assertThat(result,
willThrow(ReplicationGroupUnavailableException.class));
+
+ // Verify each peer was tried exactly once.
+ assertThat("Should call exactly 5 peers", callCount.get(), is(5));
+ assertThat("Should call all 5 unique peers", calledPeers.size(),
is(5));
+ }
+
+ /**
+ * Tests leader-wait mode: 3 nodes return "no leader", then those same
nodes succeed after leader election.
+ *
+ * <p>Key verification: "no leader" peers are NOT in unavailablePeers, so
they CAN be retried
+ * after leader notification arrives.
+ */
+ @Test
+ void testLeaderWaitModeRetriesNoLeaderPeersAfterLeaderElection() throws
Exception {
+ List<String> calledPeers = new CopyOnWriteArrayList<>();
+ AtomicInteger noLeaderResponseCount = new AtomicInteger(0);
+ CountDownLatch allPeersTriedOnce = new CountDownLatch(3);
+
+ when(messagingService.invoke(
+ any(InternalClusterNode.class),
+ argThat(this::isTestWriteCommand),
+ anyLong()
+ )).thenAnswer(invocation -> {
+ InternalClusterNode target = invocation.getArgument(0);
+ calledPeers.add(target.name());
+
+ // First round: all 3 peers return "no leader".
+ if (noLeaderResponseCount.get() < 3) {
+ noLeaderResponseCount.incrementAndGet();
+ allPeersTriedOnce.countDown();
+ return completedFuture(FACTORY.errorResponse()
+ .errorCode(RaftError.EPERM.getNumber())
+ .leaderId(null)
+ .build());
+ }
+
+ // After leader election: succeed.
+ return completedFuture(FACTORY.actionResponse().result(new
TestResponse()).build());
+ });
+
+ PhysicalTopologyAwareRaftGroupService svc = startService();
+
+ // Start the command with infinite timeout.
+ CompletableFuture<Object> result = svc.run(testWriteCommand(),
Long.MAX_VALUE);
+
+ // Wait for all 3 peers to be tried once.
+ assertTrue(allPeersTriedOnce.await(5, TimeUnit.SECONDS), "All peers
should be tried once");
+
+ // The result should NOT be complete yet - waiting for leader.
+ assertThat(result.isDone(), is(false));
+
+ // Simulate leader election.
+ simulateLeaderElection(NODES.get(0), CURRENT_TERM);
+
+ // Should eventually succeed (by retrying one of the "no leader"
peers).
+ assertThat(result, willCompleteSuccessfully());
+
+ // Verify that at least one peer was called twice (first with "no
leader", then success).
+ long totalCalls = calledPeers.size();
+ assertTrue(totalCalls > 3, "Should have more than 3 calls (some peers
retried after leader election), got " + totalCalls);
+ }
+
+ /**
+ * Tests single-attempt mode (timeout=0) with transient errors (EBUSY).
+ *
+ * <p>In single-attempt mode, each peer should be tried at most once, even
for transient errors.
+ * EBUSY should cause the executor to move to the next peer, not retry the
same peer indefinitely.
+ */
+ @Test
+ void testSingleAttemptModeWithTransientErrors() {
+ AtomicInteger callCount = new AtomicInteger(0);
+ Set<String> calledPeers = ConcurrentHashMap.newKeySet();
+
+ // All peers return EBUSY (transient error).
+ when(messagingService.invoke(
+ any(InternalClusterNode.class),
+ argThat(this::isTestWriteCommand),
+ anyLong()
+ )).thenAnswer(invocation -> {
+ InternalClusterNode target = invocation.getArgument(0);
+ calledPeers.add(target.name());
+ callCount.incrementAndGet();
+ return completedFuture(FACTORY.errorResponse()
+ .errorCode(RaftError.EBUSY.getNumber())
+ .build());
+ });
+
+ PhysicalTopologyAwareRaftGroupService svc = startService();
+
+ // With timeout=0, should try each peer at most once and fail, not
loop forever.
+ CompletableFuture<Object> result = svc.run(testWriteCommand(), 0);
+
+ assertThat(result,
willThrow(ReplicationGroupUnavailableException.class, 2, TimeUnit.SECONDS));
+
+ // Should have tried each peer at most once (3 peers).
+ assertThat("Should call at most 3 peers, but got " + callCount.get(),
callCount.get(), is(3));
+ assertThat("Should call all 3 unique peers", calledPeers.size(),
is(3));
+ }
}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/client/RaftGroupServiceTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/client/RaftGroupServiceTest.java
index 69e2c41537f..4563d13c716 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/client/RaftGroupServiceTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/client/RaftGroupServiceTest.java
@@ -47,6 +47,7 @@ import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -680,6 +681,51 @@ public class RaftGroupServiceTest extends
BaseIgniteAbstractTest {
assertThat(response, willThrow(TimeoutException.class, "Send with
retry timed out"));
}
+ /**
+ * Tests that UNKNOWN/EINTERNAL/ENOENT errors retry on the same peer (the
leader) for ActionRequests.
+ * This is because these errors are transient and the leader is likely to
recover.
+ */
+ @ParameterizedTest
+ @EnumSource(names = {"UNKNOWN", "EINTERNAL", "ENOENT"})
+ public void testRetryOnTransientErrorRetriesOnSamePeer(RaftError error) {
+ Peer leaderPeer = NODES.get(0);
+
+ // First call returns error, second call succeeds - both to the same
leader.
+ when(messagingService.invoke(
+ argThat((InternalClusterNode node) -> node != null &&
node.name().equals(leaderPeer.consistentId())),
+ any(ReadActionRequest.class),
+ anyLong())
+ )
+ .thenReturn(completedFuture(FACTORY.errorResponse()
+ .errorCode(error.getNumber())
+ .build()))
+
.thenReturn(completedFuture(FACTORY.actionResponse().result(null).build()));
+
+ RaftGroupService service =
startRaftGroupServiceWithRefreshLeader(NODES);
+
+ assertThat(service.leader(), is(leaderPeer));
+
+ CompletableFuture<Object> response =
service.run(mock(ReadCommand.class));
+
+ assertThat(response, willBe(nullValue()));
+
+ // Verify that only the leader was called (twice: once with error,
once with success).
+ verify(messagingService, atLeastOnce()).invoke(
+ argThat((InternalClusterNode target) -> target != null &&
target.name().equals(leaderPeer.consistentId())),
+ any(ReadActionRequest.class),
+ anyLong()
+ );
+
+ // Verify that other peers were NOT called.
+ for (Peer otherPeer : NODES.subList(1, NODES.size())) {
+ verify(messagingService, never()).invoke(
+ argThat((InternalClusterNode target) -> target != null &&
target.name().equals(otherPeer.consistentId())),
+ any(ReadActionRequest.class),
+ anyLong()
+ );
+ }
+ }
+
@ParameterizedTest
@EnumSource(names = {"EPERM"})
public void testRetryOnErrorWithUpdateLeader(RaftError error) {