Cyrill commented on code in PR #7242:
URL: https://github.com/apache/ignite-3/pull/7242#discussion_r2727774832
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -326,7 +397,823 @@ private static boolean recoverable(Throwable t) {
@Override
public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
- return raftClient.run(cmd, timeoutMillis);
+ // Normalize timeout: negative values mean infinite wait.
+ long effectiveTimeout = (timeoutMillis < 0) ? Long.MAX_VALUE :
timeoutMillis;
+ // Wait for leader mode (bounded or infinite).
+ long deadline = Utils.monotonicMsAfter(effectiveTimeout);
+
+ return executeWithBusyLock(responseFuture -> {
+ if (effectiveTimeout == 0) {
+ tryAllPeersOnce(responseFuture, cmd);
+ } else {
+ startRetryPhase(responseFuture, cmd, deadline,
leaderAvailabilityState.currentTerm());
+ }
+ });
+ }
+
+ /**
+ * Resolves initial target peer for a command execution.
+ *
+ * <p>Tries the known leader first, falling back to a random peer if no
leader is known.
+ *
+ * @return Initial target peer, or {@code null}.
+ */
+ @Nullable
+ private Peer resolveInitialPeer() {
+ Peer targetPeer = leader;
+ if (targetPeer == null) {
+ targetPeer = randomNode(null, false);
+ }
+ return targetPeer;
+ }
+
+ /**
+ * Tries all peers once without waiting for leader.
+ *
+ * @param resultFuture Future that completes with the response, or fails
with {@link ReplicationGroupUnavailableException} if no
+ * peer responds successfully.
+ * @param cmd The command to execute.
+ */
+ private void tryAllPeersOnce(CompletableFuture<ActionResponse>
resultFuture, Command cmd) {
+ Peer targetPeer = resolveInitialPeer();
+ if (targetPeer == null) {
+ resultFuture.completeExceptionally(new
ReplicationGroupUnavailableException(groupId()));
+
+ return;
+ }
+
+ var context = new RetryContext(
+ groupId().toString(),
+ targetPeer,
+ cmd::toStringForLightLogging,
+ createRequestFactory(cmd),
+ 0, // Single attempt - no retry timeout
+ RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+ );
+
+ sendWithRetrySingleAttempt(resultFuture, context);
+ }
+
+ /**
+ * Executes an action within busy lock and transforms the response.
+ */
+ private <R> CompletableFuture<R>
executeWithBusyLock(Consumer<CompletableFuture<ActionResponse>> action) {
+ var responseFuture = new CompletableFuture<ActionResponse>();
+ var resultFuture = new CompletableFuture<R>();
+
+ if (!busyLock.enterBusy()) {
+
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client
is stopping [" + groupId() + "]."));
+
+ return resultFuture;
+ }
+
+ try {
+ action.accept(responseFuture);
+
+ // Transform ActionResponse to result type.
+ responseFuture.whenComplete((resp, err) -> {
+ if (err != null) {
+ resultFuture.completeExceptionally(err);
+ } else {
+ resultFuture.complete((R) resp.result());
+ }
+ });
+
+ return resultFuture;
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * Applies deadline to a future using orTimeout.
+ *
+ * @param future The future to apply deadline to.
+ * @param deadline Deadline in monotonic milliseconds, or Long.MAX_VALUE
for no deadline.
+ * @return The future with timeout applied, or the original future if no
deadline.
+ */
+ private static <T> CompletableFuture<T> applyDeadline(CompletableFuture<T>
future, long deadline) {
+ if (deadline == Long.MAX_VALUE) {
+ return future;
+ }
+ long remainingTime = deadline - Utils.monotonicMs();
+ if (remainingTime <= 0) {
+ return CompletableFuture.failedFuture(new TimeoutException());
+ }
+ return future.orTimeout(remainingTime, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Creates a timeout exception for leader wait.
+ */
+ private ReplicationGroupUnavailableException createTimeoutException() {
+ return new ReplicationGroupUnavailableException(
+ groupId(),
+ "Timeout waiting for leader [groupId=" + groupId() + "]."
+ );
+ }
+
+ /**
+ * Waits for leader to become available and then retries the command.
+ */
+ private void waitForLeaderAndRetry(CompletableFuture<ActionResponse>
resultFuture, Command cmd, long deadline) {
+ CompletableFuture<Long> leaderFuture =
leaderAvailabilityState.awaitLeader();
+
+ // Apply timeout if bounded.
+ CompletableFuture<Long> timedLeaderFuture =
applyDeadline(leaderFuture, deadline);
+
+ timedLeaderFuture.whenCompleteAsync((term, waitError) -> {
+ if (waitError != null) {
+ Throwable cause = unwrapCause(waitError);
+ if (cause instanceof TimeoutException) {
+
resultFuture.completeExceptionally(createTimeoutException());
+ } else {
+ resultFuture.completeExceptionally(cause);
+ }
+ return;
+ }
+
+ if (!busyLock.enterBusy()) {
+
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client
is stopping [" + groupId() + "]."));
+ return;
+ }
+
+ try {
+ // Leader is available, now run the command with retry logic.
+ startRetryPhase(resultFuture, cmd, deadline, term);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }, executor);
+ }
+
+ /**
+ * Starts the retry phase after leader is available.
+ */
+ private void startRetryPhase(CompletableFuture<ActionResponse>
resultFuture, Command cmd, long deadline, long term) {
+ Peer targetPeer = resolveInitialPeer();
+ if (targetPeer == null) {
+ resultFuture.completeExceptionally(new
ReplicationGroupUnavailableException(groupId()));
+
+ return;
+ }
+
+ // Check deadline before starting retry phase.
+ long now = Utils.monotonicMs();
+ if (deadline != Long.MAX_VALUE && now >= deadline) {
+ resultFuture.completeExceptionally(createTimeoutException());
+ return;
+ }
+
+ // Use retry timeout bounded by remaining time until deadline.
+ long configTimeout = raftConfiguration.retryTimeoutMillis().value();
+
+ long sendWithRetryTimeoutMillis = deadline == Long.MAX_VALUE ?
configTimeout : Math.min(configTimeout, deadline - now);
+
+ var context = new RetryContext(
+ groupId().toString(),
+ targetPeer,
+ cmd::toStringForLightLogging,
+ createRequestFactory(cmd),
+ sendWithRetryTimeoutMillis,
+ RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+ );
+
+ sendWithRetryWaitingForLeader(resultFuture, context, cmd, deadline,
term);
+ }
+
+ /**
+ * Creates a request factory for the given command.
+ */
+ private Function<Peer, ActionRequest> createRequestFactory(Command cmd) {
+ if (cmd instanceof WriteCommand) {
+ return targetPeer -> MESSAGES_FACTORY.writeActionRequest()
+ .groupId(groupId().toString())
+ .command(commandsMarshaller.marshall(cmd))
+ .deserializedCommand((WriteCommand) cmd)
+ .build();
+ } else {
+ return targetPeer -> MESSAGES_FACTORY.readActionRequest()
+ .groupId(groupId().toString())
+ .command((ReadCommand) cmd)
+ .readOnlySafe(true)
+ .build();
+ }
+ }
+
+ /**
+ * Sends a request with single attempt (no retry on timeout).
+ *
+ * <p>In single-attempt mode, each peer is tried at most once.
+ */
+ private <R extends NetworkMessage> void sendWithRetrySingleAttempt(
+ CompletableFuture<R> fut,
+ RetryContext retryContext
+ ) {
+ if (!busyLock.enterBusy()) {
+ fut.completeExceptionally(stoppingExceptionFactory.create("Raft
client is stopping [" + groupId() + "]."));
+ return;
+ }
+
+ try {
+ long responseTimeout = retryContext.responseTimeoutMillis() ==
RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+ ? throttlingContextHolder.peerRequestTimeoutMillis() :
retryContext.responseTimeoutMillis();
+
+ retryContext.onNewAttempt();
+
+ resolvePeer(retryContext.targetPeer())
+ .thenCompose(node ->
clusterService.messagingService().invoke(node, retryContext.request(),
responseTimeout))
+ .whenComplete((resp, err) -> {
+ if (!busyLock.enterBusy()) {
+
fut.completeExceptionally(stoppingExceptionFactory.create("Raft client is
stopping [" + groupId() + "]."));
+ return;
+ }
+
+ try {
+ if (err != null) {
+ handleThrowableSingleAttempt(fut, err,
retryContext);
+ } else if (resp instanceof ErrorResponse) {
+ handleErrorResponseSingleAttempt(fut,
(ErrorResponse) resp, retryContext);
+ } else if (resp instanceof SMErrorResponse) {
+ handleSmErrorResponse(fut, (SMErrorResponse)
resp, retryContext);
+ } else {
+ leader = retryContext.targetPeer();
+ fut.complete((R) resp);
+ }
+ } catch (Throwable e) {
+ fut.completeExceptionally(e);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ });
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * Validates throwable and returns next peer for retry.
+ *
+ * <p>If the error is not recoverable or no peers are available, completes
the future exceptionally and returns null.
+ *
+ * @param fut Future to complete exceptionally if retry is not possible.
+ * @param err The throwable to check.
+ * @param retryContext Retry context for getting next peer.
+ * @return Next peer for retry, or null if the future was already
completed with an error.
+ */
+ @Nullable
+ private Peer getNextPeerForRecoverableError(CompletableFuture<?> fut,
Throwable err, RetryContext retryContext) {
+ err = unwrapCause(err);
+
+ if (!recoverable(err)) {
+ fut.completeExceptionally(err);
+
+ return null;
+ }
+
+ Peer nextPeer = randomNode(retryContext, false);
+
+ if (nextPeer == null) {
+ fut.completeExceptionally(new
ReplicationGroupUnavailableException(groupId()));
+
+ return null;
+ }
+
+ return nextPeer;
+ }
+
+ private static void logRecoverableError(RetryContext retryContext, Peer
nextPeer) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Recoverable error during the request occurred (will be
retried) [request={}, peer={}, newPeer={}, traceId={}].",
+ includeSensitive() ? retryContext.request() :
retryContext.request().toStringForLightLogging(),
+ retryContext.targetPeer(),
+ nextPeer,
+ retryContext.errorTraceId()
+ );
+ }
+ }
+
+ /**
+ * Handles throwable in single attempt mode.
+ */
+ private void handleThrowableSingleAttempt(
+ CompletableFuture<? extends NetworkMessage> fut,
+ Throwable err,
+ RetryContext retryContext
+ ) {
+ Peer nextPeer = getNextPeerForRecoverableError(fut, err, retryContext);
+
+ if (nextPeer == null) {
+ return;
+ }
+
+ logRecoverableError(retryContext, nextPeer);
+
+ String shortReasonMessage = "Peer " +
retryContext.targetPeer().consistentId()
+ + " threw " + unwrapCause(err).getClass().getSimpleName();
+ sendWithRetrySingleAttempt(fut,
retryContext.nextAttemptForUnavailablePeer(nextPeer, shortReasonMessage));
+ }
+
+ /**
+ * Handles error response in single attempt mode.
+ *
+ * <p>In single-attempt mode, each peer is tried at most once. All errors
mark the peer
+ * as tried and move to the next peer. When all peers have been tried, the
request fails
+ * with {@link ReplicationGroupUnavailableException}.
+ */
+ private void handleErrorResponseSingleAttempt(
+ CompletableFuture<? extends NetworkMessage> fut,
+ ErrorResponse resp,
+ RetryContext retryContext
+ ) {
+ RetryExecutionStrategy strategy = new RetryExecutionStrategy() {
+ @Override
+ public void executeRetry(RetryContext context, Peer nextPeer,
@Nullable PeerTracking trackCurrentAs, String reason) {
+ // Single-attempt mode: ALWAYS mark current peer as
unavailable, regardless of trackCurrentAs.
+ // This prevents retrying the same peer and matches original
behavior where:
+ // - transient errors: would select new peer, mark current
unavailable
+ // - peer unavailable: marks current unavailable
+ // - no leader: marks current unavailable (no NO_LEADER
distinction needed)
+ // - leader redirect: marks current unavailable (to prevent
redirect loops)
+ sendWithRetrySingleAttempt(fut,
context.nextAttemptForUnavailablePeer(nextPeer, reason));
+ }
+
+ @Override
+ public void onAllPeersExhausted() {
+ fut.completeExceptionally(new
ReplicationGroupUnavailableException(groupId()));
+ }
+ };
+
+ handleErrorResponseCommon(fut, resp, retryContext, strategy, false);
+ }
+
+ /**
+ * Sends a request with retry and waits for leader on leader absence.
+ */
+ private void sendWithRetryWaitingForLeader(
+ CompletableFuture<ActionResponse> fut,
+ RetryContext retryContext,
+ Command cmd,
+ long deadline,
+ long termWhenStarted
+ ) {
+ if (!busyLock.enterBusy()) {
+ fut.completeExceptionally(stoppingExceptionFactory.create("Raft
client is stopping [" + groupId() + "]."));
+ return;
+ }
+
+ try {
+ long requestStartTime = Utils.monotonicMs();
+ long stopTime = retryContext.stopTime();
+
+ if (requestStartTime >= stopTime) {
+ // Retry timeout expired - fail with timeout exception.
+ // For non-leader-related retriable errors, we don't wait for
leader, just fail.
+ fut.completeExceptionally(createTimeoutException());
+ return;
+ }
+
+ ThrottlingContextHolder peerThrottlingContextHolder =
throttlingContextHolder.peerContextHolder(
+ retryContext.targetPeer().consistentId()
+ );
+
+ peerThrottlingContextHolder.beforeRequest();
+ retryContext.onNewAttempt();
+
+ // Bound response timeout by remaining time until retry stop time.
+ long responseTimeout =
Math.min(peerThrottlingContextHolder.peerRequestTimeoutMillis(), stopTime -
requestStartTime);
+
+ resolvePeer(retryContext.targetPeer())
+ .thenCompose(node ->
clusterService.messagingService().invoke(node, retryContext.request(),
responseTimeout))
+ // Enforce timeout even if messaging service doesn't
(e.g., in tests with mocks).
+ .orTimeout(responseTimeout > 0 ? responseTimeout :
Long.MAX_VALUE, TimeUnit.MILLISECONDS)
+ .whenComplete((resp, err) -> {
+
peerThrottlingContextHolder.afterRequest(requestStartTime, retriableError(err,
resp));
+
+ if (!busyLock.enterBusy()) {
+
fut.completeExceptionally(stoppingExceptionFactory.create("Raft client is
stopping [" + groupId() + "]."));
+ return;
+ }
+
+ try {
+ if (err != null) {
+ handleThrowableWithLeaderWait(fut, err,
retryContext, cmd, deadline, termWhenStarted);
+ } else if (resp instanceof ErrorResponse) {
+ handleErrorResponseWithLeaderWait(fut,
(ErrorResponse) resp, retryContext, cmd, deadline, termWhenStarted);
+ } else if (resp instanceof SMErrorResponse) {
+ handleSmErrorResponse(fut, (SMErrorResponse)
resp, retryContext);
+ } else {
+ leader = retryContext.targetPeer();
+ fut.complete((ActionResponse) resp);
+ }
+ } catch (Throwable e) {
+ fut.completeExceptionally(e);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ });
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * Handles throwable in leader-wait mode.
+ */
+ private void handleThrowableWithLeaderWait(
+ CompletableFuture<ActionResponse> fut,
+ Throwable err,
+ RetryContext retryContext,
+ Command cmd,
+ long deadline,
+ long termWhenStarted
+ ) {
+ Peer nextPeer = getNextPeerForRecoverableError(fut, err, retryContext);
+
+ if (nextPeer == null) {
+ return;
+ }
+
+ logRecoverableError(retryContext, nextPeer);
+
+ String shortReasonMessage = "Peer " +
retryContext.targetPeer().consistentId()
+ + " threw " + unwrapCause(err).getClass().getSimpleName();
+ scheduleRetryWithLeaderWait(fut, retryContext.nextAttempt(nextPeer,
shortReasonMessage), cmd, deadline, termWhenStarted);
+ }
+
+ /**
+ * Handles error response in leader-wait mode.
+ *
+ * <p>In leader-wait mode:
+ * <ul>
+ * <li>Transient errors (EBUSY/EAGAIN) retry on the same peer after
delay</li>
+ * <li>"No leader" errors try each peer once, then wait for leader
notification</li>
+ * <li>Peer unavailability errors cycle through peers until
timeout</li>
+ * </ul>
+ */
+ private void handleErrorResponseWithLeaderWait(
+ CompletableFuture<ActionResponse> fut,
+ ErrorResponse resp,
+ RetryContext retryContext,
+ Command cmd,
+ long deadline,
+ long termWhenStarted
+ ) {
+ RetryExecutionStrategy strategy = new RetryExecutionStrategy() {
+ @Override
+ public void executeRetry(RetryContext context, Peer nextPeer,
@Nullable PeerTracking trackCurrentAs, String reason) {
+ RetryContext nextContext;
+ if (trackCurrentAs == null) {
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]