JAkutenshi commented on code in PR #7242:
URL: https://github.com/apache/ignite-3/pull/7242#discussion_r2726459695


##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -326,7 +397,823 @@ private static boolean recoverable(Throwable t) {
 
     @Override
     public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
-        return raftClient.run(cmd, timeoutMillis);
+        // Normalize timeout: negative values mean infinite wait.
+        long effectiveTimeout = (timeoutMillis < 0) ? Long.MAX_VALUE : 
timeoutMillis;
+        // Wait for leader mode (bounded or infinite).
+        long deadline = Utils.monotonicMsAfter(effectiveTimeout);
+
+        return executeWithBusyLock(responseFuture -> {
+            if (effectiveTimeout == 0) {
+                tryAllPeersOnce(responseFuture, cmd);
+            } else {
+                startRetryPhase(responseFuture, cmd, deadline, 
leaderAvailabilityState.currentTerm());
+            }
+        });
+    }
+
+    /**
+     * Resolves initial target peer for a command execution.
+     *
+     * <p>Tries the known leader first, falling back to a random peer if no 
leader is known.
+     *
+     * @return Initial target peer, or {@code null}.
+     */
+    @Nullable
+    private Peer resolveInitialPeer() {
+        Peer targetPeer = leader;
+        if (targetPeer == null) {
+            targetPeer = randomNode(null, false);
+        }
+        return targetPeer;
+    }
+
+    /**
+     * Tries all peers once without waiting for leader.
+     *
+     * @param resultFuture Future that completes with the response, or fails 
with {@link ReplicationGroupUnavailableException} if no
+     *         peer responds successfully.
+     * @param cmd The command to execute.
+     */
+    private void tryAllPeersOnce(CompletableFuture<ActionResponse> 
resultFuture, Command cmd) {
+        Peer targetPeer = resolveInitialPeer();
+        if (targetPeer == null) {
+            resultFuture.completeExceptionally(new 
ReplicationGroupUnavailableException(groupId()));
+
+            return;
+        }
+
+        var context = new RetryContext(
+                groupId().toString(),
+                targetPeer,
+                cmd::toStringForLightLogging,
+                createRequestFactory(cmd),
+                0,  // Single attempt - no retry timeout
+                RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT

Review Comment:
   Would we use for a request timeout only this "marker" value? If so, why do 
not pass `retryContext.responseTimeoutMillis()` explicitly? 



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -326,7 +397,823 @@ private static boolean recoverable(Throwable t) {
 
     @Override
     public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
-        return raftClient.run(cmd, timeoutMillis);
+        // Normalize timeout: negative values mean infinite wait.
+        long effectiveTimeout = (timeoutMillis < 0) ? Long.MAX_VALUE : 
timeoutMillis;
+        // Wait for leader mode (bounded or infinite).
+        long deadline = Utils.monotonicMsAfter(effectiveTimeout);
+
+        return executeWithBusyLock(responseFuture -> {
+            if (effectiveTimeout == 0) {
+                tryAllPeersOnce(responseFuture, cmd);
+            } else {
+                startRetryPhase(responseFuture, cmd, deadline, 
leaderAvailabilityState.currentTerm());
+            }
+        });
+    }
+
+    /**
+     * Resolves initial target peer for a command execution.
+     *
+     * <p>Tries the known leader first, falling back to a random peer if no 
leader is known.
+     *
+     * @return Initial target peer, or {@code null}.
+     */
+    @Nullable
+    private Peer resolveInitialPeer() {
+        Peer targetPeer = leader;
+        if (targetPeer == null) {
+            targetPeer = randomNode(null, false);
+        }
+        return targetPeer;
+    }
+
+    /**
+     * Tries all peers once without waiting for leader.
+     *
+     * @param resultFuture Future that completes with the response, or fails 
with {@link ReplicationGroupUnavailableException} if no
+     *         peer responds successfully.
+     * @param cmd The command to execute.
+     */
+    private void tryAllPeersOnce(CompletableFuture<ActionResponse> 
resultFuture, Command cmd) {
+        Peer targetPeer = resolveInitialPeer();
+        if (targetPeer == null) {
+            resultFuture.completeExceptionally(new 
ReplicationGroupUnavailableException(groupId()));
+
+            return;
+        }
+
+        var context = new RetryContext(
+                groupId().toString(),
+                targetPeer,
+                cmd::toStringForLightLogging,
+                createRequestFactory(cmd),
+                0,  // Single attempt - no retry timeout
+                RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT

Review Comment:
   Also should we worry about `sendWithRetryTimeout < responseTimeout` case?



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -326,7 +397,823 @@ private static boolean recoverable(Throwable t) {
 
     @Override
     public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
-        return raftClient.run(cmd, timeoutMillis);
+        // Normalize timeout: negative values mean infinite wait.
+        long effectiveTimeout = (timeoutMillis < 0) ? Long.MAX_VALUE : 
timeoutMillis;
+        // Wait for leader mode (bounded or infinite).
+        long deadline = Utils.monotonicMsAfter(effectiveTimeout);
+
+        return executeWithBusyLock(responseFuture -> {
+            if (effectiveTimeout == 0) {
+                tryAllPeersOnce(responseFuture, cmd);
+            } else {
+                startRetryPhase(responseFuture, cmd, deadline, 
leaderAvailabilityState.currentTerm());
+            }
+        });
+    }
+
+    /**
+     * Resolves initial target peer for a command execution.
+     *
+     * <p>Tries the known leader first, falling back to a random peer if no 
leader is known.
+     *
+     * @return Initial target peer, or {@code null}.
+     */
+    @Nullable
+    private Peer resolveInitialPeer() {
+        Peer targetPeer = leader;
+        if (targetPeer == null) {
+            targetPeer = randomNode(null, false);
+        }
+        return targetPeer;
+    }
+
+    /**
+     * Tries all peers once without waiting for leader.
+     *
+     * @param resultFuture Future that completes with the response, or fails 
with {@link ReplicationGroupUnavailableException} if no
+     *         peer responds successfully.
+     * @param cmd The command to execute.
+     */
+    private void tryAllPeersOnce(CompletableFuture<ActionResponse> 
resultFuture, Command cmd) {
+        Peer targetPeer = resolveInitialPeer();
+        if (targetPeer == null) {
+            resultFuture.completeExceptionally(new 
ReplicationGroupUnavailableException(groupId()));
+
+            return;
+        }
+
+        var context = new RetryContext(
+                groupId().toString(),
+                targetPeer,
+                cmd::toStringForLightLogging,
+                createRequestFactory(cmd),
+                0,  // Single attempt - no retry timeout
+                RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+        );
+
+        sendWithRetrySingleAttempt(resultFuture, context);
+    }
+
+    /**
+     * Executes an action within busy lock and transforms the response.
+     */
+    private <R> CompletableFuture<R> 
executeWithBusyLock(Consumer<CompletableFuture<ActionResponse>> action) {
+        var responseFuture = new CompletableFuture<ActionResponse>();
+        var resultFuture = new CompletableFuture<R>();
+
+        if (!busyLock.enterBusy()) {
+            
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client 
is stopping [" + groupId() + "]."));
+
+            return resultFuture;
+        }
+
+        try {
+            action.accept(responseFuture);
+
+            // Transform ActionResponse to result type.
+            responseFuture.whenComplete((resp, err) -> {
+                if (err != null) {
+                    resultFuture.completeExceptionally(err);
+                } else {
+                    resultFuture.complete((R) resp.result());
+                }
+            });
+
+            return resultFuture;
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Applies deadline to a future using orTimeout.
+     *
+     * @param future The future to apply deadline to.
+     * @param deadline Deadline in monotonic milliseconds, or Long.MAX_VALUE 
for no deadline.
+     * @return The future with timeout applied, or the original future if no 
deadline.
+     */
+    private static <T> CompletableFuture<T> applyDeadline(CompletableFuture<T> 
future, long deadline) {
+        if (deadline == Long.MAX_VALUE) {
+            return future;
+        }
+        long remainingTime = deadline - Utils.monotonicMs();
+        if (remainingTime <= 0) {
+            return CompletableFuture.failedFuture(new TimeoutException());
+        }
+        return future.orTimeout(remainingTime, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Creates a timeout exception for leader wait.
+     */
+    private ReplicationGroupUnavailableException createTimeoutException() {
+        return new ReplicationGroupUnavailableException(
+                groupId(),
+                "Timeout waiting for leader [groupId=" + groupId() + "]."
+        );
+    }
+
+    /**
+     * Waits for leader to become available and then retries the command.
+     */
+    private void waitForLeaderAndRetry(CompletableFuture<ActionResponse> 
resultFuture, Command cmd, long deadline) {
+        CompletableFuture<Long> leaderFuture = 
leaderAvailabilityState.awaitLeader();
+
+        // Apply timeout if bounded.
+        CompletableFuture<Long> timedLeaderFuture = 
applyDeadline(leaderFuture, deadline);
+
+        timedLeaderFuture.whenCompleteAsync((term, waitError) -> {
+            if (waitError != null) {
+                Throwable cause = unwrapCause(waitError);
+                if (cause instanceof TimeoutException) {
+                    
resultFuture.completeExceptionally(createTimeoutException());
+                } else {
+                    resultFuture.completeExceptionally(cause);
+                }
+                return;
+            }
+
+            if (!busyLock.enterBusy()) {
+                
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client 
is stopping [" + groupId() + "]."));
+                return;
+            }
+
+            try {
+                // Leader is available, now run the command with retry logic.
+                startRetryPhase(resultFuture, cmd, deadline, term);
+            } finally {
+                busyLock.leaveBusy();
+            }
+        }, executor);
+    }
+
+    /**
+     * Starts the retry phase after leader is available.
+     */
+    private void startRetryPhase(CompletableFuture<ActionResponse> 
resultFuture, Command cmd, long deadline, long term) {
+        Peer targetPeer = resolveInitialPeer();
+        if (targetPeer == null) {
+            resultFuture.completeExceptionally(new 
ReplicationGroupUnavailableException(groupId()));
+
+            return;
+        }
+
+        // Check deadline before starting retry phase.
+        long now = Utils.monotonicMs();
+        if (deadline != Long.MAX_VALUE && now >= deadline) {
+            resultFuture.completeExceptionally(createTimeoutException());
+            return;
+        }
+
+        // Use retry timeout bounded by remaining time until deadline.
+        long configTimeout = raftConfiguration.retryTimeoutMillis().value();
+
+        long sendWithRetryTimeoutMillis = deadline == Long.MAX_VALUE ? 
configTimeout : Math.min(configTimeout, deadline - now);
+
+        var context = new RetryContext(
+                groupId().toString(),
+                targetPeer,
+                cmd::toStringForLightLogging,
+                createRequestFactory(cmd),
+                sendWithRetryTimeoutMillis,
+                RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+        );
+
+        sendWithRetryWaitingForLeader(resultFuture, context, cmd, deadline, 
term);
+    }
+
+    /**
+     * Creates a request factory for the given command.
+     */
+    private Function<Peer, ActionRequest> createRequestFactory(Command cmd) {
+        if (cmd instanceof WriteCommand) {
+            return targetPeer -> MESSAGES_FACTORY.writeActionRequest()
+                    .groupId(groupId().toString())
+                    .command(commandsMarshaller.marshall(cmd))
+                    .deserializedCommand((WriteCommand) cmd)
+                    .build();
+        } else {
+            return targetPeer -> MESSAGES_FACTORY.readActionRequest()
+                    .groupId(groupId().toString())
+                    .command((ReadCommand) cmd)
+                    .readOnlySafe(true)
+                    .build();
+        }
+    }
+
+    /**
+     * Sends a request with single attempt (no retry on timeout).
+     *
+     * <p>In single-attempt mode, each peer is tried at most once.
+     */
+    private <R extends NetworkMessage> void sendWithRetrySingleAttempt(
+            CompletableFuture<R> fut,
+            RetryContext retryContext
+    ) {
+        if (!busyLock.enterBusy()) {
+            fut.completeExceptionally(stoppingExceptionFactory.create("Raft 
client is stopping [" + groupId() + "]."));
+            return;
+        }
+
+        try {
+            long responseTimeout = retryContext.responseTimeoutMillis() == 
RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+                    ? throttlingContextHolder.peerRequestTimeoutMillis() : 
retryContext.responseTimeoutMillis();
+
+            retryContext.onNewAttempt();
+
+            resolvePeer(retryContext.targetPeer())
+                    .thenCompose(node -> 
clusterService.messagingService().invoke(node, retryContext.request(), 
responseTimeout))
+                    .whenComplete((resp, err) -> {
+                        if (!busyLock.enterBusy()) {
+                            
fut.completeExceptionally(stoppingExceptionFactory.create("Raft client is 
stopping [" + groupId() + "]."));
+                            return;
+                        }
+
+                        try {
+                            if (err != null) {
+                                handleThrowableSingleAttempt(fut, err, 
retryContext);
+                            } else if (resp instanceof ErrorResponse) {
+                                handleErrorResponseSingleAttempt(fut, 
(ErrorResponse) resp, retryContext);
+                            } else if (resp instanceof SMErrorResponse) {
+                                handleSmErrorResponse(fut, (SMErrorResponse) 
resp, retryContext);
+                            } else {
+                                leader = retryContext.targetPeer();
+                                fut.complete((R) resp);
+                            }
+                        } catch (Throwable e) {
+                            fut.completeExceptionally(e);
+                        } finally {
+                            busyLock.leaveBusy();
+                        }
+                    });
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Validates throwable and returns next peer for retry.
+     *
+     * <p>If the error is not recoverable or no peers are available, completes 
the future exceptionally and returns null.
+     *
+     * @param fut Future to complete exceptionally if retry is not possible.
+     * @param err The throwable to check.
+     * @param retryContext Retry context for getting next peer.
+     * @return Next peer for retry, or null if the future was already 
completed with an error.
+     */
+    @Nullable
+    private Peer getNextPeerForRecoverableError(CompletableFuture<?> fut, 
Throwable err, RetryContext retryContext) {
+        err = unwrapCause(err);
+
+        if (!recoverable(err)) {
+            fut.completeExceptionally(err);
+
+            return null;
+        }
+
+        Peer nextPeer = randomNode(retryContext, false);
+
+        if (nextPeer == null) {
+            fut.completeExceptionally(new 
ReplicationGroupUnavailableException(groupId()));
+
+            return null;
+        }
+
+        return nextPeer;
+    }
+
+    private static void logRecoverableError(RetryContext retryContext, Peer 
nextPeer) {
+        if (LOG.isDebugEnabled()) {

Review Comment:
   Kind of tastes, optional comment: I believe that then less indentations in a 
code then better, so approach like:
   
   ```java
           if (!LOG.isDebugEnabled()) {
               return;
           }
   
           LOG.debug(
                   "Recoverable error during the request occurred (will be 
retried) [request={}, peer={}, newPeer={}, traceId={}].",
                   includeSensitive() ? retryContext.request() : 
retryContext.request().toStringForLightLogging(),
                   retryContext.targetPeer(),
                   nextPeer,
                   retryContext.errorTraceId()
           );
   ```
   
   in my perspective will be better. Future us will have 4 more characters for 
the message in line. But as I said, you may omit this comment.



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -326,7 +397,823 @@ private static boolean recoverable(Throwable t) {
 
     @Override
     public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
-        return raftClient.run(cmd, timeoutMillis);
+        // Normalize timeout: negative values mean infinite wait.
+        long effectiveTimeout = (timeoutMillis < 0) ? Long.MAX_VALUE : 
timeoutMillis;
+        // Wait for leader mode (bounded or infinite).
+        long deadline = Utils.monotonicMsAfter(effectiveTimeout);
+
+        return executeWithBusyLock(responseFuture -> {
+            if (effectiveTimeout == 0) {
+                tryAllPeersOnce(responseFuture, cmd);
+            } else {
+                startRetryPhase(responseFuture, cmd, deadline, 
leaderAvailabilityState.currentTerm());
+            }
+        });
+    }
+
+    /**
+     * Resolves initial target peer for a command execution.
+     *
+     * <p>Tries the known leader first, falling back to a random peer if no 
leader is known.
+     *
+     * @return Initial target peer, or {@code null}.
+     */
+    @Nullable
+    private Peer resolveInitialPeer() {
+        Peer targetPeer = leader;
+        if (targetPeer == null) {
+            targetPeer = randomNode(null, false);
+        }
+        return targetPeer;
+    }
+
+    /**
+     * Tries all peers once without waiting for leader.
+     *
+     * @param resultFuture Future that completes with the response, or fails 
with {@link ReplicationGroupUnavailableException} if no
+     *         peer responds successfully.
+     * @param cmd The command to execute.
+     */
+    private void tryAllPeersOnce(CompletableFuture<ActionResponse> 
resultFuture, Command cmd) {
+        Peer targetPeer = resolveInitialPeer();
+        if (targetPeer == null) {
+            resultFuture.completeExceptionally(new 
ReplicationGroupUnavailableException(groupId()));
+
+            return;
+        }
+
+        var context = new RetryContext(
+                groupId().toString(),
+                targetPeer,
+                cmd::toStringForLightLogging,
+                createRequestFactory(cmd),
+                0,  // Single attempt - no retry timeout
+                RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+        );
+
+        sendWithRetrySingleAttempt(resultFuture, context);
+    }
+
+    /**
+     * Executes an action within busy lock and transforms the response.
+     */
+    private <R> CompletableFuture<R> 
executeWithBusyLock(Consumer<CompletableFuture<ActionResponse>> action) {
+        var responseFuture = new CompletableFuture<ActionResponse>();
+        var resultFuture = new CompletableFuture<R>();
+
+        if (!busyLock.enterBusy()) {
+            
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client 
is stopping [" + groupId() + "]."));
+
+            return resultFuture;
+        }
+
+        try {
+            action.accept(responseFuture);
+
+            // Transform ActionResponse to result type.
+            responseFuture.whenComplete((resp, err) -> {
+                if (err != null) {
+                    resultFuture.completeExceptionally(err);
+                } else {
+                    resultFuture.complete((R) resp.result());
+                }
+            });
+
+            return resultFuture;
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Applies deadline to a future using orTimeout.
+     *
+     * @param future The future to apply deadline to.
+     * @param deadline Deadline in monotonic milliseconds, or Long.MAX_VALUE 
for no deadline.
+     * @return The future with timeout applied, or the original future if no 
deadline.
+     */
+    private static <T> CompletableFuture<T> applyDeadline(CompletableFuture<T> 
future, long deadline) {
+        if (deadline == Long.MAX_VALUE) {
+            return future;
+        }
+        long remainingTime = deadline - Utils.monotonicMs();
+        if (remainingTime <= 0) {
+            return CompletableFuture.failedFuture(new TimeoutException());
+        }
+        return future.orTimeout(remainingTime, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Creates a timeout exception for leader wait.
+     */
+    private ReplicationGroupUnavailableException createTimeoutException() {
+        return new ReplicationGroupUnavailableException(
+                groupId(),
+                "Timeout waiting for leader [groupId=" + groupId() + "]."
+        );
+    }
+
+    /**
+     * Waits for leader to become available and then retries the command.
+     */
+    private void waitForLeaderAndRetry(CompletableFuture<ActionResponse> 
resultFuture, Command cmd, long deadline) {
+        CompletableFuture<Long> leaderFuture = 
leaderAvailabilityState.awaitLeader();
+
+        // Apply timeout if bounded.
+        CompletableFuture<Long> timedLeaderFuture = 
applyDeadline(leaderFuture, deadline);
+
+        timedLeaderFuture.whenCompleteAsync((term, waitError) -> {
+            if (waitError != null) {
+                Throwable cause = unwrapCause(waitError);
+                if (cause instanceof TimeoutException) {
+                    
resultFuture.completeExceptionally(createTimeoutException());
+                } else {
+                    resultFuture.completeExceptionally(cause);
+                }
+                return;
+            }
+
+            if (!busyLock.enterBusy()) {
+                
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client 
is stopping [" + groupId() + "]."));
+                return;
+            }
+
+            try {
+                // Leader is available, now run the command with retry logic.
+                startRetryPhase(resultFuture, cmd, deadline, term);
+            } finally {
+                busyLock.leaveBusy();
+            }
+        }, executor);
+    }
+
+    /**
+     * Starts the retry phase after leader is available.
+     */
+    private void startRetryPhase(CompletableFuture<ActionResponse> 
resultFuture, Command cmd, long deadline, long term) {
+        Peer targetPeer = resolveInitialPeer();
+        if (targetPeer == null) {
+            resultFuture.completeExceptionally(new 
ReplicationGroupUnavailableException(groupId()));
+
+            return;
+        }
+
+        // Check deadline before starting retry phase.
+        long now = Utils.monotonicMs();
+        if (deadline != Long.MAX_VALUE && now >= deadline) {
+            resultFuture.completeExceptionally(createTimeoutException());
+            return;
+        }
+
+        // Use retry timeout bounded by remaining time until deadline.
+        long configTimeout = raftConfiguration.retryTimeoutMillis().value();
+
+        long sendWithRetryTimeoutMillis = deadline == Long.MAX_VALUE ? 
configTimeout : Math.min(configTimeout, deadline - now);
+
+        var context = new RetryContext(
+                groupId().toString(),
+                targetPeer,
+                cmd::toStringForLightLogging,
+                createRequestFactory(cmd),
+                sendWithRetryTimeoutMillis,
+                RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+        );
+
+        sendWithRetryWaitingForLeader(resultFuture, context, cmd, deadline, 
term);
+    }
+
+    /**
+     * Creates a request factory for the given command.
+     */
+    private Function<Peer, ActionRequest> createRequestFactory(Command cmd) {
+        if (cmd instanceof WriteCommand) {
+            return targetPeer -> MESSAGES_FACTORY.writeActionRequest()
+                    .groupId(groupId().toString())
+                    .command(commandsMarshaller.marshall(cmd))
+                    .deserializedCommand((WriteCommand) cmd)
+                    .build();
+        } else {
+            return targetPeer -> MESSAGES_FACTORY.readActionRequest()
+                    .groupId(groupId().toString())
+                    .command((ReadCommand) cmd)
+                    .readOnlySafe(true)
+                    .build();
+        }
+    }
+
+    /**
+     * Sends a request with single attempt (no retry on timeout).
+     *
+     * <p>In single-attempt mode, each peer is tried at most once.
+     */
+    private <R extends NetworkMessage> void sendWithRetrySingleAttempt(
+            CompletableFuture<R> fut,
+            RetryContext retryContext
+    ) {
+        if (!busyLock.enterBusy()) {
+            fut.completeExceptionally(stoppingExceptionFactory.create("Raft 
client is stopping [" + groupId() + "]."));
+            return;
+        }
+
+        try {
+            long responseTimeout = retryContext.responseTimeoutMillis() == 
RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+                    ? throttlingContextHolder.peerRequestTimeoutMillis() : 
retryContext.responseTimeoutMillis();
+
+            retryContext.onNewAttempt();
+
+            resolvePeer(retryContext.targetPeer())
+                    .thenCompose(node -> 
clusterService.messagingService().invoke(node, retryContext.request(), 
responseTimeout))
+                    .whenComplete((resp, err) -> {
+                        if (!busyLock.enterBusy()) {
+                            
fut.completeExceptionally(stoppingExceptionFactory.create("Raft client is 
stopping [" + groupId() + "]."));
+                            return;
+                        }
+
+                        try {
+                            if (err != null) {
+                                handleThrowableSingleAttempt(fut, err, 
retryContext);
+                            } else if (resp instanceof ErrorResponse) {
+                                handleErrorResponseSingleAttempt(fut, 
(ErrorResponse) resp, retryContext);
+                            } else if (resp instanceof SMErrorResponse) {
+                                handleSmErrorResponse(fut, (SMErrorResponse) 
resp, retryContext);
+                            } else {
+                                leader = retryContext.targetPeer();
+                                fut.complete((R) resp);
+                            }
+                        } catch (Throwable e) {
+                            fut.completeExceptionally(e);
+                        } finally {
+                            busyLock.leaveBusy();
+                        }
+                    });
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Validates throwable and returns next peer for retry.
+     *
+     * <p>If the error is not recoverable or no peers are available, completes 
the future exceptionally and returns null.
+     *
+     * @param fut Future to complete exceptionally if retry is not possible.
+     * @param err The throwable to check.
+     * @param retryContext Retry context for getting next peer.
+     * @return Next peer for retry, or null if the future was already 
completed with an error.
+     */
+    @Nullable

Review Comment:
   Shouldn't we place the annotation right before `Peer` return type that may 
be `null`?



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -81,11 +119,32 @@ public class PhysicalTopologyAwareRaftGroupService 
implements TimeAwareRaftGroup
     private final RaftGroupService raftClient;
 
     /** Executor to invoke RPC requests. */
-    private final Executor executor;
+    private final ScheduledExecutorService executor;
 
     /** RAFT configuration. */
     private final RaftConfiguration raftConfiguration;
 
+    /** State machine for tracking leader availability. */
+    private final LeaderAvailabilityState leaderAvailabilityState;
+
+    /** Command marshaller. */
+    private final Marshaller commandsMarshaller;
+
+    /** Throttling context holder. */
+    private final ThrottlingContextHolder throttlingContextHolder;

Review Comment:
   I see no real usages for this object. Both usages are for throttling 
adaptive timeout as invoke response timeout. Yet not in production code, in all 
test usages there will be `-1` that is a "marker" to use this throttling 
timeout. So, we have some code that (yet) test-only. From the other hand we 
already have `raftConfiguration.responseTimeoutMillis().value()` for such case 
that may be easily changed for test purposes.
   
   In my opinion, both, this class "timeout related retry approach" and 
"adaptive delaying throttling retry approach" should be implementation a retry 
strategy interfaces. But I believe this is a ticket about refactoring for 
future us. Now I think it's better to just get value from the raft 
configuration there.



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -326,7 +397,823 @@ private static boolean recoverable(Throwable t) {
 
     @Override
     public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
-        return raftClient.run(cmd, timeoutMillis);
+        // Normalize timeout: negative values mean infinite wait.
+        long effectiveTimeout = (timeoutMillis < 0) ? Long.MAX_VALUE : 
timeoutMillis;
+        // Wait for leader mode (bounded or infinite).
+        long deadline = Utils.monotonicMsAfter(effectiveTimeout);
+
+        return executeWithBusyLock(responseFuture -> {
+            if (effectiveTimeout == 0) {
+                tryAllPeersOnce(responseFuture, cmd);
+            } else {
+                startRetryPhase(responseFuture, cmd, deadline, 
leaderAvailabilityState.currentTerm());
+            }
+        });
+    }
+
+    /**
+     * Resolves initial target peer for a command execution.
+     *
+     * <p>Tries the known leader first, falling back to a random peer if no 
leader is known.
+     *
+     * @return Initial target peer, or {@code null}.
+     */
+    @Nullable
+    private Peer resolveInitialPeer() {
+        Peer targetPeer = leader;
+        if (targetPeer == null) {
+            targetPeer = randomNode(null, false);
+        }
+        return targetPeer;
+    }
+
+    /**
+     * Tries all peers once without waiting for leader.
+     *
+     * @param resultFuture Future that completes with the response, or fails 
with {@link ReplicationGroupUnavailableException} if no
+     *         peer responds successfully.
+     * @param cmd The command to execute.
+     */
+    private void tryAllPeersOnce(CompletableFuture<ActionResponse> 
resultFuture, Command cmd) {
+        Peer targetPeer = resolveInitialPeer();
+        if (targetPeer == null) {
+            resultFuture.completeExceptionally(new 
ReplicationGroupUnavailableException(groupId()));
+
+            return;
+        }
+
+        var context = new RetryContext(
+                groupId().toString(),
+                targetPeer,
+                cmd::toStringForLightLogging,
+                createRequestFactory(cmd),
+                0,  // Single attempt - no retry timeout
+                RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+        );
+
+        sendWithRetrySingleAttempt(resultFuture, context);
+    }
+
+    /**
+     * Executes an action within busy lock and transforms the response.
+     */
+    private <R> CompletableFuture<R> 
executeWithBusyLock(Consumer<CompletableFuture<ActionResponse>> action) {
+        var responseFuture = new CompletableFuture<ActionResponse>();
+        var resultFuture = new CompletableFuture<R>();
+
+        if (!busyLock.enterBusy()) {
+            
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client 
is stopping [" + groupId() + "]."));

Review Comment:
   There and below in the file.



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -326,7 +397,823 @@ private static boolean recoverable(Throwable t) {
 
     @Override
     public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
-        return raftClient.run(cmd, timeoutMillis);
+        // Normalize timeout: negative values mean infinite wait.
+        long effectiveTimeout = (timeoutMillis < 0) ? Long.MAX_VALUE : 
timeoutMillis;
+        // Wait for leader mode (bounded or infinite).
+        long deadline = Utils.monotonicMsAfter(effectiveTimeout);
+
+        return executeWithBusyLock(responseFuture -> {
+            if (effectiveTimeout == 0) {
+                tryAllPeersOnce(responseFuture, cmd);
+            } else {
+                startRetryPhase(responseFuture, cmd, deadline, 
leaderAvailabilityState.currentTerm());
+            }
+        });
+    }
+
+    /**
+     * Resolves initial target peer for a command execution.
+     *
+     * <p>Tries the known leader first, falling back to a random peer if no 
leader is known.
+     *
+     * @return Initial target peer, or {@code null}.
+     */
+    @Nullable
+    private Peer resolveInitialPeer() {
+        Peer targetPeer = leader;
+        if (targetPeer == null) {
+            targetPeer = randomNode(null, false);
+        }
+        return targetPeer;
+    }
+
+    /**
+     * Tries all peers once without waiting for leader.
+     *
+     * @param resultFuture Future that completes with the response, or fails 
with {@link ReplicationGroupUnavailableException} if no
+     *         peer responds successfully.
+     * @param cmd The command to execute.
+     */
+    private void tryAllPeersOnce(CompletableFuture<ActionResponse> 
resultFuture, Command cmd) {
+        Peer targetPeer = resolveInitialPeer();
+        if (targetPeer == null) {
+            resultFuture.completeExceptionally(new 
ReplicationGroupUnavailableException(groupId()));
+
+            return;
+        }
+
+        var context = new RetryContext(
+                groupId().toString(),
+                targetPeer,
+                cmd::toStringForLightLogging,
+                createRequestFactory(cmd),
+                0,  // Single attempt - no retry timeout
+                RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+        );
+
+        sendWithRetrySingleAttempt(resultFuture, context);
+    }
+
+    /**
+     * Executes an action within busy lock and transforms the response.
+     */
+    private <R> CompletableFuture<R> 
executeWithBusyLock(Consumer<CompletableFuture<ActionResponse>> action) {
+        var responseFuture = new CompletableFuture<ActionResponse>();
+        var resultFuture = new CompletableFuture<R>();
+
+        if (!busyLock.enterBusy()) {
+            
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client 
is stopping [" + groupId() + "]."));
+
+            return resultFuture;
+        }
+
+        try {
+            action.accept(responseFuture);
+
+            // Transform ActionResponse to result type.
+            responseFuture.whenComplete((resp, err) -> {
+                if (err != null) {
+                    resultFuture.completeExceptionally(err);
+                } else {
+                    resultFuture.complete((R) resp.result());
+                }
+            });
+
+            return resultFuture;
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Applies deadline to a future using orTimeout.
+     *
+     * @param future The future to apply deadline to.
+     * @param deadline Deadline in monotonic milliseconds, or Long.MAX_VALUE 
for no deadline.
+     * @return The future with timeout applied, or the original future if no 
deadline.
+     */
+    private static <T> CompletableFuture<T> applyDeadline(CompletableFuture<T> 
future, long deadline) {
+        if (deadline == Long.MAX_VALUE) {
+            return future;
+        }
+        long remainingTime = deadline - Utils.monotonicMs();
+        if (remainingTime <= 0) {
+            return CompletableFuture.failedFuture(new TimeoutException());
+        }
+        return future.orTimeout(remainingTime, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Creates a timeout exception for leader wait.
+     */
+    private ReplicationGroupUnavailableException createTimeoutException() {
+        return new ReplicationGroupUnavailableException(
+                groupId(),
+                "Timeout waiting for leader [groupId=" + groupId() + "]."
+        );
+    }
+
+    /**
+     * Waits for leader to become available and then retries the command.
+     */
+    private void waitForLeaderAndRetry(CompletableFuture<ActionResponse> 
resultFuture, Command cmd, long deadline) {
+        CompletableFuture<Long> leaderFuture = 
leaderAvailabilityState.awaitLeader();
+
+        // Apply timeout if bounded.
+        CompletableFuture<Long> timedLeaderFuture = 
applyDeadline(leaderFuture, deadline);
+
+        timedLeaderFuture.whenCompleteAsync((term, waitError) -> {
+            if (waitError != null) {
+                Throwable cause = unwrapCause(waitError);
+                if (cause instanceof TimeoutException) {
+                    
resultFuture.completeExceptionally(createTimeoutException());
+                } else {
+                    resultFuture.completeExceptionally(cause);
+                }
+                return;
+            }
+
+            if (!busyLock.enterBusy()) {
+                
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client 
is stopping [" + groupId() + "]."));
+                return;
+            }
+
+            try {
+                // Leader is available, now run the command with retry logic.
+                startRetryPhase(resultFuture, cmd, deadline, term);
+            } finally {
+                busyLock.leaveBusy();
+            }
+        }, executor);
+    }
+
+    /**
+     * Starts the retry phase after leader is available.
+     */
+    private void startRetryPhase(CompletableFuture<ActionResponse> 
resultFuture, Command cmd, long deadline, long term) {
+        Peer targetPeer = resolveInitialPeer();
+        if (targetPeer == null) {
+            resultFuture.completeExceptionally(new 
ReplicationGroupUnavailableException(groupId()));
+
+            return;
+        }
+
+        // Check deadline before starting retry phase.
+        long now = Utils.monotonicMs();
+        if (deadline != Long.MAX_VALUE && now >= deadline) {
+            resultFuture.completeExceptionally(createTimeoutException());
+            return;
+        }
+
+        // Use retry timeout bounded by remaining time until deadline.
+        long configTimeout = raftConfiguration.retryTimeoutMillis().value();
+
+        long sendWithRetryTimeoutMillis = deadline == Long.MAX_VALUE ? 
configTimeout : Math.min(configTimeout, deadline - now);
+
+        var context = new RetryContext(
+                groupId().toString(),
+                targetPeer,
+                cmd::toStringForLightLogging,
+                createRequestFactory(cmd),
+                sendWithRetryTimeoutMillis,
+                RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+        );
+
+        sendWithRetryWaitingForLeader(resultFuture, context, cmd, deadline, 
term);
+    }
+
+    /**
+     * Creates a request factory for the given command.
+     */
+    private Function<Peer, ActionRequest> createRequestFactory(Command cmd) {
+        if (cmd instanceof WriteCommand) {
+            return targetPeer -> MESSAGES_FACTORY.writeActionRequest()
+                    .groupId(groupId().toString())
+                    .command(commandsMarshaller.marshall(cmd))
+                    .deserializedCommand((WriteCommand) cmd)
+                    .build();
+        } else {
+            return targetPeer -> MESSAGES_FACTORY.readActionRequest()
+                    .groupId(groupId().toString())
+                    .command((ReadCommand) cmd)
+                    .readOnlySafe(true)
+                    .build();
+        }
+    }
+
+    /**
+     * Sends a request with single attempt (no retry on timeout).
+     *
+     * <p>In single-attempt mode, each peer is tried at most once.
+     */
+    private <R extends NetworkMessage> void sendWithRetrySingleAttempt(
+            CompletableFuture<R> fut,
+            RetryContext retryContext
+    ) {
+        if (!busyLock.enterBusy()) {
+            fut.completeExceptionally(stoppingExceptionFactory.create("Raft 
client is stopping [" + groupId() + "]."));
+            return;
+        }
+
+        try {
+            long responseTimeout = retryContext.responseTimeoutMillis() == 
RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+                    ? throttlingContextHolder.peerRequestTimeoutMillis() : 
retryContext.responseTimeoutMillis();
+
+            retryContext.onNewAttempt();
+
+            resolvePeer(retryContext.targetPeer())
+                    .thenCompose(node -> 
clusterService.messagingService().invoke(node, retryContext.request(), 
responseTimeout))
+                    .whenComplete((resp, err) -> {
+                        if (!busyLock.enterBusy()) {
+                            
fut.completeExceptionally(stoppingExceptionFactory.create("Raft client is 
stopping [" + groupId() + "]."));
+                            return;
+                        }
+
+                        try {
+                            if (err != null) {
+                                handleThrowableSingleAttempt(fut, err, 
retryContext);
+                            } else if (resp instanceof ErrorResponse) {
+                                handleErrorResponseSingleAttempt(fut, 
(ErrorResponse) resp, retryContext);
+                            } else if (resp instanceof SMErrorResponse) {
+                                handleSmErrorResponse(fut, (SMErrorResponse) 
resp, retryContext);
+                            } else {
+                                leader = retryContext.targetPeer();
+                                fut.complete((R) resp);
+                            }
+                        } catch (Throwable e) {
+                            fut.completeExceptionally(e);
+                        } finally {
+                            busyLock.leaveBusy();
+                        }
+                    });
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Validates throwable and returns next peer for retry.
+     *
+     * <p>If the error is not recoverable or no peers are available, completes 
the future exceptionally and returns null.
+     *
+     * @param fut Future to complete exceptionally if retry is not possible.
+     * @param err The throwable to check.
+     * @param retryContext Retry context for getting next peer.
+     * @return Next peer for retry, or null if the future was already 
completed with an error.
+     */
+    @Nullable
+    private Peer getNextPeerForRecoverableError(CompletableFuture<?> fut, 
Throwable err, RetryContext retryContext) {
+        err = unwrapCause(err);
+
+        if (!recoverable(err)) {
+            fut.completeExceptionally(err);
+
+            return null;
+        }
+
+        Peer nextPeer = randomNode(retryContext, false);
+
+        if (nextPeer == null) {
+            fut.completeExceptionally(new 
ReplicationGroupUnavailableException(groupId()));
+
+            return null;
+        }
+
+        return nextPeer;
+    }
+
+    private static void logRecoverableError(RetryContext retryContext, Peer 
nextPeer) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(
+                    "Recoverable error during the request occurred (will be 
retried) [request={}, peer={}, newPeer={}, traceId={}].",
+                    includeSensitive() ? retryContext.request() : 
retryContext.request().toStringForLightLogging(),
+                    retryContext.targetPeer(),
+                    nextPeer,
+                    retryContext.errorTraceId()
+            );
+        }
+    }
+
+    /**
+     * Handles throwable in single attempt mode.
+     */
+    private void handleThrowableSingleAttempt(
+            CompletableFuture<? extends NetworkMessage> fut,
+            Throwable err,
+            RetryContext retryContext
+    ) {
+        Peer nextPeer = getNextPeerForRecoverableError(fut, err, retryContext);
+
+        if (nextPeer == null) {
+            return;
+        }
+
+        logRecoverableError(retryContext, nextPeer);
+
+        String shortReasonMessage = "Peer " + 
retryContext.targetPeer().consistentId()
+                + " threw " + unwrapCause(err).getClass().getSimpleName();
+        sendWithRetrySingleAttempt(fut, 
retryContext.nextAttemptForUnavailablePeer(nextPeer, shortReasonMessage));
+    }
+
+    /**
+     * Handles error response in single attempt mode.
+     *
+     * <p>In single-attempt mode, each peer is tried at most once. All errors 
mark the peer
+     * as tried and move to the next peer. When all peers have been tried, the 
request fails
+     * with {@link ReplicationGroupUnavailableException}.
+     */
+    private void handleErrorResponseSingleAttempt(
+            CompletableFuture<? extends NetworkMessage> fut,
+            ErrorResponse resp,
+            RetryContext retryContext
+    ) {
+        RetryExecutionStrategy strategy = new RetryExecutionStrategy() {
+            @Override
+            public void executeRetry(RetryContext context, Peer nextPeer, 
@Nullable PeerTracking trackCurrentAs, String reason) {
+                // Single-attempt mode: ALWAYS mark current peer as 
unavailable, regardless of trackCurrentAs.
+                // This prevents retrying the same peer and matches original 
behavior where:
+                // - transient errors: would select new peer, mark current 
unavailable
+                // - peer unavailable: marks current unavailable
+                // - no leader: marks current unavailable (no NO_LEADER 
distinction needed)
+                // - leader redirect: marks current unavailable (to prevent 
redirect loops)
+                sendWithRetrySingleAttempt(fut, 
context.nextAttemptForUnavailablePeer(nextPeer, reason));
+            }
+
+            @Override
+            public void onAllPeersExhausted() {
+                fut.completeExceptionally(new 
ReplicationGroupUnavailableException(groupId()));
+            }
+        };
+
+        handleErrorResponseCommon(fut, resp, retryContext, strategy, false);
+    }
+
+    /**
+     * Sends a request with retry and waits for leader on leader absence.
+     */
+    private void sendWithRetryWaitingForLeader(
+            CompletableFuture<ActionResponse> fut,
+            RetryContext retryContext,
+            Command cmd,
+            long deadline,
+            long termWhenStarted
+    ) {
+        if (!busyLock.enterBusy()) {
+            fut.completeExceptionally(stoppingExceptionFactory.create("Raft 
client is stopping [" + groupId() + "]."));
+            return;
+        }
+
+        try {
+            long requestStartTime = Utils.monotonicMs();
+            long stopTime = retryContext.stopTime();
+
+            if (requestStartTime >= stopTime) {
+                // Retry timeout expired - fail with timeout exception.
+                // For non-leader-related retriable errors, we don't wait for 
leader, just fail.
+                fut.completeExceptionally(createTimeoutException());
+                return;
+            }
+
+            ThrottlingContextHolder peerThrottlingContextHolder = 
throttlingContextHolder.peerContextHolder(
+                    retryContext.targetPeer().consistentId()
+            );
+
+            peerThrottlingContextHolder.beforeRequest();
+            retryContext.onNewAttempt();
+
+            // Bound response timeout by remaining time until retry stop time.
+            long responseTimeout = 
Math.min(peerThrottlingContextHolder.peerRequestTimeoutMillis(), stopTime - 
requestStartTime);
+
+            resolvePeer(retryContext.targetPeer())
+                    .thenCompose(node -> 
clusterService.messagingService().invoke(node, retryContext.request(), 
responseTimeout))
+                    // Enforce timeout even if messaging service doesn't 
(e.g., in tests with mocks).
+                    .orTimeout(responseTimeout > 0 ? responseTimeout : 
Long.MAX_VALUE, TimeUnit.MILLISECONDS)
+                    .whenComplete((resp, err) -> {
+                        
peerThrottlingContextHolder.afterRequest(requestStartTime, retriableError(err, 
resp));
+
+                        if (!busyLock.enterBusy()) {
+                            
fut.completeExceptionally(stoppingExceptionFactory.create("Raft client is 
stopping [" + groupId() + "]."));
+                            return;
+                        }
+
+                        try {
+                            if (err != null) {
+                                handleThrowableWithLeaderWait(fut, err, 
retryContext, cmd, deadline, termWhenStarted);
+                            } else if (resp instanceof ErrorResponse) {
+                                handleErrorResponseWithLeaderWait(fut, 
(ErrorResponse) resp, retryContext, cmd, deadline, termWhenStarted);
+                            } else if (resp instanceof SMErrorResponse) {
+                                handleSmErrorResponse(fut, (SMErrorResponse) 
resp, retryContext);
+                            } else {
+                                leader = retryContext.targetPeer();
+                                fut.complete((ActionResponse) resp);
+                            }
+                        } catch (Throwable e) {
+                            fut.completeExceptionally(e);
+                        } finally {
+                            busyLock.leaveBusy();
+                        }
+                    });
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Handles throwable in leader-wait mode.
+     */
+    private void handleThrowableWithLeaderWait(
+            CompletableFuture<ActionResponse> fut,
+            Throwable err,
+            RetryContext retryContext,
+            Command cmd,
+            long deadline,
+            long termWhenStarted
+    ) {
+        Peer nextPeer = getNextPeerForRecoverableError(fut, err, retryContext);
+
+        if (nextPeer == null) {
+            return;
+        }
+
+        logRecoverableError(retryContext, nextPeer);
+
+        String shortReasonMessage = "Peer " + 
retryContext.targetPeer().consistentId()
+                + " threw " + unwrapCause(err).getClass().getSimpleName();
+        scheduleRetryWithLeaderWait(fut, retryContext.nextAttempt(nextPeer, 
shortReasonMessage), cmd, deadline, termWhenStarted);
+    }
+
+    /**
+     * Handles error response in leader-wait mode.
+     *
+     * <p>In leader-wait mode:
+     * <ul>
+     *     <li>Transient errors (EBUSY/EAGAIN) retry on the same peer after 
delay</li>
+     *     <li>"No leader" errors try each peer once, then wait for leader 
notification</li>
+     *     <li>Peer unavailability errors cycle through peers until 
timeout</li>
+     * </ul>
+     */
+    private void handleErrorResponseWithLeaderWait(
+            CompletableFuture<ActionResponse> fut,
+            ErrorResponse resp,
+            RetryContext retryContext,
+            Command cmd,
+            long deadline,
+            long termWhenStarted
+    ) {
+        RetryExecutionStrategy strategy = new RetryExecutionStrategy() {

Review Comment:
   This and there I believe we should have named implementations. Moreover, I 
believe that `RetryExecutionStrategy` will be useful in a future refactoring 
that I mentioned. Let's extract it as a separate interface?



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -326,7 +397,823 @@ private static boolean recoverable(Throwable t) {
 
     @Override
     public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
-        return raftClient.run(cmd, timeoutMillis);
+        // Normalize timeout: negative values mean infinite wait.
+        long effectiveTimeout = (timeoutMillis < 0) ? Long.MAX_VALUE : 
timeoutMillis;
+        // Wait for leader mode (bounded or infinite).
+        long deadline = Utils.monotonicMsAfter(effectiveTimeout);
+
+        return executeWithBusyLock(responseFuture -> {
+            if (effectiveTimeout == 0) {
+                tryAllPeersOnce(responseFuture, cmd);
+            } else {
+                startRetryPhase(responseFuture, cmd, deadline, 
leaderAvailabilityState.currentTerm());
+            }
+        });
+    }
+
+    /**
+     * Resolves initial target peer for a command execution.
+     *
+     * <p>Tries the known leader first, falling back to a random peer if no 
leader is known.
+     *
+     * @return Initial target peer, or {@code null}.
+     */
+    @Nullable
+    private Peer resolveInitialPeer() {
+        Peer targetPeer = leader;
+        if (targetPeer == null) {
+            targetPeer = randomNode(null, false);
+        }
+        return targetPeer;
+    }
+
+    /**
+     * Tries all peers once without waiting for leader.
+     *
+     * @param resultFuture Future that completes with the response, or fails 
with {@link ReplicationGroupUnavailableException} if no
+     *         peer responds successfully.
+     * @param cmd The command to execute.
+     */
+    private void tryAllPeersOnce(CompletableFuture<ActionResponse> 
resultFuture, Command cmd) {
+        Peer targetPeer = resolveInitialPeer();
+        if (targetPeer == null) {
+            resultFuture.completeExceptionally(new 
ReplicationGroupUnavailableException(groupId()));
+
+            return;
+        }
+
+        var context = new RetryContext(
+                groupId().toString(),
+                targetPeer,
+                cmd::toStringForLightLogging,
+                createRequestFactory(cmd),
+                0,  // Single attempt - no retry timeout
+                RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+        );
+
+        sendWithRetrySingleAttempt(resultFuture, context);
+    }
+
+    /**
+     * Executes an action within busy lock and transforms the response.
+     */
+    private <R> CompletableFuture<R> 
executeWithBusyLock(Consumer<CompletableFuture<ActionResponse>> action) {
+        var responseFuture = new CompletableFuture<ActionResponse>();
+        var resultFuture = new CompletableFuture<R>();
+
+        if (!busyLock.enterBusy()) {
+            
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client 
is stopping [" + groupId() + "]."));
+
+            return resultFuture;
+        }
+
+        try {
+            action.accept(responseFuture);
+
+            // Transform ActionResponse to result type.
+            responseFuture.whenComplete((resp, err) -> {
+                if (err != null) {
+                    resultFuture.completeExceptionally(err);
+                } else {
+                    resultFuture.complete((R) resp.result());
+                }
+            });
+
+            return resultFuture;
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Applies deadline to a future using orTimeout.
+     *
+     * @param future The future to apply deadline to.
+     * @param deadline Deadline in monotonic milliseconds, or Long.MAX_VALUE 
for no deadline.
+     * @return The future with timeout applied, or the original future if no 
deadline.
+     */
+    private static <T> CompletableFuture<T> applyDeadline(CompletableFuture<T> 
future, long deadline) {
+        if (deadline == Long.MAX_VALUE) {
+            return future;
+        }
+        long remainingTime = deadline - Utils.monotonicMs();
+        if (remainingTime <= 0) {
+            return CompletableFuture.failedFuture(new TimeoutException());
+        }
+        return future.orTimeout(remainingTime, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Creates a timeout exception for leader wait.
+     */
+    private ReplicationGroupUnavailableException createTimeoutException() {
+        return new ReplicationGroupUnavailableException(
+                groupId(),
+                "Timeout waiting for leader [groupId=" + groupId() + "]."
+        );
+    }
+
+    /**
+     * Waits for leader to become available and then retries the command.
+     */
+    private void waitForLeaderAndRetry(CompletableFuture<ActionResponse> 
resultFuture, Command cmd, long deadline) {
+        CompletableFuture<Long> leaderFuture = 
leaderAvailabilityState.awaitLeader();
+
+        // Apply timeout if bounded.
+        CompletableFuture<Long> timedLeaderFuture = 
applyDeadline(leaderFuture, deadline);
+
+        timedLeaderFuture.whenCompleteAsync((term, waitError) -> {
+            if (waitError != null) {
+                Throwable cause = unwrapCause(waitError);
+                if (cause instanceof TimeoutException) {
+                    
resultFuture.completeExceptionally(createTimeoutException());
+                } else {
+                    resultFuture.completeExceptionally(cause);
+                }
+                return;
+            }
+
+            if (!busyLock.enterBusy()) {
+                
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client 
is stopping [" + groupId() + "]."));
+                return;
+            }
+
+            try {
+                // Leader is available, now run the command with retry logic.
+                startRetryPhase(resultFuture, cmd, deadline, term);
+            } finally {
+                busyLock.leaveBusy();
+            }
+        }, executor);
+    }
+
+    /**
+     * Starts the retry phase after leader is available.
+     */
+    private void startRetryPhase(CompletableFuture<ActionResponse> 
resultFuture, Command cmd, long deadline, long term) {
+        Peer targetPeer = resolveInitialPeer();
+        if (targetPeer == null) {
+            resultFuture.completeExceptionally(new 
ReplicationGroupUnavailableException(groupId()));
+
+            return;
+        }
+
+        // Check deadline before starting retry phase.
+        long now = Utils.monotonicMs();
+        if (deadline != Long.MAX_VALUE && now >= deadline) {
+            resultFuture.completeExceptionally(createTimeoutException());
+            return;
+        }
+
+        // Use retry timeout bounded by remaining time until deadline.
+        long configTimeout = raftConfiguration.retryTimeoutMillis().value();
+
+        long sendWithRetryTimeoutMillis = deadline == Long.MAX_VALUE ? 
configTimeout : Math.min(configTimeout, deadline - now);
+
+        var context = new RetryContext(
+                groupId().toString(),
+                targetPeer,
+                cmd::toStringForLightLogging,
+                createRequestFactory(cmd),
+                sendWithRetryTimeoutMillis,
+                RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+        );
+
+        sendWithRetryWaitingForLeader(resultFuture, context, cmd, deadline, 
term);
+    }
+
+    /**
+     * Creates a request factory for the given command.
+     */
+    private Function<Peer, ActionRequest> createRequestFactory(Command cmd) {
+        if (cmd instanceof WriteCommand) {
+            return targetPeer -> MESSAGES_FACTORY.writeActionRequest()
+                    .groupId(groupId().toString())
+                    .command(commandsMarshaller.marshall(cmd))
+                    .deserializedCommand((WriteCommand) cmd)
+                    .build();
+        } else {
+            return targetPeer -> MESSAGES_FACTORY.readActionRequest()
+                    .groupId(groupId().toString())
+                    .command((ReadCommand) cmd)
+                    .readOnlySafe(true)
+                    .build();
+        }
+    }
+
+    /**
+     * Sends a request with single attempt (no retry on timeout).
+     *
+     * <p>In single-attempt mode, each peer is tried at most once.
+     */
+    private <R extends NetworkMessage> void sendWithRetrySingleAttempt(
+            CompletableFuture<R> fut,
+            RetryContext retryContext
+    ) {
+        if (!busyLock.enterBusy()) {
+            fut.completeExceptionally(stoppingExceptionFactory.create("Raft 
client is stopping [" + groupId() + "]."));
+            return;
+        }
+
+        try {
+            long responseTimeout = retryContext.responseTimeoutMillis() == 
RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+                    ? throttlingContextHolder.peerRequestTimeoutMillis() : 
retryContext.responseTimeoutMillis();
+
+            retryContext.onNewAttempt();
+
+            resolvePeer(retryContext.targetPeer())
+                    .thenCompose(node -> 
clusterService.messagingService().invoke(node, retryContext.request(), 
responseTimeout))
+                    .whenComplete((resp, err) -> {
+                        if (!busyLock.enterBusy()) {
+                            
fut.completeExceptionally(stoppingExceptionFactory.create("Raft client is 
stopping [" + groupId() + "]."));
+                            return;
+                        }
+
+                        try {
+                            if (err != null) {
+                                handleThrowableSingleAttempt(fut, err, 
retryContext);
+                            } else if (resp instanceof ErrorResponse) {
+                                handleErrorResponseSingleAttempt(fut, 
(ErrorResponse) resp, retryContext);
+                            } else if (resp instanceof SMErrorResponse) {
+                                handleSmErrorResponse(fut, (SMErrorResponse) 
resp, retryContext);
+                            } else {
+                                leader = retryContext.targetPeer();
+                                fut.complete((R) resp);
+                            }
+                        } catch (Throwable e) {
+                            fut.completeExceptionally(e);
+                        } finally {
+                            busyLock.leaveBusy();
+                        }
+                    });
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Validates throwable and returns next peer for retry.

Review Comment:
   I think it's better to decouple validation and next peer selection to 
separate methods. On non recoverable throwable you will return `null`. About 
validation I know from the code and javadoc, but method name doesn't tell us 
about it.



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -326,7 +397,823 @@ private static boolean recoverable(Throwable t) {
 
     @Override
     public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
-        return raftClient.run(cmd, timeoutMillis);
+        // Normalize timeout: negative values mean infinite wait.
+        long effectiveTimeout = (timeoutMillis < 0) ? Long.MAX_VALUE : 
timeoutMillis;
+        // Wait for leader mode (bounded or infinite).
+        long deadline = Utils.monotonicMsAfter(effectiveTimeout);
+
+        return executeWithBusyLock(responseFuture -> {
+            if (effectiveTimeout == 0) {
+                tryAllPeersOnce(responseFuture, cmd);
+            } else {
+                startRetryPhase(responseFuture, cmd, deadline, 
leaderAvailabilityState.currentTerm());
+            }
+        });
+    }
+
+    /**
+     * Resolves initial target peer for a command execution.
+     *
+     * <p>Tries the known leader first, falling back to a random peer if no 
leader is known.
+     *
+     * @return Initial target peer, or {@code null}.
+     */
+    @Nullable
+    private Peer resolveInitialPeer() {
+        Peer targetPeer = leader;
+        if (targetPeer == null) {
+            targetPeer = randomNode(null, false);
+        }
+        return targetPeer;
+    }
+
+    /**
+     * Tries all peers once without waiting for leader.
+     *
+     * @param resultFuture Future that completes with the response, or fails 
with {@link ReplicationGroupUnavailableException} if no
+     *         peer responds successfully.
+     * @param cmd The command to execute.
+     */
+    private void tryAllPeersOnce(CompletableFuture<ActionResponse> 
resultFuture, Command cmd) {
+        Peer targetPeer = resolveInitialPeer();
+        if (targetPeer == null) {
+            resultFuture.completeExceptionally(new 
ReplicationGroupUnavailableException(groupId()));
+
+            return;
+        }
+
+        var context = new RetryContext(
+                groupId().toString(),
+                targetPeer,
+                cmd::toStringForLightLogging,
+                createRequestFactory(cmd),
+                0,  // Single attempt - no retry timeout
+                RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+        );
+
+        sendWithRetrySingleAttempt(resultFuture, context);
+    }
+
+    /**
+     * Executes an action within busy lock and transforms the response.
+     */
+    private <R> CompletableFuture<R> 
executeWithBusyLock(Consumer<CompletableFuture<ActionResponse>> action) {
+        var responseFuture = new CompletableFuture<ActionResponse>();
+        var resultFuture = new CompletableFuture<R>();
+
+        if (!busyLock.enterBusy()) {
+            
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client 
is stopping [" + groupId() + "]."));

Review Comment:
   `[groupId=" + groupId() + "].`



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -326,7 +397,823 @@ private static boolean recoverable(Throwable t) {
 
     @Override
     public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
-        return raftClient.run(cmd, timeoutMillis);
+        // Normalize timeout: negative values mean infinite wait.
+        long effectiveTimeout = (timeoutMillis < 0) ? Long.MAX_VALUE : 
timeoutMillis;
+        // Wait for leader mode (bounded or infinite).
+        long deadline = Utils.monotonicMsAfter(effectiveTimeout);
+
+        return executeWithBusyLock(responseFuture -> {
+            if (effectiveTimeout == 0) {
+                tryAllPeersOnce(responseFuture, cmd);
+            } else {
+                startRetryPhase(responseFuture, cmd, deadline, 
leaderAvailabilityState.currentTerm());
+            }
+        });
+    }
+
+    /**
+     * Resolves initial target peer for a command execution.
+     *
+     * <p>Tries the known leader first, falling back to a random peer if no 
leader is known.
+     *
+     * @return Initial target peer, or {@code null}.
+     */
+    @Nullable
+    private Peer resolveInitialPeer() {
+        Peer targetPeer = leader;
+        if (targetPeer == null) {
+            targetPeer = randomNode(null, false);
+        }
+        return targetPeer;
+    }
+
+    /**
+     * Tries all peers once without waiting for leader.
+     *
+     * @param resultFuture Future that completes with the response, or fails 
with {@link ReplicationGroupUnavailableException} if no
+     *         peer responds successfully.
+     * @param cmd The command to execute.
+     */
+    private void tryAllPeersOnce(CompletableFuture<ActionResponse> 
resultFuture, Command cmd) {
+        Peer targetPeer = resolveInitialPeer();
+        if (targetPeer == null) {
+            resultFuture.completeExceptionally(new 
ReplicationGroupUnavailableException(groupId()));
+
+            return;
+        }
+
+        var context = new RetryContext(
+                groupId().toString(),
+                targetPeer,
+                cmd::toStringForLightLogging,
+                createRequestFactory(cmd),
+                0,  // Single attempt - no retry timeout
+                RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+        );
+
+        sendWithRetrySingleAttempt(resultFuture, context);
+    }
+
+    /**
+     * Executes an action within busy lock and transforms the response.
+     */
+    private <R> CompletableFuture<R> 
executeWithBusyLock(Consumer<CompletableFuture<ActionResponse>> action) {
+        var responseFuture = new CompletableFuture<ActionResponse>();
+        var resultFuture = new CompletableFuture<R>();
+
+        if (!busyLock.enterBusy()) {
+            
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client 
is stopping [" + groupId() + "]."));
+
+            return resultFuture;
+        }
+
+        try {
+            action.accept(responseFuture);
+
+            // Transform ActionResponse to result type.
+            responseFuture.whenComplete((resp, err) -> {
+                if (err != null) {
+                    resultFuture.completeExceptionally(err);
+                } else {
+                    resultFuture.complete((R) resp.result());
+                }
+            });
+
+            return resultFuture;
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Applies deadline to a future using orTimeout.
+     *
+     * @param future The future to apply deadline to.
+     * @param deadline Deadline in monotonic milliseconds, or Long.MAX_VALUE 
for no deadline.
+     * @return The future with timeout applied, or the original future if no 
deadline.
+     */
+    private static <T> CompletableFuture<T> applyDeadline(CompletableFuture<T> 
future, long deadline) {
+        if (deadline == Long.MAX_VALUE) {
+            return future;
+        }
+        long remainingTime = deadline - Utils.monotonicMs();
+        if (remainingTime <= 0) {
+            return CompletableFuture.failedFuture(new TimeoutException());
+        }
+        return future.orTimeout(remainingTime, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Creates a timeout exception for leader wait.
+     */
+    private ReplicationGroupUnavailableException createTimeoutException() {
+        return new ReplicationGroupUnavailableException(
+                groupId(),
+                "Timeout waiting for leader [groupId=" + groupId() + "]."
+        );
+    }
+
+    /**
+     * Waits for leader to become available and then retries the command.
+     */
+    private void waitForLeaderAndRetry(CompletableFuture<ActionResponse> 
resultFuture, Command cmd, long deadline) {
+        CompletableFuture<Long> leaderFuture = 
leaderAvailabilityState.awaitLeader();
+
+        // Apply timeout if bounded.
+        CompletableFuture<Long> timedLeaderFuture = 
applyDeadline(leaderFuture, deadline);
+
+        timedLeaderFuture.whenCompleteAsync((term, waitError) -> {
+            if (waitError != null) {
+                Throwable cause = unwrapCause(waitError);
+                if (cause instanceof TimeoutException) {
+                    
resultFuture.completeExceptionally(createTimeoutException());
+                } else {
+                    resultFuture.completeExceptionally(cause);
+                }
+                return;
+            }
+
+            if (!busyLock.enterBusy()) {
+                
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client 
is stopping [" + groupId() + "]."));
+                return;
+            }
+
+            try {
+                // Leader is available, now run the command with retry logic.
+                startRetryPhase(resultFuture, cmd, deadline, term);
+            } finally {
+                busyLock.leaveBusy();
+            }
+        }, executor);
+    }
+
+    /**
+     * Starts the retry phase after leader is available.
+     */
+    private void startRetryPhase(CompletableFuture<ActionResponse> 
resultFuture, Command cmd, long deadline, long term) {
+        Peer targetPeer = resolveInitialPeer();
+        if (targetPeer == null) {
+            resultFuture.completeExceptionally(new 
ReplicationGroupUnavailableException(groupId()));
+
+            return;
+        }
+
+        // Check deadline before starting retry phase.
+        long now = Utils.monotonicMs();
+        if (deadline != Long.MAX_VALUE && now >= deadline) {
+            resultFuture.completeExceptionally(createTimeoutException());
+            return;
+        }
+
+        // Use retry timeout bounded by remaining time until deadline.
+        long configTimeout = raftConfiguration.retryTimeoutMillis().value();
+
+        long sendWithRetryTimeoutMillis = deadline == Long.MAX_VALUE ? 
configTimeout : Math.min(configTimeout, deadline - now);
+
+        var context = new RetryContext(
+                groupId().toString(),
+                targetPeer,
+                cmd::toStringForLightLogging,
+                createRequestFactory(cmd),
+                sendWithRetryTimeoutMillis,
+                RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+        );
+
+        sendWithRetryWaitingForLeader(resultFuture, context, cmd, deadline, 
term);
+    }
+
+    /**
+     * Creates a request factory for the given command.
+     */
+    private Function<Peer, ActionRequest> createRequestFactory(Command cmd) {
+        if (cmd instanceof WriteCommand) {
+            return targetPeer -> MESSAGES_FACTORY.writeActionRequest()
+                    .groupId(groupId().toString())
+                    .command(commandsMarshaller.marshall(cmd))
+                    .deserializedCommand((WriteCommand) cmd)
+                    .build();
+        } else {
+            return targetPeer -> MESSAGES_FACTORY.readActionRequest()
+                    .groupId(groupId().toString())
+                    .command((ReadCommand) cmd)
+                    .readOnlySafe(true)
+                    .build();
+        }
+    }
+
+    /**
+     * Sends a request with single attempt (no retry on timeout).
+     *
+     * <p>In single-attempt mode, each peer is tried at most once.
+     */
+    private <R extends NetworkMessage> void sendWithRetrySingleAttempt(
+            CompletableFuture<R> fut,
+            RetryContext retryContext
+    ) {
+        if (!busyLock.enterBusy()) {
+            fut.completeExceptionally(stoppingExceptionFactory.create("Raft 
client is stopping [" + groupId() + "]."));
+            return;
+        }
+
+        try {
+            long responseTimeout = retryContext.responseTimeoutMillis() == 
RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+                    ? throttlingContextHolder.peerRequestTimeoutMillis() : 
retryContext.responseTimeoutMillis();
+
+            retryContext.onNewAttempt();
+
+            resolvePeer(retryContext.targetPeer())
+                    .thenCompose(node -> 
clusterService.messagingService().invoke(node, retryContext.request(), 
responseTimeout))
+                    .whenComplete((resp, err) -> {
+                        if (!busyLock.enterBusy()) {
+                            
fut.completeExceptionally(stoppingExceptionFactory.create("Raft client is 
stopping [" + groupId() + "]."));
+                            return;
+                        }
+
+                        try {
+                            if (err != null) {
+                                handleThrowableSingleAttempt(fut, err, 
retryContext);

Review Comment:
   This and there I'd to suggest return a _value_ to complete future, not to 
handle future inside in some curly way. Let's case study: 
`handleErrorResponseSingleAttempt` has a pretty complex logic, and in some 
cases we even don't want to complete the future (on transient error codes).
   
   Also we may see that `handleSmErrorResponse` is a copy-paste mostly from the 
general raft client implementation. And it definitely feels like this methods 
shouldn't be `static` - they contain a logic to handle and the result should be 
a value to complete future there, not to pass it through the long `static` 
method chain.
   
   If you ask me, it may be the common `ErrorResponseHandlerOnRetry` interface 
with `@Nullable Throwable handleErrorResponse(ErrorResponse, RetryContext)` 
method. The return value may be null in case non-exceptional null completeness 
of the future. WDYT?



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -326,7 +397,823 @@ private static boolean recoverable(Throwable t) {
 
     @Override
     public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
-        return raftClient.run(cmd, timeoutMillis);
+        // Normalize timeout: negative values mean infinite wait.
+        long effectiveTimeout = (timeoutMillis < 0) ? Long.MAX_VALUE : 
timeoutMillis;
+        // Wait for leader mode (bounded or infinite).
+        long deadline = Utils.monotonicMsAfter(effectiveTimeout);
+
+        return executeWithBusyLock(responseFuture -> {
+            if (effectiveTimeout == 0) {
+                tryAllPeersOnce(responseFuture, cmd);
+            } else {
+                startRetryPhase(responseFuture, cmd, deadline, 
leaderAvailabilityState.currentTerm());
+            }
+        });
+    }
+
+    /**
+     * Resolves initial target peer for a command execution.
+     *
+     * <p>Tries the known leader first, falling back to a random peer if no 
leader is known.
+     *
+     * @return Initial target peer, or {@code null}.
+     */
+    @Nullable
+    private Peer resolveInitialPeer() {
+        Peer targetPeer = leader;
+        if (targetPeer == null) {
+            targetPeer = randomNode(null, false);
+        }
+        return targetPeer;
+    }
+
+    /**
+     * Tries all peers once without waiting for leader.
+     *
+     * @param resultFuture Future that completes with the response, or fails 
with {@link ReplicationGroupUnavailableException} if no
+     *         peer responds successfully.
+     * @param cmd The command to execute.
+     */
+    private void tryAllPeersOnce(CompletableFuture<ActionResponse> 
resultFuture, Command cmd) {
+        Peer targetPeer = resolveInitialPeer();
+        if (targetPeer == null) {
+            resultFuture.completeExceptionally(new 
ReplicationGroupUnavailableException(groupId()));
+
+            return;
+        }
+
+        var context = new RetryContext(
+                groupId().toString(),
+                targetPeer,
+                cmd::toStringForLightLogging,
+                createRequestFactory(cmd),
+                0,  // Single attempt - no retry timeout
+                RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+        );
+
+        sendWithRetrySingleAttempt(resultFuture, context);
+    }
+
+    /**
+     * Executes an action within busy lock and transforms the response.
+     */
+    private <R> CompletableFuture<R> 
executeWithBusyLock(Consumer<CompletableFuture<ActionResponse>> action) {
+        var responseFuture = new CompletableFuture<ActionResponse>();
+        var resultFuture = new CompletableFuture<R>();
+
+        if (!busyLock.enterBusy()) {
+            
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client 
is stopping [" + groupId() + "]."));
+
+            return resultFuture;
+        }
+
+        try {
+            action.accept(responseFuture);
+
+            // Transform ActionResponse to result type.
+            responseFuture.whenComplete((resp, err) -> {
+                if (err != null) {
+                    resultFuture.completeExceptionally(err);
+                } else {
+                    resultFuture.complete((R) resp.result());
+                }
+            });
+
+            return resultFuture;
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Applies deadline to a future using orTimeout.
+     *
+     * @param future The future to apply deadline to.
+     * @param deadline Deadline in monotonic milliseconds, or Long.MAX_VALUE 
for no deadline.
+     * @return The future with timeout applied, or the original future if no 
deadline.
+     */
+    private static <T> CompletableFuture<T> applyDeadline(CompletableFuture<T> 
future, long deadline) {
+        if (deadline == Long.MAX_VALUE) {
+            return future;
+        }
+        long remainingTime = deadline - Utils.monotonicMs();
+        if (remainingTime <= 0) {
+            return CompletableFuture.failedFuture(new TimeoutException());
+        }
+        return future.orTimeout(remainingTime, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Creates a timeout exception for leader wait.
+     */
+    private ReplicationGroupUnavailableException createTimeoutException() {
+        return new ReplicationGroupUnavailableException(
+                groupId(),
+                "Timeout waiting for leader [groupId=" + groupId() + "]."
+        );
+    }
+
+    /**
+     * Waits for leader to become available and then retries the command.
+     */
+    private void waitForLeaderAndRetry(CompletableFuture<ActionResponse> 
resultFuture, Command cmd, long deadline) {
+        CompletableFuture<Long> leaderFuture = 
leaderAvailabilityState.awaitLeader();
+
+        // Apply timeout if bounded.
+        CompletableFuture<Long> timedLeaderFuture = 
applyDeadline(leaderFuture, deadline);
+
+        timedLeaderFuture.whenCompleteAsync((term, waitError) -> {
+            if (waitError != null) {
+                Throwable cause = unwrapCause(waitError);
+                if (cause instanceof TimeoutException) {
+                    
resultFuture.completeExceptionally(createTimeoutException());
+                } else {
+                    resultFuture.completeExceptionally(cause);
+                }
+                return;
+            }
+
+            if (!busyLock.enterBusy()) {
+                
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client 
is stopping [" + groupId() + "]."));
+                return;
+            }
+
+            try {
+                // Leader is available, now run the command with retry logic.
+                startRetryPhase(resultFuture, cmd, deadline, term);
+            } finally {
+                busyLock.leaveBusy();
+            }
+        }, executor);
+    }
+
+    /**
+     * Starts the retry phase after leader is available.
+     */
+    private void startRetryPhase(CompletableFuture<ActionResponse> 
resultFuture, Command cmd, long deadline, long term) {
+        Peer targetPeer = resolveInitialPeer();
+        if (targetPeer == null) {
+            resultFuture.completeExceptionally(new 
ReplicationGroupUnavailableException(groupId()));
+
+            return;
+        }
+
+        // Check deadline before starting retry phase.
+        long now = Utils.monotonicMs();
+        if (deadline != Long.MAX_VALUE && now >= deadline) {
+            resultFuture.completeExceptionally(createTimeoutException());
+            return;
+        }
+
+        // Use retry timeout bounded by remaining time until deadline.
+        long configTimeout = raftConfiguration.retryTimeoutMillis().value();
+
+        long sendWithRetryTimeoutMillis = deadline == Long.MAX_VALUE ? 
configTimeout : Math.min(configTimeout, deadline - now);
+
+        var context = new RetryContext(
+                groupId().toString(),
+                targetPeer,
+                cmd::toStringForLightLogging,
+                createRequestFactory(cmd),
+                sendWithRetryTimeoutMillis,
+                RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+        );
+
+        sendWithRetryWaitingForLeader(resultFuture, context, cmd, deadline, 
term);
+    }
+
+    /**
+     * Creates a request factory for the given command.
+     */
+    private Function<Peer, ActionRequest> createRequestFactory(Command cmd) {
+        if (cmd instanceof WriteCommand) {
+            return targetPeer -> MESSAGES_FACTORY.writeActionRequest()
+                    .groupId(groupId().toString())
+                    .command(commandsMarshaller.marshall(cmd))
+                    .deserializedCommand((WriteCommand) cmd)
+                    .build();
+        } else {
+            return targetPeer -> MESSAGES_FACTORY.readActionRequest()
+                    .groupId(groupId().toString())
+                    .command((ReadCommand) cmd)
+                    .readOnlySafe(true)
+                    .build();
+        }
+    }
+
+    /**
+     * Sends a request with single attempt (no retry on timeout).
+     *
+     * <p>In single-attempt mode, each peer is tried at most once.
+     */
+    private <R extends NetworkMessage> void sendWithRetrySingleAttempt(
+            CompletableFuture<R> fut,
+            RetryContext retryContext
+    ) {
+        if (!busyLock.enterBusy()) {
+            fut.completeExceptionally(stoppingExceptionFactory.create("Raft 
client is stopping [" + groupId() + "]."));
+            return;
+        }
+
+        try {
+            long responseTimeout = retryContext.responseTimeoutMillis() == 
RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+                    ? throttlingContextHolder.peerRequestTimeoutMillis() : 
retryContext.responseTimeoutMillis();
+
+            retryContext.onNewAttempt();
+
+            resolvePeer(retryContext.targetPeer())
+                    .thenCompose(node -> 
clusterService.messagingService().invoke(node, retryContext.request(), 
responseTimeout))
+                    .whenComplete((resp, err) -> {
+                        if (!busyLock.enterBusy()) {
+                            
fut.completeExceptionally(stoppingExceptionFactory.create("Raft client is 
stopping [" + groupId() + "]."));
+                            return;
+                        }
+
+                        try {
+                            if (err != null) {
+                                handleThrowableSingleAttempt(fut, err, 
retryContext);
+                            } else if (resp instanceof ErrorResponse) {
+                                handleErrorResponseSingleAttempt(fut, 
(ErrorResponse) resp, retryContext);
+                            } else if (resp instanceof SMErrorResponse) {
+                                handleSmErrorResponse(fut, (SMErrorResponse) 
resp, retryContext);
+                            } else {
+                                leader = retryContext.targetPeer();
+                                fut.complete((R) resp);
+                            }
+                        } catch (Throwable e) {
+                            fut.completeExceptionally(e);
+                        } finally {
+                            busyLock.leaveBusy();
+                        }
+                    });
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Validates throwable and returns next peer for retry.
+     *
+     * <p>If the error is not recoverable or no peers are available, completes 
the future exceptionally and returns null.
+     *
+     * @param fut Future to complete exceptionally if retry is not possible.
+     * @param err The throwable to check.
+     * @param retryContext Retry context for getting next peer.
+     * @return Next peer for retry, or null if the future was already 
completed with an error.
+     */
+    @Nullable
+    private Peer getNextPeerForRecoverableError(CompletableFuture<?> fut, 
Throwable err, RetryContext retryContext) {
+        err = unwrapCause(err);
+
+        if (!recoverable(err)) {
+            fut.completeExceptionally(err);
+
+            return null;
+        }
+
+        Peer nextPeer = randomNode(retryContext, false);
+
+        if (nextPeer == null) {
+            fut.completeExceptionally(new 
ReplicationGroupUnavailableException(groupId()));
+
+            return null;
+        }
+
+        return nextPeer;
+    }
+
+    private static void logRecoverableError(RetryContext retryContext, Peer 
nextPeer) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(
+                    "Recoverable error during the request occurred (will be 
retried) [request={}, peer={}, newPeer={}, traceId={}].",
+                    includeSensitive() ? retryContext.request() : 
retryContext.request().toStringForLightLogging(),
+                    retryContext.targetPeer(),
+                    nextPeer,
+                    retryContext.errorTraceId()
+            );
+        }
+    }
+
+    /**
+     * Handles throwable in single attempt mode.
+     */
+    private void handleThrowableSingleAttempt(
+            CompletableFuture<? extends NetworkMessage> fut,
+            Throwable err,
+            RetryContext retryContext
+    ) {
+        Peer nextPeer = getNextPeerForRecoverableError(fut, err, retryContext);
+
+        if (nextPeer == null) {
+            return;
+        }
+
+        logRecoverableError(retryContext, nextPeer);
+
+        String shortReasonMessage = "Peer " + 
retryContext.targetPeer().consistentId()
+                + " threw " + unwrapCause(err).getClass().getSimpleName();
+        sendWithRetrySingleAttempt(fut, 
retryContext.nextAttemptForUnavailablePeer(nextPeer, shortReasonMessage));
+    }
+
+    /**
+     * Handles error response in single attempt mode.
+     *
+     * <p>In single-attempt mode, each peer is tried at most once. All errors 
mark the peer
+     * as tried and move to the next peer. When all peers have been tried, the 
request fails
+     * with {@link ReplicationGroupUnavailableException}.
+     */
+    private void handleErrorResponseSingleAttempt(
+            CompletableFuture<? extends NetworkMessage> fut,
+            ErrorResponse resp,
+            RetryContext retryContext
+    ) {
+        RetryExecutionStrategy strategy = new RetryExecutionStrategy() {
+            @Override
+            public void executeRetry(RetryContext context, Peer nextPeer, 
@Nullable PeerTracking trackCurrentAs, String reason) {
+                // Single-attempt mode: ALWAYS mark current peer as 
unavailable, regardless of trackCurrentAs.
+                // This prevents retrying the same peer and matches original 
behavior where:
+                // - transient errors: would select new peer, mark current 
unavailable
+                // - peer unavailable: marks current unavailable
+                // - no leader: marks current unavailable (no NO_LEADER 
distinction needed)
+                // - leader redirect: marks current unavailable (to prevent 
redirect loops)
+                sendWithRetrySingleAttempt(fut, 
context.nextAttemptForUnavailablePeer(nextPeer, reason));
+            }
+
+            @Override
+            public void onAllPeersExhausted() {
+                fut.completeExceptionally(new 
ReplicationGroupUnavailableException(groupId()));
+            }
+        };
+
+        handleErrorResponseCommon(fut, resp, retryContext, strategy, false);
+    }
+
+    /**
+     * Sends a request with retry and waits for leader on leader absence.
+     */
+    private void sendWithRetryWaitingForLeader(
+            CompletableFuture<ActionResponse> fut,
+            RetryContext retryContext,
+            Command cmd,
+            long deadline,
+            long termWhenStarted
+    ) {
+        if (!busyLock.enterBusy()) {
+            fut.completeExceptionally(stoppingExceptionFactory.create("Raft 
client is stopping [" + groupId() + "]."));
+            return;
+        }
+
+        try {
+            long requestStartTime = Utils.monotonicMs();
+            long stopTime = retryContext.stopTime();
+
+            if (requestStartTime >= stopTime) {
+                // Retry timeout expired - fail with timeout exception.
+                // For non-leader-related retriable errors, we don't wait for 
leader, just fail.
+                fut.completeExceptionally(createTimeoutException());
+                return;
+            }
+
+            ThrottlingContextHolder peerThrottlingContextHolder = 
throttlingContextHolder.peerContextHolder(
+                    retryContext.targetPeer().consistentId()
+            );
+
+            peerThrottlingContextHolder.beforeRequest();
+            retryContext.onNewAttempt();
+
+            // Bound response timeout by remaining time until retry stop time.
+            long responseTimeout = 
Math.min(peerThrottlingContextHolder.peerRequestTimeoutMillis(), stopTime - 
requestStartTime);
+
+            resolvePeer(retryContext.targetPeer())
+                    .thenCompose(node -> 
clusterService.messagingService().invoke(node, retryContext.request(), 
responseTimeout))
+                    // Enforce timeout even if messaging service doesn't 
(e.g., in tests with mocks).
+                    .orTimeout(responseTimeout > 0 ? responseTimeout : 
Long.MAX_VALUE, TimeUnit.MILLISECONDS)
+                    .whenComplete((resp, err) -> {
+                        
peerThrottlingContextHolder.afterRequest(requestStartTime, retriableError(err, 
resp));
+
+                        if (!busyLock.enterBusy()) {
+                            
fut.completeExceptionally(stoppingExceptionFactory.create("Raft client is 
stopping [" + groupId() + "]."));
+                            return;
+                        }
+
+                        try {
+                            if (err != null) {
+                                handleThrowableWithLeaderWait(fut, err, 
retryContext, cmd, deadline, termWhenStarted);
+                            } else if (resp instanceof ErrorResponse) {
+                                handleErrorResponseWithLeaderWait(fut, 
(ErrorResponse) resp, retryContext, cmd, deadline, termWhenStarted);
+                            } else if (resp instanceof SMErrorResponse) {
+                                handleSmErrorResponse(fut, (SMErrorResponse) 
resp, retryContext);
+                            } else {
+                                leader = retryContext.targetPeer();
+                                fut.complete((ActionResponse) resp);
+                            }
+                        } catch (Throwable e) {
+                            fut.completeExceptionally(e);
+                        } finally {
+                            busyLock.leaveBusy();
+                        }
+                    });
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Handles throwable in leader-wait mode.
+     */
+    private void handleThrowableWithLeaderWait(
+            CompletableFuture<ActionResponse> fut,
+            Throwable err,
+            RetryContext retryContext,
+            Command cmd,
+            long deadline,
+            long termWhenStarted
+    ) {
+        Peer nextPeer = getNextPeerForRecoverableError(fut, err, retryContext);
+
+        if (nextPeer == null) {
+            return;
+        }
+
+        logRecoverableError(retryContext, nextPeer);
+
+        String shortReasonMessage = "Peer " + 
retryContext.targetPeer().consistentId()
+                + " threw " + unwrapCause(err).getClass().getSimpleName();
+        scheduleRetryWithLeaderWait(fut, retryContext.nextAttempt(nextPeer, 
shortReasonMessage), cmd, deadline, termWhenStarted);
+    }
+
+    /**
+     * Handles error response in leader-wait mode.
+     *
+     * <p>In leader-wait mode:
+     * <ul>
+     *     <li>Transient errors (EBUSY/EAGAIN) retry on the same peer after 
delay</li>
+     *     <li>"No leader" errors try each peer once, then wait for leader 
notification</li>
+     *     <li>Peer unavailability errors cycle through peers until 
timeout</li>
+     * </ul>
+     */
+    private void handleErrorResponseWithLeaderWait(
+            CompletableFuture<ActionResponse> fut,
+            ErrorResponse resp,
+            RetryContext retryContext,
+            Command cmd,
+            long deadline,
+            long termWhenStarted
+    ) {
+        RetryExecutionStrategy strategy = new RetryExecutionStrategy() {
+            @Override
+            public void executeRetry(RetryContext context, Peer nextPeer, 
@Nullable PeerTracking trackCurrentAs, String reason) {
+                RetryContext nextContext;
+                if (trackCurrentAs == null) {

Review Comment:
   `@Nullable` enum value is a marker that there should be another one value. I 
think `COMMON` is the good one: track the current one as common peer. And 
if-else tree below will became a switch (do we allow assignation from switch 
statement? If yes, so `RetryContext nextContext = switch(trackCurrentAs) { .. 
}` looks good.



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -326,7 +397,823 @@ private static boolean recoverable(Throwable t) {
 
     @Override
     public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
-        return raftClient.run(cmd, timeoutMillis);
+        // Normalize timeout: negative values mean infinite wait.
+        long effectiveTimeout = (timeoutMillis < 0) ? Long.MAX_VALUE : 
timeoutMillis;
+        // Wait for leader mode (bounded or infinite).
+        long deadline = Utils.monotonicMsAfter(effectiveTimeout);
+
+        return executeWithBusyLock(responseFuture -> {
+            if (effectiveTimeout == 0) {
+                tryAllPeersOnce(responseFuture, cmd);
+            } else {
+                startRetryPhase(responseFuture, cmd, deadline, 
leaderAvailabilityState.currentTerm());
+            }
+        });
+    }
+
+    /**
+     * Resolves initial target peer for a command execution.
+     *
+     * <p>Tries the known leader first, falling back to a random peer if no 
leader is known.
+     *
+     * @return Initial target peer, or {@code null}.
+     */
+    @Nullable
+    private Peer resolveInitialPeer() {
+        Peer targetPeer = leader;
+        if (targetPeer == null) {
+            targetPeer = randomNode(null, false);
+        }
+        return targetPeer;
+    }
+
+    /**
+     * Tries all peers once without waiting for leader.
+     *
+     * @param resultFuture Future that completes with the response, or fails 
with {@link ReplicationGroupUnavailableException} if no
+     *         peer responds successfully.
+     * @param cmd The command to execute.
+     */
+    private void tryAllPeersOnce(CompletableFuture<ActionResponse> 
resultFuture, Command cmd) {
+        Peer targetPeer = resolveInitialPeer();
+        if (targetPeer == null) {
+            resultFuture.completeExceptionally(new 
ReplicationGroupUnavailableException(groupId()));
+
+            return;
+        }
+
+        var context = new RetryContext(
+                groupId().toString(),
+                targetPeer,
+                cmd::toStringForLightLogging,
+                createRequestFactory(cmd),
+                0,  // Single attempt - no retry timeout
+                RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+        );
+
+        sendWithRetrySingleAttempt(resultFuture, context);
+    }
+
+    /**
+     * Executes an action within busy lock and transforms the response.
+     */
+    private <R> CompletableFuture<R> 
executeWithBusyLock(Consumer<CompletableFuture<ActionResponse>> action) {
+        var responseFuture = new CompletableFuture<ActionResponse>();
+        var resultFuture = new CompletableFuture<R>();
+
+        if (!busyLock.enterBusy()) {
+            
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client 
is stopping [" + groupId() + "]."));
+
+            return resultFuture;
+        }
+
+        try {
+            action.accept(responseFuture);
+
+            // Transform ActionResponse to result type.
+            responseFuture.whenComplete((resp, err) -> {
+                if (err != null) {
+                    resultFuture.completeExceptionally(err);
+                } else {
+                    resultFuture.complete((R) resp.result());
+                }
+            });
+
+            return resultFuture;
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Applies deadline to a future using orTimeout.
+     *
+     * @param future The future to apply deadline to.
+     * @param deadline Deadline in monotonic milliseconds, or Long.MAX_VALUE 
for no deadline.
+     * @return The future with timeout applied, or the original future if no 
deadline.
+     */
+    private static <T> CompletableFuture<T> applyDeadline(CompletableFuture<T> 
future, long deadline) {
+        if (deadline == Long.MAX_VALUE) {
+            return future;
+        }
+        long remainingTime = deadline - Utils.monotonicMs();
+        if (remainingTime <= 0) {
+            return CompletableFuture.failedFuture(new TimeoutException());
+        }
+        return future.orTimeout(remainingTime, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Creates a timeout exception for leader wait.
+     */
+    private ReplicationGroupUnavailableException createTimeoutException() {
+        return new ReplicationGroupUnavailableException(
+                groupId(),
+                "Timeout waiting for leader [groupId=" + groupId() + "]."
+        );
+    }
+
+    /**
+     * Waits for leader to become available and then retries the command.
+     */
+    private void waitForLeaderAndRetry(CompletableFuture<ActionResponse> 
resultFuture, Command cmd, long deadline) {
+        CompletableFuture<Long> leaderFuture = 
leaderAvailabilityState.awaitLeader();
+
+        // Apply timeout if bounded.
+        CompletableFuture<Long> timedLeaderFuture = 
applyDeadline(leaderFuture, deadline);
+
+        timedLeaderFuture.whenCompleteAsync((term, waitError) -> {
+            if (waitError != null) {
+                Throwable cause = unwrapCause(waitError);
+                if (cause instanceof TimeoutException) {
+                    
resultFuture.completeExceptionally(createTimeoutException());
+                } else {
+                    resultFuture.completeExceptionally(cause);
+                }
+                return;
+            }
+
+            if (!busyLock.enterBusy()) {
+                
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client 
is stopping [" + groupId() + "]."));
+                return;
+            }
+
+            try {
+                // Leader is available, now run the command with retry logic.
+                startRetryPhase(resultFuture, cmd, deadline, term);
+            } finally {
+                busyLock.leaveBusy();
+            }
+        }, executor);
+    }
+
+    /**
+     * Starts the retry phase after leader is available.
+     */
+    private void startRetryPhase(CompletableFuture<ActionResponse> 
resultFuture, Command cmd, long deadline, long term) {
+        Peer targetPeer = resolveInitialPeer();
+        if (targetPeer == null) {
+            resultFuture.completeExceptionally(new 
ReplicationGroupUnavailableException(groupId()));
+
+            return;
+        }
+
+        // Check deadline before starting retry phase.
+        long now = Utils.monotonicMs();
+        if (deadline != Long.MAX_VALUE && now >= deadline) {
+            resultFuture.completeExceptionally(createTimeoutException());
+            return;
+        }
+
+        // Use retry timeout bounded by remaining time until deadline.
+        long configTimeout = raftConfiguration.retryTimeoutMillis().value();
+
+        long sendWithRetryTimeoutMillis = deadline == Long.MAX_VALUE ? 
configTimeout : Math.min(configTimeout, deadline - now);
+
+        var context = new RetryContext(
+                groupId().toString(),
+                targetPeer,
+                cmd::toStringForLightLogging,
+                createRequestFactory(cmd),
+                sendWithRetryTimeoutMillis,
+                RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+        );
+
+        sendWithRetryWaitingForLeader(resultFuture, context, cmd, deadline, 
term);
+    }
+
+    /**
+     * Creates a request factory for the given command.
+     */
+    private Function<Peer, ActionRequest> createRequestFactory(Command cmd) {
+        if (cmd instanceof WriteCommand) {
+            return targetPeer -> MESSAGES_FACTORY.writeActionRequest()
+                    .groupId(groupId().toString())
+                    .command(commandsMarshaller.marshall(cmd))
+                    .deserializedCommand((WriteCommand) cmd)
+                    .build();
+        } else {
+            return targetPeer -> MESSAGES_FACTORY.readActionRequest()
+                    .groupId(groupId().toString())
+                    .command((ReadCommand) cmd)
+                    .readOnlySafe(true)
+                    .build();
+        }
+    }
+
+    /**
+     * Sends a request with single attempt (no retry on timeout).
+     *
+     * <p>In single-attempt mode, each peer is tried at most once.
+     */
+    private <R extends NetworkMessage> void sendWithRetrySingleAttempt(
+            CompletableFuture<R> fut,
+            RetryContext retryContext
+    ) {
+        if (!busyLock.enterBusy()) {
+            fut.completeExceptionally(stoppingExceptionFactory.create("Raft 
client is stopping [" + groupId() + "]."));
+            return;
+        }
+
+        try {
+            long responseTimeout = retryContext.responseTimeoutMillis() == 
RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+                    ? throttlingContextHolder.peerRequestTimeoutMillis() : 
retryContext.responseTimeoutMillis();
+
+            retryContext.onNewAttempt();
+
+            resolvePeer(retryContext.targetPeer())
+                    .thenCompose(node -> 
clusterService.messagingService().invoke(node, retryContext.request(), 
responseTimeout))
+                    .whenComplete((resp, err) -> {
+                        if (!busyLock.enterBusy()) {
+                            
fut.completeExceptionally(stoppingExceptionFactory.create("Raft client is 
stopping [" + groupId() + "]."));
+                            return;
+                        }
+
+                        try {
+                            if (err != null) {
+                                handleThrowableSingleAttempt(fut, err, 
retryContext);
+                            } else if (resp instanceof ErrorResponse) {
+                                handleErrorResponseSingleAttempt(fut, 
(ErrorResponse) resp, retryContext);
+                            } else if (resp instanceof SMErrorResponse) {
+                                handleSmErrorResponse(fut, (SMErrorResponse) 
resp, retryContext);
+                            } else {
+                                leader = retryContext.targetPeer();
+                                fut.complete((R) resp);
+                            }
+                        } catch (Throwable e) {
+                            fut.completeExceptionally(e);
+                        } finally {
+                            busyLock.leaveBusy();
+                        }
+                    });
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Validates throwable and returns next peer for retry.

Review Comment:
   And complete the future outside of method. I'm afraid of too many implicit 
actions inside of the method.



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -81,11 +119,32 @@ public class PhysicalTopologyAwareRaftGroupService 
implements TimeAwareRaftGroup
     private final RaftGroupService raftClient;
 
     /** Executor to invoke RPC requests. */
-    private final Executor executor;
+    private final ScheduledExecutorService executor;
 
     /** RAFT configuration. */
     private final RaftConfiguration raftConfiguration;
 
+    /** State machine for tracking leader availability. */
+    private final LeaderAvailabilityState leaderAvailabilityState;
+
+    /** Command marshaller. */
+    private final Marshaller commandsMarshaller;
+
+    /** Throttling context holder. */
+    private final ThrottlingContextHolder throttlingContextHolder;

Review Comment:
   Actually that you've started with `RetryExecutionStrategy` but anonymous 
implementations on hot path (raft commands handling, even exceptional) is a bad 
way.



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -326,7 +397,823 @@ private static boolean recoverable(Throwable t) {
 
     @Override
     public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
-        return raftClient.run(cmd, timeoutMillis);
+        // Normalize timeout: negative values mean infinite wait.
+        long effectiveTimeout = (timeoutMillis < 0) ? Long.MAX_VALUE : 
timeoutMillis;
+        // Wait for leader mode (bounded or infinite).
+        long deadline = Utils.monotonicMsAfter(effectiveTimeout);
+
+        return executeWithBusyLock(responseFuture -> {
+            if (effectiveTimeout == 0) {
+                tryAllPeersOnce(responseFuture, cmd);
+            } else {
+                startRetryPhase(responseFuture, cmd, deadline, 
leaderAvailabilityState.currentTerm());
+            }
+        });
+    }
+
+    /**
+     * Resolves initial target peer for a command execution.
+     *
+     * <p>Tries the known leader first, falling back to a random peer if no 
leader is known.
+     *
+     * @return Initial target peer, or {@code null}.
+     */
+    @Nullable
+    private Peer resolveInitialPeer() {
+        Peer targetPeer = leader;
+        if (targetPeer == null) {
+            targetPeer = randomNode(null, false);
+        }
+        return targetPeer;
+    }
+
+    /**
+     * Tries all peers once without waiting for leader.
+     *
+     * @param resultFuture Future that completes with the response, or fails 
with {@link ReplicationGroupUnavailableException} if no
+     *         peer responds successfully.
+     * @param cmd The command to execute.
+     */
+    private void tryAllPeersOnce(CompletableFuture<ActionResponse> 
resultFuture, Command cmd) {
+        Peer targetPeer = resolveInitialPeer();
+        if (targetPeer == null) {
+            resultFuture.completeExceptionally(new 
ReplicationGroupUnavailableException(groupId()));
+
+            return;
+        }
+
+        var context = new RetryContext(
+                groupId().toString(),
+                targetPeer,
+                cmd::toStringForLightLogging,
+                createRequestFactory(cmd),
+                0,  // Single attempt - no retry timeout
+                RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+        );
+
+        sendWithRetrySingleAttempt(resultFuture, context);
+    }
+
+    /**
+     * Executes an action within busy lock and transforms the response.
+     */
+    private <R> CompletableFuture<R> 
executeWithBusyLock(Consumer<CompletableFuture<ActionResponse>> action) {
+        var responseFuture = new CompletableFuture<ActionResponse>();
+        var resultFuture = new CompletableFuture<R>();
+
+        if (!busyLock.enterBusy()) {
+            
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client 
is stopping [" + groupId() + "]."));
+
+            return resultFuture;
+        }
+
+        try {
+            action.accept(responseFuture);
+
+            // Transform ActionResponse to result type.
+            responseFuture.whenComplete((resp, err) -> {
+                if (err != null) {
+                    resultFuture.completeExceptionally(err);
+                } else {
+                    resultFuture.complete((R) resp.result());
+                }
+            });
+
+            return resultFuture;
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Applies deadline to a future using orTimeout.
+     *
+     * @param future The future to apply deadline to.
+     * @param deadline Deadline in monotonic milliseconds, or Long.MAX_VALUE 
for no deadline.
+     * @return The future with timeout applied, or the original future if no 
deadline.
+     */
+    private static <T> CompletableFuture<T> applyDeadline(CompletableFuture<T> 
future, long deadline) {
+        if (deadline == Long.MAX_VALUE) {
+            return future;
+        }
+        long remainingTime = deadline - Utils.monotonicMs();
+        if (remainingTime <= 0) {
+            return CompletableFuture.failedFuture(new TimeoutException());
+        }
+        return future.orTimeout(remainingTime, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Creates a timeout exception for leader wait.
+     */
+    private ReplicationGroupUnavailableException createTimeoutException() {
+        return new ReplicationGroupUnavailableException(
+                groupId(),
+                "Timeout waiting for leader [groupId=" + groupId() + "]."
+        );
+    }
+
+    /**
+     * Waits for leader to become available and then retries the command.
+     */
+    private void waitForLeaderAndRetry(CompletableFuture<ActionResponse> 
resultFuture, Command cmd, long deadline) {
+        CompletableFuture<Long> leaderFuture = 
leaderAvailabilityState.awaitLeader();
+
+        // Apply timeout if bounded.
+        CompletableFuture<Long> timedLeaderFuture = 
applyDeadline(leaderFuture, deadline);
+
+        timedLeaderFuture.whenCompleteAsync((term, waitError) -> {
+            if (waitError != null) {
+                Throwable cause = unwrapCause(waitError);
+                if (cause instanceof TimeoutException) {
+                    
resultFuture.completeExceptionally(createTimeoutException());
+                } else {
+                    resultFuture.completeExceptionally(cause);
+                }
+                return;
+            }
+
+            if (!busyLock.enterBusy()) {
+                
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client 
is stopping [" + groupId() + "]."));
+                return;
+            }
+
+            try {
+                // Leader is available, now run the command with retry logic.
+                startRetryPhase(resultFuture, cmd, deadline, term);
+            } finally {
+                busyLock.leaveBusy();
+            }
+        }, executor);
+    }
+
+    /**
+     * Starts the retry phase after leader is available.
+     */
+    private void startRetryPhase(CompletableFuture<ActionResponse> 
resultFuture, Command cmd, long deadline, long term) {
+        Peer targetPeer = resolveInitialPeer();
+        if (targetPeer == null) {
+            resultFuture.completeExceptionally(new 
ReplicationGroupUnavailableException(groupId()));
+
+            return;
+        }
+
+        // Check deadline before starting retry phase.
+        long now = Utils.monotonicMs();
+        if (deadline != Long.MAX_VALUE && now >= deadline) {
+            resultFuture.completeExceptionally(createTimeoutException());
+            return;
+        }
+
+        // Use retry timeout bounded by remaining time until deadline.
+        long configTimeout = raftConfiguration.retryTimeoutMillis().value();
+
+        long sendWithRetryTimeoutMillis = deadline == Long.MAX_VALUE ? 
configTimeout : Math.min(configTimeout, deadline - now);
+
+        var context = new RetryContext(
+                groupId().toString(),
+                targetPeer,
+                cmd::toStringForLightLogging,
+                createRequestFactory(cmd),
+                sendWithRetryTimeoutMillis,
+                RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+        );
+
+        sendWithRetryWaitingForLeader(resultFuture, context, cmd, deadline, 
term);
+    }
+
+    /**
+     * Creates a request factory for the given command.
+     */
+    private Function<Peer, ActionRequest> createRequestFactory(Command cmd) {
+        if (cmd instanceof WriteCommand) {
+            return targetPeer -> MESSAGES_FACTORY.writeActionRequest()
+                    .groupId(groupId().toString())
+                    .command(commandsMarshaller.marshall(cmd))
+                    .deserializedCommand((WriteCommand) cmd)
+                    .build();
+        } else {
+            return targetPeer -> MESSAGES_FACTORY.readActionRequest()
+                    .groupId(groupId().toString())
+                    .command((ReadCommand) cmd)
+                    .readOnlySafe(true)
+                    .build();
+        }
+    }
+
+    /**
+     * Sends a request with single attempt (no retry on timeout).
+     *
+     * <p>In single-attempt mode, each peer is tried at most once.
+     */
+    private <R extends NetworkMessage> void sendWithRetrySingleAttempt(
+            CompletableFuture<R> fut,
+            RetryContext retryContext
+    ) {
+        if (!busyLock.enterBusy()) {
+            fut.completeExceptionally(stoppingExceptionFactory.create("Raft 
client is stopping [" + groupId() + "]."));

Review Comment:
   I think we had kind of utils methods for busy locks wrapping? Like those in 
`IgniteUtils` ones?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to