This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new d824c4e88 RATIS-2317. Move acquire PendingRequestPermit out of
synchronized block in RaftServerImpl#appendTransaction. (#1275)
d824c4e88 is described below
commit d824c4e88c4e1804819a3e72465f4730c0891c10
Author: Nandakumar Vadivelu <[email protected]>
AuthorDate: Wed Jul 9 15:32:36 2025 +0530
RATIS-2317. Move acquire PendingRequestPermit out of synchronized block in
RaftServerImpl#appendTransaction. (#1275)
---
.../apache/ratis/server/impl/RaftServerImpl.java | 50 ++++++++++++++--------
.../apache/ratis/server/impl/RetryCacheImpl.java | 13 +-----
2 files changed, 33 insertions(+), 30 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index e5d9bfeca..3c10e103b 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -171,7 +171,7 @@ class RaftServerImpl implements RaftServer.Division,
@Override
public boolean isLeaderReady() {
- return isLeader() && getRole().isLeaderReady();
+ return getRole().isLeaderReady();
}
@Override
@@ -752,6 +752,11 @@ class RaftServerImpl implements RaftServer.Division,
}
private CompletableFuture<RaftClientReply>
checkLeaderState(RaftClientRequest request) {
+ try {
+ assertGroup(getMemberId(), request);
+ } catch (GroupMismatchException e) {
+ return JavaUtils.completeExceptionally(e);
+ }
return checkLeaderState(request, null);
}
@@ -759,12 +764,6 @@ class RaftServerImpl implements RaftServer.Division,
* @return null if the server is in leader state.
*/
private CompletableFuture<RaftClientReply>
checkLeaderState(RaftClientRequest request, CacheEntry entry) {
- try {
- assertGroup(getMemberId(), request);
- } catch (GroupMismatchException e) {
- return RetryCacheImpl.failWithException(e, entry);
- }
-
if (!getInfo().isLeader()) {
NotLeaderException exception = generateNotLeaderException();
final RaftClientReply reply = newExceptionReply(request, exception);
@@ -809,6 +808,11 @@ class RaftServerImpl implements RaftServer.Division,
getMemberId() + " is not in " + expected + ": current state is " + c),
expected);
}
+ private CompletableFuture<RaftClientReply>
getResourceUnavailableReply(RaftClientRequest request, CacheEntry entry) {
+ return entry.failWithException(new ResourceUnavailableException(
+ getMemberId() + ": Failed to acquire a pending write request for " +
request));
+ }
+
/**
* Handle a normal update request from client.
*/
@@ -819,6 +823,17 @@ class RaftServerImpl implements RaftServer.Division,
assertLifeCycleState(LifeCycle.States.RUNNING);
+ final LeaderStateImpl unsyncedLeaderState =
role.getLeaderState().orElse(null);
+ if (unsyncedLeaderState == null) {
+ final RaftClientReply reply = newExceptionReply(request,
generateNotLeaderException());
+ return RetryCacheImpl.failWithReply(reply, cacheEntry);
+ }
+ final PendingRequests.Permit unsyncedPermit =
unsyncedLeaderState.tryAcquirePendingRequest(request.getMessage());
+ if (unsyncedPermit == null) {
+ return getResourceUnavailableReply(request, cacheEntry);
+ }
+
+ final LeaderStateImpl leaderState;
final PendingRequest pending;
synchronized (this) {
final CompletableFuture<RaftClientReply> reply =
checkLeaderState(request, cacheEntry);
@@ -826,16 +841,15 @@ class RaftServerImpl implements RaftServer.Division,
return reply;
}
- // append the message to its local log
- final LeaderStateImpl leaderState = role.getLeaderStateNonNull();
- writeIndexCache.add(request.getClientId(), context.getLogIndexFuture());
-
- final PendingRequests.Permit permit =
leaderState.tryAcquirePendingRequest(request.getMessage());
+ leaderState = role.getLeaderStateNonNull();
+ final PendingRequests.Permit permit = leaderState ==
unsyncedLeaderState? unsyncedPermit
+ : leaderState.tryAcquirePendingRequest(request.getMessage());
if (permit == null) {
- cacheEntry.failWithException(new ResourceUnavailableException(
- getMemberId() + ": Failed to acquire a pending write request for "
+ request));
- return cacheEntry.getReplyFuture();
+ return getResourceUnavailableReply(request, cacheEntry);
}
+
+ // append the message to its local log
+ writeIndexCache.add(request.getClientId(), context.getLogIndexFuture());
try {
state.appendLog(context);
} catch (StateMachineException e) {
@@ -853,13 +867,11 @@ class RaftServerImpl implements RaftServer.Division,
// put the request into the pending queue
pending = leaderState.addPendingRequest(permit, request, context);
if (pending == null) {
- cacheEntry.failWithException(new ResourceUnavailableException(
+ return cacheEntry.failWithException(new ResourceUnavailableException(
getMemberId() + ": Failed to add a pending write request for " +
request));
- return cacheEntry.getReplyFuture();
}
- leaderState.notifySenders();
}
-
+ leaderState.notifySenders();
return pending.getFuture();
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java
index 50d238b07..4da459ae9 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java
@@ -84,9 +84,10 @@ class RetryCacheImpl implements RetryCache {
replyFuture.complete(reply);
}
- void failWithException(Throwable t) {
+ CompletableFuture<RaftClientReply> failWithException(Throwable t) {
failed = true;
replyFuture.completeExceptionally(t);
+ return replyFuture;
}
@Override
@@ -266,14 +267,4 @@ class RetryCacheImpl implements RetryCache {
return CompletableFuture.completedFuture(reply);
}
}
-
- static CompletableFuture<RaftClientReply> failWithException(
- Throwable t, CacheEntry entry) {
- if (entry != null) {
- entry.failWithException(t);
- return entry.getReplyFuture();
- } else {
- return JavaUtils.completeExceptionally(t);
- }
- }
}