JAkutenshi commented on code in PR #7242:
URL: https://github.com/apache/ignite-3/pull/7242#discussion_r2726459695
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -326,7 +397,823 @@ private static boolean recoverable(Throwable t) {
@Override
public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
- return raftClient.run(cmd, timeoutMillis);
+ // Normalize timeout: negative values mean infinite wait.
+ long effectiveTimeout = (timeoutMillis < 0) ? Long.MAX_VALUE :
timeoutMillis;
+ // Wait for leader mode (bounded or infinite).
+ long deadline = Utils.monotonicMsAfter(effectiveTimeout);
+
+ return executeWithBusyLock(responseFuture -> {
+ if (effectiveTimeout == 0) {
+ tryAllPeersOnce(responseFuture, cmd);
+ } else {
+ startRetryPhase(responseFuture, cmd, deadline,
leaderAvailabilityState.currentTerm());
+ }
+ });
+ }
+
+ /**
+ * Resolves initial target peer for a command execution.
+ *
+ * <p>Tries the known leader first, falling back to a random peer if no
leader is known.
+ *
+ * @return Initial target peer, or {@code null}.
+ */
+ @Nullable
+ private Peer resolveInitialPeer() {
+ Peer targetPeer = leader;
+ if (targetPeer == null) {
+ targetPeer = randomNode(null, false);
+ }
+ return targetPeer;
+ }
+
+ /**
+ * Tries all peers once without waiting for leader.
+ *
+ * @param resultFuture Future that completes with the response, or fails
with {@link ReplicationGroupUnavailableException} if no
+ * peer responds successfully.
+ * @param cmd The command to execute.
+ */
+ private void tryAllPeersOnce(CompletableFuture<ActionResponse>
resultFuture, Command cmd) {
+ Peer targetPeer = resolveInitialPeer();
+ if (targetPeer == null) {
+ resultFuture.completeExceptionally(new
ReplicationGroupUnavailableException(groupId()));
+
+ return;
+ }
+
+ var context = new RetryContext(
+ groupId().toString(),
+ targetPeer,
+ cmd::toStringForLightLogging,
+ createRequestFactory(cmd),
+ 0, // Single attempt - no retry timeout
+ RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
Review Comment:
Would we use for a request timeout only this "marker" value? If so, why do
not pass `retryContext.responseTimeoutMillis()` explicitly?
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -326,7 +397,823 @@ private static boolean recoverable(Throwable t) {
@Override
public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
- return raftClient.run(cmd, timeoutMillis);
+ // Normalize timeout: negative values mean infinite wait.
+ long effectiveTimeout = (timeoutMillis < 0) ? Long.MAX_VALUE :
timeoutMillis;
+ // Wait for leader mode (bounded or infinite).
+ long deadline = Utils.monotonicMsAfter(effectiveTimeout);
+
+ return executeWithBusyLock(responseFuture -> {
+ if (effectiveTimeout == 0) {
+ tryAllPeersOnce(responseFuture, cmd);
+ } else {
+ startRetryPhase(responseFuture, cmd, deadline,
leaderAvailabilityState.currentTerm());
+ }
+ });
+ }
+
+ /**
+ * Resolves initial target peer for a command execution.
+ *
+ * <p>Tries the known leader first, falling back to a random peer if no
leader is known.
+ *
+ * @return Initial target peer, or {@code null}.
+ */
+ @Nullable
+ private Peer resolveInitialPeer() {
+ Peer targetPeer = leader;
+ if (targetPeer == null) {
+ targetPeer = randomNode(null, false);
+ }
+ return targetPeer;
+ }
+
+ /**
+ * Tries all peers once without waiting for leader.
+ *
+ * @param resultFuture Future that completes with the response, or fails
with {@link ReplicationGroupUnavailableException} if no
+ * peer responds successfully.
+ * @param cmd The command to execute.
+ */
+ private void tryAllPeersOnce(CompletableFuture<ActionResponse>
resultFuture, Command cmd) {
+ Peer targetPeer = resolveInitialPeer();
+ if (targetPeer == null) {
+ resultFuture.completeExceptionally(new
ReplicationGroupUnavailableException(groupId()));
+
+ return;
+ }
+
+ var context = new RetryContext(
+ groupId().toString(),
+ targetPeer,
+ cmd::toStringForLightLogging,
+ createRequestFactory(cmd),
+ 0, // Single attempt - no retry timeout
+ RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
Review Comment:
Also should we worry about `sendWithRetryTimeout < responseTimeout` case?
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -326,7 +397,823 @@ private static boolean recoverable(Throwable t) {
@Override
public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
- return raftClient.run(cmd, timeoutMillis);
+ // Normalize timeout: negative values mean infinite wait.
+ long effectiveTimeout = (timeoutMillis < 0) ? Long.MAX_VALUE :
timeoutMillis;
+ // Wait for leader mode (bounded or infinite).
+ long deadline = Utils.monotonicMsAfter(effectiveTimeout);
+
+ return executeWithBusyLock(responseFuture -> {
+ if (effectiveTimeout == 0) {
+ tryAllPeersOnce(responseFuture, cmd);
+ } else {
+ startRetryPhase(responseFuture, cmd, deadline,
leaderAvailabilityState.currentTerm());
+ }
+ });
+ }
+
+ /**
+ * Resolves initial target peer for a command execution.
+ *
+ * <p>Tries the known leader first, falling back to a random peer if no
leader is known.
+ *
+ * @return Initial target peer, or {@code null}.
+ */
+ @Nullable
+ private Peer resolveInitialPeer() {
+ Peer targetPeer = leader;
+ if (targetPeer == null) {
+ targetPeer = randomNode(null, false);
+ }
+ return targetPeer;
+ }
+
+ /**
+ * Tries all peers once without waiting for leader.
+ *
+ * @param resultFuture Future that completes with the response, or fails
with {@link ReplicationGroupUnavailableException} if no
+ * peer responds successfully.
+ * @param cmd The command to execute.
+ */
+ private void tryAllPeersOnce(CompletableFuture<ActionResponse>
resultFuture, Command cmd) {
+ Peer targetPeer = resolveInitialPeer();
+ if (targetPeer == null) {
+ resultFuture.completeExceptionally(new
ReplicationGroupUnavailableException(groupId()));
+
+ return;
+ }
+
+ var context = new RetryContext(
+ groupId().toString(),
+ targetPeer,
+ cmd::toStringForLightLogging,
+ createRequestFactory(cmd),
+ 0, // Single attempt - no retry timeout
+ RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+ );
+
+ sendWithRetrySingleAttempt(resultFuture, context);
+ }
+
+ /**
+ * Executes an action within busy lock and transforms the response.
+ */
+ private <R> CompletableFuture<R>
executeWithBusyLock(Consumer<CompletableFuture<ActionResponse>> action) {
+ var responseFuture = new CompletableFuture<ActionResponse>();
+ var resultFuture = new CompletableFuture<R>();
+
+ if (!busyLock.enterBusy()) {
+
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client
is stopping [" + groupId() + "]."));
+
+ return resultFuture;
+ }
+
+ try {
+ action.accept(responseFuture);
+
+ // Transform ActionResponse to result type.
+ responseFuture.whenComplete((resp, err) -> {
+ if (err != null) {
+ resultFuture.completeExceptionally(err);
+ } else {
+ resultFuture.complete((R) resp.result());
+ }
+ });
+
+ return resultFuture;
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * Applies deadline to a future using orTimeout.
+ *
+ * @param future The future to apply deadline to.
+ * @param deadline Deadline in monotonic milliseconds, or Long.MAX_VALUE
for no deadline.
+ * @return The future with timeout applied, or the original future if no
deadline.
+ */
+ private static <T> CompletableFuture<T> applyDeadline(CompletableFuture<T>
future, long deadline) {
+ if (deadline == Long.MAX_VALUE) {
+ return future;
+ }
+ long remainingTime = deadline - Utils.monotonicMs();
+ if (remainingTime <= 0) {
+ return CompletableFuture.failedFuture(new TimeoutException());
+ }
+ return future.orTimeout(remainingTime, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Creates a timeout exception for leader wait.
+ */
+ private ReplicationGroupUnavailableException createTimeoutException() {
+ return new ReplicationGroupUnavailableException(
+ groupId(),
+ "Timeout waiting for leader [groupId=" + groupId() + "]."
+ );
+ }
+
+ /**
+ * Waits for leader to become available and then retries the command.
+ */
+ private void waitForLeaderAndRetry(CompletableFuture<ActionResponse>
resultFuture, Command cmd, long deadline) {
+ CompletableFuture<Long> leaderFuture =
leaderAvailabilityState.awaitLeader();
+
+ // Apply timeout if bounded.
+ CompletableFuture<Long> timedLeaderFuture =
applyDeadline(leaderFuture, deadline);
+
+ timedLeaderFuture.whenCompleteAsync((term, waitError) -> {
+ if (waitError != null) {
+ Throwable cause = unwrapCause(waitError);
+ if (cause instanceof TimeoutException) {
+
resultFuture.completeExceptionally(createTimeoutException());
+ } else {
+ resultFuture.completeExceptionally(cause);
+ }
+ return;
+ }
+
+ if (!busyLock.enterBusy()) {
+
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client
is stopping [" + groupId() + "]."));
+ return;
+ }
+
+ try {
+ // Leader is available, now run the command with retry logic.
+ startRetryPhase(resultFuture, cmd, deadline, term);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }, executor);
+ }
+
+ /**
+ * Starts the retry phase after leader is available.
+ */
+ private void startRetryPhase(CompletableFuture<ActionResponse>
resultFuture, Command cmd, long deadline, long term) {
+ Peer targetPeer = resolveInitialPeer();
+ if (targetPeer == null) {
+ resultFuture.completeExceptionally(new
ReplicationGroupUnavailableException(groupId()));
+
+ return;
+ }
+
+ // Check deadline before starting retry phase.
+ long now = Utils.monotonicMs();
+ if (deadline != Long.MAX_VALUE && now >= deadline) {
+ resultFuture.completeExceptionally(createTimeoutException());
+ return;
+ }
+
+ // Use retry timeout bounded by remaining time until deadline.
+ long configTimeout = raftConfiguration.retryTimeoutMillis().value();
+
+ long sendWithRetryTimeoutMillis = deadline == Long.MAX_VALUE ?
configTimeout : Math.min(configTimeout, deadline - now);
+
+ var context = new RetryContext(
+ groupId().toString(),
+ targetPeer,
+ cmd::toStringForLightLogging,
+ createRequestFactory(cmd),
+ sendWithRetryTimeoutMillis,
+ RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+ );
+
+ sendWithRetryWaitingForLeader(resultFuture, context, cmd, deadline,
term);
+ }
+
+ /**
+ * Creates a request factory for the given command.
+ */
+ private Function<Peer, ActionRequest> createRequestFactory(Command cmd) {
+ if (cmd instanceof WriteCommand) {
+ return targetPeer -> MESSAGES_FACTORY.writeActionRequest()
+ .groupId(groupId().toString())
+ .command(commandsMarshaller.marshall(cmd))
+ .deserializedCommand((WriteCommand) cmd)
+ .build();
+ } else {
+ return targetPeer -> MESSAGES_FACTORY.readActionRequest()
+ .groupId(groupId().toString())
+ .command((ReadCommand) cmd)
+ .readOnlySafe(true)
+ .build();
+ }
+ }
+
+ /**
+ * Sends a request with single attempt (no retry on timeout).
+ *
+ * <p>In single-attempt mode, each peer is tried at most once.
+ */
+ private <R extends NetworkMessage> void sendWithRetrySingleAttempt(
+ CompletableFuture<R> fut,
+ RetryContext retryContext
+ ) {
+ if (!busyLock.enterBusy()) {
+ fut.completeExceptionally(stoppingExceptionFactory.create("Raft
client is stopping [" + groupId() + "]."));
+ return;
+ }
+
+ try {
+ long responseTimeout = retryContext.responseTimeoutMillis() ==
RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+ ? throttlingContextHolder.peerRequestTimeoutMillis() :
retryContext.responseTimeoutMillis();
+
+ retryContext.onNewAttempt();
+
+ resolvePeer(retryContext.targetPeer())
+ .thenCompose(node ->
clusterService.messagingService().invoke(node, retryContext.request(),
responseTimeout))
+ .whenComplete((resp, err) -> {
+ if (!busyLock.enterBusy()) {
+
fut.completeExceptionally(stoppingExceptionFactory.create("Raft client is
stopping [" + groupId() + "]."));
+ return;
+ }
+
+ try {
+ if (err != null) {
+ handleThrowableSingleAttempt(fut, err,
retryContext);
+ } else if (resp instanceof ErrorResponse) {
+ handleErrorResponseSingleAttempt(fut,
(ErrorResponse) resp, retryContext);
+ } else if (resp instanceof SMErrorResponse) {
+ handleSmErrorResponse(fut, (SMErrorResponse)
resp, retryContext);
+ } else {
+ leader = retryContext.targetPeer();
+ fut.complete((R) resp);
+ }
+ } catch (Throwable e) {
+ fut.completeExceptionally(e);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ });
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * Validates throwable and returns next peer for retry.
+ *
+ * <p>If the error is not recoverable or no peers are available, completes
the future exceptionally and returns null.
+ *
+ * @param fut Future to complete exceptionally if retry is not possible.
+ * @param err The throwable to check.
+ * @param retryContext Retry context for getting next peer.
+ * @return Next peer for retry, or null if the future was already
completed with an error.
+ */
+ @Nullable
+ private Peer getNextPeerForRecoverableError(CompletableFuture<?> fut,
Throwable err, RetryContext retryContext) {
+ err = unwrapCause(err);
+
+ if (!recoverable(err)) {
+ fut.completeExceptionally(err);
+
+ return null;
+ }
+
+ Peer nextPeer = randomNode(retryContext, false);
+
+ if (nextPeer == null) {
+ fut.completeExceptionally(new
ReplicationGroupUnavailableException(groupId()));
+
+ return null;
+ }
+
+ return nextPeer;
+ }
+
+ private static void logRecoverableError(RetryContext retryContext, Peer
nextPeer) {
+ if (LOG.isDebugEnabled()) {
Review Comment:
Kind of tastes, optional comment: I believe that then less indentations in a
code then better, so approach like:
```java
if (!LOG.isDebugEnabled()) {
return;
}
LOG.debug(
"Recoverable error during the request occurred (will be
retried) [request={}, peer={}, newPeer={}, traceId={}].",
includeSensitive() ? retryContext.request() :
retryContext.request().toStringForLightLogging(),
retryContext.targetPeer(),
nextPeer,
retryContext.errorTraceId()
);
```
in my perspective will be better. Future us will have 4 more characters for
the message in line. But as I said, you may omit this comment.
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -326,7 +397,823 @@ private static boolean recoverable(Throwable t) {
@Override
public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
- return raftClient.run(cmd, timeoutMillis);
+ // Normalize timeout: negative values mean infinite wait.
+ long effectiveTimeout = (timeoutMillis < 0) ? Long.MAX_VALUE :
timeoutMillis;
+ // Wait for leader mode (bounded or infinite).
+ long deadline = Utils.monotonicMsAfter(effectiveTimeout);
+
+ return executeWithBusyLock(responseFuture -> {
+ if (effectiveTimeout == 0) {
+ tryAllPeersOnce(responseFuture, cmd);
+ } else {
+ startRetryPhase(responseFuture, cmd, deadline,
leaderAvailabilityState.currentTerm());
+ }
+ });
+ }
+
+ /**
+ * Resolves initial target peer for a command execution.
+ *
+ * <p>Tries the known leader first, falling back to a random peer if no
leader is known.
+ *
+ * @return Initial target peer, or {@code null}.
+ */
+ @Nullable
+ private Peer resolveInitialPeer() {
+ Peer targetPeer = leader;
+ if (targetPeer == null) {
+ targetPeer = randomNode(null, false);
+ }
+ return targetPeer;
+ }
+
+ /**
+ * Tries all peers once without waiting for leader.
+ *
+ * @param resultFuture Future that completes with the response, or fails
with {@link ReplicationGroupUnavailableException} if no
+ * peer responds successfully.
+ * @param cmd The command to execute.
+ */
+ private void tryAllPeersOnce(CompletableFuture<ActionResponse>
resultFuture, Command cmd) {
+ Peer targetPeer = resolveInitialPeer();
+ if (targetPeer == null) {
+ resultFuture.completeExceptionally(new
ReplicationGroupUnavailableException(groupId()));
+
+ return;
+ }
+
+ var context = new RetryContext(
+ groupId().toString(),
+ targetPeer,
+ cmd::toStringForLightLogging,
+ createRequestFactory(cmd),
+ 0, // Single attempt - no retry timeout
+ RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+ );
+
+ sendWithRetrySingleAttempt(resultFuture, context);
+ }
+
+ /**
+ * Executes an action within busy lock and transforms the response.
+ */
+ private <R> CompletableFuture<R>
executeWithBusyLock(Consumer<CompletableFuture<ActionResponse>> action) {
+ var responseFuture = new CompletableFuture<ActionResponse>();
+ var resultFuture = new CompletableFuture<R>();
+
+ if (!busyLock.enterBusy()) {
+
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client
is stopping [" + groupId() + "]."));
+
+ return resultFuture;
+ }
+
+ try {
+ action.accept(responseFuture);
+
+ // Transform ActionResponse to result type.
+ responseFuture.whenComplete((resp, err) -> {
+ if (err != null) {
+ resultFuture.completeExceptionally(err);
+ } else {
+ resultFuture.complete((R) resp.result());
+ }
+ });
+
+ return resultFuture;
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * Applies deadline to a future using orTimeout.
+ *
+ * @param future The future to apply deadline to.
+ * @param deadline Deadline in monotonic milliseconds, or Long.MAX_VALUE
for no deadline.
+ * @return The future with timeout applied, or the original future if no
deadline.
+ */
+ private static <T> CompletableFuture<T> applyDeadline(CompletableFuture<T>
future, long deadline) {
+ if (deadline == Long.MAX_VALUE) {
+ return future;
+ }
+ long remainingTime = deadline - Utils.monotonicMs();
+ if (remainingTime <= 0) {
+ return CompletableFuture.failedFuture(new TimeoutException());
+ }
+ return future.orTimeout(remainingTime, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Creates a timeout exception for leader wait.
+ */
+ private ReplicationGroupUnavailableException createTimeoutException() {
+ return new ReplicationGroupUnavailableException(
+ groupId(),
+ "Timeout waiting for leader [groupId=" + groupId() + "]."
+ );
+ }
+
+ /**
+ * Waits for leader to become available and then retries the command.
+ */
+ private void waitForLeaderAndRetry(CompletableFuture<ActionResponse>
resultFuture, Command cmd, long deadline) {
+ CompletableFuture<Long> leaderFuture =
leaderAvailabilityState.awaitLeader();
+
+ // Apply timeout if bounded.
+ CompletableFuture<Long> timedLeaderFuture =
applyDeadline(leaderFuture, deadline);
+
+ timedLeaderFuture.whenCompleteAsync((term, waitError) -> {
+ if (waitError != null) {
+ Throwable cause = unwrapCause(waitError);
+ if (cause instanceof TimeoutException) {
+
resultFuture.completeExceptionally(createTimeoutException());
+ } else {
+ resultFuture.completeExceptionally(cause);
+ }
+ return;
+ }
+
+ if (!busyLock.enterBusy()) {
+
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client
is stopping [" + groupId() + "]."));
+ return;
+ }
+
+ try {
+ // Leader is available, now run the command with retry logic.
+ startRetryPhase(resultFuture, cmd, deadline, term);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }, executor);
+ }
+
+ /**
+ * Starts the retry phase after leader is available.
+ */
+ private void startRetryPhase(CompletableFuture<ActionResponse>
resultFuture, Command cmd, long deadline, long term) {
+ Peer targetPeer = resolveInitialPeer();
+ if (targetPeer == null) {
+ resultFuture.completeExceptionally(new
ReplicationGroupUnavailableException(groupId()));
+
+ return;
+ }
+
+ // Check deadline before starting retry phase.
+ long now = Utils.monotonicMs();
+ if (deadline != Long.MAX_VALUE && now >= deadline) {
+ resultFuture.completeExceptionally(createTimeoutException());
+ return;
+ }
+
+ // Use retry timeout bounded by remaining time until deadline.
+ long configTimeout = raftConfiguration.retryTimeoutMillis().value();
+
+ long sendWithRetryTimeoutMillis = deadline == Long.MAX_VALUE ?
configTimeout : Math.min(configTimeout, deadline - now);
+
+ var context = new RetryContext(
+ groupId().toString(),
+ targetPeer,
+ cmd::toStringForLightLogging,
+ createRequestFactory(cmd),
+ sendWithRetryTimeoutMillis,
+ RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+ );
+
+ sendWithRetryWaitingForLeader(resultFuture, context, cmd, deadline,
term);
+ }
+
+ /**
+ * Creates a request factory for the given command.
+ */
+ private Function<Peer, ActionRequest> createRequestFactory(Command cmd) {
+ if (cmd instanceof WriteCommand) {
+ return targetPeer -> MESSAGES_FACTORY.writeActionRequest()
+ .groupId(groupId().toString())
+ .command(commandsMarshaller.marshall(cmd))
+ .deserializedCommand((WriteCommand) cmd)
+ .build();
+ } else {
+ return targetPeer -> MESSAGES_FACTORY.readActionRequest()
+ .groupId(groupId().toString())
+ .command((ReadCommand) cmd)
+ .readOnlySafe(true)
+ .build();
+ }
+ }
+
+ /**
+ * Sends a request with single attempt (no retry on timeout).
+ *
+ * <p>In single-attempt mode, each peer is tried at most once.
+ */
+ private <R extends NetworkMessage> void sendWithRetrySingleAttempt(
+ CompletableFuture<R> fut,
+ RetryContext retryContext
+ ) {
+ if (!busyLock.enterBusy()) {
+ fut.completeExceptionally(stoppingExceptionFactory.create("Raft
client is stopping [" + groupId() + "]."));
+ return;
+ }
+
+ try {
+ long responseTimeout = retryContext.responseTimeoutMillis() ==
RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+ ? throttlingContextHolder.peerRequestTimeoutMillis() :
retryContext.responseTimeoutMillis();
+
+ retryContext.onNewAttempt();
+
+ resolvePeer(retryContext.targetPeer())
+ .thenCompose(node ->
clusterService.messagingService().invoke(node, retryContext.request(),
responseTimeout))
+ .whenComplete((resp, err) -> {
+ if (!busyLock.enterBusy()) {
+
fut.completeExceptionally(stoppingExceptionFactory.create("Raft client is
stopping [" + groupId() + "]."));
+ return;
+ }
+
+ try {
+ if (err != null) {
+ handleThrowableSingleAttempt(fut, err,
retryContext);
+ } else if (resp instanceof ErrorResponse) {
+ handleErrorResponseSingleAttempt(fut,
(ErrorResponse) resp, retryContext);
+ } else if (resp instanceof SMErrorResponse) {
+ handleSmErrorResponse(fut, (SMErrorResponse)
resp, retryContext);
+ } else {
+ leader = retryContext.targetPeer();
+ fut.complete((R) resp);
+ }
+ } catch (Throwable e) {
+ fut.completeExceptionally(e);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ });
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * Validates throwable and returns next peer for retry.
+ *
+ * <p>If the error is not recoverable or no peers are available, completes
the future exceptionally and returns null.
+ *
+ * @param fut Future to complete exceptionally if retry is not possible.
+ * @param err The throwable to check.
+ * @param retryContext Retry context for getting next peer.
+ * @return Next peer for retry, or null if the future was already
completed with an error.
+ */
+ @Nullable
Review Comment:
Shouldn't we place the annotation right before `Peer` return type that may
be `null`?
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -81,11 +119,32 @@ public class PhysicalTopologyAwareRaftGroupService
implements TimeAwareRaftGroup
private final RaftGroupService raftClient;
/** Executor to invoke RPC requests. */
- private final Executor executor;
+ private final ScheduledExecutorService executor;
/** RAFT configuration. */
private final RaftConfiguration raftConfiguration;
+ /** State machine for tracking leader availability. */
+ private final LeaderAvailabilityState leaderAvailabilityState;
+
+ /** Command marshaller. */
+ private final Marshaller commandsMarshaller;
+
+ /** Throttling context holder. */
+ private final ThrottlingContextHolder throttlingContextHolder;
Review Comment:
I see no real usages for this object. Both usages are for throttling
adaptive timeout as invoke response timeout. Yet not in production code, in all
test usages there will be `-1` that is a "marker" to use this throttling
timeout. So, we have some code that (yet) test-only. From the other hand we
already have `raftConfiguration.responseTimeoutMillis().value()` for such case
that may be easily changed for test purposes.
In my opinion, both, this class "timeout related retry approach" and
"adaptive delaying throttling retry approach" should be implementation a retry
strategy interfaces. But I believe this is a ticket about refactoring for
future us. Now I think it's better to just get value from the raft
configuration there.
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -326,7 +397,823 @@ private static boolean recoverable(Throwable t) {
@Override
public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
- return raftClient.run(cmd, timeoutMillis);
+ // Normalize timeout: negative values mean infinite wait.
+ long effectiveTimeout = (timeoutMillis < 0) ? Long.MAX_VALUE :
timeoutMillis;
+ // Wait for leader mode (bounded or infinite).
+ long deadline = Utils.monotonicMsAfter(effectiveTimeout);
+
+ return executeWithBusyLock(responseFuture -> {
+ if (effectiveTimeout == 0) {
+ tryAllPeersOnce(responseFuture, cmd);
+ } else {
+ startRetryPhase(responseFuture, cmd, deadline,
leaderAvailabilityState.currentTerm());
+ }
+ });
+ }
+
+ /**
+ * Resolves initial target peer for a command execution.
+ *
+ * <p>Tries the known leader first, falling back to a random peer if no
leader is known.
+ *
+ * @return Initial target peer, or {@code null}.
+ */
+ @Nullable
+ private Peer resolveInitialPeer() {
+ Peer targetPeer = leader;
+ if (targetPeer == null) {
+ targetPeer = randomNode(null, false);
+ }
+ return targetPeer;
+ }
+
+ /**
+ * Tries all peers once without waiting for leader.
+ *
+ * @param resultFuture Future that completes with the response, or fails
with {@link ReplicationGroupUnavailableException} if no
+ * peer responds successfully.
+ * @param cmd The command to execute.
+ */
+ private void tryAllPeersOnce(CompletableFuture<ActionResponse>
resultFuture, Command cmd) {
+ Peer targetPeer = resolveInitialPeer();
+ if (targetPeer == null) {
+ resultFuture.completeExceptionally(new
ReplicationGroupUnavailableException(groupId()));
+
+ return;
+ }
+
+ var context = new RetryContext(
+ groupId().toString(),
+ targetPeer,
+ cmd::toStringForLightLogging,
+ createRequestFactory(cmd),
+ 0, // Single attempt - no retry timeout
+ RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+ );
+
+ sendWithRetrySingleAttempt(resultFuture, context);
+ }
+
+ /**
+ * Executes an action within busy lock and transforms the response.
+ */
+ private <R> CompletableFuture<R>
executeWithBusyLock(Consumer<CompletableFuture<ActionResponse>> action) {
+ var responseFuture = new CompletableFuture<ActionResponse>();
+ var resultFuture = new CompletableFuture<R>();
+
+ if (!busyLock.enterBusy()) {
+
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client
is stopping [" + groupId() + "]."));
Review Comment:
There and below in the file.
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -326,7 +397,823 @@ private static boolean recoverable(Throwable t) {
@Override
public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
- return raftClient.run(cmd, timeoutMillis);
+ // Normalize timeout: negative values mean infinite wait.
+ long effectiveTimeout = (timeoutMillis < 0) ? Long.MAX_VALUE :
timeoutMillis;
+ // Wait for leader mode (bounded or infinite).
+ long deadline = Utils.monotonicMsAfter(effectiveTimeout);
+
+ return executeWithBusyLock(responseFuture -> {
+ if (effectiveTimeout == 0) {
+ tryAllPeersOnce(responseFuture, cmd);
+ } else {
+ startRetryPhase(responseFuture, cmd, deadline,
leaderAvailabilityState.currentTerm());
+ }
+ });
+ }
+
+ /**
+ * Resolves initial target peer for a command execution.
+ *
+ * <p>Tries the known leader first, falling back to a random peer if no
leader is known.
+ *
+ * @return Initial target peer, or {@code null}.
+ */
+ @Nullable
+ private Peer resolveInitialPeer() {
+ Peer targetPeer = leader;
+ if (targetPeer == null) {
+ targetPeer = randomNode(null, false);
+ }
+ return targetPeer;
+ }
+
+ /**
+ * Tries all peers once without waiting for leader.
+ *
+ * @param resultFuture Future that completes with the response, or fails
with {@link ReplicationGroupUnavailableException} if no
+ * peer responds successfully.
+ * @param cmd The command to execute.
+ */
+ private void tryAllPeersOnce(CompletableFuture<ActionResponse>
resultFuture, Command cmd) {
+ Peer targetPeer = resolveInitialPeer();
+ if (targetPeer == null) {
+ resultFuture.completeExceptionally(new
ReplicationGroupUnavailableException(groupId()));
+
+ return;
+ }
+
+ var context = new RetryContext(
+ groupId().toString(),
+ targetPeer,
+ cmd::toStringForLightLogging,
+ createRequestFactory(cmd),
+ 0, // Single attempt - no retry timeout
+ RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+ );
+
+ sendWithRetrySingleAttempt(resultFuture, context);
+ }
+
+ /**
+ * Executes an action within busy lock and transforms the response.
+ */
+ private <R> CompletableFuture<R>
executeWithBusyLock(Consumer<CompletableFuture<ActionResponse>> action) {
+ var responseFuture = new CompletableFuture<ActionResponse>();
+ var resultFuture = new CompletableFuture<R>();
+
+ if (!busyLock.enterBusy()) {
+
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client
is stopping [" + groupId() + "]."));
+
+ return resultFuture;
+ }
+
+ try {
+ action.accept(responseFuture);
+
+ // Transform ActionResponse to result type.
+ responseFuture.whenComplete((resp, err) -> {
+ if (err != null) {
+ resultFuture.completeExceptionally(err);
+ } else {
+ resultFuture.complete((R) resp.result());
+ }
+ });
+
+ return resultFuture;
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * Applies deadline to a future using orTimeout.
+ *
+ * @param future The future to apply deadline to.
+ * @param deadline Deadline in monotonic milliseconds, or Long.MAX_VALUE
for no deadline.
+ * @return The future with timeout applied, or the original future if no
deadline.
+ */
+ private static <T> CompletableFuture<T> applyDeadline(CompletableFuture<T>
future, long deadline) {
+ if (deadline == Long.MAX_VALUE) {
+ return future;
+ }
+ long remainingTime = deadline - Utils.monotonicMs();
+ if (remainingTime <= 0) {
+ return CompletableFuture.failedFuture(new TimeoutException());
+ }
+ return future.orTimeout(remainingTime, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Creates a timeout exception for leader wait.
+ */
+ private ReplicationGroupUnavailableException createTimeoutException() {
+ return new ReplicationGroupUnavailableException(
+ groupId(),
+ "Timeout waiting for leader [groupId=" + groupId() + "]."
+ );
+ }
+
+ /**
+ * Waits for leader to become available and then retries the command.
+ */
+ private void waitForLeaderAndRetry(CompletableFuture<ActionResponse>
resultFuture, Command cmd, long deadline) {
+ CompletableFuture<Long> leaderFuture =
leaderAvailabilityState.awaitLeader();
+
+ // Apply timeout if bounded.
+ CompletableFuture<Long> timedLeaderFuture =
applyDeadline(leaderFuture, deadline);
+
+ timedLeaderFuture.whenCompleteAsync((term, waitError) -> {
+ if (waitError != null) {
+ Throwable cause = unwrapCause(waitError);
+ if (cause instanceof TimeoutException) {
+
resultFuture.completeExceptionally(createTimeoutException());
+ } else {
+ resultFuture.completeExceptionally(cause);
+ }
+ return;
+ }
+
+ if (!busyLock.enterBusy()) {
+
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client
is stopping [" + groupId() + "]."));
+ return;
+ }
+
+ try {
+ // Leader is available, now run the command with retry logic.
+ startRetryPhase(resultFuture, cmd, deadline, term);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }, executor);
+ }
+
+ /**
+ * Starts the retry phase after leader is available.
+ */
+ private void startRetryPhase(CompletableFuture<ActionResponse>
resultFuture, Command cmd, long deadline, long term) {
+ Peer targetPeer = resolveInitialPeer();
+ if (targetPeer == null) {
+ resultFuture.completeExceptionally(new
ReplicationGroupUnavailableException(groupId()));
+
+ return;
+ }
+
+ // Check deadline before starting retry phase.
+ long now = Utils.monotonicMs();
+ if (deadline != Long.MAX_VALUE && now >= deadline) {
+ resultFuture.completeExceptionally(createTimeoutException());
+ return;
+ }
+
+ // Use retry timeout bounded by remaining time until deadline.
+ long configTimeout = raftConfiguration.retryTimeoutMillis().value();
+
+ long sendWithRetryTimeoutMillis = deadline == Long.MAX_VALUE ?
configTimeout : Math.min(configTimeout, deadline - now);
+
+ var context = new RetryContext(
+ groupId().toString(),
+ targetPeer,
+ cmd::toStringForLightLogging,
+ createRequestFactory(cmd),
+ sendWithRetryTimeoutMillis,
+ RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+ );
+
+ sendWithRetryWaitingForLeader(resultFuture, context, cmd, deadline,
term);
+ }
+
+ /**
+ * Creates a request factory for the given command.
+ */
+ private Function<Peer, ActionRequest> createRequestFactory(Command cmd) {
+ if (cmd instanceof WriteCommand) {
+ return targetPeer -> MESSAGES_FACTORY.writeActionRequest()
+ .groupId(groupId().toString())
+ .command(commandsMarshaller.marshall(cmd))
+ .deserializedCommand((WriteCommand) cmd)
+ .build();
+ } else {
+ return targetPeer -> MESSAGES_FACTORY.readActionRequest()
+ .groupId(groupId().toString())
+ .command((ReadCommand) cmd)
+ .readOnlySafe(true)
+ .build();
+ }
+ }
+
+ /**
+ * Sends a request with single attempt (no retry on timeout).
+ *
+ * <p>In single-attempt mode, each peer is tried at most once.
+ */
+ private <R extends NetworkMessage> void sendWithRetrySingleAttempt(
+ CompletableFuture<R> fut,
+ RetryContext retryContext
+ ) {
+ if (!busyLock.enterBusy()) {
+ fut.completeExceptionally(stoppingExceptionFactory.create("Raft
client is stopping [" + groupId() + "]."));
+ return;
+ }
+
+ try {
+ long responseTimeout = retryContext.responseTimeoutMillis() ==
RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+ ? throttlingContextHolder.peerRequestTimeoutMillis() :
retryContext.responseTimeoutMillis();
+
+ retryContext.onNewAttempt();
+
+ resolvePeer(retryContext.targetPeer())
+ .thenCompose(node ->
clusterService.messagingService().invoke(node, retryContext.request(),
responseTimeout))
+ .whenComplete((resp, err) -> {
+ if (!busyLock.enterBusy()) {
+
fut.completeExceptionally(stoppingExceptionFactory.create("Raft client is
stopping [" + groupId() + "]."));
+ return;
+ }
+
+ try {
+ if (err != null) {
+ handleThrowableSingleAttempt(fut, err,
retryContext);
+ } else if (resp instanceof ErrorResponse) {
+ handleErrorResponseSingleAttempt(fut,
(ErrorResponse) resp, retryContext);
+ } else if (resp instanceof SMErrorResponse) {
+ handleSmErrorResponse(fut, (SMErrorResponse)
resp, retryContext);
+ } else {
+ leader = retryContext.targetPeer();
+ fut.complete((R) resp);
+ }
+ } catch (Throwable e) {
+ fut.completeExceptionally(e);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ });
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * Validates throwable and returns next peer for retry.
+ *
+ * <p>If the error is not recoverable or no peers are available, completes
the future exceptionally and returns null.
+ *
+ * @param fut Future to complete exceptionally if retry is not possible.
+ * @param err The throwable to check.
+ * @param retryContext Retry context for getting next peer.
+ * @return Next peer for retry, or null if the future was already
completed with an error.
+ */
+ @Nullable
+ private Peer getNextPeerForRecoverableError(CompletableFuture<?> fut,
Throwable err, RetryContext retryContext) {
+ err = unwrapCause(err);
+
+ if (!recoverable(err)) {
+ fut.completeExceptionally(err);
+
+ return null;
+ }
+
+ Peer nextPeer = randomNode(retryContext, false);
+
+ if (nextPeer == null) {
+ fut.completeExceptionally(new
ReplicationGroupUnavailableException(groupId()));
+
+ return null;
+ }
+
+ return nextPeer;
+ }
+
+ private static void logRecoverableError(RetryContext retryContext, Peer
nextPeer) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Recoverable error during the request occurred (will be
retried) [request={}, peer={}, newPeer={}, traceId={}].",
+ includeSensitive() ? retryContext.request() :
retryContext.request().toStringForLightLogging(),
+ retryContext.targetPeer(),
+ nextPeer,
+ retryContext.errorTraceId()
+ );
+ }
+ }
+
+ /**
+ * Handles throwable in single attempt mode.
+ */
+ private void handleThrowableSingleAttempt(
+ CompletableFuture<? extends NetworkMessage> fut,
+ Throwable err,
+ RetryContext retryContext
+ ) {
+ Peer nextPeer = getNextPeerForRecoverableError(fut, err, retryContext);
+
+ if (nextPeer == null) {
+ return;
+ }
+
+ logRecoverableError(retryContext, nextPeer);
+
+ String shortReasonMessage = "Peer " +
retryContext.targetPeer().consistentId()
+ + " threw " + unwrapCause(err).getClass().getSimpleName();
+ sendWithRetrySingleAttempt(fut,
retryContext.nextAttemptForUnavailablePeer(nextPeer, shortReasonMessage));
+ }
+
+ /**
+ * Handles error response in single attempt mode.
+ *
+ * <p>In single-attempt mode, each peer is tried at most once. All errors
mark the peer
+ * as tried and move to the next peer. When all peers have been tried, the
request fails
+ * with {@link ReplicationGroupUnavailableException}.
+ */
+ private void handleErrorResponseSingleAttempt(
+ CompletableFuture<? extends NetworkMessage> fut,
+ ErrorResponse resp,
+ RetryContext retryContext
+ ) {
+ RetryExecutionStrategy strategy = new RetryExecutionStrategy() {
+ @Override
+ public void executeRetry(RetryContext context, Peer nextPeer,
@Nullable PeerTracking trackCurrentAs, String reason) {
+ // Single-attempt mode: ALWAYS mark current peer as
unavailable, regardless of trackCurrentAs.
+ // This prevents retrying the same peer and matches original
behavior where:
+ // - transient errors: would select new peer, mark current
unavailable
+ // - peer unavailable: marks current unavailable
+ // - no leader: marks current unavailable (no NO_LEADER
distinction needed)
+ // - leader redirect: marks current unavailable (to prevent
redirect loops)
+ sendWithRetrySingleAttempt(fut,
context.nextAttemptForUnavailablePeer(nextPeer, reason));
+ }
+
+ @Override
+ public void onAllPeersExhausted() {
+ fut.completeExceptionally(new
ReplicationGroupUnavailableException(groupId()));
+ }
+ };
+
+ handleErrorResponseCommon(fut, resp, retryContext, strategy, false);
+ }
+
+ /**
+ * Sends a request with retry and waits for leader on leader absence.
+ */
+ private void sendWithRetryWaitingForLeader(
+ CompletableFuture<ActionResponse> fut,
+ RetryContext retryContext,
+ Command cmd,
+ long deadline,
+ long termWhenStarted
+ ) {
+ if (!busyLock.enterBusy()) {
+ fut.completeExceptionally(stoppingExceptionFactory.create("Raft
client is stopping [" + groupId() + "]."));
+ return;
+ }
+
+ try {
+ long requestStartTime = Utils.monotonicMs();
+ long stopTime = retryContext.stopTime();
+
+ if (requestStartTime >= stopTime) {
+ // Retry timeout expired - fail with timeout exception.
+ // For non-leader-related retriable errors, we don't wait for
leader, just fail.
+ fut.completeExceptionally(createTimeoutException());
+ return;
+ }
+
+ ThrottlingContextHolder peerThrottlingContextHolder =
throttlingContextHolder.peerContextHolder(
+ retryContext.targetPeer().consistentId()
+ );
+
+ peerThrottlingContextHolder.beforeRequest();
+ retryContext.onNewAttempt();
+
+ // Bound response timeout by remaining time until retry stop time.
+ long responseTimeout =
Math.min(peerThrottlingContextHolder.peerRequestTimeoutMillis(), stopTime -
requestStartTime);
+
+ resolvePeer(retryContext.targetPeer())
+ .thenCompose(node ->
clusterService.messagingService().invoke(node, retryContext.request(),
responseTimeout))
+ // Enforce timeout even if messaging service doesn't
(e.g., in tests with mocks).
+ .orTimeout(responseTimeout > 0 ? responseTimeout :
Long.MAX_VALUE, TimeUnit.MILLISECONDS)
+ .whenComplete((resp, err) -> {
+
peerThrottlingContextHolder.afterRequest(requestStartTime, retriableError(err,
resp));
+
+ if (!busyLock.enterBusy()) {
+
fut.completeExceptionally(stoppingExceptionFactory.create("Raft client is
stopping [" + groupId() + "]."));
+ return;
+ }
+
+ try {
+ if (err != null) {
+ handleThrowableWithLeaderWait(fut, err,
retryContext, cmd, deadline, termWhenStarted);
+ } else if (resp instanceof ErrorResponse) {
+ handleErrorResponseWithLeaderWait(fut,
(ErrorResponse) resp, retryContext, cmd, deadline, termWhenStarted);
+ } else if (resp instanceof SMErrorResponse) {
+ handleSmErrorResponse(fut, (SMErrorResponse)
resp, retryContext);
+ } else {
+ leader = retryContext.targetPeer();
+ fut.complete((ActionResponse) resp);
+ }
+ } catch (Throwable e) {
+ fut.completeExceptionally(e);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ });
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * Handles throwable in leader-wait mode.
+ */
+ private void handleThrowableWithLeaderWait(
+ CompletableFuture<ActionResponse> fut,
+ Throwable err,
+ RetryContext retryContext,
+ Command cmd,
+ long deadline,
+ long termWhenStarted
+ ) {
+ Peer nextPeer = getNextPeerForRecoverableError(fut, err, retryContext);
+
+ if (nextPeer == null) {
+ return;
+ }
+
+ logRecoverableError(retryContext, nextPeer);
+
+ String shortReasonMessage = "Peer " +
retryContext.targetPeer().consistentId()
+ + " threw " + unwrapCause(err).getClass().getSimpleName();
+ scheduleRetryWithLeaderWait(fut, retryContext.nextAttempt(nextPeer,
shortReasonMessage), cmd, deadline, termWhenStarted);
+ }
+
+ /**
+ * Handles error response in leader-wait mode.
+ *
+ * <p>In leader-wait mode:
+ * <ul>
+ * <li>Transient errors (EBUSY/EAGAIN) retry on the same peer after
delay</li>
+ * <li>"No leader" errors try each peer once, then wait for leader
notification</li>
+ * <li>Peer unavailability errors cycle through peers until
timeout</li>
+ * </ul>
+ */
+ private void handleErrorResponseWithLeaderWait(
+ CompletableFuture<ActionResponse> fut,
+ ErrorResponse resp,
+ RetryContext retryContext,
+ Command cmd,
+ long deadline,
+ long termWhenStarted
+ ) {
+ RetryExecutionStrategy strategy = new RetryExecutionStrategy() {
Review Comment:
This and there I believe we should have named implementations. Moreover, I
believe that `RetryExecutionStrategy` will be useful in a future refactoring
that I mentioned. Let's extract it as a separate interface?
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -326,7 +397,823 @@ private static boolean recoverable(Throwable t) {
@Override
public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
- return raftClient.run(cmd, timeoutMillis);
+ // Normalize timeout: negative values mean infinite wait.
+ long effectiveTimeout = (timeoutMillis < 0) ? Long.MAX_VALUE :
timeoutMillis;
+ // Wait for leader mode (bounded or infinite).
+ long deadline = Utils.monotonicMsAfter(effectiveTimeout);
+
+ return executeWithBusyLock(responseFuture -> {
+ if (effectiveTimeout == 0) {
+ tryAllPeersOnce(responseFuture, cmd);
+ } else {
+ startRetryPhase(responseFuture, cmd, deadline,
leaderAvailabilityState.currentTerm());
+ }
+ });
+ }
+
+ /**
+ * Resolves initial target peer for a command execution.
+ *
+ * <p>Tries the known leader first, falling back to a random peer if no
leader is known.
+ *
+ * @return Initial target peer, or {@code null}.
+ */
+ @Nullable
+ private Peer resolveInitialPeer() {
+ Peer targetPeer = leader;
+ if (targetPeer == null) {
+ targetPeer = randomNode(null, false);
+ }
+ return targetPeer;
+ }
+
+ /**
+ * Tries all peers once without waiting for leader.
+ *
+ * @param resultFuture Future that completes with the response, or fails
with {@link ReplicationGroupUnavailableException} if no
+ * peer responds successfully.
+ * @param cmd The command to execute.
+ */
+ private void tryAllPeersOnce(CompletableFuture<ActionResponse>
resultFuture, Command cmd) {
+ Peer targetPeer = resolveInitialPeer();
+ if (targetPeer == null) {
+ resultFuture.completeExceptionally(new
ReplicationGroupUnavailableException(groupId()));
+
+ return;
+ }
+
+ var context = new RetryContext(
+ groupId().toString(),
+ targetPeer,
+ cmd::toStringForLightLogging,
+ createRequestFactory(cmd),
+ 0, // Single attempt - no retry timeout
+ RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+ );
+
+ sendWithRetrySingleAttempt(resultFuture, context);
+ }
+
+ /**
+ * Executes an action within busy lock and transforms the response.
+ */
+ private <R> CompletableFuture<R>
executeWithBusyLock(Consumer<CompletableFuture<ActionResponse>> action) {
+ var responseFuture = new CompletableFuture<ActionResponse>();
+ var resultFuture = new CompletableFuture<R>();
+
+ if (!busyLock.enterBusy()) {
+
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client
is stopping [" + groupId() + "]."));
+
+ return resultFuture;
+ }
+
+ try {
+ action.accept(responseFuture);
+
+ // Transform ActionResponse to result type.
+ responseFuture.whenComplete((resp, err) -> {
+ if (err != null) {
+ resultFuture.completeExceptionally(err);
+ } else {
+ resultFuture.complete((R) resp.result());
+ }
+ });
+
+ return resultFuture;
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * Applies deadline to a future using orTimeout.
+ *
+ * @param future The future to apply deadline to.
+ * @param deadline Deadline in monotonic milliseconds, or Long.MAX_VALUE
for no deadline.
+ * @return The future with timeout applied, or the original future if no
deadline.
+ */
+ private static <T> CompletableFuture<T> applyDeadline(CompletableFuture<T>
future, long deadline) {
+ if (deadline == Long.MAX_VALUE) {
+ return future;
+ }
+ long remainingTime = deadline - Utils.monotonicMs();
+ if (remainingTime <= 0) {
+ return CompletableFuture.failedFuture(new TimeoutException());
+ }
+ return future.orTimeout(remainingTime, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Creates a timeout exception for leader wait.
+ */
+ private ReplicationGroupUnavailableException createTimeoutException() {
+ return new ReplicationGroupUnavailableException(
+ groupId(),
+ "Timeout waiting for leader [groupId=" + groupId() + "]."
+ );
+ }
+
+ /**
+ * Waits for leader to become available and then retries the command.
+ */
+ private void waitForLeaderAndRetry(CompletableFuture<ActionResponse>
resultFuture, Command cmd, long deadline) {
+ CompletableFuture<Long> leaderFuture =
leaderAvailabilityState.awaitLeader();
+
+ // Apply timeout if bounded.
+ CompletableFuture<Long> timedLeaderFuture =
applyDeadline(leaderFuture, deadline);
+
+ timedLeaderFuture.whenCompleteAsync((term, waitError) -> {
+ if (waitError != null) {
+ Throwable cause = unwrapCause(waitError);
+ if (cause instanceof TimeoutException) {
+
resultFuture.completeExceptionally(createTimeoutException());
+ } else {
+ resultFuture.completeExceptionally(cause);
+ }
+ return;
+ }
+
+ if (!busyLock.enterBusy()) {
+
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client
is stopping [" + groupId() + "]."));
+ return;
+ }
+
+ try {
+ // Leader is available, now run the command with retry logic.
+ startRetryPhase(resultFuture, cmd, deadline, term);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }, executor);
+ }
+
+ /**
+ * Starts the retry phase after leader is available.
+ */
+ private void startRetryPhase(CompletableFuture<ActionResponse>
resultFuture, Command cmd, long deadline, long term) {
+ Peer targetPeer = resolveInitialPeer();
+ if (targetPeer == null) {
+ resultFuture.completeExceptionally(new
ReplicationGroupUnavailableException(groupId()));
+
+ return;
+ }
+
+ // Check deadline before starting retry phase.
+ long now = Utils.monotonicMs();
+ if (deadline != Long.MAX_VALUE && now >= deadline) {
+ resultFuture.completeExceptionally(createTimeoutException());
+ return;
+ }
+
+ // Use retry timeout bounded by remaining time until deadline.
+ long configTimeout = raftConfiguration.retryTimeoutMillis().value();
+
+ long sendWithRetryTimeoutMillis = deadline == Long.MAX_VALUE ?
configTimeout : Math.min(configTimeout, deadline - now);
+
+ var context = new RetryContext(
+ groupId().toString(),
+ targetPeer,
+ cmd::toStringForLightLogging,
+ createRequestFactory(cmd),
+ sendWithRetryTimeoutMillis,
+ RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+ );
+
+ sendWithRetryWaitingForLeader(resultFuture, context, cmd, deadline,
term);
+ }
+
+ /**
+ * Creates a request factory for the given command.
+ */
+ private Function<Peer, ActionRequest> createRequestFactory(Command cmd) {
+ if (cmd instanceof WriteCommand) {
+ return targetPeer -> MESSAGES_FACTORY.writeActionRequest()
+ .groupId(groupId().toString())
+ .command(commandsMarshaller.marshall(cmd))
+ .deserializedCommand((WriteCommand) cmd)
+ .build();
+ } else {
+ return targetPeer -> MESSAGES_FACTORY.readActionRequest()
+ .groupId(groupId().toString())
+ .command((ReadCommand) cmd)
+ .readOnlySafe(true)
+ .build();
+ }
+ }
+
+ /**
+ * Sends a request with single attempt (no retry on timeout).
+ *
+ * <p>In single-attempt mode, each peer is tried at most once.
+ */
+ private <R extends NetworkMessage> void sendWithRetrySingleAttempt(
+ CompletableFuture<R> fut,
+ RetryContext retryContext
+ ) {
+ if (!busyLock.enterBusy()) {
+ fut.completeExceptionally(stoppingExceptionFactory.create("Raft
client is stopping [" + groupId() + "]."));
+ return;
+ }
+
+ try {
+ long responseTimeout = retryContext.responseTimeoutMillis() ==
RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+ ? throttlingContextHolder.peerRequestTimeoutMillis() :
retryContext.responseTimeoutMillis();
+
+ retryContext.onNewAttempt();
+
+ resolvePeer(retryContext.targetPeer())
+ .thenCompose(node ->
clusterService.messagingService().invoke(node, retryContext.request(),
responseTimeout))
+ .whenComplete((resp, err) -> {
+ if (!busyLock.enterBusy()) {
+
fut.completeExceptionally(stoppingExceptionFactory.create("Raft client is
stopping [" + groupId() + "]."));
+ return;
+ }
+
+ try {
+ if (err != null) {
+ handleThrowableSingleAttempt(fut, err,
retryContext);
+ } else if (resp instanceof ErrorResponse) {
+ handleErrorResponseSingleAttempt(fut,
(ErrorResponse) resp, retryContext);
+ } else if (resp instanceof SMErrorResponse) {
+ handleSmErrorResponse(fut, (SMErrorResponse)
resp, retryContext);
+ } else {
+ leader = retryContext.targetPeer();
+ fut.complete((R) resp);
+ }
+ } catch (Throwable e) {
+ fut.completeExceptionally(e);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ });
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * Validates throwable and returns next peer for retry.
Review Comment:
I think it's better to decouple validation and next peer selection to
separate methods. On non recoverable throwable you will return `null`. About
validation I know from the code and javadoc, but method name doesn't tell us
about it.
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -326,7 +397,823 @@ private static boolean recoverable(Throwable t) {
@Override
public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
- return raftClient.run(cmd, timeoutMillis);
+ // Normalize timeout: negative values mean infinite wait.
+ long effectiveTimeout = (timeoutMillis < 0) ? Long.MAX_VALUE :
timeoutMillis;
+ // Wait for leader mode (bounded or infinite).
+ long deadline = Utils.monotonicMsAfter(effectiveTimeout);
+
+ return executeWithBusyLock(responseFuture -> {
+ if (effectiveTimeout == 0) {
+ tryAllPeersOnce(responseFuture, cmd);
+ } else {
+ startRetryPhase(responseFuture, cmd, deadline,
leaderAvailabilityState.currentTerm());
+ }
+ });
+ }
+
+ /**
+ * Resolves initial target peer for a command execution.
+ *
+ * <p>Tries the known leader first, falling back to a random peer if no
leader is known.
+ *
+ * @return Initial target peer, or {@code null}.
+ */
+ @Nullable
+ private Peer resolveInitialPeer() {
+ Peer targetPeer = leader;
+ if (targetPeer == null) {
+ targetPeer = randomNode(null, false);
+ }
+ return targetPeer;
+ }
+
+ /**
+ * Tries all peers once without waiting for leader.
+ *
+ * @param resultFuture Future that completes with the response, or fails
with {@link ReplicationGroupUnavailableException} if no
+ * peer responds successfully.
+ * @param cmd The command to execute.
+ */
+ private void tryAllPeersOnce(CompletableFuture<ActionResponse>
resultFuture, Command cmd) {
+ Peer targetPeer = resolveInitialPeer();
+ if (targetPeer == null) {
+ resultFuture.completeExceptionally(new
ReplicationGroupUnavailableException(groupId()));
+
+ return;
+ }
+
+ var context = new RetryContext(
+ groupId().toString(),
+ targetPeer,
+ cmd::toStringForLightLogging,
+ createRequestFactory(cmd),
+ 0, // Single attempt - no retry timeout
+ RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+ );
+
+ sendWithRetrySingleAttempt(resultFuture, context);
+ }
+
+ /**
+ * Executes an action within busy lock and transforms the response.
+ */
+ private <R> CompletableFuture<R>
executeWithBusyLock(Consumer<CompletableFuture<ActionResponse>> action) {
+ var responseFuture = new CompletableFuture<ActionResponse>();
+ var resultFuture = new CompletableFuture<R>();
+
+ if (!busyLock.enterBusy()) {
+
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client
is stopping [" + groupId() + "]."));
Review Comment:
`[groupId=" + groupId() + "].`
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -326,7 +397,823 @@ private static boolean recoverable(Throwable t) {
@Override
public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
- return raftClient.run(cmd, timeoutMillis);
+ // Normalize timeout: negative values mean infinite wait.
+ long effectiveTimeout = (timeoutMillis < 0) ? Long.MAX_VALUE :
timeoutMillis;
+ // Wait for leader mode (bounded or infinite).
+ long deadline = Utils.monotonicMsAfter(effectiveTimeout);
+
+ return executeWithBusyLock(responseFuture -> {
+ if (effectiveTimeout == 0) {
+ tryAllPeersOnce(responseFuture, cmd);
+ } else {
+ startRetryPhase(responseFuture, cmd, deadline,
leaderAvailabilityState.currentTerm());
+ }
+ });
+ }
+
+ /**
+ * Resolves initial target peer for a command execution.
+ *
+ * <p>Tries the known leader first, falling back to a random peer if no
leader is known.
+ *
+ * @return Initial target peer, or {@code null}.
+ */
+ @Nullable
+ private Peer resolveInitialPeer() {
+ Peer targetPeer = leader;
+ if (targetPeer == null) {
+ targetPeer = randomNode(null, false);
+ }
+ return targetPeer;
+ }
+
+ /**
+ * Tries all peers once without waiting for leader.
+ *
+ * @param resultFuture Future that completes with the response, or fails
with {@link ReplicationGroupUnavailableException} if no
+ * peer responds successfully.
+ * @param cmd The command to execute.
+ */
+ private void tryAllPeersOnce(CompletableFuture<ActionResponse>
resultFuture, Command cmd) {
+ Peer targetPeer = resolveInitialPeer();
+ if (targetPeer == null) {
+ resultFuture.completeExceptionally(new
ReplicationGroupUnavailableException(groupId()));
+
+ return;
+ }
+
+ var context = new RetryContext(
+ groupId().toString(),
+ targetPeer,
+ cmd::toStringForLightLogging,
+ createRequestFactory(cmd),
+ 0, // Single attempt - no retry timeout
+ RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+ );
+
+ sendWithRetrySingleAttempt(resultFuture, context);
+ }
+
+ /**
+ * Executes an action within busy lock and transforms the response.
+ */
+ private <R> CompletableFuture<R>
executeWithBusyLock(Consumer<CompletableFuture<ActionResponse>> action) {
+ var responseFuture = new CompletableFuture<ActionResponse>();
+ var resultFuture = new CompletableFuture<R>();
+
+ if (!busyLock.enterBusy()) {
+
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client
is stopping [" + groupId() + "]."));
+
+ return resultFuture;
+ }
+
+ try {
+ action.accept(responseFuture);
+
+ // Transform ActionResponse to result type.
+ responseFuture.whenComplete((resp, err) -> {
+ if (err != null) {
+ resultFuture.completeExceptionally(err);
+ } else {
+ resultFuture.complete((R) resp.result());
+ }
+ });
+
+ return resultFuture;
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * Applies deadline to a future using orTimeout.
+ *
+ * @param future The future to apply deadline to.
+ * @param deadline Deadline in monotonic milliseconds, or Long.MAX_VALUE
for no deadline.
+ * @return The future with timeout applied, or the original future if no
deadline.
+ */
+ private static <T> CompletableFuture<T> applyDeadline(CompletableFuture<T>
future, long deadline) {
+ if (deadline == Long.MAX_VALUE) {
+ return future;
+ }
+ long remainingTime = deadline - Utils.monotonicMs();
+ if (remainingTime <= 0) {
+ return CompletableFuture.failedFuture(new TimeoutException());
+ }
+ return future.orTimeout(remainingTime, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Creates a timeout exception for leader wait.
+ */
+ private ReplicationGroupUnavailableException createTimeoutException() {
+ return new ReplicationGroupUnavailableException(
+ groupId(),
+ "Timeout waiting for leader [groupId=" + groupId() + "]."
+ );
+ }
+
+ /**
+ * Waits for leader to become available and then retries the command.
+ */
+ private void waitForLeaderAndRetry(CompletableFuture<ActionResponse>
resultFuture, Command cmd, long deadline) {
+ CompletableFuture<Long> leaderFuture =
leaderAvailabilityState.awaitLeader();
+
+ // Apply timeout if bounded.
+ CompletableFuture<Long> timedLeaderFuture =
applyDeadline(leaderFuture, deadline);
+
+ timedLeaderFuture.whenCompleteAsync((term, waitError) -> {
+ if (waitError != null) {
+ Throwable cause = unwrapCause(waitError);
+ if (cause instanceof TimeoutException) {
+
resultFuture.completeExceptionally(createTimeoutException());
+ } else {
+ resultFuture.completeExceptionally(cause);
+ }
+ return;
+ }
+
+ if (!busyLock.enterBusy()) {
+
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client
is stopping [" + groupId() + "]."));
+ return;
+ }
+
+ try {
+ // Leader is available, now run the command with retry logic.
+ startRetryPhase(resultFuture, cmd, deadline, term);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }, executor);
+ }
+
+ /**
+ * Starts the retry phase after leader is available.
+ */
+ private void startRetryPhase(CompletableFuture<ActionResponse>
resultFuture, Command cmd, long deadline, long term) {
+ Peer targetPeer = resolveInitialPeer();
+ if (targetPeer == null) {
+ resultFuture.completeExceptionally(new
ReplicationGroupUnavailableException(groupId()));
+
+ return;
+ }
+
+ // Check deadline before starting retry phase.
+ long now = Utils.monotonicMs();
+ if (deadline != Long.MAX_VALUE && now >= deadline) {
+ resultFuture.completeExceptionally(createTimeoutException());
+ return;
+ }
+
+ // Use retry timeout bounded by remaining time until deadline.
+ long configTimeout = raftConfiguration.retryTimeoutMillis().value();
+
+ long sendWithRetryTimeoutMillis = deadline == Long.MAX_VALUE ?
configTimeout : Math.min(configTimeout, deadline - now);
+
+ var context = new RetryContext(
+ groupId().toString(),
+ targetPeer,
+ cmd::toStringForLightLogging,
+ createRequestFactory(cmd),
+ sendWithRetryTimeoutMillis,
+ RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+ );
+
+ sendWithRetryWaitingForLeader(resultFuture, context, cmd, deadline,
term);
+ }
+
+ /**
+ * Creates a request factory for the given command.
+ */
+ private Function<Peer, ActionRequest> createRequestFactory(Command cmd) {
+ if (cmd instanceof WriteCommand) {
+ return targetPeer -> MESSAGES_FACTORY.writeActionRequest()
+ .groupId(groupId().toString())
+ .command(commandsMarshaller.marshall(cmd))
+ .deserializedCommand((WriteCommand) cmd)
+ .build();
+ } else {
+ return targetPeer -> MESSAGES_FACTORY.readActionRequest()
+ .groupId(groupId().toString())
+ .command((ReadCommand) cmd)
+ .readOnlySafe(true)
+ .build();
+ }
+ }
+
+ /**
+ * Sends a request with single attempt (no retry on timeout).
+ *
+ * <p>In single-attempt mode, each peer is tried at most once.
+ */
+ private <R extends NetworkMessage> void sendWithRetrySingleAttempt(
+ CompletableFuture<R> fut,
+ RetryContext retryContext
+ ) {
+ if (!busyLock.enterBusy()) {
+ fut.completeExceptionally(stoppingExceptionFactory.create("Raft
client is stopping [" + groupId() + "]."));
+ return;
+ }
+
+ try {
+ long responseTimeout = retryContext.responseTimeoutMillis() ==
RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+ ? throttlingContextHolder.peerRequestTimeoutMillis() :
retryContext.responseTimeoutMillis();
+
+ retryContext.onNewAttempt();
+
+ resolvePeer(retryContext.targetPeer())
+ .thenCompose(node ->
clusterService.messagingService().invoke(node, retryContext.request(),
responseTimeout))
+ .whenComplete((resp, err) -> {
+ if (!busyLock.enterBusy()) {
+
fut.completeExceptionally(stoppingExceptionFactory.create("Raft client is
stopping [" + groupId() + "]."));
+ return;
+ }
+
+ try {
+ if (err != null) {
+ handleThrowableSingleAttempt(fut, err,
retryContext);
Review Comment:
This and there I'd to suggest return a _value_ to complete future, not to
handle future inside in some curly way. Let's case study:
`handleErrorResponseSingleAttempt` has a pretty complex logic, and in some
cases we even don't want to complete the future (on transient error codes).
Also we may see that `handleSmErrorResponse` is a copy-paste mostly from the
general raft client implementation. And it definitely feels like this methods
shouldn't be `static` - they contain a logic to handle and the result should be
a value to complete future there, not to pass it through the long `static`
method chain.
If you ask me, it may be the common `ErrorResponseHandlerOnRetry` interface
with `@Nullable Throwable handleErrorResponse(ErrorResponse, RetryContext)`
method. The return value may be null in case non-exceptional null completeness
of the future. WDYT?
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -326,7 +397,823 @@ private static boolean recoverable(Throwable t) {
@Override
public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
- return raftClient.run(cmd, timeoutMillis);
+ // Normalize timeout: negative values mean infinite wait.
+ long effectiveTimeout = (timeoutMillis < 0) ? Long.MAX_VALUE :
timeoutMillis;
+ // Wait for leader mode (bounded or infinite).
+ long deadline = Utils.monotonicMsAfter(effectiveTimeout);
+
+ return executeWithBusyLock(responseFuture -> {
+ if (effectiveTimeout == 0) {
+ tryAllPeersOnce(responseFuture, cmd);
+ } else {
+ startRetryPhase(responseFuture, cmd, deadline,
leaderAvailabilityState.currentTerm());
+ }
+ });
+ }
+
+ /**
+ * Resolves initial target peer for a command execution.
+ *
+ * <p>Tries the known leader first, falling back to a random peer if no
leader is known.
+ *
+ * @return Initial target peer, or {@code null}.
+ */
+ @Nullable
+ private Peer resolveInitialPeer() {
+ Peer targetPeer = leader;
+ if (targetPeer == null) {
+ targetPeer = randomNode(null, false);
+ }
+ return targetPeer;
+ }
+
+ /**
+ * Tries all peers once without waiting for leader.
+ *
+ * @param resultFuture Future that completes with the response, or fails
with {@link ReplicationGroupUnavailableException} if no
+ * peer responds successfully.
+ * @param cmd The command to execute.
+ */
+ private void tryAllPeersOnce(CompletableFuture<ActionResponse>
resultFuture, Command cmd) {
+ Peer targetPeer = resolveInitialPeer();
+ if (targetPeer == null) {
+ resultFuture.completeExceptionally(new
ReplicationGroupUnavailableException(groupId()));
+
+ return;
+ }
+
+ var context = new RetryContext(
+ groupId().toString(),
+ targetPeer,
+ cmd::toStringForLightLogging,
+ createRequestFactory(cmd),
+ 0, // Single attempt - no retry timeout
+ RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+ );
+
+ sendWithRetrySingleAttempt(resultFuture, context);
+ }
+
+ /**
+ * Executes an action within busy lock and transforms the response.
+ */
+ private <R> CompletableFuture<R>
executeWithBusyLock(Consumer<CompletableFuture<ActionResponse>> action) {
+ var responseFuture = new CompletableFuture<ActionResponse>();
+ var resultFuture = new CompletableFuture<R>();
+
+ if (!busyLock.enterBusy()) {
+
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client
is stopping [" + groupId() + "]."));
+
+ return resultFuture;
+ }
+
+ try {
+ action.accept(responseFuture);
+
+ // Transform ActionResponse to result type.
+ responseFuture.whenComplete((resp, err) -> {
+ if (err != null) {
+ resultFuture.completeExceptionally(err);
+ } else {
+ resultFuture.complete((R) resp.result());
+ }
+ });
+
+ return resultFuture;
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * Applies deadline to a future using orTimeout.
+ *
+ * @param future The future to apply deadline to.
+ * @param deadline Deadline in monotonic milliseconds, or Long.MAX_VALUE
for no deadline.
+ * @return The future with timeout applied, or the original future if no
deadline.
+ */
+ private static <T> CompletableFuture<T> applyDeadline(CompletableFuture<T>
future, long deadline) {
+ if (deadline == Long.MAX_VALUE) {
+ return future;
+ }
+ long remainingTime = deadline - Utils.monotonicMs();
+ if (remainingTime <= 0) {
+ return CompletableFuture.failedFuture(new TimeoutException());
+ }
+ return future.orTimeout(remainingTime, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Creates a timeout exception for leader wait.
+ */
+ private ReplicationGroupUnavailableException createTimeoutException() {
+ return new ReplicationGroupUnavailableException(
+ groupId(),
+ "Timeout waiting for leader [groupId=" + groupId() + "]."
+ );
+ }
+
+ /**
+ * Waits for leader to become available and then retries the command.
+ */
+ private void waitForLeaderAndRetry(CompletableFuture<ActionResponse>
resultFuture, Command cmd, long deadline) {
+ CompletableFuture<Long> leaderFuture =
leaderAvailabilityState.awaitLeader();
+
+ // Apply timeout if bounded.
+ CompletableFuture<Long> timedLeaderFuture =
applyDeadline(leaderFuture, deadline);
+
+ timedLeaderFuture.whenCompleteAsync((term, waitError) -> {
+ if (waitError != null) {
+ Throwable cause = unwrapCause(waitError);
+ if (cause instanceof TimeoutException) {
+
resultFuture.completeExceptionally(createTimeoutException());
+ } else {
+ resultFuture.completeExceptionally(cause);
+ }
+ return;
+ }
+
+ if (!busyLock.enterBusy()) {
+
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client
is stopping [" + groupId() + "]."));
+ return;
+ }
+
+ try {
+ // Leader is available, now run the command with retry logic.
+ startRetryPhase(resultFuture, cmd, deadline, term);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }, executor);
+ }
+
+ /**
+ * Starts the retry phase after leader is available.
+ */
+ private void startRetryPhase(CompletableFuture<ActionResponse>
resultFuture, Command cmd, long deadline, long term) {
+ Peer targetPeer = resolveInitialPeer();
+ if (targetPeer == null) {
+ resultFuture.completeExceptionally(new
ReplicationGroupUnavailableException(groupId()));
+
+ return;
+ }
+
+ // Check deadline before starting retry phase.
+ long now = Utils.monotonicMs();
+ if (deadline != Long.MAX_VALUE && now >= deadline) {
+ resultFuture.completeExceptionally(createTimeoutException());
+ return;
+ }
+
+ // Use retry timeout bounded by remaining time until deadline.
+ long configTimeout = raftConfiguration.retryTimeoutMillis().value();
+
+ long sendWithRetryTimeoutMillis = deadline == Long.MAX_VALUE ?
configTimeout : Math.min(configTimeout, deadline - now);
+
+ var context = new RetryContext(
+ groupId().toString(),
+ targetPeer,
+ cmd::toStringForLightLogging,
+ createRequestFactory(cmd),
+ sendWithRetryTimeoutMillis,
+ RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+ );
+
+ sendWithRetryWaitingForLeader(resultFuture, context, cmd, deadline,
term);
+ }
+
+ /**
+ * Creates a request factory for the given command.
+ */
+ private Function<Peer, ActionRequest> createRequestFactory(Command cmd) {
+ if (cmd instanceof WriteCommand) {
+ return targetPeer -> MESSAGES_FACTORY.writeActionRequest()
+ .groupId(groupId().toString())
+ .command(commandsMarshaller.marshall(cmd))
+ .deserializedCommand((WriteCommand) cmd)
+ .build();
+ } else {
+ return targetPeer -> MESSAGES_FACTORY.readActionRequest()
+ .groupId(groupId().toString())
+ .command((ReadCommand) cmd)
+ .readOnlySafe(true)
+ .build();
+ }
+ }
+
+ /**
+ * Sends a request with single attempt (no retry on timeout).
+ *
+ * <p>In single-attempt mode, each peer is tried at most once.
+ */
+ private <R extends NetworkMessage> void sendWithRetrySingleAttempt(
+ CompletableFuture<R> fut,
+ RetryContext retryContext
+ ) {
+ if (!busyLock.enterBusy()) {
+ fut.completeExceptionally(stoppingExceptionFactory.create("Raft
client is stopping [" + groupId() + "]."));
+ return;
+ }
+
+ try {
+ long responseTimeout = retryContext.responseTimeoutMillis() ==
RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+ ? throttlingContextHolder.peerRequestTimeoutMillis() :
retryContext.responseTimeoutMillis();
+
+ retryContext.onNewAttempt();
+
+ resolvePeer(retryContext.targetPeer())
+ .thenCompose(node ->
clusterService.messagingService().invoke(node, retryContext.request(),
responseTimeout))
+ .whenComplete((resp, err) -> {
+ if (!busyLock.enterBusy()) {
+
fut.completeExceptionally(stoppingExceptionFactory.create("Raft client is
stopping [" + groupId() + "]."));
+ return;
+ }
+
+ try {
+ if (err != null) {
+ handleThrowableSingleAttempt(fut, err,
retryContext);
+ } else if (resp instanceof ErrorResponse) {
+ handleErrorResponseSingleAttempt(fut,
(ErrorResponse) resp, retryContext);
+ } else if (resp instanceof SMErrorResponse) {
+ handleSmErrorResponse(fut, (SMErrorResponse)
resp, retryContext);
+ } else {
+ leader = retryContext.targetPeer();
+ fut.complete((R) resp);
+ }
+ } catch (Throwable e) {
+ fut.completeExceptionally(e);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ });
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * Validates throwable and returns next peer for retry.
+ *
+ * <p>If the error is not recoverable or no peers are available, completes
the future exceptionally and returns null.
+ *
+ * @param fut Future to complete exceptionally if retry is not possible.
+ * @param err The throwable to check.
+ * @param retryContext Retry context for getting next peer.
+ * @return Next peer for retry, or null if the future was already
completed with an error.
+ */
+ @Nullable
+ private Peer getNextPeerForRecoverableError(CompletableFuture<?> fut,
Throwable err, RetryContext retryContext) {
+ err = unwrapCause(err);
+
+ if (!recoverable(err)) {
+ fut.completeExceptionally(err);
+
+ return null;
+ }
+
+ Peer nextPeer = randomNode(retryContext, false);
+
+ if (nextPeer == null) {
+ fut.completeExceptionally(new
ReplicationGroupUnavailableException(groupId()));
+
+ return null;
+ }
+
+ return nextPeer;
+ }
+
+ private static void logRecoverableError(RetryContext retryContext, Peer
nextPeer) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Recoverable error during the request occurred (will be
retried) [request={}, peer={}, newPeer={}, traceId={}].",
+ includeSensitive() ? retryContext.request() :
retryContext.request().toStringForLightLogging(),
+ retryContext.targetPeer(),
+ nextPeer,
+ retryContext.errorTraceId()
+ );
+ }
+ }
+
+ /**
+ * Handles throwable in single attempt mode.
+ */
+ private void handleThrowableSingleAttempt(
+ CompletableFuture<? extends NetworkMessage> fut,
+ Throwable err,
+ RetryContext retryContext
+ ) {
+ Peer nextPeer = getNextPeerForRecoverableError(fut, err, retryContext);
+
+ if (nextPeer == null) {
+ return;
+ }
+
+ logRecoverableError(retryContext, nextPeer);
+
+ String shortReasonMessage = "Peer " +
retryContext.targetPeer().consistentId()
+ + " threw " + unwrapCause(err).getClass().getSimpleName();
+ sendWithRetrySingleAttempt(fut,
retryContext.nextAttemptForUnavailablePeer(nextPeer, shortReasonMessage));
+ }
+
+ /**
+ * Handles error response in single attempt mode.
+ *
+ * <p>In single-attempt mode, each peer is tried at most once. All errors
mark the peer
+ * as tried and move to the next peer. When all peers have been tried, the
request fails
+ * with {@link ReplicationGroupUnavailableException}.
+ */
+ private void handleErrorResponseSingleAttempt(
+ CompletableFuture<? extends NetworkMessage> fut,
+ ErrorResponse resp,
+ RetryContext retryContext
+ ) {
+ RetryExecutionStrategy strategy = new RetryExecutionStrategy() {
+ @Override
+ public void executeRetry(RetryContext context, Peer nextPeer,
@Nullable PeerTracking trackCurrentAs, String reason) {
+ // Single-attempt mode: ALWAYS mark current peer as
unavailable, regardless of trackCurrentAs.
+ // This prevents retrying the same peer and matches original
behavior where:
+ // - transient errors: would select new peer, mark current
unavailable
+ // - peer unavailable: marks current unavailable
+ // - no leader: marks current unavailable (no NO_LEADER
distinction needed)
+ // - leader redirect: marks current unavailable (to prevent
redirect loops)
+ sendWithRetrySingleAttempt(fut,
context.nextAttemptForUnavailablePeer(nextPeer, reason));
+ }
+
+ @Override
+ public void onAllPeersExhausted() {
+ fut.completeExceptionally(new
ReplicationGroupUnavailableException(groupId()));
+ }
+ };
+
+ handleErrorResponseCommon(fut, resp, retryContext, strategy, false);
+ }
+
+ /**
+ * Sends a request with retry and waits for leader on leader absence.
+ */
+ private void sendWithRetryWaitingForLeader(
+ CompletableFuture<ActionResponse> fut,
+ RetryContext retryContext,
+ Command cmd,
+ long deadline,
+ long termWhenStarted
+ ) {
+ if (!busyLock.enterBusy()) {
+ fut.completeExceptionally(stoppingExceptionFactory.create("Raft
client is stopping [" + groupId() + "]."));
+ return;
+ }
+
+ try {
+ long requestStartTime = Utils.monotonicMs();
+ long stopTime = retryContext.stopTime();
+
+ if (requestStartTime >= stopTime) {
+ // Retry timeout expired - fail with timeout exception.
+ // For non-leader-related retriable errors, we don't wait for
leader, just fail.
+ fut.completeExceptionally(createTimeoutException());
+ return;
+ }
+
+ ThrottlingContextHolder peerThrottlingContextHolder =
throttlingContextHolder.peerContextHolder(
+ retryContext.targetPeer().consistentId()
+ );
+
+ peerThrottlingContextHolder.beforeRequest();
+ retryContext.onNewAttempt();
+
+ // Bound response timeout by remaining time until retry stop time.
+ long responseTimeout =
Math.min(peerThrottlingContextHolder.peerRequestTimeoutMillis(), stopTime -
requestStartTime);
+
+ resolvePeer(retryContext.targetPeer())
+ .thenCompose(node ->
clusterService.messagingService().invoke(node, retryContext.request(),
responseTimeout))
+ // Enforce timeout even if messaging service doesn't
(e.g., in tests with mocks).
+ .orTimeout(responseTimeout > 0 ? responseTimeout :
Long.MAX_VALUE, TimeUnit.MILLISECONDS)
+ .whenComplete((resp, err) -> {
+
peerThrottlingContextHolder.afterRequest(requestStartTime, retriableError(err,
resp));
+
+ if (!busyLock.enterBusy()) {
+
fut.completeExceptionally(stoppingExceptionFactory.create("Raft client is
stopping [" + groupId() + "]."));
+ return;
+ }
+
+ try {
+ if (err != null) {
+ handleThrowableWithLeaderWait(fut, err,
retryContext, cmd, deadline, termWhenStarted);
+ } else if (resp instanceof ErrorResponse) {
+ handleErrorResponseWithLeaderWait(fut,
(ErrorResponse) resp, retryContext, cmd, deadline, termWhenStarted);
+ } else if (resp instanceof SMErrorResponse) {
+ handleSmErrorResponse(fut, (SMErrorResponse)
resp, retryContext);
+ } else {
+ leader = retryContext.targetPeer();
+ fut.complete((ActionResponse) resp);
+ }
+ } catch (Throwable e) {
+ fut.completeExceptionally(e);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ });
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * Handles throwable in leader-wait mode.
+ */
+ private void handleThrowableWithLeaderWait(
+ CompletableFuture<ActionResponse> fut,
+ Throwable err,
+ RetryContext retryContext,
+ Command cmd,
+ long deadline,
+ long termWhenStarted
+ ) {
+ Peer nextPeer = getNextPeerForRecoverableError(fut, err, retryContext);
+
+ if (nextPeer == null) {
+ return;
+ }
+
+ logRecoverableError(retryContext, nextPeer);
+
+ String shortReasonMessage = "Peer " +
retryContext.targetPeer().consistentId()
+ + " threw " + unwrapCause(err).getClass().getSimpleName();
+ scheduleRetryWithLeaderWait(fut, retryContext.nextAttempt(nextPeer,
shortReasonMessage), cmd, deadline, termWhenStarted);
+ }
+
+ /**
+ * Handles error response in leader-wait mode.
+ *
+ * <p>In leader-wait mode:
+ * <ul>
+ * <li>Transient errors (EBUSY/EAGAIN) retry on the same peer after
delay</li>
+ * <li>"No leader" errors try each peer once, then wait for leader
notification</li>
+ * <li>Peer unavailability errors cycle through peers until
timeout</li>
+ * </ul>
+ */
+ private void handleErrorResponseWithLeaderWait(
+ CompletableFuture<ActionResponse> fut,
+ ErrorResponse resp,
+ RetryContext retryContext,
+ Command cmd,
+ long deadline,
+ long termWhenStarted
+ ) {
+ RetryExecutionStrategy strategy = new RetryExecutionStrategy() {
+ @Override
+ public void executeRetry(RetryContext context, Peer nextPeer,
@Nullable PeerTracking trackCurrentAs, String reason) {
+ RetryContext nextContext;
+ if (trackCurrentAs == null) {
Review Comment:
`@Nullable` enum value is a marker that there should be another one value. I
think `COMMON` is the good one: track the current one as common peer. And
if-else tree below will became a switch (do we allow assignation from switch
statement? If yes, so `RetryContext nextContext = switch(trackCurrentAs) { ..
}` looks good.
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -326,7 +397,823 @@ private static boolean recoverable(Throwable t) {
@Override
public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
- return raftClient.run(cmd, timeoutMillis);
+ // Normalize timeout: negative values mean infinite wait.
+ long effectiveTimeout = (timeoutMillis < 0) ? Long.MAX_VALUE :
timeoutMillis;
+ // Wait for leader mode (bounded or infinite).
+ long deadline = Utils.monotonicMsAfter(effectiveTimeout);
+
+ return executeWithBusyLock(responseFuture -> {
+ if (effectiveTimeout == 0) {
+ tryAllPeersOnce(responseFuture, cmd);
+ } else {
+ startRetryPhase(responseFuture, cmd, deadline,
leaderAvailabilityState.currentTerm());
+ }
+ });
+ }
+
+ /**
+ * Resolves initial target peer for a command execution.
+ *
+ * <p>Tries the known leader first, falling back to a random peer if no
leader is known.
+ *
+ * @return Initial target peer, or {@code null}.
+ */
+ @Nullable
+ private Peer resolveInitialPeer() {
+ Peer targetPeer = leader;
+ if (targetPeer == null) {
+ targetPeer = randomNode(null, false);
+ }
+ return targetPeer;
+ }
+
+ /**
+ * Tries all peers once without waiting for leader.
+ *
+ * @param resultFuture Future that completes with the response, or fails
with {@link ReplicationGroupUnavailableException} if no
+ * peer responds successfully.
+ * @param cmd The command to execute.
+ */
+ private void tryAllPeersOnce(CompletableFuture<ActionResponse>
resultFuture, Command cmd) {
+ Peer targetPeer = resolveInitialPeer();
+ if (targetPeer == null) {
+ resultFuture.completeExceptionally(new
ReplicationGroupUnavailableException(groupId()));
+
+ return;
+ }
+
+ var context = new RetryContext(
+ groupId().toString(),
+ targetPeer,
+ cmd::toStringForLightLogging,
+ createRequestFactory(cmd),
+ 0, // Single attempt - no retry timeout
+ RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+ );
+
+ sendWithRetrySingleAttempt(resultFuture, context);
+ }
+
+ /**
+ * Executes an action within busy lock and transforms the response.
+ */
+ private <R> CompletableFuture<R>
executeWithBusyLock(Consumer<CompletableFuture<ActionResponse>> action) {
+ var responseFuture = new CompletableFuture<ActionResponse>();
+ var resultFuture = new CompletableFuture<R>();
+
+ if (!busyLock.enterBusy()) {
+
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client
is stopping [" + groupId() + "]."));
+
+ return resultFuture;
+ }
+
+ try {
+ action.accept(responseFuture);
+
+ // Transform ActionResponse to result type.
+ responseFuture.whenComplete((resp, err) -> {
+ if (err != null) {
+ resultFuture.completeExceptionally(err);
+ } else {
+ resultFuture.complete((R) resp.result());
+ }
+ });
+
+ return resultFuture;
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * Applies deadline to a future using orTimeout.
+ *
+ * @param future The future to apply deadline to.
+ * @param deadline Deadline in monotonic milliseconds, or Long.MAX_VALUE
for no deadline.
+ * @return The future with timeout applied, or the original future if no
deadline.
+ */
+ private static <T> CompletableFuture<T> applyDeadline(CompletableFuture<T>
future, long deadline) {
+ if (deadline == Long.MAX_VALUE) {
+ return future;
+ }
+ long remainingTime = deadline - Utils.monotonicMs();
+ if (remainingTime <= 0) {
+ return CompletableFuture.failedFuture(new TimeoutException());
+ }
+ return future.orTimeout(remainingTime, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Creates a timeout exception for leader wait.
+ */
+ private ReplicationGroupUnavailableException createTimeoutException() {
+ return new ReplicationGroupUnavailableException(
+ groupId(),
+ "Timeout waiting for leader [groupId=" + groupId() + "]."
+ );
+ }
+
+ /**
+ * Waits for leader to become available and then retries the command.
+ */
+ private void waitForLeaderAndRetry(CompletableFuture<ActionResponse>
resultFuture, Command cmd, long deadline) {
+ CompletableFuture<Long> leaderFuture =
leaderAvailabilityState.awaitLeader();
+
+ // Apply timeout if bounded.
+ CompletableFuture<Long> timedLeaderFuture =
applyDeadline(leaderFuture, deadline);
+
+ timedLeaderFuture.whenCompleteAsync((term, waitError) -> {
+ if (waitError != null) {
+ Throwable cause = unwrapCause(waitError);
+ if (cause instanceof TimeoutException) {
+
resultFuture.completeExceptionally(createTimeoutException());
+ } else {
+ resultFuture.completeExceptionally(cause);
+ }
+ return;
+ }
+
+ if (!busyLock.enterBusy()) {
+
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client
is stopping [" + groupId() + "]."));
+ return;
+ }
+
+ try {
+ // Leader is available, now run the command with retry logic.
+ startRetryPhase(resultFuture, cmd, deadline, term);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }, executor);
+ }
+
+ /**
+ * Starts the retry phase after leader is available.
+ */
+ private void startRetryPhase(CompletableFuture<ActionResponse>
resultFuture, Command cmd, long deadline, long term) {
+ Peer targetPeer = resolveInitialPeer();
+ if (targetPeer == null) {
+ resultFuture.completeExceptionally(new
ReplicationGroupUnavailableException(groupId()));
+
+ return;
+ }
+
+ // Check deadline before starting retry phase.
+ long now = Utils.monotonicMs();
+ if (deadline != Long.MAX_VALUE && now >= deadline) {
+ resultFuture.completeExceptionally(createTimeoutException());
+ return;
+ }
+
+ // Use retry timeout bounded by remaining time until deadline.
+ long configTimeout = raftConfiguration.retryTimeoutMillis().value();
+
+ long sendWithRetryTimeoutMillis = deadline == Long.MAX_VALUE ?
configTimeout : Math.min(configTimeout, deadline - now);
+
+ var context = new RetryContext(
+ groupId().toString(),
+ targetPeer,
+ cmd::toStringForLightLogging,
+ createRequestFactory(cmd),
+ sendWithRetryTimeoutMillis,
+ RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+ );
+
+ sendWithRetryWaitingForLeader(resultFuture, context, cmd, deadline,
term);
+ }
+
+ /**
+ * Creates a request factory for the given command.
+ */
+ private Function<Peer, ActionRequest> createRequestFactory(Command cmd) {
+ if (cmd instanceof WriteCommand) {
+ return targetPeer -> MESSAGES_FACTORY.writeActionRequest()
+ .groupId(groupId().toString())
+ .command(commandsMarshaller.marshall(cmd))
+ .deserializedCommand((WriteCommand) cmd)
+ .build();
+ } else {
+ return targetPeer -> MESSAGES_FACTORY.readActionRequest()
+ .groupId(groupId().toString())
+ .command((ReadCommand) cmd)
+ .readOnlySafe(true)
+ .build();
+ }
+ }
+
+ /**
+ * Sends a request with single attempt (no retry on timeout).
+ *
+ * <p>In single-attempt mode, each peer is tried at most once.
+ */
+ private <R extends NetworkMessage> void sendWithRetrySingleAttempt(
+ CompletableFuture<R> fut,
+ RetryContext retryContext
+ ) {
+ if (!busyLock.enterBusy()) {
+ fut.completeExceptionally(stoppingExceptionFactory.create("Raft
client is stopping [" + groupId() + "]."));
+ return;
+ }
+
+ try {
+ long responseTimeout = retryContext.responseTimeoutMillis() ==
RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+ ? throttlingContextHolder.peerRequestTimeoutMillis() :
retryContext.responseTimeoutMillis();
+
+ retryContext.onNewAttempt();
+
+ resolvePeer(retryContext.targetPeer())
+ .thenCompose(node ->
clusterService.messagingService().invoke(node, retryContext.request(),
responseTimeout))
+ .whenComplete((resp, err) -> {
+ if (!busyLock.enterBusy()) {
+
fut.completeExceptionally(stoppingExceptionFactory.create("Raft client is
stopping [" + groupId() + "]."));
+ return;
+ }
+
+ try {
+ if (err != null) {
+ handleThrowableSingleAttempt(fut, err,
retryContext);
+ } else if (resp instanceof ErrorResponse) {
+ handleErrorResponseSingleAttempt(fut,
(ErrorResponse) resp, retryContext);
+ } else if (resp instanceof SMErrorResponse) {
+ handleSmErrorResponse(fut, (SMErrorResponse)
resp, retryContext);
+ } else {
+ leader = retryContext.targetPeer();
+ fut.complete((R) resp);
+ }
+ } catch (Throwable e) {
+ fut.completeExceptionally(e);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ });
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * Validates throwable and returns next peer for retry.
Review Comment:
And complete the future outside of method. I'm afraid of too many implicit
actions inside of the method.
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -81,11 +119,32 @@ public class PhysicalTopologyAwareRaftGroupService
implements TimeAwareRaftGroup
private final RaftGroupService raftClient;
/** Executor to invoke RPC requests. */
- private final Executor executor;
+ private final ScheduledExecutorService executor;
/** RAFT configuration. */
private final RaftConfiguration raftConfiguration;
+ /** State machine for tracking leader availability. */
+ private final LeaderAvailabilityState leaderAvailabilityState;
+
+ /** Command marshaller. */
+ private final Marshaller commandsMarshaller;
+
+ /** Throttling context holder. */
+ private final ThrottlingContextHolder throttlingContextHolder;
Review Comment:
Actually that you've started with `RetryExecutionStrategy` but anonymous
implementations on hot path (raft commands handling, even exceptional) is a bad
way.
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -326,7 +397,823 @@ private static boolean recoverable(Throwable t) {
@Override
public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
- return raftClient.run(cmd, timeoutMillis);
+ // Normalize timeout: negative values mean infinite wait.
+ long effectiveTimeout = (timeoutMillis < 0) ? Long.MAX_VALUE :
timeoutMillis;
+ // Wait for leader mode (bounded or infinite).
+ long deadline = Utils.monotonicMsAfter(effectiveTimeout);
+
+ return executeWithBusyLock(responseFuture -> {
+ if (effectiveTimeout == 0) {
+ tryAllPeersOnce(responseFuture, cmd);
+ } else {
+ startRetryPhase(responseFuture, cmd, deadline,
leaderAvailabilityState.currentTerm());
+ }
+ });
+ }
+
+ /**
+ * Resolves initial target peer for a command execution.
+ *
+ * <p>Tries the known leader first, falling back to a random peer if no
leader is known.
+ *
+ * @return Initial target peer, or {@code null}.
+ */
+ @Nullable
+ private Peer resolveInitialPeer() {
+ Peer targetPeer = leader;
+ if (targetPeer == null) {
+ targetPeer = randomNode(null, false);
+ }
+ return targetPeer;
+ }
+
+ /**
+ * Tries all peers once without waiting for leader.
+ *
+ * @param resultFuture Future that completes with the response, or fails
with {@link ReplicationGroupUnavailableException} if no
+ * peer responds successfully.
+ * @param cmd The command to execute.
+ */
+ private void tryAllPeersOnce(CompletableFuture<ActionResponse>
resultFuture, Command cmd) {
+ Peer targetPeer = resolveInitialPeer();
+ if (targetPeer == null) {
+ resultFuture.completeExceptionally(new
ReplicationGroupUnavailableException(groupId()));
+
+ return;
+ }
+
+ var context = new RetryContext(
+ groupId().toString(),
+ targetPeer,
+ cmd::toStringForLightLogging,
+ createRequestFactory(cmd),
+ 0, // Single attempt - no retry timeout
+ RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+ );
+
+ sendWithRetrySingleAttempt(resultFuture, context);
+ }
+
+ /**
+ * Executes an action within busy lock and transforms the response.
+ */
+ private <R> CompletableFuture<R>
executeWithBusyLock(Consumer<CompletableFuture<ActionResponse>> action) {
+ var responseFuture = new CompletableFuture<ActionResponse>();
+ var resultFuture = new CompletableFuture<R>();
+
+ if (!busyLock.enterBusy()) {
+
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client
is stopping [" + groupId() + "]."));
+
+ return resultFuture;
+ }
+
+ try {
+ action.accept(responseFuture);
+
+ // Transform ActionResponse to result type.
+ responseFuture.whenComplete((resp, err) -> {
+ if (err != null) {
+ resultFuture.completeExceptionally(err);
+ } else {
+ resultFuture.complete((R) resp.result());
+ }
+ });
+
+ return resultFuture;
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * Applies deadline to a future using orTimeout.
+ *
+ * @param future The future to apply deadline to.
+ * @param deadline Deadline in monotonic milliseconds, or Long.MAX_VALUE
for no deadline.
+ * @return The future with timeout applied, or the original future if no
deadline.
+ */
+ private static <T> CompletableFuture<T> applyDeadline(CompletableFuture<T>
future, long deadline) {
+ if (deadline == Long.MAX_VALUE) {
+ return future;
+ }
+ long remainingTime = deadline - Utils.monotonicMs();
+ if (remainingTime <= 0) {
+ return CompletableFuture.failedFuture(new TimeoutException());
+ }
+ return future.orTimeout(remainingTime, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Creates a timeout exception for leader wait.
+ */
+ private ReplicationGroupUnavailableException createTimeoutException() {
+ return new ReplicationGroupUnavailableException(
+ groupId(),
+ "Timeout waiting for leader [groupId=" + groupId() + "]."
+ );
+ }
+
+ /**
+ * Waits for leader to become available and then retries the command.
+ */
+ private void waitForLeaderAndRetry(CompletableFuture<ActionResponse>
resultFuture, Command cmd, long deadline) {
+ CompletableFuture<Long> leaderFuture =
leaderAvailabilityState.awaitLeader();
+
+ // Apply timeout if bounded.
+ CompletableFuture<Long> timedLeaderFuture =
applyDeadline(leaderFuture, deadline);
+
+ timedLeaderFuture.whenCompleteAsync((term, waitError) -> {
+ if (waitError != null) {
+ Throwable cause = unwrapCause(waitError);
+ if (cause instanceof TimeoutException) {
+
resultFuture.completeExceptionally(createTimeoutException());
+ } else {
+ resultFuture.completeExceptionally(cause);
+ }
+ return;
+ }
+
+ if (!busyLock.enterBusy()) {
+
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client
is stopping [" + groupId() + "]."));
+ return;
+ }
+
+ try {
+ // Leader is available, now run the command with retry logic.
+ startRetryPhase(resultFuture, cmd, deadline, term);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }, executor);
+ }
+
+ /**
+ * Starts the retry phase after leader is available.
+ */
+ private void startRetryPhase(CompletableFuture<ActionResponse>
resultFuture, Command cmd, long deadline, long term) {
+ Peer targetPeer = resolveInitialPeer();
+ if (targetPeer == null) {
+ resultFuture.completeExceptionally(new
ReplicationGroupUnavailableException(groupId()));
+
+ return;
+ }
+
+ // Check deadline before starting retry phase.
+ long now = Utils.monotonicMs();
+ if (deadline != Long.MAX_VALUE && now >= deadline) {
+ resultFuture.completeExceptionally(createTimeoutException());
+ return;
+ }
+
+ // Use retry timeout bounded by remaining time until deadline.
+ long configTimeout = raftConfiguration.retryTimeoutMillis().value();
+
+ long sendWithRetryTimeoutMillis = deadline == Long.MAX_VALUE ?
configTimeout : Math.min(configTimeout, deadline - now);
+
+ var context = new RetryContext(
+ groupId().toString(),
+ targetPeer,
+ cmd::toStringForLightLogging,
+ createRequestFactory(cmd),
+ sendWithRetryTimeoutMillis,
+ RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+ );
+
+ sendWithRetryWaitingForLeader(resultFuture, context, cmd, deadline,
term);
+ }
+
+ /**
+ * Creates a request factory for the given command.
+ */
+ private Function<Peer, ActionRequest> createRequestFactory(Command cmd) {
+ if (cmd instanceof WriteCommand) {
+ return targetPeer -> MESSAGES_FACTORY.writeActionRequest()
+ .groupId(groupId().toString())
+ .command(commandsMarshaller.marshall(cmd))
+ .deserializedCommand((WriteCommand) cmd)
+ .build();
+ } else {
+ return targetPeer -> MESSAGES_FACTORY.readActionRequest()
+ .groupId(groupId().toString())
+ .command((ReadCommand) cmd)
+ .readOnlySafe(true)
+ .build();
+ }
+ }
+
+ /**
+ * Sends a request with single attempt (no retry on timeout).
+ *
+ * <p>In single-attempt mode, each peer is tried at most once.
+ */
+ private <R extends NetworkMessage> void sendWithRetrySingleAttempt(
+ CompletableFuture<R> fut,
+ RetryContext retryContext
+ ) {
+ if (!busyLock.enterBusy()) {
+ fut.completeExceptionally(stoppingExceptionFactory.create("Raft
client is stopping [" + groupId() + "]."));
Review Comment:
I think we had kind of utils methods for busy locks wrapping? Like those in
`IgniteUtils` ones?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]