This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 1e6487e7f02 IGNITE-26075 Fix refreshAndGetLeaderWithTerm in order not 
to return stale leader (#6344)
1e6487e7f02 is described below

commit 1e6487e7f02658cbb6e082ba1831abecd6c372e7
Author: Denis Chudov <[email protected]>
AuthorDate: Fri Aug 8 18:49:02 2025 +0300

    IGNITE-26075 Fix refreshAndGetLeaderWithTerm in order not to return stale 
leader (#6344)
---
 .../apache/ignite/raft/jraft/core/NodeImpl.java    | 154 ++++++++++++++++++---
 .../ignite/raft/jraft/rpc/RaftClientService.java   |  12 ++
 .../ignite/raft/jraft/rpc/RaftServerService.java   |  10 ++
 .../raft/jraft/rpc/impl/IgniteRpcServer.java       |   2 +-
 .../rpc/impl/cli/GetLeaderRequestProcessor.java    |  68 +++------
 .../rpc/impl/core/DefaultRaftClientService.java    |  12 +-
 6 files changed, 188 insertions(+), 70 deletions(-)

diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 6b0ff959e96..28492c0cc4e 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -39,6 +39,8 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
+import java.util.function.Consumer;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.ignite.internal.hlc.HybridClock;
@@ -100,6 +102,9 @@ import 
org.apache.ignite.raft.jraft.option.ReadOnlyServiceOptions;
 import org.apache.ignite.raft.jraft.option.ReplicatorGroupOptions;
 import org.apache.ignite.raft.jraft.option.SnapshotExecutorOptions;
 import org.apache.ignite.raft.jraft.rpc.AppendEntriesResponseBuilder;
+import org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderRequest;
+import org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderResponse;
+import org.apache.ignite.raft.jraft.rpc.GetLeaderResponseBuilder;
 import org.apache.ignite.raft.jraft.rpc.Message;
 import org.apache.ignite.raft.jraft.rpc.RaftClientService;
 import org.apache.ignite.raft.jraft.rpc.RaftRpcFactory;
@@ -1761,21 +1766,24 @@ public class NodeImpl implements Node, 
RaftServerService {
     /**
      * ReadIndex response closure
      */
-    public static class ReadIndexHeartbeatResponseClosure extends 
RpcResponseClosureAdapter<AppendEntriesResponse> {
-        final ReadIndexResponseBuilder respBuilder;
-        final RpcResponseClosure<ReadIndexResponse> closure;
+    public static class QuorumConfirmedHeartbeatResponseClosure<T extends 
Message> extends RpcResponseClosureAdapter<AppendEntriesResponse>{
+        final Function<Boolean, T> responseBuilder;
+        final Consumer<T> responseConsumer;
         final int quorum;
         final int failPeersThreshold;
         int ackSuccess;
         int ackFailures;
         boolean isDone;
 
-        ReadIndexHeartbeatResponseClosure(final 
RpcResponseClosure<ReadIndexResponse> closure,
-            final ReadIndexResponseBuilder rb, final int quorum,
-            final int peersCount) {
+        QuorumConfirmedHeartbeatResponseClosure(
+                final Consumer<T> responseConsumer,
+                final Function<Boolean, T> responseBuilder,
+                final int quorum,
+                final int peersCount
+        ) {
             super();
-            this.closure = closure;
-            this.respBuilder = rb;
+            this.responseConsumer = responseConsumer;
+            this.responseBuilder = responseBuilder;
             this.quorum = quorum;
             this.failPeersThreshold = peersCount % 2 == 0 ? (quorum - 1) : 
quorum;
             this.ackSuccess = 0;
@@ -1794,17 +1802,16 @@ public class NodeImpl implements Node, 
RaftServerService {
             else {
                 this.ackFailures++;
             }
+
             // Include leader self vote yes.
             if (this.ackSuccess + 1 >= this.quorum) {
-                this.respBuilder.success(true);
-                this.closure.setResponse(this.respBuilder.build());
-                this.closure.run(Status.OK());
+                T response = responseBuilder.apply(true);
+                responseConsumer.accept(response);
                 this.isDone = true;
             }
             else if (this.ackFailures >= this.failPeersThreshold) {
-                this.respBuilder.success(false);
-                this.closure.setResponse(this.respBuilder.build());
-                this.closure.run(Status.OK());
+                T response = responseBuilder.apply(false);
+                responseConsumer.accept(response);
                 this.isDone = true;
             }
         }
@@ -1814,8 +1821,10 @@ public class NodeImpl implements Node, RaftServerService 
{
      * Handle read index request.
      */
     @Override
-    public void handleReadIndexRequest(final ReadIndexRequest request,
-        final RpcResponseClosure<ReadIndexResponse> done) {
+    public void handleReadIndexRequest(
+            final ReadIndexRequest request,
+            final RpcResponseClosure<ReadIndexResponse> done
+    ) {
         final long startMs = Utils.monotonicMs();
         this.readLock.lock();
         try {
@@ -1841,6 +1850,104 @@ public class NodeImpl implements Node, 
RaftServerService {
         }
     }
 
+    @Override
+    public void handleGetLeaderAndTermRequest(GetLeaderRequest request, 
RpcResponseClosure<GetLeaderResponse> done) {
+        final long startMs = Utils.monotonicMs();
+        this.readLock.lock();
+        try {
+            switch (this.state) {
+                case STATE_LEADER:
+                    getLeaderFromLeader(done);
+                    break;
+                case STATE_FOLLOWER:
+                    getLeaderFromFollower(request, done);
+                    break;
+                case STATE_TRANSFERRING:
+                    done.run(new Status(RaftError.EBUSY, "Is transferring 
leadership."));
+                    break;
+                default:
+                    done.run(new Status(RaftError.UNKNOWN, "Invalid state for 
getLeaderAndTerm: %s.", this.state));
+                    break;
+            }
+        }
+        finally {
+            this.readLock.unlock();
+            this.metrics.recordLatency("handle-get-leader", 
Utils.monotonicMs() - startMs);
+        }
+    }
+
+    private void getLeaderFromFollower(GetLeaderRequest request, 
RpcResponseClosure<GetLeaderResponse> closure) {
+       PeerId leaderId = this.leaderId;
+
+       if (leaderId == null || leaderId.isEmpty()) {
+            closure.run(new Status(RaftError.UNKNOWN, "No leader at term %d.", 
this.currTerm));
+            return;
+        }
+        // send request to leader.
+        final GetLeaderRequest newRequest = 
raftOptions.getRaftMessagesFactory()
+            .getLeaderRequest()
+            .groupId(request.groupId())
+            .peerId(leaderId.toString())
+            .build();
+
+        this.rpcClientService.getLeaderAndTerm(leaderId, newRequest, -1, 
closure);
+    }
+
+    private void getLeaderFromLeader(RpcResponseClosure<GetLeaderResponse> 
closure) {
+       PeerId leaderId = this.leaderId;
+
+       if (leaderId == null || leaderId.isEmpty()) {
+            closure.run(new Status(RaftError.UNKNOWN, "No leader at term %d.", 
this.currTerm));
+            return;
+        }
+
+        GetLeaderResponseBuilder respBuilder = 
raftOptions.getRaftMessagesFactory().getLeaderResponse()
+            .leaderId(leaderId.toString())
+            .currentTerm(this.getCurrentTerm());
+
+        final int quorum = getQuorum();
+        if (quorum <= 1) {
+            // Only one peer, fast path.
+            closure.setResponse(respBuilder.build());
+            closure.run(Status.OK());
+            return;
+        }
+
+        ReadOnlyOption readOnlyOpt = this.raftOptions.getReadOnlyOptions();
+        if (readOnlyOpt == ReadOnlyOption.ReadOnlyLeaseBased && 
!isLeaderLeaseValid()) {
+            // If leader lease timeout, we must change option to ReadOnlySafe
+            readOnlyOpt = ReadOnlyOption.ReadOnlySafe;
+        }
+
+        switch (readOnlyOpt) {
+            case ReadOnlySafe:
+                final List<PeerId> peers = this.conf.getConf().getPeers();
+                Requires.requireTrue(peers != null && !peers.isEmpty(), "Empty 
peers");
+                final 
QuorumConfirmedHeartbeatResponseClosure<GetLeaderResponse> heartbeatDone =
+                        new QuorumConfirmedHeartbeatResponseClosure<>(
+                            response -> {
+                                closure.setResponse(response);
+                                closure.run(Status.OK());
+                            },
+                            success -> respBuilder.build(),
+                            quorum,
+                            peers.size()
+                        );
+                // Send heartbeat requests to followers
+                for (final PeerId peer : peers) {
+                    if (peer.equals(this.serverId)) {
+                        continue;
+                    }
+                    this.replicatorGroup.sendHeartbeat(peer, heartbeatDone);
+                }
+                break;
+            case ReadOnlyLeaseBased:
+                closure.setResponse(respBuilder.build());
+                closure.run(Status.OK());
+                break;
+        }
+    }
+
     private int getQuorum() {
         final Configuration c = this.conf.getConf();
         if (c.isEmpty()) {
@@ -1912,8 +2019,19 @@ public class NodeImpl implements Node, RaftServerService 
{
             case ReadOnlySafe:
                 final List<PeerId> peers = this.conf.getConf().getPeers();
                 Requires.requireTrue(peers != null && !peers.isEmpty(), "Empty 
peers");
-                final ReadIndexHeartbeatResponseClosure heartbeatDone = new 
ReadIndexHeartbeatResponseClosure(closure,
-                    respBuilder, quorum, peers.size());
+                final 
QuorumConfirmedHeartbeatResponseClosure<ReadIndexResponse> heartbeatDone =
+                    new QuorumConfirmedHeartbeatResponseClosure<>(
+                            response -> {
+                                closure.setResponse(response);
+                                closure.run(Status.OK());
+                            },
+                            success -> {
+                                respBuilder.success(success);
+                                return respBuilder.build();
+                            },
+                            quorum,
+                            peers.size()
+                    );
                 // Send heartbeat requests to followers
                 for (final PeerId peer : peers) {
                     if (peer.equals(this.serverId)) {
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RaftClientService.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RaftClientService.java
index b3084412c08..bc891021073 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RaftClientService.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RaftClientService.java
@@ -103,4 +103,16 @@ public interface RaftClientService extends ClientService {
      */
     Future<Message> readIndex(final PeerId peerId, final 
RpcRequests.ReadIndexRequest request, final int timeoutMs,
         final RpcResponseClosure<RpcRequests.ReadIndexResponse> done);
+
+    /**
+     * Send a get-leader-and-term request and handle the response with done.
+     *
+     * @param peerId destination peer ID
+     * @param request request data
+     * @param timeoutMs timeout millis
+     * @param done callback
+     * @return a future result
+     */
+    Future<Message> getLeaderAndTerm(final PeerId peerId, final 
CliRequests.GetLeaderRequest request, final int timeoutMs,
+        final RpcResponseClosure<CliRequests.GetLeaderResponse> done);
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RaftServerService.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RaftServerService.java
index 7d60d26d7d4..2d3b6f48910 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RaftServerService.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RaftServerService.java
@@ -16,6 +16,8 @@
  */
 package org.apache.ignite.raft.jraft.rpc;
 
+import org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderRequest;
+import org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderResponse;
 import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesRequest;
 import org.apache.ignite.raft.jraft.rpc.RpcRequests.InstallSnapshotRequest;
 import org.apache.ignite.raft.jraft.rpc.RpcRequests.ReadIndexRequest;
@@ -78,4 +80,12 @@ public interface RaftServerService {
      * @param done callback
      */
     void handleReadIndexRequest(ReadIndexRequest request, 
RpcResponseClosure<ReadIndexResponse> done);
+
+    /**
+     * Handle get-leader-and-term request, call the RPC closure with response.
+     *
+     * @param request data of the getLeader read
+     * @param done callback
+     */
+    void handleGetLeaderAndTermRequest(GetLeaderRequest request, 
RpcResponseClosure<GetLeaderResponse> done);
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
index 85c9bb289e2..91eee81971a 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
@@ -113,13 +113,13 @@ public class IgniteRpcServer implements RpcServer<Void> {
         registerProcessor(new TimeoutNowRequestProcessor(rpcExecutor, 
raftMessagesFactory));
         registerProcessor(new ReadIndexRequestProcessor(rpcExecutor, 
raftMessagesFactory));
         registerProcessor(new HeartbeatRequestProcessor(rpcExecutor, 
raftMessagesFactory));
+        registerProcessor(new GetLeaderRequestProcessor(rpcExecutor, 
raftMessagesFactory));
         // raft native cli service
         registerProcessor(new AddPeerRequestProcessor(rpcExecutor, 
raftMessagesFactory));
         registerProcessor(new RemovePeerRequestProcessor(rpcExecutor, 
raftMessagesFactory));
         registerProcessor(new ResetPeerRequestProcessor(rpcExecutor, 
raftMessagesFactory));
         registerProcessor(new 
ChangePeersAndLearnersRequestProcessor(rpcExecutor, raftMessagesFactory));
         registerProcessor(new 
ChangePeersAndLearnersAsyncRequestProcessor(rpcExecutor, raftMessagesFactory));
-        registerProcessor(new GetLeaderRequestProcessor(rpcExecutor, 
raftMessagesFactory));
         registerProcessor(new SnapshotRequestProcessor(rpcExecutor, 
raftMessagesFactory));
         registerProcessor(new TransferLeaderRequestProcessor(rpcExecutor, 
raftMessagesFactory));
         registerProcessor(new GetPeersRequestProcessor(rpcExecutor, 
raftMessagesFactory));
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/GetLeaderRequestProcessor.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/GetLeaderRequestProcessor.java
index 1090447f814..177e4f939b4 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/GetLeaderRequestProcessor.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/GetLeaderRequestProcessor.java
@@ -16,23 +16,21 @@
  */
 package org.apache.ignite.raft.jraft.rpc.impl.cli;
 
-import java.util.ArrayList;
-import java.util.List;
+import static java.util.concurrent.CompletableFuture.runAsync;
 import java.util.concurrent.Executor;
 import org.apache.ignite.raft.jraft.RaftMessagesFactory;
-import org.apache.ignite.raft.jraft.Node;
 import org.apache.ignite.raft.jraft.Status;
-import org.apache.ignite.raft.jraft.entity.PeerId;
-import org.apache.ignite.raft.jraft.error.RaftError;
 import org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderRequest;
 import org.apache.ignite.raft.jraft.rpc.Message;
-import org.apache.ignite.raft.jraft.rpc.RaftRpcFactory;
+import org.apache.ignite.raft.jraft.rpc.RaftServerService;
 import org.apache.ignite.raft.jraft.rpc.RpcRequestClosure;
+import org.apache.ignite.raft.jraft.rpc.RpcResponseClosureAdapter;
+import org.apache.ignite.raft.jraft.rpc.impl.core.NodeRequestProcessor;
 
 /**
  * Process get leader request.
  */
-public class GetLeaderRequestProcessor extends 
BaseCliRequestProcessor<GetLeaderRequest> {
+public class GetLeaderRequestProcessor extends 
NodeRequestProcessor<GetLeaderRequest> {
 
     public GetLeaderRequestProcessor(Executor executor, RaftMessagesFactory 
msgFactory) {
         super(executor, msgFactory);
@@ -49,55 +47,27 @@ public class GetLeaderRequestProcessor extends 
BaseCliRequestProcessor<GetLeader
     }
 
     @Override
-    protected Message processRequest0(final CliRequestContext ctx, final 
GetLeaderRequest request,
-        final IgniteCliRpcRequestClosure done) {
-        // ignore
-        return null;
-    }
+    public Message processRequest0(final RaftServerService service, final 
GetLeaderRequest request,
+        final RpcRequestClosure done) {
 
-    @Override
-    public Message processRequest(final GetLeaderRequest request, final 
RpcRequestClosure done) {
-        List<Node> nodes = new ArrayList<>();
-        final String groupId = getGroupId(request);
-        if (request.peerId() != null) {
-            final String peerIdStr = getPeerId(request);
-            final PeerId peer = new PeerId();
-            if (peer.parse(peerIdStr)) {
-                final Status st = new Status();
-                nodes.add(getNode(groupId, peer, st, 
done.getRpcCtx().getNodeManager()));
-                if (!st.isOk()) {
-                    return RaftRpcFactory.DEFAULT //
-                        .newResponse(msgFactory(), st);
+        service.handleGetLeaderAndTermRequest(request, new 
RpcResponseClosureAdapter<>() {
+            @Override
+            public void run(final Status status) {
+                if (getResponse() != null) {
+                    runAsync(() -> done.sendResponse(getResponse()), 
executor());
+                }
+                else {
+                    runAsync(() -> done.run(status), executor());
                 }
             }
-            else {
-                return RaftRpcFactory.DEFAULT //
-                    .newResponse(msgFactory(), RaftError.EINVAL, "Fail to 
parse peer id %s", peerIdStr);
-            }
-        }
-        else {
-            nodes = 
done.getRpcCtx().getNodeManager().getNodesByGroupId(groupId);
-        }
-        if (nodes == null || nodes.isEmpty()) {
-            return RaftRpcFactory.DEFAULT //
-                .newResponse(msgFactory(), RaftError.ENOENT, "No nodes in 
group %s", groupId);
-        }
-        for (final Node node : nodes) {
-            final PeerId leader = node.getLeaderId();
-            if (leader != null && !leader.isEmpty()) {
-                return msgFactory().getLeaderResponse()
-                    .leaderId(leader.toString())
-                    .currentTerm(node.getCurrentTerm())
-                    .build();
-            }
-        }
-        return RaftRpcFactory.DEFAULT //
-            .newResponse(msgFactory(), RaftError.UNKNOWN, "Unknown leader");
+
+        });
+
+        return null;
     }
 
     @Override
     public String interest() {
         return GetLeaderRequest.class.getName();
     }
-
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/DefaultRaftClientService.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/DefaultRaftClientService.java
index 7048e3b3739..202c0044d73 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/DefaultRaftClientService.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/DefaultRaftClientService.java
@@ -26,13 +26,15 @@ import java.util.concurrent.TimeoutException;
 import org.apache.ignite.raft.jraft.JRaftUtils;
 import org.apache.ignite.raft.jraft.NodeManager;
 import org.apache.ignite.raft.jraft.Status;
-import 
org.apache.ignite.raft.jraft.core.NodeImpl.ReadIndexHeartbeatResponseClosure;
+import 
org.apache.ignite.raft.jraft.core.NodeImpl.QuorumConfirmedHeartbeatResponseClosure;
 import org.apache.ignite.raft.jraft.entity.PeerId;
 import org.apache.ignite.raft.jraft.error.InvokeTimeoutException;
 import org.apache.ignite.raft.jraft.error.RaftError;
 import org.apache.ignite.raft.jraft.error.RemotingException;
 import org.apache.ignite.raft.jraft.option.NodeOptions;
 import org.apache.ignite.raft.jraft.option.RpcOptions;
+import org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderRequest;
+import org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderResponse;
 import org.apache.ignite.raft.jraft.rpc.InvokeContext;
 import org.apache.ignite.raft.jraft.rpc.Message;
 import org.apache.ignite.raft.jraft.rpc.RaftClientService;
@@ -161,7 +163,7 @@ public class DefaultRaftClientService extends 
AbstractClientService implements R
      * @return True if the read index request.
      */
     private static boolean 
isReadIndexRequest(RpcResponseClosure<AppendEntriesResponse> doneClosure) {
-        return doneClosure instanceof ReadIndexHeartbeatResponseClosure;
+        return doneClosure instanceof QuorumConfirmedHeartbeatResponseClosure;
     }
 
     @Override
@@ -197,6 +199,12 @@ public class DefaultRaftClientService extends 
AbstractClientService implements R
         return invokeWithDone(peerId, request, done, timeoutMs);
     }
 
+    @Override
+    public Future<Message> getLeaderAndTerm( PeerId peerId, GetLeaderRequest 
request, int timeoutMs,
+            RpcResponseClosure<GetLeaderResponse> done) {
+        return invokeWithDone(peerId, request, done, timeoutMs);
+    }
+
     /**
      * @param executor The executor to run done closure.
      * @param request The request.

Reply via email to