This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 3c4439be787 IGNITE-27156 Change timeout semantics (#7242)
3c4439be787 is described below

commit 3c4439be78766067593aa7b65f1a3118d458b738
Author: Cyrill <[email protected]>
AuthorDate: Thu Jan 29 22:02:33 2026 +0300

    IGNITE-27156 Change timeout semantics (#7242)
    
    Co-authored-by: Kirill Sizov <[email protected]>
---
 .../raft/client/LeaderAvailabilityState.java       |  237 +++++
 .../PhysicalTopologyAwareRaftGroupService.java     |   48 +-
 .../internal/raft/client/RaftCommandExecutor.java  | 1044 ++++++++++++++++++++
 .../internal/raft/client/RaftErrorUtils.java       |   45 +
 .../ignite/internal/raft/client/RetryContext.java  |   48 +-
 .../raft/client/LeaderAvailabilityStateTest.java   |  466 +++++++++
 ...ysicalTopologyAwareRaftGroupServiceRunTest.java |  390 +++++++-
 .../internal/raft/client/RaftGroupServiceTest.java |   46 +
 8 files changed, 2297 insertions(+), 27 deletions(-)

diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/LeaderAvailabilityState.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/LeaderAvailabilityState.java
new file mode 100644
index 00000000000..ed427840e06
--- /dev/null
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/LeaderAvailabilityState.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.client;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.network.InternalClusterNode;
+import org.jetbrains.annotations.VisibleForTesting;
+
+/**
+ * State machine for tracking leader availability.
+ *
+ * <p>Manages transitions between two states:
+ * <ul>
+ *     <li>{@link State#WAITING_FOR_LEADER} - No leader is currently known, 
operations must wait</li>
+ *     <li>{@link State#LEADER_AVAILABLE} - A leader is known, operations can 
proceed</li>
+ * </ul>
+ *
+ * <p>State transitions:
+ * <pre>
+ *     WAITING_FOR_LEADER --[onLeaderElected]--> LEADER_AVAILABLE
+ *     LEADER_AVAILABLE --[onGroupUnavailable]--> WAITING_FOR_LEADER
+ *     Any state --[stop]--> stopped (terminal state)
+ * </pre>
+ *
+ * <p>Thread-safe: all public methods are synchronized or use proper 
synchronization.
+ */
+@VisibleForTesting
+class LeaderAvailabilityState {
+    /** The logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(LeaderAvailabilityState.class);
+
+    /** Synchronization mutex. */
+    private final Object mutex = new Object();
+
+    /** Possible states of leader availability. */
+    enum State {
+        /** No leader is known, waiting for leader election. */
+        WAITING_FOR_LEADER,
+        /** A leader is available and operations can proceed. */
+        LEADER_AVAILABLE
+    }
+
+    /** Current state. Guarded by {@code mutex}. */
+    private State currentState = State.WAITING_FOR_LEADER;
+
+    /** Current leader term. Initialized to -1 to accept term 0 as the first 
valid term. Guarded by {@code mutex}. */
+    private long currentTerm = -1;
+
+    /** Future that waiters block on. Guarded by {@code mutex}. */
+    private CompletableFuture<Long> waiters = new CompletableFuture<>();
+
+    /** Whether the state machine has been stopped. Guarded by {@code mutex}. 
*/
+    private boolean stopped = false;
+
+    /** Exception used to fail futures after destruction. Guarded by {@code 
mutex}. */
+    private Throwable stopException;
+
+    /**
+     * Returns a future that completes when a leader becomes available.
+     *
+     * <p>If a leader is already available, returns an already-completed 
future with the current term.
+     * Otherwise, returns a future that will complete when {@link 
#onLeaderElected} is called.
+     * If the state machine has been destroyed, returns a failed future.
+     *
+     * @return Future that completes with the leader term when a leader is 
available.
+     */
+    CompletableFuture<Long> awaitLeader() {
+        synchronized (mutex) {
+            if (stopped) {
+                return CompletableFuture.failedFuture(stopException);
+            }
+            if (currentState == State.LEADER_AVAILABLE) {
+                return CompletableFuture.completedFuture(currentTerm);
+            }
+            return waiters;
+        }
+    }
+
+    /**
+     * Handles leader election notification.
+     *
+     * <p>Transitions from {@link State#WAITING_FOR_LEADER} to {@link 
State#LEADER_AVAILABLE}
+     * if the new term is greater than the current term. Stale notifications 
(with term <= current)
+     * are ignored. Has no effect if the state machine has been stopped.
+     *
+     * @param leader The newly elected leader node.
+     * @param term The term of the new leader.
+     */
+    void onLeaderElected(InternalClusterNode leader, long term) {
+        if (term < 0) {
+            throw new IllegalArgumentException("Term must be non-negative: " + 
term);
+        }
+
+        CompletableFuture<Long> futureToComplete = null;
+
+        synchronized (mutex) {
+            if (stopped) {
+                LOG.debug("Ignoring leader election after stop [leader={}, 
term={}]", leader, term);
+                return;
+            }
+
+            // Ignore stale term notifications.
+            if (term <= currentTerm) {
+                LOG.debug("Ignoring stale leader [newTerm={}, 
currentTerm={}]", term, currentTerm);
+                return;
+            }
+
+            long previousTerm = currentTerm;
+            State previousState = currentState;
+
+            currentTerm = term;
+
+            if (currentState == State.WAITING_FOR_LEADER) {
+                currentState = State.LEADER_AVAILABLE;
+                futureToComplete = waiters;
+            }
+
+            LOG.debug("Leader elected [leader={}, term={}, previousTerm={}, 
stateChange={}->{}]",
+                    leader, term, previousTerm, previousState, currentState);
+        }
+
+        // Complete outside the lock to avoid potential deadlocks with future 
callbacks.
+        if (futureToComplete != null) {
+            futureToComplete.complete(term);
+        }
+    }
+
+    /**
+     * Handles group unavailability notification.
+     *
+     * <p>Transitions from {@link State#LEADER_AVAILABLE} to {@link 
State#WAITING_FOR_LEADER}
+     * only if the term hasn't changed since the unavailability was detected. 
This prevents
+     * resetting the state if a new leader was already elected.
+     * Has no effect if the state machine has been stopped.
+     *
+     * @param termWhenDetected The term at which unavailability was detected.
+     */
+    void onGroupUnavailable(long termWhenDetected) {
+        synchronized (mutex) {
+            if (stopped) {
+                return;
+            }
+
+            // Only transition if we're still at the same term (no new leader 
elected in the meantime).
+            if (currentTerm == termWhenDetected && currentState == 
State.LEADER_AVAILABLE) {
+                State previousState = currentState;
+
+                currentState = State.WAITING_FOR_LEADER;
+                waiters = new CompletableFuture<>();
+
+                LOG.debug("Group unavailable [term={}, stateChange={}->{}]",
+                        termWhenDetected, previousState, currentState);
+            }
+        }
+    }
+
+    /**
+     * Returns the current leader term.
+     *
+     * @return Current leader term (-1 if no leader has been elected yet).
+     */
+    long currentTerm() {
+        synchronized (mutex) {
+            return currentTerm;
+        }
+    }
+
+    /**
+     * Returns the current state.
+     *
+     * @return Current state.
+     */
+    State currentState() {
+        synchronized (mutex) {
+            return currentState;
+        }
+    }
+
+    /**
+     * Returns whether the state machine has been stopped.
+     *
+     * @return True if stopped.
+     */
+    boolean stopped() {
+        synchronized (mutex) {
+            return stopped;
+        }
+    }
+
+    /**
+     * Stops the state machine, cancelling all pending waiters with the given 
exception.
+     *
+     * <p>After stop, the state machine is no longer active:
+     * <ul>
+     *     <li>{@link #awaitLeader()} returns a failed future</li>
+     *     <li>{@link #onLeaderElected} is ignored</li>
+     *     <li>{@link #onGroupUnavailable} is ignored</li>
+     * </ul>
+     *
+     * <p>Called during shutdown to ensure waiters don't hang indefinitely.
+     *
+     * @param exception The exception to complete waiters with.
+     */
+    void stop(Throwable exception) {
+        CompletableFuture<Long> futureToCancel;
+
+        synchronized (mutex) {
+            if (stopped) {
+                return;
+            }
+
+            stopped = true;
+            stopException = exception;
+            futureToCancel = waiters;
+        }
+
+        // Complete outside the lock to avoid potential deadlocks.
+        futureToCancel.completeExceptionally(exception);
+    }
+}
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java
index 50bc085a877..6e6d00b311a 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java
@@ -20,14 +20,12 @@ package org.apache.ignite.internal.raft.client;
 import static java.util.concurrent.CompletableFuture.runAsync;
 import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeoutException;
 import org.apache.ignite.internal.failure.FailureContext;
 import org.apache.ignite.internal.failure.FailureManager;
 import org.apache.ignite.internal.lang.NodeStoppingException;
@@ -44,7 +42,6 @@ import org.apache.ignite.internal.raft.LeaderElectionListener;
 import org.apache.ignite.internal.raft.Loza;
 import org.apache.ignite.internal.raft.Marshaller;
 import org.apache.ignite.internal.raft.Peer;
-import org.apache.ignite.internal.raft.PeerUnavailableException;
 import org.apache.ignite.internal.raft.PeersAndLearners;
 import org.apache.ignite.internal.raft.ThrottlingContextHolder;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
@@ -60,6 +57,8 @@ import org.jetbrains.annotations.Nullable;
 
 /**
  * The RAFT group service is based on the cluster physical topology. This 
service has ability to subscribe of a RAFT group leader update.
+ *
+ * <p>Command execution with leader-aware retry semantics is delegated to 
{@link RaftCommandExecutor}.
  */
 public class PhysicalTopologyAwareRaftGroupService implements 
