JAkutenshi commented on code in PR #7507:
URL: https://github.com/apache/ignite-3/pull/7507#discussion_r2872743696
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/RaftCommandExecutor.java:
##########
@@ -834,15 +999,26 @@ private interface RetryExecutionStrategy {
* as unavailable. When all peers have been tried, the request fails with
* {@link ReplicationGroupUnavailableException}.
*/
- private class SingleAttemptRetryStrategy implements RetryExecutionStrategy
{
- private final CompletableFuture<? extends NetworkMessage> fut;
+ private class SingleAttemptRetryStrategy<R extends NetworkMessage>
implements RetryExecutionStrategy {
+ private final CompletableFuture<R> fut;
Review Comment:
Resolved.
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/RaftCommandExecutor.java:
##########
@@ -169,16 +185,69 @@ LeaderElectionListener leaderElectionListener() {
* @return Future that completes with the command result.
*/
<R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
+ return this.<ActionResponse>send(
+ createRequestFactory(cmd),
+ TargetPeerStrategy.LEADER,
+ null,
+ timeoutMillis
+ ).thenApply(resp -> (R) resp.result());
+ }
+
+ /**
+ * Sends a request with leader-aware retry semantics.
+ *
+ * @param requestFactory Factory creating requests for target peer.
+ * @param targetStrategy How to select the initial target peer.
+ * @param timeoutMillis Timeout (0=single attempt, MAX_VALUE=infinite,
positive=bounded).
+ * @param <R> Response type.
+ * @return Future with response.
+ */
+ <R extends NetworkMessage> CompletableFuture<R> send(
+ Function<Peer, ? extends NetworkMessage> requestFactory,
+ TargetPeerStrategy targetStrategy,
+ long timeoutMillis
+ ) {
+ return send(requestFactory, targetStrategy, null, timeoutMillis);
+ }
+
+ /**
+ * Sends a request with leader-aware retry semantics.
+ *
+ * @param requestFactory Factory creating requests for target peer.
+ * @param targetStrategy How to select the initial target peer.
+ * @param specificPeer Target peer for SPECIFIC strategy (ignored for
other strategies).
+ * @param timeoutMillis Timeout (0=single attempt, MAX_VALUE=infinite,
positive=bounded).
+ * @param <R> Response type.
+ * @return Future with response.
+ */
+ <R extends NetworkMessage> CompletableFuture<R> send(
+ Function<Peer, ? extends NetworkMessage> requestFactory,
+ TargetPeerStrategy targetStrategy,
+ @Nullable Peer specificPeer,
+ long timeoutMillis
+ ) {
// Normalize timeout: negative values mean infinite wait.
long effectiveTimeout = (timeoutMillis < 0) ? Long.MAX_VALUE :
timeoutMillis;
- // Wait for leader mode (bounded or infinite).
long deadline = Utils.monotonicMsAfter(effectiveTimeout);
return executeWithBusyLock(responseFuture -> {
+ Peer initialPeer = resolveInitialPeer(targetStrategy,
specificPeer);
Review Comment:
Resolved.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]