This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 1e6487e7f02 IGNITE-26075 Fix refreshAndGetLeaderWithTerm in order not
to return stale leader (#6344)
1e6487e7f02 is described below
commit 1e6487e7f02658cbb6e082ba1831abecd6c372e7
Author: Denis Chudov <[email protected]>
AuthorDate: Fri Aug 8 18:49:02 2025 +0300
IGNITE-26075 Fix refreshAndGetLeaderWithTerm in order not to return stale
leader (#6344)
---
.../apache/ignite/raft/jraft/core/NodeImpl.java | 154 ++++++++++++++++++---
.../ignite/raft/jraft/rpc/RaftClientService.java | 12 ++
.../ignite/raft/jraft/rpc/RaftServerService.java | 10 ++
.../raft/jraft/rpc/impl/IgniteRpcServer.java | 2 +-
.../rpc/impl/cli/GetLeaderRequestProcessor.java | 68 +++------
.../rpc/impl/core/DefaultRaftClientService.java | 12 +-
6 files changed, 188 insertions(+), 70 deletions(-)
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 6b0ff959e96..28492c0cc4e 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -39,6 +39,8 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
+import java.util.function.Consumer;
+import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite.internal.hlc.HybridClock;
@@ -100,6 +102,9 @@ import
org.apache.ignite.raft.jraft.option.ReadOnlyServiceOptions;
import org.apache.ignite.raft.jraft.option.ReplicatorGroupOptions;
import org.apache.ignite.raft.jraft.option.SnapshotExecutorOptions;
import org.apache.ignite.raft.jraft.rpc.AppendEntriesResponseBuilder;
+import org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderRequest;
+import org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderResponse;
+import org.apache.ignite.raft.jraft.rpc.GetLeaderResponseBuilder;
import org.apache.ignite.raft.jraft.rpc.Message;
import org.apache.ignite.raft.jraft.rpc.RaftClientService;
import org.apache.ignite.raft.jraft.rpc.RaftRpcFactory;
@@ -1761,21 +1766,24 @@ public class NodeImpl implements Node,
RaftServerService {
/**
* ReadIndex response closure
*/
- public static class ReadIndexHeartbeatResponseClosure extends
RpcResponseClosureAdapter<AppendEntriesResponse> {
- final ReadIndexResponseBuilder respBuilder;
- final RpcResponseClosure<ReadIndexResponse> closure;
+ public static class QuorumConfirmedHeartbeatResponseClosure<T extends
Message> extends RpcResponseClosureAdapter<AppendEntriesResponse>{
+ final Function<Boolean, T> responseBuilder;
+ final Consumer<T> responseConsumer;
final int quorum;
final int failPeersThreshold;
int ackSuccess;
int ackFailures;
boolean isDone;
- ReadIndexHeartbeatResponseClosure(final
RpcResponseClosure<ReadIndexResponse> closure,
- final ReadIndexResponseBuilder rb, final int quorum,
- final int peersCount) {
+ QuorumConfirmedHeartbeatResponseClosure(
+ final Consumer<T> responseConsumer,
+ final Function<Boolean, T> responseBuilder,
+ final int quorum,
+ final int peersCount
+ ) {
super();
- this.closure = closure;
- this.respBuilder = rb;
+ this.responseConsumer = responseConsumer;
+ this.responseBuilder = responseBuilder;
this.quorum = quorum;
this.failPeersThreshold = peersCount % 2 == 0 ? (quorum - 1) :
quorum;
this.ackSuccess = 0;
@@ -1794,17 +1802,16 @@ public class NodeImpl implements Node,
RaftServerService {
else {
this.ackFailures++;
}
+
// Include leader self vote yes.
if (this.ackSuccess + 1 >= this.quorum) {
- this.respBuilder.success(true);
- this.closure.setResponse(this.respBuilder.build());
- this.closure.run(Status.OK());
+ T response = responseBuilder.apply(true);
+ responseConsumer.accept(response);
this.isDone = true;
}
else if (this.ackFailures >= this.failPeersThreshold) {
- this.respBuilder.success(false);
- this.closure.setResponse(this.respBuilder.build());
- this.closure.run(Status.OK());
+ T response = responseBuilder.apply(false);
+ responseConsumer.accept(response);
this.isDone = true;
}
}
@@ -1814,8 +1821,10 @@ public class NodeImpl implements Node, RaftServerService
{
* Handle read index request.
*/
@Override
- public void handleReadIndexRequest(final ReadIndexRequest request,
- final RpcResponseClosure<ReadIndexResponse> done) {
+ public void handleReadIndexRequest(
+ final ReadIndexRequest request,
+ final RpcResponseClosure<ReadIndexResponse> done
+ ) {
final long startMs = Utils.monotonicMs();
this.readLock.lock();
try {
@@ -1841,6 +1850,104 @@ public class NodeImpl implements Node,
RaftServerService {
}
}
+ @Override
+ public void handleGetLeaderAndTermRequest(GetLeaderRequest request,
RpcResponseClosure<GetLeaderResponse> done) {
+ final long startMs = Utils.monotonicMs();
+ this.readLock.lock();
+ try {
+ switch (this.state) {
+ case STATE_LEADER:
+ getLeaderFromLeader(done);
+ break;
+ case STATE_FOLLOWER:
+ getLeaderFromFollower(request, done);
+ break;
+ case STATE_TRANSFERRING:
+ done.run(new Status(RaftError.EBUSY, "Is transferring
leadership."));
+ break;
+ default:
+ done.run(new Status(RaftError.UNKNOWN, "Invalid state for
getLeaderAndTerm: %s.", this.state));
+ break;
+ }
+ }
+ finally {
+ this.readLock.unlock();
+ this.metrics.recordLatency("handle-get-leader",
Utils.monotonicMs() - startMs);
+ }
+ }
+
+ private void getLeaderFromFollower(GetLeaderRequest request,
RpcResponseClosure<GetLeaderResponse> closure) {
+ PeerId leaderId = this.leaderId;
+
+ if (leaderId == null || leaderId.isEmpty()) {
+ closure.run(new Status(RaftError.UNKNOWN, "No leader at term %d.",
this.currTerm));
+ return;
+ }
+ // send request to leader.
+ final GetLeaderRequest newRequest =
raftOptions.getRaftMessagesFactory()
+ .getLeaderRequest()
+ .groupId(request.groupId())
+ .peerId(leaderId.toString())
+ .build();
+
+ this.rpcClientService.getLeaderAndTerm(leaderId, newRequest, -1,
closure);
+ }
+
+ private void getLeaderFromLeader(RpcResponseClosure<GetLeaderResponse>
closure) {
+ PeerId leaderId = this.leaderId;
+
+ if (leaderId == null || leaderId.isEmpty()) {
+ closure.run(new Status(RaftError.UNKNOWN, "No leader at term %d.",
this.currTerm));
+ return;
+ }
+
+ GetLeaderResponseBuilder respBuilder =
raftOptions.getRaftMessagesFactory().getLeaderResponse()
+ .leaderId(leaderId.toString())
+ .currentTerm(this.getCurrentTerm());
+
+ final int quorum = getQuorum();
+ if (quorum <= 1) {
+ // Only one peer, fast path.
+ closure.setResponse(respBuilder.build());
+ closure.run(Status.OK());
+ return;
+ }
+
+ ReadOnlyOption readOnlyOpt = this.raftOptions.getReadOnlyOptions();
+ if (readOnlyOpt == ReadOnlyOption.ReadOnlyLeaseBased &&
!isLeaderLeaseValid()) {
+ // If leader lease timeout, we must change option to ReadOnlySafe
+ readOnlyOpt = ReadOnlyOption.ReadOnlySafe;
+ }
+
+ switch (readOnlyOpt) {
+ case ReadOnlySafe:
+ final List<PeerId> peers = this.conf.getConf().getPeers();
+ Requires.requireTrue(peers != null && !peers.isEmpty(), "Empty
peers");
+ final
QuorumConfirmedHeartbeatResponseClosure<GetLeaderResponse> heartbeatDone =
+ new QuorumConfirmedHeartbeatResponseClosure<>(
+ response -> {
+ closure.setResponse(response);
+ closure.run(Status.OK());
+ },
+ success -> respBuilder.build(),
+ quorum,
+ peers.size()
+ );
+ // Send heartbeat requests to followers
+ for (final PeerId peer : peers) {
+ if (peer.equals(this.serverId)) {
+ continue;
+ }
+ this.replicatorGroup.sendHeartbeat(peer, heartbeatDone);
+ }
+ break;
+ case ReadOnlyLeaseBased:
+ closure.setResponse(respBuilder.build());
+ closure.run(Status.OK());
+ break;
+ }
+ }
+
private int getQuorum() {
final Configuration c = this.conf.getConf();
if (c.isEmpty()) {
@@ -1912,8 +2019,19 @@ public class NodeImpl implements Node, RaftServerService
{
case ReadOnlySafe:
final List<PeerId> peers = this.conf.getConf().getPeers();
Requires.requireTrue(peers != null && !peers.isEmpty(), "Empty
peers");
- final ReadIndexHeartbeatResponseClosure heartbeatDone = new
ReadIndexHeartbeatResponseClosure(closure,
- respBuilder, quorum, peers.size());
+ final
QuorumConfirmedHeartbeatResponseClosure<ReadIndexResponse> heartbeatDone =
+ new QuorumConfirmedHeartbeatResponseClosure<>(
+ response -> {
+ closure.setResponse(response);
+ closure.run(Status.OK());
+ },
+ success -> {
+ respBuilder.success(success);
+ return respBuilder.build();
+ },
+ quorum,
+ peers.size()
+ );
// Send heartbeat requests to followers
for (final PeerId peer : peers) {
if (peer.equals(this.serverId)) {
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RaftClientService.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RaftClientService.java
index b3084412c08..bc891021073 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RaftClientService.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RaftClientService.java
@@ -103,4 +103,16 @@ public interface RaftClientService extends ClientService {
*/
Future<Message> readIndex(final PeerId peerId, final
RpcRequests.ReadIndexRequest request, final int timeoutMs,
final RpcResponseClosure<RpcRequests.ReadIndexResponse> done);
+
+ /**
+ * Send a get-leader-and-term request and handle the response with done.
+ *
+ * @param peerId destination peer ID
+ * @param request request data
+ * @param timeoutMs timeout millis
+ * @param done callback
+ * @return a future result
+ */
+ Future<Message> getLeaderAndTerm(final PeerId peerId, final
CliRequests.GetLeaderRequest request, final int timeoutMs,
+ final RpcResponseClosure<CliRequests.GetLeaderResponse> done);
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RaftServerService.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RaftServerService.java
index 7d60d26d7d4..2d3b6f48910 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RaftServerService.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RaftServerService.java
@@ -16,6 +16,8 @@
*/
package org.apache.ignite.raft.jraft.rpc;
+import org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderRequest;
+import org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderResponse;
import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesRequest;
import org.apache.ignite.raft.jraft.rpc.RpcRequests.InstallSnapshotRequest;
import org.apache.ignite.raft.jraft.rpc.RpcRequests.ReadIndexRequest;
@@ -78,4 +80,12 @@ public interface RaftServerService {
* @param done callback
*/
void handleReadIndexRequest(ReadIndexRequest request,
RpcResponseClosure<ReadIndexResponse> done);
+
+ /**
+ * Handle get-leader-and-term request, call the RPC closure with response.
+ *
+ * @param request data of the getLeader read
+ * @param done callback
+ */
+ void handleGetLeaderAndTermRequest(GetLeaderRequest request,
RpcResponseClosure<GetLeaderResponse> done);
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
index 85c9bb289e2..91eee81971a 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
@@ -113,13 +113,13 @@ public class IgniteRpcServer implements RpcServer<Void> {
registerProcessor(new TimeoutNowRequestProcessor(rpcExecutor,
raftMessagesFactory));
registerProcessor(new ReadIndexRequestProcessor(rpcExecutor,
raftMessagesFactory));
registerProcessor(new HeartbeatRequestProcessor(rpcExecutor,
raftMessagesFactory));
+ registerProcessor(new GetLeaderRequestProcessor(rpcExecutor,
raftMessagesFactory));
// raft native cli service
registerProcessor(new AddPeerRequestProcessor(rpcExecutor,
raftMessagesFactory));
registerProcessor(new RemovePeerRequestProcessor(rpcExecutor,
raftMessagesFactory));
registerProcessor(new ResetPeerRequestProcessor(rpcExecutor,
raftMessagesFactory));
registerProcessor(new
ChangePeersAndLearnersRequestProcessor(rpcExecutor, raftMessagesFactory));
registerProcessor(new
ChangePeersAndLearnersAsyncRequestProcessor(rpcExecutor, raftMessagesFactory));
- registerProcessor(new GetLeaderRequestProcessor(rpcExecutor,
raftMessagesFactory));
registerProcessor(new SnapshotRequestProcessor(rpcExecutor,
raftMessagesFactory));
registerProcessor(new TransferLeaderRequestProcessor(rpcExecutor,
raftMessagesFactory));
registerProcessor(new GetPeersRequestProcessor(rpcExecutor,
raftMessagesFactory));
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/GetLeaderRequestProcessor.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/GetLeaderRequestProcessor.java
index 1090447f814..177e4f939b4 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/GetLeaderRequestProcessor.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/GetLeaderRequestProcessor.java
@@ -16,23 +16,21 @@
*/
package org.apache.ignite.raft.jraft.rpc.impl.cli;
-import java.util.ArrayList;
-import java.util.List;
+import static java.util.concurrent.CompletableFuture.runAsync;
import java.util.concurrent.Executor;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
-import org.apache.ignite.raft.jraft.Node;
import org.apache.ignite.raft.jraft.Status;
-import org.apache.ignite.raft.jraft.entity.PeerId;
-import org.apache.ignite.raft.jraft.error.RaftError;
import org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderRequest;
import org.apache.ignite.raft.jraft.rpc.Message;
-import org.apache.ignite.raft.jraft.rpc.RaftRpcFactory;
+import org.apache.ignite.raft.jraft.rpc.RaftServerService;
import org.apache.ignite.raft.jraft.rpc.RpcRequestClosure;
+import org.apache.ignite.raft.jraft.rpc.RpcResponseClosureAdapter;
+import org.apache.ignite.raft.jraft.rpc.impl.core.NodeRequestProcessor;
/**
* Process get leader request.
*/
-public class GetLeaderRequestProcessor extends
BaseCliRequestProcessor<GetLeaderRequest> {
+public class GetLeaderRequestProcessor extends
NodeRequestProcessor<GetLeaderRequest> {
public GetLeaderRequestProcessor(Executor executor, RaftMessagesFactory
msgFactory) {
super(executor, msgFactory);
@@ -49,55 +47,27 @@ public class GetLeaderRequestProcessor extends
BaseCliRequestProcessor<GetLeader
}
@Override
- protected Message processRequest0(final CliRequestContext ctx, final
GetLeaderRequest request,
- final IgniteCliRpcRequestClosure done) {
- // ignore
- return null;
- }
+ public Message processRequest0(final RaftServerService service, final
GetLeaderRequest request,
+ final RpcRequestClosure done) {
- @Override
- public Message processRequest(final GetLeaderRequest request, final
RpcRequestClosure done) {
- List<Node> nodes = new ArrayList<>();
- final String groupId = getGroupId(request);
- if (request.peerId() != null) {
- final String peerIdStr = getPeerId(request);
- final PeerId peer = new PeerId();
- if (peer.parse(peerIdStr)) {
- final Status st = new Status();
- nodes.add(getNode(groupId, peer, st,
done.getRpcCtx().getNodeManager()));
- if (!st.isOk()) {
- return RaftRpcFactory.DEFAULT //
- .newResponse(msgFactory(), st);
+ service.handleGetLeaderAndTermRequest(request, new
RpcResponseClosureAdapter<>() {
+ @Override
+ public void run(final Status status) {
+ if (getResponse() != null) {
+ runAsync(() -> done.sendResponse(getResponse()),
executor());
+ }
+ else {
+ runAsync(() -> done.run(status), executor());
}
}
- else {
- return RaftRpcFactory.DEFAULT //
- .newResponse(msgFactory(), RaftError.EINVAL, "Fail to
parse peer id %s", peerIdStr);
- }
- }
- else {
- nodes =
done.getRpcCtx().getNodeManager().getNodesByGroupId(groupId);
- }
- if (nodes == null || nodes.isEmpty()) {
- return RaftRpcFactory.DEFAULT //
- .newResponse(msgFactory(), RaftError.ENOENT, "No nodes in
group %s", groupId);
- }
- for (final Node node : nodes) {
- final PeerId leader = node.getLeaderId();
- if (leader != null && !leader.isEmpty()) {
- return msgFactory().getLeaderResponse()
- .leaderId(leader.toString())
- .currentTerm(node.getCurrentTerm())
- .build();
- }
- }
- return RaftRpcFactory.DEFAULT //
- .newResponse(msgFactory(), RaftError.UNKNOWN, "Unknown leader");
+
+ });
+
+ return null;
}
@Override
public String interest() {
return GetLeaderRequest.class.getName();
}
-
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/DefaultRaftClientService.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/DefaultRaftClientService.java
index 7048e3b3739..202c0044d73 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/DefaultRaftClientService.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/DefaultRaftClientService.java
@@ -26,13 +26,15 @@ import java.util.concurrent.TimeoutException;
import org.apache.ignite.raft.jraft.JRaftUtils;
import org.apache.ignite.raft.jraft.NodeManager;
import org.apache.ignite.raft.jraft.Status;
-import
org.apache.ignite.raft.jraft.core.NodeImpl.ReadIndexHeartbeatResponseClosure;
+import
org.apache.ignite.raft.jraft.core.NodeImpl.QuorumConfirmedHeartbeatResponseClosure;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.error.InvokeTimeoutException;
import org.apache.ignite.raft.jraft.error.RaftError;
import org.apache.ignite.raft.jraft.error.RemotingException;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.option.RpcOptions;
+import org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderRequest;
+import org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderResponse;
import org.apache.ignite.raft.jraft.rpc.InvokeContext;
import org.apache.ignite.raft.jraft.rpc.Message;
import org.apache.ignite.raft.jraft.rpc.RaftClientService;
@@ -161,7 +163,7 @@ public class DefaultRaftClientService extends
AbstractClientService implements R
* @return True if the read index request.
*/
private static boolean
isReadIndexRequest(RpcResponseClosure<AppendEntriesResponse> doneClosure) {
- return doneClosure instanceof ReadIndexHeartbeatResponseClosure;
+ return doneClosure instanceof QuorumConfirmedHeartbeatResponseClosure;
}
@Override
@@ -197,6 +199,12 @@ public class DefaultRaftClientService extends
AbstractClientService implements R
return invokeWithDone(peerId, request, done, timeoutMs);
}
+ @Override
+ public Future<Message> getLeaderAndTerm( PeerId peerId, GetLeaderRequest
request, int timeoutMs,
+ RpcResponseClosure<GetLeaderResponse> done) {
+ return invokeWithDone(peerId, request, done, timeoutMs);
+ }
+
/**
* @param executor The executor to run done closure.
* @param request The request.