This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 2c4db4774 RATIS-2433. Cancel transaction in case of failure to append
(#1382)
2c4db4774 is described below
commit 2c4db4774b1717eecebed0dc52f9a44b11c2dbd5
Author: Abhishek Pal <[email protected]>
AuthorDate: Thu Apr 9 23:27:44 2026 +0530
RATIS-2433. Cancel transaction in case of failure to append (#1382)
---
.../apache/ratis/server/impl/RaftServerImpl.java | 89 +++++++++++++++-------
.../apache/ratis/server/impl/RetryCacheImpl.java | 10 ---
.../statemachine/impl/TransactionContextImpl.java | 2 -
.../impl/RaftStateMachineExceptionTests.java | 35 +++++++++
4 files changed, 98 insertions(+), 38 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index d4c6f164e..a9c80d000 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -768,17 +768,18 @@ class RaftServerImpl implements RaftServer.Division,
} catch (GroupMismatchException e) {
return JavaUtils.completeExceptionally(e);
}
- return checkLeaderState(request, null);
+ return checkLeaderState(request, null, null);
}
/**
* @return null if the server is in leader state.
*/
- private CompletableFuture<RaftClientReply>
checkLeaderState(RaftClientRequest request, CacheEntry entry) {
+ private CompletableFuture<RaftClientReply> checkLeaderState(
+ RaftClientRequest request, CacheEntry entry, TransactionContextImpl
context) {
if (!getInfo().isLeader()) {
NotLeaderException exception = generateNotLeaderException();
final RaftClientReply reply = newExceptionReply(request, exception);
- return RetryCacheImpl.failWithReply(reply, entry);
+ return failWithReply(reply, entry, context);
}
if (!getInfo().isLeaderReady()) {
final CacheEntry cacheEntry =
retryCache.getIfPresent(ClientInvocationId.valueOf(request));
@@ -787,13 +788,13 @@ class RaftServerImpl implements RaftServer.Division,
}
final LeaderNotReadyException lnre = new
LeaderNotReadyException(getMemberId());
final RaftClientReply reply = newExceptionReply(request, lnre);
- return RetryCacheImpl.failWithReply(reply, entry);
+ return failWithReply(reply, entry, context);
}
if (!request.isReadOnly() && isSteppingDown()) {
final LeaderSteppingDownException lsde = new
LeaderSteppingDownException(getMemberId() + " is stepping down");
final RaftClientReply reply = newExceptionReply(request, lsde);
- return RetryCacheImpl.failWithReply(reply, entry);
+ return failWithReply(reply, entry, context);
}
return null;
@@ -819,11 +820,44 @@ class RaftServerImpl implements RaftServer.Division,
getMemberId() + " is not in " + expected + ": current state is " + c),
expected);
}
- private CompletableFuture<RaftClientReply>
getResourceUnavailableReply(RaftClientRequest request, CacheEntry entry) {
- return entry.failWithException(new ResourceUnavailableException(
- getMemberId() + ": Failed to acquire a pending write request for " +
request));
+ private CompletableFuture<RaftClientReply>
getResourceUnavailableReply(String op,
+ RaftClientRequest request, CacheEntry entry, TransactionContextImpl
context) {
+ final ResourceUnavailableException e = new
ResourceUnavailableException(getMemberId()
+ + ": Failed to " + op + " for " + request);
+ cancelTransaction(context, e);
+ return entry.failWithException(e);
+ }
+
+ private CompletableFuture<RaftClientReply> failWithReply(
+ RaftClientReply reply, CacheEntry entry, TransactionContextImpl context)
{
+ if (context != null) {
+ cancelTransaction(context, reply.getException());
+ }
+
+ if (entry == null) {
+ return CompletableFuture.completedFuture(reply);
+ }
+ entry.failWithReply(reply);
+ return entry.getReplyFuture();
+ }
+
+ /** Cancel a transaction and notify the state machine. Set exception if
provided to the transaction context. */
+ private void cancelTransaction(TransactionContextImpl context, Exception
exception) {
+ if (context == null) {
+ return;
}
+ if (exception != null) {
+ context.setException(exception);
+ }
+
+ try {
+ context.cancelTransaction();
+ } catch (IOException ioe) {
+ LOG.warn("{}: Failed to cancel transaction {}", getMemberId(), context,
ioe);
+ }
+ }
+
/**
* Handle a normal update request from client.
*/
@@ -836,27 +870,28 @@ class RaftServerImpl implements RaftServer.Division,
final LeaderStateImpl unsyncedLeaderState =
role.getLeaderState().orElse(null);
if (unsyncedLeaderState == null) {
- final RaftClientReply reply = newExceptionReply(request,
generateNotLeaderException());
- return RetryCacheImpl.failWithReply(reply, cacheEntry);
+ final NotLeaderException nle = generateNotLeaderException();
+ final RaftClientReply reply = newExceptionReply(request, nle);
+ return failWithReply(reply, cacheEntry, context);
}
final PendingRequests.Permit unsyncedPermit =
unsyncedLeaderState.tryAcquirePendingRequest(request.getMessage());
if (unsyncedPermit == null) {
- return getResourceUnavailableReply(request, cacheEntry);
+ return getResourceUnavailableReply("acquire a pending write request",
request, cacheEntry, context);
}
final LeaderStateImpl leaderState;
final PendingRequest pending;
synchronized (this) {
- final CompletableFuture<RaftClientReply> reply =
checkLeaderState(request, cacheEntry);
+ final CompletableFuture<RaftClientReply> reply =
checkLeaderState(request, cacheEntry, context);
if (reply != null) {
return reply;
}
leaderState = role.getLeaderStateNonNull();
- final PendingRequests.Permit permit = leaderState ==
unsyncedLeaderState? unsyncedPermit
+ final PendingRequests.Permit permit = leaderState == unsyncedLeaderState
? unsyncedPermit
: leaderState.tryAcquirePendingRequest(request.getMessage());
if (permit == null) {
- return getResourceUnavailableReply(request, cacheEntry);
+ return getResourceUnavailableReply("acquire a pending write request",
request, cacheEntry, context);
}
// append the message to its local log
@@ -866,20 +901,18 @@ class RaftServerImpl implements RaftServer.Division,
} catch (StateMachineException e) {
// the StateMachineException is thrown by the SM in the preAppend
stage.
// Return the exception in a RaftClientReply.
- RaftClientReply exceptionReply = newExceptionReply(request, e);
- cacheEntry.failWithReply(exceptionReply);
+ final RaftClientReply exceptionReply = newExceptionReply(request, e);
// leader will step down here
if (e.leaderShouldStepDown() && getInfo().isLeader()) {
leaderState.submitStepDownEvent(StepDownReason.STATE_MACHINE_EXCEPTION);
}
- return CompletableFuture.completedFuture(exceptionReply);
+ return failWithReply(exceptionReply, cacheEntry, null);
}
// put the request into the pending queue
pending = leaderState.addPendingRequest(permit, request, context);
if (pending == null) {
- return cacheEntry.failWithException(new ResourceUnavailableException(
- getMemberId() + ": Failed to add a pending write request for " +
request));
+ return getResourceUnavailableReply("add a pending write request",
request, cacheEntry, context);
}
}
leaderState.notifySenders();
@@ -1011,19 +1044,23 @@ class RaftServerImpl implements RaftServer.Division,
// return the cached future.
return cacheEntry.getReplyFuture();
}
- // TODO: this client request will not be added to pending requests until
- // later which means that any failure in between will leave partial state
in
- // the state machine. We should call cancelTransaction() for failed
requests
+ // This request will be added to pending requests later in
appendTransaction.
+ // Any failure in between must invoke cancelTransaction.
final TransactionContextImpl context = (TransactionContextImpl)
stateMachine.startTransaction(
filterDataStreamRaftClientRequest(request));
if (context.getException() != null) {
- final StateMachineException e = new StateMachineException(getMemberId(),
context.getException());
+ final Exception exception = context.getException();
+ final StateMachineException e = new StateMachineException(getMemberId(),
exception);
final RaftClientReply exceptionReply = newExceptionReply(request, e);
- cacheEntry.failWithReply(exceptionReply);
- return CompletableFuture.completedFuture(exceptionReply);
+ return failWithReply(exceptionReply, cacheEntry, context);
}
- return appendTransaction(request, context, cacheEntry);
+ try {
+ return appendTransaction(request, context, cacheEntry);
+ } catch (Exception e) {
+ cancelTransaction(context, e);
+ throw e;
+ }
}
private CompletableFuture<RaftClientReply> watchAsync(RaftClientRequest
request) {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java
index 4da459ae9..96ad62a53 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java
@@ -257,14 +257,4 @@ class RetryCacheImpl implements RetryCache {
cache.invalidateAll();
statistics.set(null);
}
-
- static CompletableFuture<RaftClientReply> failWithReply(
- RaftClientReply reply, CacheEntry entry) {
- if (entry != null) {
- entry.failWithReply(reply);
- return entry.getReplyFuture();
- } else {
- return CompletableFuture.completedFuture(reply);
- }
- }
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java
index d92f3a1c8..8497b12f4 100644
---
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java
@@ -191,8 +191,6 @@ public class TransactionContextImpl implements
TransactionContext {
@Override
public TransactionContext cancelTransaction() throws IOException {
- // TODO: This is not called from Raft server / log yet. When an
IOException happens, we should
- // call this to let the SM know that Transaction cannot be synced
return stateMachine.cancelTransaction(this);
}
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
index 3a58f4e7c..1e46907d1 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
@@ -40,6 +40,7 @@ import org.slf4j.event.Level;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.jupiter.api.Assertions.fail;
@@ -52,6 +53,7 @@ public abstract class RaftStateMachineExceptionTests<CLUSTER
extends MiniRaftClu
}
private static volatile boolean failPreAppend = false;
+ private static final AtomicInteger numCancelTransaction = new
AtomicInteger();
protected static class StateMachineWithException extends
SimpleStateMachine4Testing {
@@ -72,6 +74,12 @@ public abstract class RaftStateMachineExceptionTests<CLUSTER
extends MiniRaftClu
return trx;
}
}
+
+ @Override
+ public TransactionContext cancelTransaction(TransactionContext trx) throws
IOException {
+ numCancelTransaction.incrementAndGet();
+ return super.cancelTransaction(trx);
+ }
}
{
@@ -179,4 +187,31 @@ public abstract class
RaftStateMachineExceptionTests<CLUSTER extends MiniRaftClu
failPreAppend = false;
}
}
+
+ @Test
+ public void testNoCancelTransactionOnPreAppendFailure() throws Exception {
+ runWithNewCluster(3, this::runTestNoCancelTransactionOnPreAppendFailure);
+ }
+
+ private void runTestNoCancelTransactionOnPreAppendFailure(CLUSTER cluster)
throws Exception {
+ final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
+ failPreAppend = true;
+ numCancelTransaction.set(0);
+ try (final RaftClient client = cluster.createClient(leaderId)) {
+ try {
+ client.io().send(new SimpleMessage("cancel-transaction"));
+ fail("Exception expected");
+ } catch (StateMachineException e) {
+ Assertions.assertTrue(e.getCause().getMessage().contains("Fake
Exception in preAppend"));
+ }
+
+ JavaUtils.attemptRepeatedly(() -> {
+ Assertions.assertEquals(0, numCancelTransaction.get(),
+ () -> "Expected cancelTransaction() not to be called but got " +
numCancelTransaction.get());
+ return null;
+ }, 10, ONE_SECOND, "wait for cancelTransaction", LOG);
+ } finally {
+ failPreAppend = false;
+ }
+ }
}