This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new df2a3027e RATIS-2499. Allow the LogAppender restart when
LogAppenderDaemon exception (#1425)
df2a3027e is described below
commit df2a3027e39ab23a98d1500099b5412fe180f2a2
Author: XiChen <[email protected]>
AuthorDate: Wed Apr 8 19:49:24 2026 +0800
RATIS-2499. Allow the LogAppender restart when LogAppenderDaemon exception
(#1425)
---
.../ratis/server/leader/LogAppenderBase.java | 14 +++--
.../ratis/server/leader/LogAppenderDaemon.java | 4 ++
.../java/org/apache/ratis/LogAppenderTests.java | 1 +
.../apache/ratis/grpc/TestLogAppenderWithGrpc.java | 63 ++++++++++++++++++++++
4 files changed, 79 insertions(+), 3 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
index 4f558e0c7..f65ac1863 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
@@ -125,7 +125,11 @@ public abstract class LogAppenderBase implements
LogAppender {
@Override
public boolean isRunning() {
return daemon.isWorking()
- && server.getInfo().isAlive()
+ && isLeaderAlive();
+ }
+
+ private boolean isLeaderAlive() {
+ return server.getInfo().isAlive()
&& server.getInfo().isLeader()
&& getRaftLog().isOpened();
}
@@ -136,8 +140,12 @@ public abstract class LogAppenderBase implements
LogAppender {
}
void restart() {
- if (!isRunning()) {
- LOG.warn("{} is not running: skipping restart", this);
+ if (daemon.isClosingOrClosed()) {
+ LOG.warn("{}: daemon is closing or closed, skipping restart", this);
+ return;
+ }
+ if (!isLeaderAlive()) {
+ LOG.warn("{}: leader is not ready, skipping restart", this);
return;
}
getLeaderState().restart(this);
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java
index 5de3f3b4d..c779d007b 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java
@@ -58,6 +58,10 @@ class LogAppenderDaemon {
return
!LifeCycle.States.CLOSING_OR_CLOSED_OR_EXCEPTION.contains(lifeCycle.getCurrentState());
}
+ public boolean isClosingOrClosed() {
+ return
LifeCycle.States.CLOSING_OR_CLOSED.contains(lifeCycle.getCurrentState());
+ }
+
public void tryToStart() {
if (lifeCycle.compareAndTransition(NEW, STARTING)) {
daemon.start();
diff --git a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
index c9b19a72a..8a8731daf 100644
--- a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
@@ -171,6 +171,7 @@ public abstract class LogAppenderTests<CLUSTER extends
MiniRaftCluster>
assertTrue(t.getTimer().getCount() > 0L);
}
}
+ cluster.shutdown();
}
void runTest(CLUSTER cluster) throws Exception {
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
index 107cd7ba9..318ed5e6b 100644
---
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
+++
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
@@ -18,7 +18,10 @@
package org.apache.ratis.grpc;
import org.apache.ratis.LogAppenderTests;
+import org.apache.ratis.grpc.server.GrpcServicesImpl;
import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.client.RaftClient;
@@ -29,11 +32,14 @@ import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.leader.FollowerInfo;
import org.apache.ratis.server.impl.RaftServerTestUtil;
+import org.apache.ratis.server.leader.LogAppender;
import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.CodeInjectionForTesting;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Slf4jUtils;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.event.Level;
@@ -42,7 +48,10 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import static org.apache.ratis.RaftTestUtil.waitForLeader;
@@ -148,4 +157,58 @@ public class TestLogAppenderWithGrpc
Assertions.assertTrue(newleaderMetrics.getRegistry().counter(counter).getCount()
>= 1L);
}
}
+
+ @Test
+ public void testLogAppenderAutoRestartOnException() throws Exception {
+ runWithNewCluster(3, this::runTestAutoRestartOnException);
+ }
+
+ private void runTestAutoRestartOnException(MiniRaftClusterWithGrpc cluster)
throws Exception {
+ final RaftServer.Division leader = waitForLeader(cluster);
+ final RaftPeerId leaderId = leader.getId();
+
+ try (RaftClient client = cluster.createClient(leaderId)) {
+ for (int i = 0; i < 5; i++) {
+ Assertions.assertTrue(client.io().send(new
RaftTestUtil.SimpleMessage("init-" + i)).isSuccess());
+ }
+ }
+
+ final Set<LogAppender> before =
RaftServerTestUtil.getLogAppenders(leader).collect(Collectors.toSet());
+ Assertions.assertEquals(2, before.size());
+
+ // Inject a one-time IllegalStateException into the leader's AppendEntries
send path.
+ // This causes the LogAppenderDaemon to enter EXCEPTION state and call
restart().
+ final RaftGroupId groupId = cluster.getGroupId();
+ final AtomicInteger failCount = new AtomicInteger(0);
+ try {
+ CodeInjectionForTesting.put(GrpcServicesImpl.GRPC_SEND_SERVER_REQUEST,
(localId, remoteId, args) -> {
+ if (leaderId.equals(localId)
+ && args.length > 0 && args[0] instanceof
RaftProtos.AppendEntriesRequestProto) {
+ final RaftProtos.AppendEntriesRequestProto proto =
(RaftProtos.AppendEntriesRequestProto) args[0];
+ if
(RaftGroupId.valueOf(proto.getServerRequest().getRaftGroupId().getId()).equals(groupId)
+ && failCount.getAndIncrement() < 1) {
+ throw new IllegalStateException("Injected failure for restart
test");
+ }
+ }
+ return false;
+ });
+
+ JavaUtils.attempt(() -> {
+ final Set<LogAppender> current =
RaftServerTestUtil.getLogAppenders(leader)
+ .collect(Collectors.toSet());
+ Assertions.assertEquals(2, current.size());
+ Assertions.assertTrue(current.stream().anyMatch(a ->
!before.contains(a)),
+ "Expected at least one new LogAppender instance after daemon
exception restart");
+ }, 30, ONE_SECOND, "LogAppender auto-restart after exception", LOG);
+ } finally {
+
CodeInjectionForTesting.remove(GrpcServicesImpl.GRPC_SEND_SERVER_REQUEST);
+ }
+
+ try (RaftClient client = cluster.createClient(leaderId)) {
+ for (int i = 0; i < 5; i++) {
+ Assertions.assertTrue(
+ client.io().send(new
RaftTestUtil.SimpleMessage("after-restart-" + i)).isSuccess());
+ }
+ }
+ }
}