TimeAwareRaftGroupService {
     /** The logger. */
@@ -81,11 +80,17 @@ public class PhysicalTopologyAwareRaftGroupService 
implements TimeAwareRaftGroup
     private final RaftGroupService raftClient;
 
     /** Executor to invoke RPC requests. */
-    private final Executor executor;
+    private final ScheduledExecutorService executor;
 
     /** RAFT configuration. */
     private final RaftConfiguration raftConfiguration;
 
+    /** Factory for creating stopping exceptions. */
+    private final ExceptionFactory stoppingExceptionFactory;
+
+    /** Command executor with retry semantics. */
+    private final RaftCommandExecutor commandExecutor;
+
     /**
      * Constructor.
      *
@@ -111,6 +116,7 @@ public class PhysicalTopologyAwareRaftGroupService 
implements TimeAwareRaftGroup
         this.clusterService = clusterService;
         this.executor = executor;
         this.raftConfiguration = raftConfiguration;
+        this.stoppingExceptionFactory = stoppingExceptionFactory;
         this.raftClient = RaftGroupServiceImpl.start(
                 groupId,
                 clusterService,
@@ -123,10 +129,23 @@ public class PhysicalTopologyAwareRaftGroupService 
implements TimeAwareRaftGroup
                 throttlingContextHolder
         );
 
+        this.commandExecutor = new RaftCommandExecutor(
+                groupId,
+                raftClient::peers,
+                clusterService,
+                executor,
+                raftConfiguration,
+                cmdMarshaller,
+                stoppingExceptionFactory
+        );
+
         this.generalLeaderElectionListener = new ServerEventHandler(executor);
 
         eventsClientListener.addLeaderElectionListener(raftClient.groupId(), 
generalLeaderElectionListener);
 
+        // Subscribe the command executor's leader availability state to 
leader election notifications.
+        subscribeLeader(commandExecutor.leaderElectionListener());
+
         TopologyService topologyService = clusterService.topologyService();
 
         topologyService.addEventHandler(new TopologyEventHandler() {
@@ -170,6 +189,11 @@ public class PhysicalTopologyAwareRaftGroupService 
implements TimeAwareRaftGroup
                                 return;
                             }
 
+                            if (leaderWithTerm == null || 
leaderWithTerm.leader() == null) {
+                                LOG.debug("No leader information available 
[grp={}].", groupId());
+                                return;
+                            }
+
                             InternalClusterNode leaderHost = topologyService
                                     
.getByConsistentId(leaderWithTerm.leader().consistentId());
 
@@ -296,7 +320,7 @@ public class PhysicalTopologyAwareRaftGroupService 
implements TimeAwareRaftGroup
 
                     msgSendFut.complete(false);
                 }
-            } else if (recoverable(invokeCause)) {
+            } else if (RaftErrorUtils.recoverable(invokeCause)) {
                 sendWithRetry(node, msg, msgSendFut);
             } else if (invokeCause instanceof RecipientLeftException) {
                 LOG.info(
@@ -315,18 +339,9 @@ public class PhysicalTopologyAwareRaftGroupService 
implements TimeAwareRaftGroup
         }, executor);
     }
 
-    private static boolean recoverable(Throwable t) {
-        t = unwrapCause(t);
-
-        return t instanceof TimeoutException
-                || t instanceof IOException
-                || t instanceof PeerUnavailableException
-                || t instanceof RecipientLeftException;
-    }
-
     @Override
     public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
-        return raftClient.run(cmd, timeoutMillis);
+        return commandExecutor.run(cmd, timeoutMillis);
     }
 
     @Override
@@ -411,6 +426,9 @@ public class PhysicalTopologyAwareRaftGroupService 
implements TimeAwareRaftGroup
 
     @Override
     public void shutdown() {
+        // Stop the command executor first - blocks new run() calls, cancels 
leader waiters.
+        commandExecutor.shutdown(stoppingExceptionFactory.create("Raft client 
is stopping [groupId=" + groupId() + "]."));
+
         finishSubscriptions();
 
         raftClient.shutdown();
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/RaftCommandExecutor.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/RaftCommandExecutor.java
new file mode 100644
index 00000000000..fb293931f2f
--- /dev/null
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/RaftCommandExecutor.java
@@ -0,0 +1,1044 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.client;
+
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static 
org.apache.ignite.internal.tostring.IgniteToStringBuilder.includeSensitive;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
+import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.network.InternalClusterNode;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.raft.ExceptionFactory;
+import org.apache.ignite.internal.raft.LeaderElectionListener;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.Marshaller;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.PeerUnavailableException;
+import org.apache.ignite.internal.raft.ReadCommand;
+import org.apache.ignite.internal.raft.ReplicationGroupUnavailableException;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import org.apache.ignite.internal.raft.rebalance.RaftStaleUpdateException;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.rpc.ActionRequest;
+import org.apache.ignite.raft.jraft.rpc.ActionResponse;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests.ErrorResponse;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests.SMErrorResponse;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftException;
+import org.apache.ignite.raft.jraft.rpc.impl.SMCompactedThrowable;
+import org.apache.ignite.raft.jraft.rpc.impl.SMFullThrowable;
+import org.apache.ignite.raft.jraft.rpc.impl.SMThrowable;
+import org.apache.ignite.raft.jraft.util.Utils;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Executes RAFT commands with leader-aware retry semantics.
+ *
+ * <p>Supports three timeout modes:
+ * <ul>
+ *     <li>{@code timeout == 0}: Single attempt - tries each peer once, throws 
{@link ReplicationGroupUnavailableException}
+ *         if no leader is available.</li>
+ *     <li>{@code timeout < 0} or {@code == Long.MAX_VALUE}: Infinite wait 
mode - waits indefinitely for leader,
+ *         retries recoverable errors within {@code retryTimeoutMillis}.</li>
+ *     <li>{@code 0 < timeout < Long.MAX_VALUE}: Bounded wait mode - waits up 
to {@code timeout} for leader,
+ *         throws {@link ReplicationGroupUnavailableException} on timeout.</li>
+ * </ul>
+ */
+class RaftCommandExecutor {
+    /** The logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(RaftCommandExecutor.class);
+
+    /** Raft message factory. */
+    private static final RaftMessagesFactory MESSAGES_FACTORY = Loza.FACTORY;
+
+    /** Replication group ID. */
+    private final ReplicationGroupId groupId;
+
+    /** Supplier for the current peer list. Returns an immutable snapshot 
backed by a volatile field. */
+    private final Supplier<List<Peer>> peersSupplier;
+
+    /** Cluster service. */
+    private final ClusterService clusterService;
+
+    /** Executor to invoke RPC requests. */
+    private final ScheduledExecutorService executor;
+
+    /** RAFT configuration. */
+    private final RaftConfiguration raftConfiguration;
+
+    /** Command marshaller. */
+    private final Marshaller commandsMarshaller;
+
+    /** Factory for creating stopping exceptions. */
+    private final ExceptionFactory stoppingExceptionFactory;
+
+    /** State machine for tracking leader availability. */
+    private final LeaderAvailabilityState leaderAvailabilityState;
+
+    /** Current leader. */
+    private volatile Peer leader;
+
+    /** Busy lock for shutdown. */
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    /** This flag is used only for logging. */
+    private final AtomicBoolean peersAreUnavailable = new AtomicBoolean();
+
+    /**
+     * Constructor.
+     *
+     * @param groupId Replication group ID.
+     * @param peersSupplier Supplier for the current peer list. Must return an 
immutable snapshot.
+     * @param clusterService Cluster service.
+     * @param executor Executor to invoke RPC requests.
+     * @param raftConfiguration RAFT configuration.
+     * @param commandsMarshaller Command marshaller.
+     * @param stoppingExceptionFactory Factory for creating stopping 
exceptions.
+     */
+    RaftCommandExecutor(
+            ReplicationGroupId groupId,
+            Supplier<List<Peer>> peersSupplier,
+            ClusterService clusterService,
+            ScheduledExecutorService executor,
+            RaftConfiguration raftConfiguration,
+            Marshaller commandsMarshaller,
+            ExceptionFactory stoppingExceptionFactory
+    ) {
+        this.groupId = groupId;
+        this.peersSupplier = peersSupplier;
+        this.clusterService = clusterService;
+        this.executor = executor;
+        this.raftConfiguration = raftConfiguration;
+        this.commandsMarshaller = commandsMarshaller;
+        this.stoppingExceptionFactory = stoppingExceptionFactory;
+        this.leaderAvailabilityState = new LeaderAvailabilityState();
+    }
+
+    /**
+     * Returns a {@link LeaderElectionListener} that feeds the internal leader 
availability state machine.
+     *
+     * @return Leader election listener callback.
+     */
+    LeaderElectionListener leaderElectionListener() {
+        return leaderAvailabilityState::onLeaderElected;
+    }
+
+    /**
+     * Executes a command with leader-aware retry semantics.
+     *
+     * @param cmd Command to execute.
+     * @param timeoutMillis Timeout in milliseconds (0 = single attempt, 
negative/MAX_VALUE = infinite, positive = bounded).
+     * @return Future that completes with the command result.
+     */
+    <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
+        // Normalize timeout: negative values mean infinite wait.
+        long effectiveTimeout = (timeoutMillis < 0) ? Long.MAX_VALUE : 
timeoutMillis;
+        // Wait for leader mode (bounded or infinite).
+        long deadline = Utils.monotonicMsAfter(effectiveTimeout);
+
+        return executeWithBusyLock(responseFuture -> {
+            if (effectiveTimeout == 0) {
+                tryAllPeersOnce(responseFuture, cmd);
+            } else {
+                startRetryPhase(responseFuture, cmd, deadline, 
leaderAvailabilityState.currentTerm());
+            }
+        });
+    }
+
+    /**
+     * Shuts down the executor, blocking new run() calls and cancelling leader 
waiters.
+     *
+     * <p>Shutdown ordering is critical:
+     * <ol>
+     *     <li>Block busyLock first - ensures no new run() calls enter the 
busy section.
+     *         In-flight calls waiting on awaitLeader() have already exited 
the busy section.</li>
+     *     <li>Stop leaderAvailabilityState second - completes awaitLeader() 
futures exceptionally,
+     *         triggering callbacks that will be rejected by the blocked 
busyLock.</li>
+     * </ol>
+     *
+     * @param stopException Exception to complete waiters with.
+     */
+    void shutdown(Throwable stopException) {
+        busyLock.block();
+
+        leaderAvailabilityState.stop(stopException);
+    }
+
+    /**
+     * Resolves initial target peer for a command execution.
+     *
+     * <p>Tries the known leader first, falling back to a random peer if no 
leader is known.
+     *
+     * @return Initial target peer, or {@code null}.
+     */
+    private @Nullable Peer resolveInitialPeer() {
+        Peer targetPeer = leader;
+        if (targetPeer == null) {
+            targetPeer = randomNode(null, false);
+        }
+        return targetPeer;
+    }
+
+    /**
+     * Tries all peers once without waiting for leader.
+     *
+     * @param resultFuture Future that completes with the response, or fails 
with {@link ReplicationGroupUnavailableException} if no
+     *         peer responds successfully.
+     * @param cmd The command to execute.
+     */
+    private void tryAllPeersOnce(CompletableFuture<ActionResponse> 
resultFuture, Command cmd) {
+        Peer targetPeer = resolveInitialPeer();
+        if (targetPeer == null) {
+            resultFuture.completeExceptionally(new 
ReplicationGroupUnavailableException(groupId));
+
+            return;
+        }
+
+        var context = new RetryContext(
+                groupId.toString(),
+                targetPeer,
+                cmd::toStringForLightLogging,
+                createRequestFactory(cmd),
+                0,  // Single attempt - no retry timeout
+                raftConfiguration.responseTimeoutMillis().value()
+        );
+
+        sendWithRetrySingleAttempt(resultFuture, context);
+    }
+
+    /**
+     * Executes an action within busy lock and transforms the response.
+     */
+    @SuppressWarnings("unchecked")
+    private <R> CompletableFuture<R> 
executeWithBusyLock(Consumer<CompletableFuture<ActionResponse>> action) {
+        var responseFuture = new CompletableFuture<ActionResponse>();
+        var resultFuture = new CompletableFuture<R>();
+
+        if (!busyLock.enterBusy()) {
+            
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client 
is stopping [groupId=" + groupId + "]."));
+
+            return resultFuture;
+        }
+
+        try {
+            action.accept(responseFuture);
+
+            // Transform ActionResponse to result type.
+            responseFuture.whenComplete((resp, err) -> {
+                if (err != null) {
+                    resultFuture.completeExceptionally(err);
+                } else {
+                    resultFuture.complete((R) resp.result());
+                }
+            });
+
+            return resultFuture;
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Applies deadline to a future using orTimeout.
+     *
+     * @param future The future to apply deadline to.
+     * @param deadline Deadline in monotonic milliseconds, or Long.MAX_VALUE 
for no deadline.
+     * @return The future with timeout applied, or the original future if no 
deadline.
+     */
+    private static <T> CompletableFuture<T> applyDeadline(CompletableFuture<T> 
future, long deadline) {
+        if (deadline == Long.MAX_VALUE) {
+            return future;
+        }
+        long remainingTime = deadline - Utils.monotonicMs();
+        if (remainingTime <= 0) {
+            return CompletableFuture.failedFuture(new TimeoutException());
+        }
+        return future.orTimeout(remainingTime, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Creates a timeout exception for leader wait.
+     */
+    private ReplicationGroupUnavailableException createTimeoutException() {
+        return new ReplicationGroupUnavailableException(
+                groupId,
+                "Timeout waiting for leader [groupId=" + groupId + "]."
+        );
+    }
+
+    /**
+     * Waits for leader to become available and then retries the command.
+     */
+    private void waitForLeaderAndRetry(CompletableFuture<ActionResponse> 
resultFuture, Command cmd, long deadline) {
+        CompletableFuture<Long> leaderFuture = 
leaderAvailabilityState.awaitLeader();
+
+        // Apply timeout if bounded.
+        CompletableFuture<Long> timedLeaderFuture = 
applyDeadline(leaderFuture, deadline);
+
+        timedLeaderFuture.whenCompleteAsync((term, waitError) -> {
+            if (waitError != null) {
+                Throwable cause = unwrapCause(waitError);
+                if (cause instanceof TimeoutException) {
+                    
resultFuture.completeExceptionally(createTimeoutException());
+                } else {
+                    resultFuture.completeExceptionally(cause);
+                }
+                return;
+            }
+
+            if (!busyLock.enterBusy()) {
+                
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client 
is stopping [groupId=" + groupId + "]."));
+                return;
+            }
+
+            try {
+                // Leader is available, now run the command with retry logic.
+                startRetryPhase(resultFuture, cmd, deadline, term);
+            } finally {
+                busyLock.leaveBusy();
+            }
+        }, executor);
+    }
+
+    /**
+     * Starts the retry phase after leader is available.
+     */
+    private void startRetryPhase(CompletableFuture<ActionResponse> 
resultFuture, Command cmd, long deadline, long term) {
+        Peer targetPeer = resolveInitialPeer();
+        if (targetPeer == null) {
+            resultFuture.completeExceptionally(new 
ReplicationGroupUnavailableException(groupId));
+
+            return;
+        }
+
+        // Check deadline before starting retry phase.
+        long now = Utils.monotonicMs();
+        if (deadline != Long.MAX_VALUE && now >= deadline) {
+            resultFuture.completeExceptionally(createTimeoutException());
+            return;
+        }
+
+        // Use retry timeout bounded by remaining time until deadline.
+        long configTimeout = raftConfiguration.retryTimeoutMillis().value();
+
+        long sendWithRetryTimeoutMillis = deadline == Long.MAX_VALUE ? 
configTimeout : Math.min(configTimeout, deadline - now);
+
+        var context = new RetryContext(
+                groupId.toString(),
+                targetPeer,
+                cmd::toStringForLightLogging,
+                createRequestFactory(cmd),
+                sendWithRetryTimeoutMillis,
+                raftConfiguration.responseTimeoutMillis().value()
+        );
+
+        sendWithRetryWaitingForLeader(resultFuture, context, cmd, deadline, 
term);
+    }
+
+    /**
+     * Creates a request factory for the given command.
+     */
+    private Function<Peer, ActionRequest> createRequestFactory(Command cmd) {
+        if (cmd instanceof WriteCommand) {
+            return targetPeer -> MESSAGES_FACTORY.writeActionRequest()
+                    .groupId(groupId.toString())
+                    .command(commandsMarshaller.marshall(cmd))
+                    .deserializedCommand((WriteCommand) cmd)
+                    .build();
+        } else {
+            return targetPeer -> MESSAGES_FACTORY.readActionRequest()
+                    .groupId(groupId.toString())
+                    .command((ReadCommand) cmd)
+                    .readOnlySafe(true)
+                    .build();
+        }
+    }
+
+    /**
+     * Sends a request with single attempt (no retry on timeout).
+     *
+     * <p>In single-attempt mode, each peer is tried at most once.
+     */
+    private <R extends NetworkMessage> void sendWithRetrySingleAttempt(
+            CompletableFuture<R> fut,
+            RetryContext retryContext
+    ) {
+        if (!busyLock.enterBusy()) {
+            fut.completeExceptionally(stoppingExceptionFactory.create("Raft 
client is stopping [groupId=" + groupId + "]."));
+            return;
+        }
+
+        try {
+            long responseTimeout = retryContext.responseTimeoutMillis();
+
+            retryContext.onNewAttempt();
+
+            resolvePeer(retryContext.targetPeer())
+                    .thenCompose(node -> 
clusterService.messagingService().invoke(node, retryContext.request(), 
responseTimeout))
+                    .whenCompleteAsync((resp, err) ->
+                            handleResponse(
+                                    fut,
+                                    resp,
+                                    err,
+                                    retryContext,
+                                    new SingleAttemptRetryStrategy(fut)
+                            ),
+                            executor
+                    );
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Attempts to select a peer for retrying after a recoverable error.
+     *
+     * <p>This method combines error validation with peer selection for retry 
handling.
+     *
+     * @param err The throwable to check (will be unwrapped).
+     * @param retryContext Retry context for getting next peer.
+     * @return Result containing the next peer, or the exception to complete 
the future with.
+     */
+    private RetryPeerResult selectPeerForRetry(Throwable err, RetryContext 
retryContext) {
+        Throwable unwrappedErr = unwrapCause(err);
+
+        if (!RaftErrorUtils.recoverable(unwrappedErr)) {
+            return RetryPeerResult.fail(unwrappedErr);
+        }
+
+        Peer nextPeer = randomNode(retryContext, false);
+
+        if (nextPeer == null) {
+            return RetryPeerResult.fail(new 
ReplicationGroupUnavailableException(groupId));
+        }
+
+        return RetryPeerResult.success(nextPeer);
+    }
+
+    /**
+     * Result of attempting to select a peer for retry.
+     */
+    private static final class RetryPeerResult {
+        private final @Nullable Peer peer;
+        private final @Nullable Throwable error;
+
+        private RetryPeerResult(@Nullable Peer peer, @Nullable Throwable 
error) {
+            this.peer = peer;
+            this.error = error;
+        }
+
+        static RetryPeerResult success(Peer peer) {
+            return new RetryPeerResult(peer, null);
+        }
+
+        static RetryPeerResult fail(Throwable error) {
+            return new RetryPeerResult(null, error);
+        }
+
+        boolean isSuccess() {
+            return peer != null;
+        }
+
+        Peer peer() {
+            assert peer != null : "Check isSuccess() before calling peer()";
+            return peer;
+        }
+
+        Throwable error() {
+            assert error != null : "Check isSuccess() before calling error()";
+            return error;
+        }
+    }
+
+    private static void logRecoverableError(RetryContext retryContext, Peer 
nextPeer) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(
+                    "Recoverable error during the request occurred (will be 
retried) [request={}, peer={}, newPeer={}, traceId={}].",
+                    includeSensitive() ? retryContext.request() : 
retryContext.request().toStringForLightLogging(),
+                    retryContext.targetPeer(),
+                    nextPeer,
+                    retryContext.errorTraceId()
+            );
+        }
+    }
+
+    /**
+     * Handles the response from a raft peer request.
+     *
+     * <p>Dispatches the response to the appropriate handler based on its type:
+     * throwable, error response, state machine error, or successful response.
+     *
+     * @param fut Future to complete with the result.
+     * @param resp Response message, or {@code null} if an error occurred.
+     * @param err Throwable if the request failed, or {@code null} on success.
+     * @param retryContext Retry context.
+     * @param strategy Strategy for executing retries.
+     */
+    @SuppressWarnings("unchecked")
+    private <R extends NetworkMessage> void handleResponse(
+            CompletableFuture<R> fut,
+            @Nullable NetworkMessage resp,
+            @Nullable Throwable err,
+            RetryContext retryContext,
+            RetryExecutionStrategy strategy
+    ) {
+        if (!busyLock.enterBusy()) {
+            fut.completeExceptionally(stoppingExceptionFactory.create("Raft 
client is stopping [groupId=" + groupId + "]."));
+            return;
+        }
+
+        try {
+            if (err != null) {
+                handleThrowableWithRetry(fut, err, retryContext, strategy);
+            } else if (resp instanceof ErrorResponse) {
+                handleErrorResponseCommon(fut, (ErrorResponse) resp, 
retryContext, strategy);
+            } else if (resp instanceof SMErrorResponse) {
+                fut.completeExceptionally(extractSmError((SMErrorResponse) 
resp, retryContext));
+            } else {
+                leader = retryContext.targetPeer();
+                fut.complete((R) resp);
+            }
+        } catch (Throwable e) {
+            fut.completeExceptionally(e);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    private void handleThrowableWithRetry(
+            CompletableFuture<? extends NetworkMessage> fut,
+            Throwable err,
+            RetryContext retryContext,
+            RetryExecutionStrategy strategy
+    ) {
+        RetryPeerResult result = selectPeerForRetry(err, retryContext);
+
+        if (!result.isSuccess()) {
+            fut.completeExceptionally(result.error());
+            return;
+        }
+
+        Peer nextPeer = result.peer();
+        logRecoverableError(retryContext, nextPeer);
+
+        String shortReasonMessage = "Peer " + 
retryContext.targetPeer().consistentId()
+                + " threw " + unwrapCause(err).getClass().getSimpleName();
+        strategy.executeRetry(retryContext, nextPeer, PeerTracking.COMMON, 
shortReasonMessage);
+    }
+
+    /**
+     * Sends a request with retry and waits for leader on leader absence.
+     */
+    private void sendWithRetryWaitingForLeader(
+            CompletableFuture<ActionResponse> fut,
+            RetryContext retryContext,
+            Command cmd,
+            long deadline,
+            long termWhenStarted
+    ) {
+        if (!busyLock.enterBusy()) {
+            fut.completeExceptionally(stoppingExceptionFactory.create("Raft 
client is stopping [groupId=" + groupId + "]."));
+            return;
+        }
+
+        try {
+            long requestStartTime = Utils.monotonicMs();
+            long stopTime = retryContext.stopTime();
+
+            if (requestStartTime >= stopTime) {
+                // Retry timeout expired - fail with timeout exception.
+                // For non-leader-related retriable errors, we don't wait for 
leader, just fail.
+                fut.completeExceptionally(createTimeoutException());
+                return;
+            }
+
+            retryContext.onNewAttempt();
+
+            // Bound response timeout by remaining time until retry stop time.
+            long responseTimeout = 
Math.min(retryContext.responseTimeoutMillis(), stopTime - requestStartTime);
+
+            resolvePeer(retryContext.targetPeer())
+                    .thenCompose(node -> 
clusterService.messagingService().invoke(node, retryContext.request(), 
responseTimeout))
+                    // Enforce timeout even if messaging service doesn't 
(e.g., in tests with mocks).
+                    .orTimeout(responseTimeout > 0 ? responseTimeout : 
Long.MAX_VALUE, TimeUnit.MILLISECONDS)
+                    .whenCompleteAsync(
+                            (resp, err) -> handleResponse(
+                                    fut,
+                                    resp,
+                                    err,
+                                    retryContext,
+                                    new LeaderWaitRetryStrategy(fut, cmd, 
deadline, termWhenStarted)
+                            ),
+                            executor
+                    );
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Extracts the error from a state machine error response.
+     *
+     * @param resp State machine error response.
+     * @param retryContext Retry context (used for error trace ID).
+     * @return The throwable to complete the future with.
+     */
+    private static Throwable extractSmError(SMErrorResponse resp, RetryContext 
retryContext) {
+        SMThrowable th = resp.error();
+
+        if (th instanceof SMCompactedThrowable) {
+            SMCompactedThrowable compactedThrowable = (SMCompactedThrowable) 
th;
+
+            try {
+                return (Throwable) 
Class.forName(compactedThrowable.throwableClassName())
+                        .getConstructor(String.class)
+                        .newInstance(compactedThrowable.throwableMessage());
+            } catch (Exception e) {
+                LOG.warn("Cannot restore throwable from user's state machine. "
+                        + "Check if throwable " + 
compactedThrowable.throwableClassName()
+                        + " is present in the classpath.");
+
+                return new IgniteInternalException(
+                        retryContext.errorTraceId(), INTERNAL_ERR, 
compactedThrowable.throwableMessage()
+                );
+            }
+        } else if (th instanceof SMFullThrowable) {
+            return ((SMFullThrowable) th).throwable();
+        } else {
+            return new IgniteInternalException(
+                    retryContext.errorTraceId(),
+                    INTERNAL_ERR,
+                    "Unknown SMThrowable type: " + (th == null ? "null" : 
th.getClass().getName())
+            );
+        }
+    }
+
+    private static boolean retriableError(@Nullable Throwable e, 
NetworkMessage raftResponse) {
+        int errorCode = raftResponse instanceof ErrorResponse ? 
((ErrorResponse) raftResponse).errorCode() : 0;
+        RaftError raftError = RaftError.forNumber(errorCode);
+
+        if (e != null) {
+            e = unwrapCause(e);
+
+            // Retriable error but can be caused by an overload.
+            return e instanceof TimeoutException || e instanceof IOException;
+        }
+
+        return raftError == RaftError.EBUSY || raftError == RaftError.EAGAIN;
+    }
+
+    /**
+     * Resolves a peer to an internal cluster node.
+     */
+    private CompletableFuture<InternalClusterNode> resolvePeer(Peer peer) {
+        InternalClusterNode node = 
clusterService.topologyService().getByConsistentId(peer.consistentId());
+
+        if (node == null) {
+            return CompletableFuture.failedFuture(new 
PeerUnavailableException(peer.consistentId()));
+        }
+
+        return CompletableFuture.completedFuture(node);
+    }
+
+    /**
+     * Parses a peer ID string to a Peer object.
+     *
+     * @param peerId Peer ID string in format "consistentId:idx" or just 
"consistentId".
+     * @return Parsed Peer object, or {@code null} if parsing fails.
+     */
+    private static @Nullable Peer parsePeer(@Nullable String peerId) {
+        PeerId id = PeerId.parsePeer(peerId);
+
+        return id == null ? null : new Peer(id.getConsistentId(), id.getIdx());
+    }
+
+    /**
+     * Returns a random peer excluding unavailable peers and optionally "no 
leader" peers.
+     *
+     * <p>Behavior depends on parameters:
+     * <ul>
+     *     <li>If {@code retryContext} is null, returns any peer from the 
list.</li>
+     *     <li>If {@code excludeNoLeaderPeers} is true, also excludes peers 
that returned "no leader"
+     *         and does NOT reset exclusion sets when exhausted (returns null 
instead).</li>
+     *     <li>If {@code excludeNoLeaderPeers} is false and all peers are 
unavailable in non-single-shot mode,
+     *         resets unavailable peers and tries again.</li>
+     * </ul>
+     *
+     * @param retryContext Retry context, or null for initial peer selection.
+     * @param excludeNoLeaderPeers Whether to exclude peers that returned "no 
leader" response.
+     * @return A random available peer, or null if none available.
+     */
+    private @Nullable Peer randomNode(@Nullable RetryContext retryContext, 
boolean excludeNoLeaderPeers) {
+        List<Peer> localPeers = peers();
+
+        if (localPeers == null || localPeers.isEmpty()) {
+            return null;
+        }
+
+        var availablePeers = new ArrayList<Peer>(localPeers.size());
+
+        if (retryContext == null) {
+            availablePeers.addAll(localPeers);
+        } else {
+            for (Peer peer : localPeers) {
+                if (retryContext.targetPeer().equals(peer) || 
retryContext.unavailablePeers().contains(peer)) {
+                    continue;
+                }
+                if (excludeNoLeaderPeers && 
retryContext.noLeaderPeers().contains(peer)) {
+                    continue;
+                }
+                availablePeers.add(peer);
+            }
+
+            // Reset unavailable peers only if NOT tracking no-leader peers 
separately.
+            // When tracking no-leader peers, we want to exhaust all peers and 
then wait for leader.
+            if (availablePeers.isEmpty() && !excludeNoLeaderPeers && 
!retryContext.singleShotRequest()) {
+                if (!peersAreUnavailable.getAndSet(true)) {
+                    LOG.warn(
+                            "All peers are unavailable, going to keep retrying 
until timeout [peers = {}, group = {}, trace ID: {}, "
+                                    + "request {}, origin command {}, 
instance={}].",
+                            localPeers,
+                            groupId,
+                            retryContext.errorTraceId(),
+                            retryContext.request().toStringForLightLogging(),
+                            retryContext.originCommandDescription(),
+                            this
+                    );
+                }
+
+                retryContext.resetUnavailablePeers();
+                retryContext.resetNoLeaderPeers();
+
+                // Read the volatile field again, just in case it changed.
+                availablePeers.addAll(peers());
+            } else {
+                peersAreUnavailable.set(false);
+            }
+        }
+
+        if (availablePeers.isEmpty()) {
+            return null;
+        }
+
+        Collections.shuffle(availablePeers, ThreadLocalRandom.current());
+
+        return availablePeers.stream()
+                .filter(peer -> 
clusterService.topologyService().getByConsistentId(peer.consistentId()) != null)
+                .findAny()
+                .orElse(availablePeers.get(0));
+    }
+
+    private List<Peer> peers() {
+        return peersSupplier.get();
+    }
+
+    private static String getShortReasonMessage(RetryContext retryContext, 
RaftError error, ErrorResponse resp) {
+        return format("Peer {} returned code {}: {}", 
retryContext.targetPeer().consistentId(), error, resp.errorMsg());
+    }
+
+    /**
+     * How to track the current peer when moving to a new one.
+     */
+    private enum PeerTracking {
+        /** Don't mark the current peer (transient errors, leader redirects). 
*/
+        COMMON,
+        /** Mark as unavailable (down, shutting down). */
+        UNAVAILABLE,
+        /** Mark as "no leader" (working but doesn't know leader). */
+        NO_LEADER
+    }
+
+    /**
+     * Strategy for executing retries. Each method receives everything needed 
to perform the retry.
+     *
+     * <p>This interface abstracts the differences between single-attempt mode 
and leader-wait mode:
+     * <ul>
+     *     <li><b>Single-attempt mode</b>: Each peer is tried at most once. 
All errors mark the peer
+     *         as unavailable. When all peers exhausted, fail immediately.</li>
+     *     <li><b>Leader-wait mode</b>: Transient errors retry on the same 
peer with delay.
+     *         "No leader" errors track peers separately. When exhausted, wait 
for leader notification.</li>
+     * </ul>
+     */
+    private interface RetryExecutionStrategy {
+        /**
+         * Executes retry on the specified peer with the given tracking for 
the current peer.
+         *
+         * @param context Current retry context.
+         * @param nextPeer Peer to retry on.
+         * @param trackCurrentAs How to track the current peer ({@link 
PeerTracking#COMMON} for "don't track").
+         * @param reason Human-readable reason for the retry.
+         */
+        void executeRetry(RetryContext context, Peer nextPeer, PeerTracking 
trackCurrentAs, String reason);
+
+        /**
+         * Called when all peers have been exhausted.
+         *
+         * <p>In leader-wait mode, this triggers waiting for leader 
notification.
+         * In single-attempt mode, this completes with {@link 
ReplicationGroupUnavailableException}.
+         */
+        void onAllPeersExhausted();
+
+        /**
+         * Whether to track "no leader" peers separately from unavailable 
peers.
+         *
+         * <p>In leader-wait mode, peers that return "no leader" are tracked 
separately so that
+         * when all peers are exhausted, the strategy can wait for a leader 
notification rather
+         * than failing immediately. In single-attempt mode, all errors are 
treated uniformly
+         * as unavailable.
+         */
+        boolean trackNoLeaderSeparately();
+    }
+
+    /**
+     * Retry strategy for single-attempt mode.
+     *
+     * <p>In single-attempt mode, each peer is tried at most once. All errors 
mark the peer
+     * as unavailable. When all peers have been tried, the request fails with
+     * {@link ReplicationGroupUnavailableException}.
+     */
+    private class SingleAttemptRetryStrategy implements RetryExecutionStrategy 
{
+        private final CompletableFuture<? extends NetworkMessage> fut;
+
+        SingleAttemptRetryStrategy(CompletableFuture<? extends NetworkMessage> 
fut) {
+            this.fut = fut;
+        }
+
+        @Override
+        public void executeRetry(RetryContext context, Peer nextPeer, 
PeerTracking trackCurrentAs, String reason) {
+            // In single-attempt mode, each peer is tried at most once.
+            // For transient errors, the caller passes the same peer (meaning 
"retry on same peer"),
+            // but we must select a different one instead.
+            Peer effectiveNextPeer = nextPeer.equals(context.targetPeer())
+                    ? randomNode(context, false)
+                    : nextPeer;
+
+            if (effectiveNextPeer == null) {
+                onAllPeersExhausted();
+                return;
+            }
+
+            sendWithRetrySingleAttempt(fut, 
context.nextAttemptForUnavailablePeer(effectiveNextPeer, reason));
+        }
+
+        @Override
+        public void onAllPeersExhausted() {
+            fut.completeExceptionally(new 
ReplicationGroupUnavailableException(groupId));
+        }
+
+        @Override
+        public boolean trackNoLeaderSeparately() {
+            return false;
+        }
+    }
+
+    /**
+     * Retry strategy for leader-wait mode.
+     *
+     * <p>In leader-wait mode:
+     * <ul>
+     *     <li>Transient errors retry on the same peer after delay</li>
+     *     <li>"No leader" errors track peers separately and try each peer 
once</li>
+     *     <li>When all peers are exhausted, waits for leader notification</li>
+     * </ul>
+     */
+    private class LeaderWaitRetryStrategy implements RetryExecutionStrategy {
+        private final CompletableFuture<ActionResponse> fut;
+        private final Command cmd;
+        private final long deadline;
+        private final long termWhenStarted;
+
+        LeaderWaitRetryStrategy(
+                CompletableFuture<ActionResponse> fut,
+                Command cmd,
+                long deadline,
+                long termWhenStarted
+        ) {
+            this.fut = fut;
+            this.cmd = cmd;
+            this.deadline = deadline;
+            this.termWhenStarted = termWhenStarted;
+        }
+
+        @Override
+        public void executeRetry(RetryContext context, Peer nextPeer, 
PeerTracking trackCurrentAs, String reason) {
+            RetryContext nextContext;
+            switch (trackCurrentAs) {
+                case COMMON:
+                    nextContext = context.nextAttempt(nextPeer, reason);
+                    break;
+                case UNAVAILABLE:
+                    nextContext = 
context.nextAttemptForUnavailablePeer(nextPeer, reason);
+                    break;
+                case NO_LEADER:
+                    nextContext = context.nextAttemptForNoLeaderPeer(nextPeer, 
reason);
+                    break;
+                default:
+                    throw new AssertionError("Unexpected tracking: " + 
trackCurrentAs);
+            }
+
+            executor.schedule(
+                    () -> sendWithRetryWaitingForLeader(fut, nextContext, cmd, 
deadline, termWhenStarted),
+                    raftConfiguration.retryDelayMillis().value(),
+                    TimeUnit.MILLISECONDS
+            );
+        }
+
+        @Override
+        public void onAllPeersExhausted() {
+            LOG.debug("All peers exhausted, waiting for leader [groupId={}, 
term={}]", groupId, termWhenStarted);
+            leaderAvailabilityState.onGroupUnavailable(termWhenStarted);
+            waitForLeaderAndRetry(fut, cmd, deadline);
+        }
+
+        @Override
+        public boolean trackNoLeaderSeparately() {
+            return true;
+        }
+    }
+
+    /**
+     * Selects next peer and executes retry, or calls exhausted handler.
+     * Centralizes the "select peer + null check + fallback" pattern.
+     *
+     * @param context Retry context.
+     * @param excludeNoLeaderPeers Whether to also exclude peers that returned 
"no leader".
+     * @param trackCurrentAs How to track the current peer.
+     * @param reason Human-readable reason for the retry.
+     * @param strategy Strategy for executing the retry.
+     */
+    private void selectPeerAndRetry(
+            RetryContext context,
+            boolean excludeNoLeaderPeers,
+            PeerTracking trackCurrentAs,
+            String reason,
+            RetryExecutionStrategy strategy
+    ) {
+        Peer nextPeer = randomNode(context, excludeNoLeaderPeers);
+        if (nextPeer != null) {
+            strategy.executeRetry(context, nextPeer, trackCurrentAs, reason);
+        } else {
+            strategy.onAllPeersExhausted();
+        }
+    }
+
+    /**
+     * Handles error response using the provided strategy.
+     *
+     * <p>This method handles peer selection centrally. The strategy only 
decides HOW to retry
+     * (with delay, marking peers, waiting for leader, etc.).
+     *
+     * <p>Error categories:
+     * <ul>
+     *     <li><b>SUCCESS</b>: Complete successfully</li>
+     *     <li><b>Transient</b> (EBUSY/EAGAIN/UNKNOWN/EINTERNAL/ENOENT): Retry 
on same peer</li>
+     *     <li><b>Unavailable</b> (EHOSTDOWN/ESHUTDOWN/ENODESHUTDOWN/ESTOP): 
Peer is down, try another</li>
+     *     <li><b>EPERM with leader</b>: Redirect to actual leader</li>
+     *     <li><b>EPERM without leader</b>: No leader known, try another peer 
or wait</li>
+     *     <li><b>ESTALE</b>: Terminal error, stale update</li>
+     *     <li><b>Other</b>: Terminal error</li>
+     * </ul>
+     *
+     * @param fut Future to complete on terminal errors.
+     * @param resp Error response from the peer.
+     * @param retryContext Retry context.
+     * @param strategy Strategy for executing retries.
+     */
+    private void handleErrorResponseCommon(
+            CompletableFuture<? extends NetworkMessage> fut,
+            ErrorResponse resp,
+            RetryContext retryContext,
+            RetryExecutionStrategy strategy
+    ) {
+        boolean trackNoLeaderSeparately = strategy.trackNoLeaderSeparately();
+        RaftError error = RaftError.forNumber(resp.errorCode());
+        String reason = getShortReasonMessage(retryContext, error, resp);
+
+        switch (error) {
+            case SUCCESS:
+                leader = retryContext.targetPeer();
+                fut.complete(null);
+                break;
+
+            case EBUSY:
+            case EAGAIN:
+            case UNKNOWN:
+            case EINTERNAL:
+            case ENOENT:
+                // Transient errors - retry on same peer (COMMON = don't mark 
current peer).
+                strategy.executeRetry(retryContext, retryContext.targetPeer(), 
PeerTracking.COMMON, reason);
+                break;
+
+            case EHOSTDOWN:
+            case ESHUTDOWN:
+            case ENODESHUTDOWN:
+            case ESTOP:
+                // Peer is down - select new peer, mark current as UNAVAILABLE.
+                selectPeerAndRetry(retryContext, false, 
PeerTracking.UNAVAILABLE, reason, strategy);
+                break;
+
+            case EPERM:
+                if (resp.leaderId() == null) {
+                    // No leader known - select new peer, track based on mode.
+                    PeerTracking tracking = trackNoLeaderSeparately ? 
PeerTracking.NO_LEADER : PeerTracking.UNAVAILABLE;
+                    selectPeerAndRetry(retryContext, trackNoLeaderSeparately, 
tracking, reason, strategy);
+                } else {
+                    // Redirect to known leader (COMMON = don't mark current 
peer as bad).
+                    Peer leaderPeer = parsePeer(resp.leaderId());
+
+                    if (leaderPeer == null) {
+                        throw new IllegalStateException("parsePeer returned 
null for non-null leaderId: " + resp.leaderId());
+                    }
+
+                    leader = leaderPeer;
+                    strategy.executeRetry(retryContext, leaderPeer, 
PeerTracking.COMMON, reason);
+                }
+                break;
+
+            case ESTALE:
+                fut.completeExceptionally(new 
RaftStaleUpdateException(resp.errorMsg()));
+                break;
+
+            default:
+                fut.completeExceptionally(new RaftException(error, 
resp.errorMsg()));
+                break;
+        }
+    }
+}
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/RaftErrorUtils.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/RaftErrorUtils.java
new file mode 100644
index 00000000000..6d4278d74f6
--- /dev/null
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/RaftErrorUtils.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.client;
+
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
+
+import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.internal.network.RecipientLeftException;
+import org.apache.ignite.internal.raft.PeerUnavailableException;
+
+/**
+ * Utility methods for classifying RAFT errors.
+ */
+class RaftErrorUtils {
+    /**
+     * Returns {@code true} if the given throwable represents a recoverable 
error that can be retried.
+     *
+     * @param t Throwable to check.
+     * @return {@code true} if recoverable.
+     */
+    static boolean recoverable(Throwable t) {
+        t = unwrapCause(t);
+
+        return t instanceof TimeoutException
+                || t instanceof IOException
+                || t instanceof PeerUnavailableException
+                || t instanceof RecipientLeftException;
+    }
+}
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/RetryContext.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/RetryContext.java
index 4b7cc3b6307..ccf60bede8c 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/RetryContext.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/RetryContext.java
@@ -44,8 +44,14 @@ import org.jetbrains.annotations.Nullable;
  * calls.
  */
 class RetryContext {
+    /** Indicates that default response timeout should be used. */
+    static final long USE_DEFAULT_RESPONSE_TIMEOUT = -1;
+
     private static final int MAX_RETRY_REASONS = 25;
 
+    private static final DateTimeFormatter TIMESTAMP_FORMATTER =
+            
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss,SSS").withZone(ZoneId.systemDefault());
+
     private final String groupId;
 
     private Peer targetPeer;
@@ -75,6 +81,13 @@ class RetryContext {
      */
     private final Set<Peer> unavailablePeers = new HashSet<>();
 
+    /**
+     * Set of peers that returned "no leader" response (EPERM with 
leaderId==null).
+     * Unlike {@link #unavailablePeers}, this set is NOT reset when all peers 
are exhausted.
+     * Instead, when all peers are in this set, we wait for leader 
notification.
+     */
+    private final Set<Peer> noLeaderPeers = new HashSet<>();
+
     /**
      * List of last {@value MAX_RETRY_REASONS} retry reasons. {@link 
LinkedList} in order to allow fast head removal upon overflow.
      */
@@ -96,6 +109,11 @@ class RetryContext {
 
     private final long responseTimeoutMillis;
 
+    /**
+     * If {@code true} then all peers will be retried only once.
+     */
+    private final boolean singleShotRequest;
+
     /**
      * Creates a context.
      *
@@ -127,6 +145,7 @@ class RetryContext {
         this.attemptScheduleTime = this.startTime;
         this.attemptStartTime = this.startTime;
         this.responseTimeoutMillis = responseTimeoutMillis;
+        this.singleShotRequest = sendWithRetryTimeoutMillis == 0;
     }
 
     Peer targetPeer() {
@@ -157,10 +176,34 @@ class RetryContext {
         return errorTraceId;
     }
 
+    boolean singleShotRequest() {
+        return singleShotRequest;
+    }
+
     void resetUnavailablePeers() {
         unavailablePeers.clear();
     }
 
+    void resetNoLeaderPeers() {
+        noLeaderPeers.clear();
+    }
+
+    Set<Peer> noLeaderPeers() {
+        return noLeaderPeers;
+    }
+
+    /**
+     * Updates this context by changing the target peer and adding the 
previous target peer to the "no leader" set.
+     * Used when a peer returns EPERM with no leader information.
+     *
+     * @return {@code this}.
+     */
+    RetryContext nextAttemptForNoLeaderPeer(Peer newTargetPeer, String 
shortReasonMessage) {
+        noLeaderPeers.add(targetPeer);
+
+        return nextAttempt(newTargetPeer, shortReasonMessage);
+    }
+
     /**
      * Updates this context by changing the target peer.
      *
@@ -195,10 +238,7 @@ class RetryContext {
     }
 
     private static String timestampToString(long timestamp) {
-        DateTimeFormatter formatter = 
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss,SSS")
-                .withZone(ZoneId.systemDefault());
-        Instant instant = Instant.ofEpochMilli(timestamp);
-        return formatter.format(instant);
+        return TIMESTAMP_FORMATTER.format(Instant.ofEpochMilli(timestamp));
     }
 
     /**
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/client/LeaderAvailabilityStateTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/client/LeaderAvailabilityStateTest.java
new file mode 100644
index 00000000000..acc1b3a7af8
--- /dev/null
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/client/LeaderAvailabilityStateTest.java
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.client;
+
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.deriveUuidFrom;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.network.ClusterNodeImpl;
+import org.apache.ignite.internal.network.InternalClusterNode;
+import org.apache.ignite.internal.raft.client.LeaderAvailabilityState.State;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.network.NetworkAddress;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for {@link LeaderAvailabilityState} state machine.
+ */
+public class LeaderAvailabilityStateTest extends BaseIgniteAbstractTest {
+
+    /**
+     * Initial state should be {@link State#WAITING_FOR_LEADER} with term -1.
+     * {@link LeaderAvailabilityState#onGroupUnavailable(long)} has no effect 
when already in {@link State#WAITING_FOR_LEADER}.
+     */
+    @Test
+    void testOnGroupUnavailableWhenAlreadyWaiting() {
+        LeaderAvailabilityState state = new LeaderAvailabilityState();
+
+        assertEquals(State.WAITING_FOR_LEADER, state.currentState());
+        assertEquals(-1, state.currentTerm());
+
+        // Already in WAITING_FOR_LEADER, should have no effect
+        state.onGroupUnavailable(0);
+
+        assertEquals(State.WAITING_FOR_LEADER, state.currentState());
+        assertEquals(-1, state.currentTerm());
+    }
+
+    /** {@link LeaderAvailabilityState#awaitLeader()} returns incomplete 
future when no leader elected. */
+    @Test
+    void testAwaitLeaderWhenNoLeader() {
+        LeaderAvailabilityState state = new LeaderAvailabilityState();
+
+        CompletableFuture<Long> future = state.awaitLeader();
+
+        assertFalse(future.isDone(), "Future should not be completed when no 
leader");
+    }
+
+    /**
+     * {@link LeaderAvailabilityState#onLeaderElected(InternalClusterNode, 
long)} transitions to {@link State#LEADER_AVAILABLE} state and
+     * completes waiting futures.
+     * {@link LeaderAvailabilityState#awaitLeader()} returns completed future 
when leader is available.
+     */
+    @Test
+    void testAwaitLeader() {
+        LeaderAvailabilityState state = new LeaderAvailabilityState();
+        InternalClusterNode leaderNode = createNode("leader-node");
+
+        long expectedTerm = 1;
+
+        // No leader present. The future will wait.
+        CompletableFuture<Long> waiter = state.awaitLeader();
+        assertFalse(waiter.isDone());
+
+        state.onLeaderElected(leaderNode, expectedTerm);
+
+        // Verify leader is available after onLeaderElected.
+        assertEquals(State.LEADER_AVAILABLE, state.currentState());
+        assertEquals(expectedTerm, state.currentTerm());
+
+        assertTrue(waiter.isDone());
+        assertEquals(expectedTerm, waiter.join());
+
+        // Leader is present. The future is completed.
+        CompletableFuture<Long> future = state.awaitLeader();
+
+        assertTrue(future.isDone(), "Future should be completed when leader 
available");
+        assertEquals(expectedTerm, future.join());
+
+        // Verify the state has not changed and we see exactly the same 
results as previously.
+        assertEquals(State.LEADER_AVAILABLE, state.currentState());
+        assertEquals(expectedTerm, state.currentTerm());
+    }
+
+    /** Stale term notifications are ignored. */
+    @Test
+    void testStaleTermIgnored() {
+        LeaderAvailabilityState state = new LeaderAvailabilityState();
+        InternalClusterNode leaderNode1 = createNode("leader-1");
+        InternalClusterNode leaderNode2 = createNode("leader-2");
+
+        long expectedTerm = 5;
+
+        state.onLeaderElected(leaderNode1, expectedTerm);
+        assertEquals(expectedTerm, state.currentTerm());
+
+        // Stale notification with lower term should be ignored.
+        state.onLeaderElected(leaderNode2, 3);
+        assertEquals(expectedTerm, state.currentTerm());
+    }
+
+    /** Equal term notifications are ignored. */
+    @Test
+    void testEqualTermIgnored() {
+        LeaderAvailabilityState state = new LeaderAvailabilityState();
+        InternalClusterNode leaderNode = createNode("leader");
+
+        long expectedTerm = 5;
+
+        state.onLeaderElected(leaderNode, expectedTerm);
+        assertEquals(expectedTerm, state.currentTerm());
+
+        // Same term should be ignored.
+        state.onLeaderElected(leaderNode, expectedTerm);
+        assertEquals(expectedTerm, state.currentTerm());
+    }
+
+    /** Higher term updates current term. */
+    @Test
+    void testHigherTermUpdates() {
+        LeaderAvailabilityState state = new LeaderAvailabilityState();
+        InternalClusterNode leaderNode1 = createNode("leader-1");
+        InternalClusterNode leaderNode2 = createNode("leader-2");
+
+        state.onLeaderElected(leaderNode1, 1);
+        assertEquals(1, state.currentTerm());
+
+        state.onLeaderElected(leaderNode2, 5);
+        assertEquals(5, state.currentTerm());
+        assertEquals(State.LEADER_AVAILABLE, state.currentState());
+    }
+
+    /**
+     * {@link LeaderAvailabilityState#onGroupUnavailable(long)} transitions 
back to WAITING_FOR_LEADER.
+     */
+    @Test
+    void testOnGroupUnavailableTransition() {
+        LeaderAvailabilityState state = new LeaderAvailabilityState();
+        InternalClusterNode leaderNode = createNode("leader-node");
+
+        long expectedTerm = 1;
+
+        state.onLeaderElected(leaderNode, expectedTerm);
+        assertEquals(State.LEADER_AVAILABLE, state.currentState());
+        assertEquals(expectedTerm, state.currentTerm());
+
+        state.onGroupUnavailable(expectedTerm);
+        assertEquals(State.WAITING_FOR_LEADER, state.currentState());
+        assertEquals(expectedTerm, state.currentTerm());
+    }
+
+    /**
+     * {@link LeaderAvailabilityState#onGroupUnavailable(long)} ignored if 
term changed.
+     */
+    @Test
+    void testOnGroupUnavailableIgnoredIfTermChanged() {
+        LeaderAvailabilityState state = new LeaderAvailabilityState();
+        InternalClusterNode leaderNode = createNode("leader-node");
+
+        state.onLeaderElected(leaderNode, 1);
+        state.onLeaderElected(leaderNode, 2);
+        assertEquals(State.LEADER_AVAILABLE, state.currentState());
+        assertEquals(2, state.currentTerm());
+
+        // Should be ignored because term changed from 1 to 2
+        state.onGroupUnavailable(1);
+        assertEquals(State.LEADER_AVAILABLE, state.currentState());
+        assertEquals(2, state.currentTerm());
+    }
+
+    /**
+     * After {@link LeaderAvailabilityState#onGroupUnavailable(long)}, new 
waiters get fresh future.
+     */
+    @Test
+    void testNewWaitersAfterUnavailable() {
+        LeaderAvailabilityState state = new LeaderAvailabilityState();
+        InternalClusterNode leaderNode = createNode("leader-node");
+
+        state.onLeaderElected(leaderNode, 1);
+        CompletableFuture<Long> future1 = state.awaitLeader();
+        assertTrue(future1.isDone());
+
+        state.onGroupUnavailable(1);
+
+        CompletableFuture<Long> future2 = state.awaitLeader();
+        assertFalse(future2.isDone(), "New waiter should get incomplete future 
after unavailable");
+    }
+
+    /** Multiple waiters are all completed on leader election. */
+    @Test
+    void testMultipleWaitersCompleted() {
+        LeaderAvailabilityState state = new LeaderAvailabilityState();
+        InternalClusterNode leaderNode = createNode("leader-node");
+
+        CompletableFuture<Long> waiter1 = state.awaitLeader();
+        CompletableFuture<Long> waiter2 = state.awaitLeader();
+        CompletableFuture<Long> waiter3 = state.awaitLeader();
+
+        assertFalse(waiter1.isDone());
+        assertFalse(waiter2.isDone());
+        assertFalse(waiter3.isDone());
+
+        state.onLeaderElected(leaderNode, 10);
+
+        assertTrue(waiter1.isDone());
+        assertTrue(waiter2.isDone());
+        assertTrue(waiter3.isDone());
+
+        assertEquals(10L, waiter1.join());
+        assertEquals(10L, waiter2.join());
+        assertEquals(10L, waiter3.join());
+    }
+
+    /** Term 0 is accepted as first valid term. */
+    @Test
+    void testTermZeroAccepted() {
+        LeaderAvailabilityState state = new LeaderAvailabilityState();
+        InternalClusterNode leaderNode = createNode("leader");
+
+        assertEquals(-1, state.currentTerm());
+
+        state.onLeaderElected(leaderNode, 0);
+
+        assertEquals(0, state.currentTerm());
+        assertEquals(State.LEADER_AVAILABLE, state.currentState());
+    }
+
+    /**
+     * Negative terms are rejected with {@link IllegalArgumentException}.
+     */
+    @Test
+    void testNegativeTermRejected() {
+        LeaderAvailabilityState state = new LeaderAvailabilityState();
+        InternalClusterNode leaderNode = createNode("leader");
+
+        assertEquals(-1, state.currentTerm());
+
+        IllegalArgumentException thrown = assertThrows(
+                IllegalArgumentException.class,
+                () -> state.onLeaderElected(leaderNode, -1)
+        );
+
+        assertEquals("Term must be non-negative: -1", thrown.getMessage());
+        assertEquals(State.WAITING_FOR_LEADER, state.currentState());
+        assertEquals(-1, state.currentTerm());
+    }
+
+    /** Concurrent leader elections with different terms. */
+    @Test
+    void testConcurrentLeaderElections() throws Exception {
+        LeaderAvailabilityState state = new LeaderAvailabilityState();
+        int threadCount = 10;
+        CountDownLatch startLatch = new CountDownLatch(1);
+        CountDownLatch doneLatch = new CountDownLatch(threadCount);
+
+        Thread[] threads = new Thread[threadCount];
+        for (int i = 0; i < threadCount; i++) {
+            int term = i;
+            threads[i] = new Thread(() -> {
+                try {
+                    startLatch.await();
+                    InternalClusterNode leader = createNode("leader-" + term);
+                    state.onLeaderElected(leader, term);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                } finally {
+                    doneLatch.countDown();
+                }
+            });
+            threads[i].start();
+        }
+
+        startLatch.countDown();
+        assertTrue(doneLatch.await(5, TimeUnit.SECONDS));
+
+        // The final term should be the maximum term that was set
+        assertEquals(threadCount - 1, state.currentTerm());
+    }
+
+    /**
+     * {@link LeaderAvailabilityState#awaitLeader()} returns same future for 
multiple calls when waiting.
+     */
+    @Test
+    void testAwaitLeaderReturnsSameFuture() {
+        LeaderAvailabilityState state = new LeaderAvailabilityState();
+
+        CompletableFuture<Long> future1 = state.awaitLeader();
+        CompletableFuture<Long> future2 = state.awaitLeader();
+
+        // Should return the same future instance when in WAITING_FOR_LEADER 
state
+        assertSame(future1, future2, "Should return same future instance when 
waiting");
+    }
+
+    /** Stop completes multiple waiters exceptionally. */
+    @Test
+    void testStopCompletesMultipleWaiters() {
+        LeaderAvailabilityState state = new LeaderAvailabilityState();
+        RuntimeException testException = new RuntimeException("Test shutdown");
+
+        // All waiters share the same future
+        CompletableFuture<Long> waiter1 = state.awaitLeader();
+        CompletableFuture<Long> waiter2 = state.awaitLeader();
+        CompletableFuture<Long> waiter3 = state.awaitLeader();
+
+        assertFalse(waiter1.isDone());
+        assertFalse(waiter2.isDone());
+        assertFalse(waiter3.isDone());
+
+        state.stop(testException);
+
+        assertTrue(waiter1.isCompletedExceptionally());
+        assertTrue(waiter2.isCompletedExceptionally());
+        assertTrue(waiter3.isCompletedExceptionally());
+
+        CompletionException thrown1 = assertThrows(CompletionException.class, 
waiter1::join);
+        assertSame(testException, thrown1.getCause());
+        CompletionException thrown2 = assertThrows(CompletionException.class, 
waiter2::join);
+        assertSame(testException, thrown2.getCause());
+        CompletionException thrown3 = assertThrows(CompletionException.class, 
waiter3::join);
+        assertSame(testException, thrown3.getCause());
+
+        // New waiter should immediately fail with the same exception
+        CompletableFuture<Long> waiterAfterComplete = state.awaitLeader();
+        assertTrue(waiterAfterComplete.isCompletedExceptionally());
+
+        CompletionException thrown = assertThrows(CompletionException.class, 
waiterAfterComplete::join);
+        assertSame(testException, thrown.getCause());
+    }
+
+    /** Stop when no waiters are pending still marks state as stopped. */
+    @Test
+    void testStopWhenNoWaiters() {
+        LeaderAvailabilityState state = new LeaderAvailabilityState();
+        RuntimeException testException = new RuntimeException("Test shutdown");
+
+        assertFalse(state.stopped());
+        // Stop without any waiters - should not throw
+        state.stop(testException);
+
+        assertTrue(state.stopped());
+
+        // Subsequent awaitLeader should fail
+        CompletableFuture<Long> waiter = state.awaitLeader();
+        assertTrue(waiter.isDone());
+        assertTrue(waiter.isCompletedExceptionally());
+
+        CompletionException thrown = assertThrows(CompletionException.class, 
waiter::join);
+        assertSame(testException, thrown.getCause());
+    }
+
+    /** Stop when leader is available fails subsequent {@link 
LeaderAvailabilityState#awaitLeader()}. */
+    @Test
+    void testStopWhenLeaderAvailable() {
+        LeaderAvailabilityState state = new LeaderAvailabilityState();
+        InternalClusterNode leader = createNode("leader");
+        RuntimeException testException = new RuntimeException("Test shutdown");
+
+        state.onLeaderElected(leader, 1);
+        assertEquals(State.LEADER_AVAILABLE, state.currentState());
+
+        state.stop(testException);
+
+        assertTrue(state.stopped());
+
+        // awaitLeader should fail after stop, even though leader was available
+        CompletableFuture<Long> waiter = state.awaitLeader();
+        assertTrue(waiter.isDone());
+        assertTrue(waiter.isCompletedExceptionally());
+
+        CompletionException thrown = assertThrows(CompletionException.class, 
waiter::join);
+        assertSame(testException, thrown.getCause());
+    }
+
+    /** Multiple stop calls are idempotent. */
+    @Test
+    void testMultipleStopCalls() {
+        LeaderAvailabilityState state = new LeaderAvailabilityState();
+        IllegalStateException exception1 = new 
IllegalStateException("Component stopping");
+        RuntimeException exception2 = new RuntimeException("Shutdown 2");
+
+        CompletableFuture<Long> waiter1 = state.awaitLeader();
+        state.stop(exception1);
+        assertTrue(waiter1.isCompletedExceptionally());
+
+        // Second stop call should be ignored
+        state.stop(exception2);
+
+        // Subsequent awaitLeader should fail with the FIRST exception
+        CompletableFuture<Long> waiter2 = state.awaitLeader();
+        assertTrue(waiter2.isDone());
+
+        CompletionException thrown = assertThrows(CompletionException.class, 
waiter2::join);
+        assertSame(exception1, thrown.getCause(), "Should use exception from 
first stop call");
+    }
+
+    /**
+     * {@link LeaderAvailabilityState#onLeaderElected(InternalClusterNode, 
long)} is ignored after stop.
+     */
+    @Test
+    void testOnLeaderElectedIgnoredAfterStop() {
+        LeaderAvailabilityState state = new LeaderAvailabilityState();
+        InternalClusterNode leader = createNode("leader");
+        RuntimeException testException = new RuntimeException("Test shutdown");
+
+        CompletableFuture<Long> waiter1 = state.awaitLeader();
+        state.stop(testException);
+        assertTrue(waiter1.isCompletedExceptionally());
+
+        // Leader election after stop should be ignored
+        state.onLeaderElected(leader, 5);
+
+        // State should still be stopped.
+        assertTrue(state.stopped());
+
+        // New waiter should still fail
+        CompletableFuture<Long> waiter2 = state.awaitLeader();
+        assertTrue(waiter2.isDone());
+        assertTrue(waiter2.isCompletedExceptionally());
+    }
+
+    /**
+     * {@link LeaderAvailabilityState#onGroupUnavailable(long)} is ignored 
after stop.
+     */
+    @Test
+    void testOnGroupUnavailableIgnoredAfterStop() {
+        LeaderAvailabilityState state = new LeaderAvailabilityState();
+        InternalClusterNode leader = createNode("leader");
+        RuntimeException testException = new RuntimeException("Test shutdown");
+
+        state.onLeaderElected(leader, 1);
+        assertEquals(State.LEADER_AVAILABLE, state.currentState());
+
+        state.stop(testException);
+
+        // Group unavailable after stop should be ignored
+        state.onGroupUnavailable(1);
+
+        assertTrue(state.stopped());
+    }
+
+    private static InternalClusterNode createNode(String name) {
+        return new ClusterNodeImpl(deriveUuidFrom(name), name, new 
NetworkAddress("localhost", 123));
+    }
+}
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupServiceRunTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupServiceRunTest.java
index 9560e631f9d..54dee9568e0 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupServiceRunTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupServiceRunTest.java
@@ -40,6 +40,7 @@ import static org.mockito.Mockito.when;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledExecutorService;
@@ -81,7 +82,6 @@ import org.apache.ignite.raft.jraft.rpc.WriteActionRequest;
 import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.ArgumentCaptor;
@@ -103,13 +103,16 @@ import org.mockito.quality.Strictness;
  */
 @ExtendWith(MockitoExtension.class)
 @MockitoSettings(strictness = Strictness.LENIENT)
-@Disabled("IGNITE-27156")
 public class PhysicalTopologyAwareRaftGroupServiceRunTest extends 
BaseIgniteAbstractTest {
 
     private static final List<Peer> NODES = Stream.of(20000, 20001, 20002)
             .map(port -> new Peer("localhost-" + port))
             .collect(toUnmodifiableList());
 
+    private static final List<Peer> FIVE_NODES = Stream.of(20000, 20001, 
20002, 20003, 20004)
+            .map(port -> new Peer("localhost-" + port))
+            .collect(toUnmodifiableList());
+
     private static final RaftMessagesFactory FACTORY = new 
RaftMessagesFactory();
 
     private static final FailureManager NOOP_FAILURE_PROCESSOR = new 
FailureManager(new NoOpFailureHandler());
@@ -168,6 +171,10 @@ public class PhysicalTopologyAwareRaftGroupServiceRunTest 
extends BaseIgniteAbst
         lenient().when(responseTimeoutValue.value()).thenReturn(3000L);
         
lenient().when(raftConfiguration.responseTimeoutMillis()).thenReturn(responseTimeoutValue);
 
+        ConfigurationValue<Long> retryDelayValue = 
mock(ConfigurationValue.class);
+        lenient().when(retryDelayValue.value()).thenReturn(50L);
+        
lenient().when(raftConfiguration.retryDelayMillis()).thenReturn(retryDelayValue);
+
         executor = new ScheduledThreadPoolExecutor(20, 
IgniteThreadFactory.create("common", Loza.CLIENT_POOL_NAME, logger()));
         eventsClientListener = new RaftGroupEventsClientListener();
     }
@@ -181,10 +188,14 @@ public class PhysicalTopologyAwareRaftGroupServiceRunTest 
extends BaseIgniteAbst
     }
 
     private PhysicalTopologyAwareRaftGroupService startService() {
+        return startService(NODES);
+    }
+
+    private PhysicalTopologyAwareRaftGroupService startService(List<Peer> 
peers) {
         var commandsMarshaller = new 
ThreadLocalOptimizedMarshaller(cluster.serializationRegistry());
 
         PeersAndLearners peersAndLearners = PeersAndLearners.fromConsistentIds(
-                
NODES.stream().map(Peer::consistentId).collect(Collectors.toSet())
+                
peers.stream().map(Peer::consistentId).collect(Collectors.toSet())
         );
 
         service = PhysicalTopologyAwareRaftGroupService.start(
@@ -595,6 +606,131 @@ public class PhysicalTopologyAwareRaftGroupServiceRunTest 
extends BaseIgniteAbst
         assertThat(result, willThrow(NodeStoppingException.class, 5, 
TimeUnit.SECONDS));
     }
 
+    /**
+     * Tests that UNKNOWN/EINTERNAL/ENOENT errors retry on the same peer for 
ActionRequests.
+     * This is because these errors are transient and the peer is likely to 
recover.
+     * This behavior is consistent with {@link RaftGroupServiceImpl}.
+     *
+     * <p>The test verifies that after receiving an EINTERNAL error from a 
peer,
+     * the next retry goes to the same peer (not a different one).
+     */
+    @Test
+    void testTransientErrorRetriesOnSamePeer() {
+        // Track the sequence of peers called.
+        List<String> calledPeers = new CopyOnWriteArrayList<>();
+
+        // Mock all WriteActionRequest calls - track which peer is called.
+        when(messagingService.invoke(
+                any(InternalClusterNode.class),
+                argThat(this::isTestWriteCommand),
+                anyLong())
+        ).thenAnswer(invocation -> {
+            InternalClusterNode target = invocation.getArgument(0);
+            calledPeers.add(target.name());
+
+            if (calledPeers.size() == 1) {
+                // First call returns EINTERNAL error.
+                return completedFuture(FACTORY.errorResponse()
+                        .errorCode(RaftError.EINTERNAL.getNumber())
+                        .build());
+            }
+            // Second call succeeds.
+            return completedFuture(FACTORY.actionResponse().result(new 
TestResponse()).build());
+        });
+
+        PhysicalTopologyAwareRaftGroupService svc = startService();
+
+        // Simulate leader election and wait for it to be processed.
+        simulateLeaderElectionAndWait(NODES.get(0), CURRENT_TERM);
+
+        CompletableFuture<Object> result = svc.run(testWriteCommand(), 
Long.MAX_VALUE);
+
+        assertThat(result, willBe(instanceOf(TestResponse.class)));
+
+        // Verify that exactly 2 calls were made.
+        assertThat("Should have exactly 2 calls", calledPeers.size(), is(2));
+
+        // Key assertion: both calls should be to the SAME peer (retry on same 
peer for transient errors).
+        assertThat("Transient error should retry on the same peer, but first 
call was to "
+                        + calledPeers.get(0) + " and second was to " + 
calledPeers.get(1),
+                calledPeers.get(0), is(calledPeers.get(1)));
+    }
+
+    /**
+     * Tests that with bounded timeout, a non-leader-related retriable error 
(like EBUSY) causes
+     * retries until timeout without waiting for leader. This differs from 
EPERM with no leader,
+     * which tries all peers once then waits for leader.
+     */
+    @Test
+    void testBoundedTimeoutRetriesNonLeaderErrorUntilTimeout() {
+        AtomicInteger callCount = new AtomicInteger(0);
+
+        // All peers return EBUSY (a retriable error that is NOT related to 
missing leader).
+        when(messagingService.invoke(
+                any(InternalClusterNode.class),
+                argThat(this::isTestWriteCommand),
+                anyLong()
+        )).thenAnswer(invocation -> {
+            callCount.incrementAndGet();
+            return completedFuture(FACTORY.errorResponse()
+                    .errorCode(RaftError.EBUSY.getNumber())
+                    .build());
+        });
+
+        PhysicalTopologyAwareRaftGroupService svc = startService();
+
+        // With 300ms timeout and EBUSY errors, should retry until timeout.
+        // Unlike EPERM (no leader), EBUSY should cause continuous retries, 
not wait for leader.
+        CompletableFuture<Object> result = svc.run(testWriteCommand(), 300);
+
+        assertThat(result, 
willThrow(ReplicationGroupUnavailableException.class, 1, TimeUnit.SECONDS));
+
+        // Should have made more than 3 calls (cycling through peers multiple 
times).
+        // With retryDelayMillis=50 and 300ms timeout, we should get several 
retry rounds.
+        assertTrue(callCount.get() > 3,
+                "Expected more than 3 calls (multiple retry rounds), but got " 
+ callCount.get());
+    }
+
+    /**
+     * Tests that with infinite timeout, a non-leader-related retriable error 
(like EBUSY) causes
+     * retries indefinitely until success. This differs from EPERM with no 
leader.
+     */
+    @Test
+    void testInfiniteTimeoutRetriesNonLeaderErrorUntilSuccess() throws 
Exception {
+        AtomicInteger callCount = new AtomicInteger(0);
+        CountDownLatch multipleRetriesDone = new CountDownLatch(5);
+
+        // First 5 calls return EBUSY, then success.
+        when(messagingService.invoke(
+                any(InternalClusterNode.class),
+                argThat(this::isTestWriteCommand),
+                anyLong()
+        )).thenAnswer(invocation -> {
+            int count = callCount.incrementAndGet();
+            if (count <= 5) {
+                multipleRetriesDone.countDown();
+                return completedFuture(FACTORY.errorResponse()
+                        .errorCode(RaftError.EBUSY.getNumber())
+                        .build());
+            }
+            return completedFuture(FACTORY.actionResponse().result(new 
TestResponse()).build());
+        });
+
+        PhysicalTopologyAwareRaftGroupService svc = startService();
+
+        // With infinite timeout and EBUSY errors, should retry until success.
+        CompletableFuture<Object> result = svc.run(testWriteCommand(), 
Long.MAX_VALUE);
+
+        // Wait for multiple retry attempts.
+        assertTrue(multipleRetriesDone.await(5, TimeUnit.SECONDS), "Should 
have multiple retries");
+
+        // Should eventually succeed.
+        assertThat(result, willCompleteSuccessfully());
+
+        // Should have made more than 5 calls.
+        assertTrue(callCount.get() > 5, "Expected more than 5 calls, but got " 
+ callCount.get());
+    }
+
     /**
      * Tests that with bounded timeout, if the client is shutting down during 
retry phase,
      * the future completes with NodeStoppingException immediately without new 
retry round.
@@ -604,6 +740,7 @@ public class PhysicalTopologyAwareRaftGroupServiceRunTest 
extends BaseIgniteAbst
         var pendingRetryInvoke = new CompletableFuture<Void>();
         var retryPhaseStarted = new CountDownLatch(1);
         var allPeersTried = new CountDownLatch(3);
+        var fifthCallAttempted = new CountDownLatch(1);
 
         AtomicInteger callCount = new AtomicInteger(0);
         when(messagingService.invoke(
@@ -626,7 +763,8 @@ public class PhysicalTopologyAwareRaftGroupServiceRunTest 
extends BaseIgniteAbst
                         .errorCode(RaftError.EBUSY.getNumber())
                         .build());
             }
-            // Should not reach here.
+            // Fifth call should not happen after shutdown.
+            fifthCallAttempted.countDown();
             return completedFuture(FACTORY.actionResponse().result(new 
TestResponse()).build());
         });
 
@@ -653,14 +791,75 @@ public class PhysicalTopologyAwareRaftGroupServiceRunTest 
extends BaseIgniteAbst
         // The result should complete with NodeStoppingException.
         assertThat(result, willThrow(NodeStoppingException.class, 5, 
TimeUnit.SECONDS));
 
-        // Give some time for any additional retry attempts.
-        Thread.sleep(200);
-
         // Verify no additional retry attempts were made after shutdown.
-        // We should have exactly 4 invocations (3 initial + 1 retry attempt 
that was interrupted).
+        // The fifth call latch should NOT be counted down (wait briefly and 
check).
+        assertThat("No 5th call should be attempted after shutdown",
+                fifthCallAttempted.await(100, TimeUnit.MILLISECONDS), 
is(false));
         assertThat(callCount.get(), is(4));
     }
 
+    /**
+     * Tests that when a leader was previously elected and then becomes 
unavailable (all peers return EPERM with no leader),
+     * the service tries all peers exactly once and then waits for leader 
notification.
+     *
+     * <p>This test verifies that the term is correctly passed to the retry 
logic so that
+     * {@code onGroupUnavailable(term)} can properly transition the state from 
LEADER_AVAILABLE to WAITING_FOR_LEADER.
+     */
+    @Test
+    void testInfiniteTimeoutWithPreviousLeaderTriesPeersOnceBeforeWaiting() 
throws Exception {
+        AtomicInteger callCount = new AtomicInteger(0);
+        CountDownLatch allPeersTried = new CountDownLatch(3);
+        // This latch will be counted down if more than 3 calls are made 
(indicating the bug).
+        CountDownLatch extraCallsMade = new CountDownLatch(1);
+
+        // All peers return EPERM with no leader.
+        when(messagingService.invoke(
+                any(InternalClusterNode.class),
+                argThat(this::isTestWriteCommand),
+                anyLong()
+        )).thenAnswer(invocation -> {
+            int count = callCount.incrementAndGet();
+            if (count <= 3) {
+                allPeersTried.countDown();
+            } else {
+                // More than 3 calls means the bug exists - extra retry cycle 
happened.
+                extraCallsMade.countDown();
+            }
+            return completedFuture(FACTORY.errorResponse()
+                    .errorCode(RaftError.EPERM.getNumber())
+                    .build());
+        });
+
+        PhysicalTopologyAwareRaftGroupService svc = startService();
+
+        // Simulate leader election BEFORE calling run().
+        // After this, state is LEADER_AVAILABLE with currentTerm=1.
+        simulateLeaderElectionAndWait(NODES.get(0), CURRENT_TERM);
+
+        // Start the command with infinite timeout.
+        // The service should try all peers once and then wait for leader (not 
do an extra cycle).
+        CompletableFuture<Object> result = svc.run(testWriteCommand(), 
Long.MAX_VALUE);
+
+        // Wait for all 3 peer attempts to complete.
+        assertTrue(allPeersTried.await(5, TimeUnit.SECONDS), "All 3 peers 
should be tried");
+
+        // Give some time for potential extra calls (if bug exists).
+        // If the term is incorrectly passed as -1, awaitLeader() returns 
immediately
+        // and another retry cycle starts immediately.
+        boolean extraCallsHappened = extraCallsMade.await(500, 
TimeUnit.MILLISECONDS);
+
+        // The result should NOT be complete yet - it should be waiting for 
leader.
+        assertThat("Result should be waiting for leader, not completed",
+                result.isDone(), is(false));
+
+        // Verify exactly 3 calls were made (one per peer), not 6.
+        assertThat("Expected exactly 3 calls (one per peer), but got " + 
callCount.get()
+                        + ". If 6+ calls were made, the term was incorrectly 
passed as -1 "
+                        + "causing an extra retry cycle before proper 
waiting.",
+                extraCallsHappened, is(false));
+        assertThat(callCount.get(), is(3));
+    }
+
     private void verifyExact3PeersCalled() {
         ArgumentCaptor<InternalClusterNode> nodeCaptor = 
ArgumentCaptor.forClass(InternalClusterNode.class);
         Mockito.verify(this.messagingService, Mockito.times(3)).invoke(
@@ -683,4 +882,179 @@ public class PhysicalTopologyAwareRaftGroupServiceRunTest 
extends BaseIgniteAbst
 
     private static class TestResponse {
     }
+
+    /**
+     * Tests single-attempt mode (timeout=0) with 5 nodes: all return "no 
leader".
+     *
+     * <p>In single-attempt mode, "no leader" is treated same as unavailable.
+     * Each peer is tried exactly once, then fails with 
ReplicationGroupUnavailableException.
+     */
+    @Test
+    void testSingleAttemptModeWithAllNoLeader() {
+        AtomicInteger callCount = new AtomicInteger(0);
+        Set<String> calledPeers = ConcurrentHashMap.newKeySet();
+
+        // All peers return EPERM with no leader.
+        when(messagingService.invoke(
+                any(InternalClusterNode.class),
+                argThat(this::isTestWriteCommand),
+                anyLong()
+        )).thenAnswer(invocation -> {
+            InternalClusterNode target = invocation.getArgument(0);
+            calledPeers.add(target.name());
+            callCount.incrementAndGet();
+            return completedFuture(FACTORY.errorResponse()
+                    .errorCode(RaftError.EPERM.getNumber())
+                    .leaderId(null)
+                    .build());
+        });
+
+        PhysicalTopologyAwareRaftGroupService svc = startService(FIVE_NODES);
+
+        // With timeout=0, should try each peer once and fail.
+        CompletableFuture<Object> result = svc.run(testWriteCommand(), 0);
+
+        assertThat(result, 
willThrow(ReplicationGroupUnavailableException.class));
+
+        // Verify each peer was tried exactly once.
+        assertThat("Should call exactly 5 peers", callCount.get(), is(5));
+        assertThat("Should call all 5 unique peers", calledPeers.size(), 
is(5));
+    }
+
+    /**
+     * Tests single-attempt mode (timeout=0) with 5 nodes: 3 return "no 
leader", 2 return EHOSTDOWN.
+     *
+     * <p>In single-attempt mode, all errors are treated the same - each peer 
is tried once.
+     * The request should fail after all 5 peers are tried.
+     */
+    @Test
+    void testSingleAttemptModeWithMixedErrors() {
+        AtomicInteger callCount = new AtomicInteger(0);
+        Set<String> calledPeers = ConcurrentHashMap.newKeySet();
+
+        when(messagingService.invoke(
+                any(InternalClusterNode.class),
+                argThat(this::isTestWriteCommand),
+                anyLong()
+        )).thenAnswer(invocation -> {
+            InternalClusterNode target = invocation.getArgument(0);
+            calledPeers.add(target.name());
+            int count = callCount.incrementAndGet();
+
+            // First 3 calls return "no leader", next 2 return EHOSTDOWN.
+            if (count <= 3) {
+                return completedFuture(FACTORY.errorResponse()
+                        .errorCode(RaftError.EPERM.getNumber())
+                        .leaderId(null)
+                        .build());
+            }
+            return completedFuture(FACTORY.errorResponse()
+                    .errorCode(RaftError.EHOSTDOWN.getNumber())
+                    .build());
+        });
+
+        PhysicalTopologyAwareRaftGroupService svc = startService(FIVE_NODES);
+
+        // With timeout=0, should try each peer once and fail.
+        CompletableFuture<Object> result = svc.run(testWriteCommand(), 0);
+
+        assertThat(result, 
willThrow(ReplicationGroupUnavailableException.class));
+
+        // Verify each peer was tried exactly once.
+        assertThat("Should call exactly 5 peers", callCount.get(), is(5));
+        assertThat("Should call all 5 unique peers", calledPeers.size(), 
is(5));
+    }
+
+    /**
+     * Tests leader-wait mode: 3 nodes return "no leader", then those same 
nodes succeed after leader election.
+     *
+     * <p>Key verification: "no leader" peers are NOT in unavailablePeers, so 
they CAN be retried
+     * after leader notification arrives.
+     */
+    @Test
+    void testLeaderWaitModeRetriesNoLeaderPeersAfterLeaderElection() throws 
Exception {
+        List<String> calledPeers = new CopyOnWriteArrayList<>();
+        AtomicInteger noLeaderResponseCount = new AtomicInteger(0);
+        CountDownLatch allPeersTriedOnce = new CountDownLatch(3);
+
+        when(messagingService.invoke(
+                any(InternalClusterNode.class),
+                argThat(this::isTestWriteCommand),
+                anyLong()
+        )).thenAnswer(invocation -> {
+            InternalClusterNode target = invocation.getArgument(0);
+            calledPeers.add(target.name());
+
+            // First round: all 3 peers return "no leader".
+            if (noLeaderResponseCount.get() < 3) {
+                noLeaderResponseCount.incrementAndGet();
+                allPeersTriedOnce.countDown();
+                return completedFuture(FACTORY.errorResponse()
+                        .errorCode(RaftError.EPERM.getNumber())
+                        .leaderId(null)
+                        .build());
+            }
+
+            // After leader election: succeed.
+            return completedFuture(FACTORY.actionResponse().result(new 
TestResponse()).build());
+        });
+
+        PhysicalTopologyAwareRaftGroupService svc = startService();
+
+        // Start the command with infinite timeout.
+        CompletableFuture<Object> result = svc.run(testWriteCommand(), 
Long.MAX_VALUE);
+
+        // Wait for all 3 peers to be tried once.
+        assertTrue(allPeersTriedOnce.await(5, TimeUnit.SECONDS), "All peers 
should be tried once");
+
+        // The result should NOT be complete yet - waiting for leader.
+        assertThat(result.isDone(), is(false));
+
+        // Simulate leader election.
+        simulateLeaderElection(NODES.get(0), CURRENT_TERM);
+
+        // Should eventually succeed (by retrying one of the "no leader" 
peers).
+        assertThat(result, willCompleteSuccessfully());
+
+        // Verify that at least one peer was called twice (first with "no 
leader", then success).
+        long totalCalls = calledPeers.size();
+        assertTrue(totalCalls > 3, "Should have more than 3 calls (some peers 
retried after leader election), got " + totalCalls);
+    }
+
+    /**
+     * Tests single-attempt mode (timeout=0) with transient errors (EBUSY).
+     *
+     * <p>In single-attempt mode, each peer should be tried at most once, even 
for transient errors.
+     * EBUSY should cause the executor to move to the next peer, not retry the 
same peer indefinitely.
+     */
+    @Test
+    void testSingleAttemptModeWithTransientErrors() {
+        AtomicInteger callCount = new AtomicInteger(0);
+        Set<String> calledPeers = ConcurrentHashMap.newKeySet();
+
+        // All peers return EBUSY (transient error).
+        when(messagingService.invoke(
+                any(InternalClusterNode.class),
+                argThat(this::isTestWriteCommand),
+                anyLong()
+        )).thenAnswer(invocation -> {
+            InternalClusterNode target = invocation.getArgument(0);
+            calledPeers.add(target.name());
+            callCount.incrementAndGet();
+            return completedFuture(FACTORY.errorResponse()
+                    .errorCode(RaftError.EBUSY.getNumber())
+                    .build());
+        });
+
+        PhysicalTopologyAwareRaftGroupService svc = startService();
+
+        // With timeout=0, should try each peer at most once and fail, not 
loop forever.
+        CompletableFuture<Object> result = svc.run(testWriteCommand(), 0);
+
+        assertThat(result, 
willThrow(ReplicationGroupUnavailableException.class, 2, TimeUnit.SECONDS));
+
+        // Should have tried each peer at most once (3 peers).
+        assertThat("Should call at most 3 peers, but got " + callCount.get(), 
callCount.get(), is(3));
+        assertThat("Should call all 3 unique peers", calledPeers.size(), 
is(3));
+    }
 }
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/client/RaftGroupServiceTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/client/RaftGroupServiceTest.java
index 69e2c41537f..4563d13c716 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/client/RaftGroupServiceTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/client/RaftGroupServiceTest.java
@@ -47,6 +47,7 @@ import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -680,6 +681,51 @@ public class RaftGroupServiceTest extends 
BaseIgniteAbstractTest {
         assertThat(response, willThrow(TimeoutException.class, "Send with 
retry timed out"));
     }
 
+    /**
+     * Tests that UNKNOWN/EINTERNAL/ENOENT errors retry on the same peer (the 
leader) for ActionRequests.
+     * This is because these errors are transient and the leader is likely to 
recover.
+     */
+    @ParameterizedTest
+    @EnumSource(names = {"UNKNOWN", "EINTERNAL", "ENOENT"})
+    public void testRetryOnTransientErrorRetriesOnSamePeer(RaftError error) {
+        Peer leaderPeer = NODES.get(0);
+
+        // First call returns error, second call succeeds - both to the same 
leader.
+        when(messagingService.invoke(
+                argThat((InternalClusterNode node) -> node != null && 
node.name().equals(leaderPeer.consistentId())),
+                any(ReadActionRequest.class),
+                anyLong())
+        )
+                .thenReturn(completedFuture(FACTORY.errorResponse()
+                        .errorCode(error.getNumber())
+                        .build()))
+                
.thenReturn(completedFuture(FACTORY.actionResponse().result(null).build()));
+
+        RaftGroupService service = 
startRaftGroupServiceWithRefreshLeader(NODES);
+
+        assertThat(service.leader(), is(leaderPeer));
+
+        CompletableFuture<Object> response = 
service.run(mock(ReadCommand.class));
+
+        assertThat(response, willBe(nullValue()));
+
+        // Verify that only the leader was called (twice: once with error, 
once with success).
+        verify(messagingService, atLeastOnce()).invoke(
+                argThat((InternalClusterNode target) -> target != null && 
target.name().equals(leaderPeer.consistentId())),
+                any(ReadActionRequest.class),
+                anyLong()
+        );
+
+        // Verify that other peers were NOT called.
+        for (Peer otherPeer : NODES.subList(1, NODES.size())) {
+            verify(messagingService, never()).invoke(
+                    argThat((InternalClusterNode target) -> target != null && 
target.name().equals(otherPeer.consistentId())),
+                    any(ReadActionRequest.class),
+                    anyLong()
+            );
+        }
+    }
+
     @ParameterizedTest
     @EnumSource(names = {"EPERM"})
     public void testRetryOnErrorWithUpdateLeader(RaftError error) {

Reply via email to