JAkutenshi commented on code in PR #7242:
URL: https://github.com/apache/ignite-3/pull/7242#discussion_r2743023616


##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -326,7 +397,823 @@ private static boolean recoverable(Throwable t) {
 
     @Override
     public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
-        return raftClient.run(cmd, timeoutMillis);
+        // Normalize timeout: negative values mean infinite wait.
+        long effectiveTimeout = (timeoutMillis < 0) ? Long.MAX_VALUE : 
timeoutMillis;
+        // Wait for leader mode (bounded or infinite).
+        long deadline = Utils.monotonicMsAfter(effectiveTimeout);
+
+        return executeWithBusyLock(responseFuture -> {
+            if (effectiveTimeout == 0) {
+                tryAllPeersOnce(responseFuture, cmd);
+            } else {
+                startRetryPhase(responseFuture, cmd, deadline, 
leaderAvailabilityState.currentTerm());
+            }
+        });
+    }
+
+    /**
+     * Resolves initial target peer for a command execution.
+     *
+     * <p>Tries the known leader first, falling back to a random peer if no 
leader is known.
+     *
+     * @return Initial target peer, or {@code null}.
+     */
+    @Nullable
+    private Peer resolveInitialPeer() {
+        Peer targetPeer = leader;
+        if (targetPeer == null) {
+            targetPeer = randomNode(null, false);
+        }
+        return targetPeer;
+    }
+
+    /**
+     * Tries all peers once without waiting for leader.
+     *
+     * @param resultFuture Future that completes with the response, or fails 
with {@link ReplicationGroupUnavailableException} if no
+     *         peer responds successfully.
+     * @param cmd The command to execute.
+     */
+    private void tryAllPeersOnce(CompletableFuture<ActionResponse> 
resultFuture, Command cmd) {
+        Peer targetPeer = resolveInitialPeer();
+        if (targetPeer == null) {
+            resultFuture.completeExceptionally(new 
ReplicationGroupUnavailableException(groupId()));
+
+            return;
+        }
+
+        var context = new RetryContext(
+                groupId().toString(),
+                targetPeer,
+                cmd::toStringForLightLogging,
+                createRequestFactory(cmd),
+                0,  // Single attempt - no retry timeout
+                RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT

Review Comment:
   Resolved.



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -326,7 +397,823 @@ private static boolean recoverable(Throwable t) {
 
     @Override
     public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
-        return raftClient.run(cmd, timeoutMillis);
+        // Normalize timeout: negative values mean infinite wait.
+        long effectiveTimeout = (timeoutMillis < 0) ? Long.MAX_VALUE : 
timeoutMillis;
+        // Wait for leader mode (bounded or infinite).
+        long deadline = Utils.monotonicMsAfter(effectiveTimeout);
+
+        return executeWithBusyLock(responseFuture -> {
+            if (effectiveTimeout == 0) {
+                tryAllPeersOnce(responseFuture, cmd);
+            } else {
+                startRetryPhase(responseFuture, cmd, deadline, 
leaderAvailabilityState.currentTerm());
+            }
+        });
+    }
+
+    /**
+     * Resolves initial target peer for a command execution.
+     *
+     * <p>Tries the known leader first, falling back to a random peer if no 
leader is known.
+     *
+     * @return Initial target peer, or {@code null}.
+     */
+    @Nullable
+    private Peer resolveInitialPeer() {
+        Peer targetPeer = leader;
+        if (targetPeer == null) {
+            targetPeer = randomNode(null, false);
+        }
+        return targetPeer;
+    }
+
+    /**
+     * Tries all peers once without waiting for leader.
+     *
+     * @param resultFuture Future that completes with the response, or fails 
with {@link ReplicationGroupUnavailableException} if no
+     *         peer responds successfully.
+     * @param cmd The command to execute.
+     */
+    private void tryAllPeersOnce(CompletableFuture<ActionResponse> 
resultFuture, Command cmd) {
+        Peer targetPeer = resolveInitialPeer();
+        if (targetPeer == null) {
+            resultFuture.completeExceptionally(new 
ReplicationGroupUnavailableException(groupId()));
+
+            return;
+        }
+
+        var context = new RetryContext(
+                groupId().toString(),
+                targetPeer,
+                cmd::toStringForLightLogging,
+                createRequestFactory(cmd),
+                0,  // Single attempt - no retry timeout
+                RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT

Review Comment:
   Ah, some why I missed `min`. Resolved.



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -81,11 +119,32 @@ public class PhysicalTopologyAwareRaftGroupService 
implements TimeAwareRaftGroup
     private final RaftGroupService raftClient;
 
     /** Executor to invoke RPC requests. */
-    private final Executor executor;
+    private final ScheduledExecutorService executor;
 
     /** RAFT configuration. */
     private final RaftConfiguration raftConfiguration;
 
+    /** State machine for tracking leader availability. */
+    private final LeaderAvailabilityState leaderAvailabilityState;
+
+    /** Command marshaller. */
+    private final Marshaller commandsMarshaller;
+
+    /** Throttling context holder. */
+    private final ThrottlingContextHolder throttlingContextHolder;

Review Comment:
   Resolved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to