This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 0bb4de051 RATIS-2318. Add a test to show how to manually restore a
snapshot. (#1279)
0bb4de051 is described below
commit 0bb4de0519ec8117420cd4d078fe745666f72ae8
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Sep 8 13:14:36 2025 -0700
RATIS-2318. Add a test to show how to manually restore a snapshot. (#1279)
---
.../main/java/org/apache/ratis/util/FileUtils.java | 26 ++++
.../counter/server/CounterStateMachine.java | 32 +++--
.../counter/server/TestManualRestoreSnapshot.java | 146 +++++++++++++++++++++
.../org/apache/ratis/server/impl/ServerState.java | 17 ++-
.../apache/ratis/server/raftlog/RaftLogBase.java | 3 +-
.../ratis/statemachine/impl/BaseStateMachine.java | 6 +-
.../impl/SimpleStateMachineStorage.java | 8 +-
.../apache/ratis/server/impl/MiniRaftCluster.java | 10 +-
8 files changed, 225 insertions(+), 23 deletions(-)
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java
b/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java
index 315010c62..79c00b5a3 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java
@@ -33,6 +33,7 @@ import java.nio.file.FileAlreadyExistsException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.LinkOption;
+import java.nio.file.NotDirectoryException;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -43,7 +44,10 @@ import java.nio.file.attribute.BasicFileAttributes;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
import java.util.function.Supplier;
+import java.util.stream.Stream;
public interface FileUtils {
Logger LOG = LoggerFactory.getLogger(FileUtils.class);
@@ -382,4 +386,26 @@ public interface FileUtils {
}
});
}
+
+ static void listDir(File dir, Consumer<Object> out, BiConsumer<String,
Throwable> err) {
+ listDir(dir.toPath(), out, err);
+ }
+
+ static void listDir(Path dir, Consumer<Object> out, BiConsumer<String,
Throwable> err) {
+ try {
+ listDir(dir, out);
+ } catch (IOException e) {
+ err.accept("Failed to listDir: " + dir, e);
+ }
+ }
+
+ static void listDir(Path dir, Consumer<Object> out) throws IOException {
+ if (!Files.isDirectory(dir, LinkOption.NOFOLLOW_LINKS)) {
+ throw new NotDirectoryException( "Failed to listDir: " + dir + " is not
a directory.");
+ }
+
+ try(Stream<Path> s = Files.list(dir)) {
+ s.forEach(out);
+ }
+ }
}
diff --git
a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
index b5352f0ad..914180feb 100644
---
a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
+++
b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
@@ -78,6 +78,11 @@ public class CounterStateMachine extends BaseStateMachine {
int getCounter() {
return counter;
}
+
+ @Override
+ public String toString() {
+ return counter + "@" + applied;
+ }
}
private final SimpleStateMachineStorage storage = new
SimpleStateMachineStorage();
@@ -94,11 +99,11 @@ public class CounterStateMachine extends BaseStateMachine {
}
/** @return the current state. */
- private synchronized CounterState getState() {
+ synchronized CounterState getState() {
return new CounterState(getLastAppliedTermIndex(), counter.get());
}
- private synchronized void updateState(TermIndex applied, int counterValue) {
+ synchronized void updateState(TermIndex applied, int counterValue) {
updateLastAppliedTermIndex(applied);
counter.set(counterValue);
}
@@ -141,36 +146,45 @@ public class CounterStateMachine extends BaseStateMachine
{
load(storage.loadLatestSnapshot());
}
+ @Override
+ public SimpleStateMachineStorage getStateMachineStorage() {
+ return storage;
+ }
+
/**
* Store the current state as a snapshot file in the {@link #storage}.
*
* @return the index of the snapshot
*/
@Override
- public long takeSnapshot() {
+ public long takeSnapshot() throws IOException {
//get the current state
final CounterState state = getState();
final long index = state.getApplied().getIndex();
//create a file with a proper name to store the snapshot
final File snapshotFile =
storage.getSnapshotFile(state.getApplied().getTerm(), index);
+ try {
+ saveSnapshot(state, snapshotFile);
+ } catch (Exception e) {
+ throw new IOException("Failed to save snapshot (" + state + ") to file "
+ snapshotFile, e);
+ }
+ //return the index of the stored snapshot (which is the last applied one)
+ return index;
+ }
+
+ void saveSnapshot(CounterState state, File snapshotFile) throws IOException {
//write the counter value into the snapshot file
try (ObjectOutputStream out = new ObjectOutputStream(new
BufferedOutputStream(
Files.newOutputStream(snapshotFile.toPath())))) {
out.writeInt(state.getCounter());
- } catch (IOException ioe) {
- LOG.warn("Failed to write snapshot file \"" + snapshotFile
- + "\", last applied index=" + state.getApplied());
}
// update storage
final MD5Hash md5 = MD5FileUtil.computeAndSaveMd5ForFile(snapshotFile);
final FileInfo info = new FileInfo(snapshotFile.toPath(), md5);
storage.updateLatestSnapshot(new SingleFileSnapshotInfo(info,
state.getApplied()));
-
- //return the index of the stored snapshot (which is the last applied one)
- return index;
}
/**
diff --git
a/ratis-examples/src/test/java/org/apache/ratis/examples/counter/server/TestManualRestoreSnapshot.java
b/ratis-examples/src/test/java/org/apache/ratis/examples/counter/server/TestManualRestoreSnapshot.java
new file mode 100644
index 000000000..0480a5364
--- /dev/null
+++
b/ratis-examples/src/test/java/org/apache/ratis/examples/counter/server/TestManualRestoreSnapshot.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.examples.counter.server;
+
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.examples.counter.CounterCommand;
+import org.apache.ratis.grpc.MiniRaftClusterWithGrpc;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.impl.MiniRaftCluster;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.statemachine.SnapshotInfo;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
+import org.apache.ratis.util.FileUtils;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test manually restoring a snapshot.
+ * Due to hardware failures or software bugs,
+ * the state of a state machine can become corrupted.
+ * In such case, we may manually copy a snapshot from the leader
+ * and then install it to the corrupted state machine.
+ */
+public class TestManualRestoreSnapshot extends BaseTest implements
MiniRaftClusterWithGrpc.FactoryGet {
+ public static final int NUM_SERVERS = 3;
+
+ {
+ getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
CounterStateMachine.class, StateMachine.class);
+ }
+
+ @Test
+ public void testManualRestoreSnapshot() throws Exception {
+ runWithNewCluster(NUM_SERVERS, this::run);
+ }
+
+ void run(MiniRaftCluster cluster) throws Exception {
+ final RaftGroup group = cluster.getGroup();
+
+ // send some messages
+ final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
+ LOG.info("Leader: {}", leader);
+ sendMessages(cluster, 5);
+
+ // kill a follower
+ final RaftServer.Division toBeKilled = cluster.getFollowers().get(0);
+ LOG.info("Follower to be killed: {}", toBeKilled.getId());
+ final SimpleStateMachineStorage smStorage = ((CounterStateMachine)
toBeKilled.getStateMachine())
+ .getStateMachineStorage();
+ final File raftLogCurrentDir =
toBeKilled.getRaftStorage().getStorageDir().getCurrentDir();
+ cluster.killServer(toBeKilled.getId());
+
+ // send more messages
+ sendMessages(cluster, 3);
+
+ // get a snapshot from the leader
+ final CounterStateMachine leaderStateMachine = (CounterStateMachine)
leader.getStateMachine();
+ final CounterStateMachine.CounterState snapshot =
leaderStateMachine.getState();
+ LOG.info("{}: Leader {}", leader.getId(), snapshot);
+
+ // remove raft log from the killed follower
+ FileUtils.listDir(raftLogCurrentDir, s -> LOG.info("{}", s), LOG::error);
+ final String[] logFiles = raftLogCurrentDir.list((dir, name) ->
name.startsWith("log"));
+ assertNotNull(logFiles);
+ for (String logFile : logFiles) {
+ FileUtils.deleteFile(new File(raftLogCurrentDir, logFile));
+ }
+
+ // remove the killed follower
+ final RaftPeerId followerId = toBeKilled.getId();
+ cluster.removeServer(followerId);
+
+ // save the leader snapshot to the killed follower
+ final TermIndex applied = snapshot.getApplied();
+ final File snapshotFile = smStorage.getSnapshotFile(applied.getTerm(),
applied.getIndex());
+ final RaftServer toSaveSnapshot = cluster.putNewServer(followerId, group,
false);
+ ((CounterStateMachine)
toSaveSnapshot.getDivision(group.getGroupId()).getStateMachine())
+ .saveSnapshot(snapshot, snapshotFile);
+
+ // start follower and verify last applied
+ LOG.info("Restarting {}", followerId);
+ final RaftServer.Division restartedFollower =
cluster.restartServer(followerId, group, false);
+ final StateMachine stateMachine = restartedFollower.getStateMachine();
+ final SnapshotInfo info = stateMachine.getLatestSnapshot();
+ LOG.info("{} restarted snapshot info {} from {}", followerId, info,
stateMachine);
+
+ JavaUtils.attemptUntilTrue(() -> {
+ System.out.println(cluster.printServers());
+ final TermIndex leaderLastApplied =
leaderStateMachine.getLastAppliedTermIndex();
+ LOG.info("Leader {} last applied {}", leader.getId(),
leaderLastApplied);
+ final TermIndex followerLastApplied =
stateMachine.getLastAppliedTermIndex();
+ LOG.info("Follower {} last applied {}", followerId, followerLastApplied);
+ return followerLastApplied.equals(leaderLastApplied);
+ }, 10, TimeDuration.ONE_SECOND, "followerLastApplied", LOG);
+
+ sendMessages(cluster, 7);
+ }
+
+ static void sendMessages(MiniRaftCluster cluster, int numMessages) throws
Exception {
+ final List<Message> messages = getUpdateRequests(numMessages);
+ try(final RaftClient client = cluster.createClient()) {
+ for (Message message : messages) {
+ final RaftClientReply reply = client.io().send(message);
+ assertTrue(reply.isSuccess());
+ }
+ }
+ }
+
+ static List<Message> getUpdateRequests(int numMessages) {
+ final List<Message> messages = new ArrayList<>();
+ for(int i = 0; i < numMessages; i++) {
+ messages.add(CounterCommand.INCREMENT.getMessage());
+ }
+ return messages;
+ }
+
+}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 725c499d1..dc05c63e3 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -123,12 +123,7 @@ class ServerState {
// On start the leader is null, start the clock now
this.lastNoLeaderTime = new AtomicReference<>(Timestamp.currentTime());
this.noLeaderTimeout =
RaftServerConfigKeys.Notification.noLeaderTimeout(prop);
-
- final LongSupplier getSnapshotIndexFromStateMachine = () ->
Optional.ofNullable(stateMachine.getLatestSnapshot())
- .map(SnapshotInfo::getIndex)
- .filter(i -> i >= 0)
- .orElse(RaftLog.INVALID_LOG_INDEX);
- this.log = JavaUtils.memoize(() ->
initRaftLog(getSnapshotIndexFromStateMachine, prop));
+ this.log = JavaUtils.memoize(() -> initRaftLog(() ->
getSnapshotIndexFromStateMachine(stateMachine), prop));
this.readRequests = new ReadRequests(prop, stateMachine);
this.stateMachineUpdater = JavaUtils.memoize(() -> new StateMachineUpdater(
stateMachine, server, this, getLog().getSnapshotIndex(), prop,
@@ -154,6 +149,16 @@ class ServerState {
return memberId;
}
+ private long getSnapshotIndexFromStateMachine(StateMachine stateMachine) {
+ final SnapshotInfo latest = stateMachine.getLatestSnapshot();
+ LOG.info("{}: getLatestSnapshot({}) returns {}", getMemberId(),
stateMachine, latest);
+ if (latest == null) {
+ return RaftLog.INVALID_LOG_INDEX;
+ }
+ final long index = latest.getIndex();
+ return index >= 0 ? index : RaftLog.INVALID_LOG_INDEX;
+ }
+
void writeRaftConfiguration(LogEntryProto conf) {
getStorage().writeRaftConfiguration(conf);
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
index 710dede48..8c2b66f96 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
@@ -86,7 +86,8 @@ public abstract class RaftLogBase implements RaftLog {
RaftProperties properties) {
this.name = memberId + "-" + JavaUtils.getClassSimpleName(getClass());
this.memberId = memberId;
- long index = getSnapshotIndexFromStateMachine.getAsLong();
+ final long index = getSnapshotIndexFromStateMachine.getAsLong();
+ LOG.info("{}: snapshotIndexFromStateMachine = {}", name, index);
this.commitIndex = new RaftLogIndex("commitIndex", index);
this.snapshotIndex = new RaftLogIndex("snapshotIndex", index);
this.purgeIndex = new RaftLogIndex("purgeIndex", LEAST_VALID_LOG_INDEX -
1);
diff --git
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
index bb7e9856b..3f18ee538 100644
---
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
+++
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
@@ -42,6 +42,7 @@ import java.util.Objects;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -49,6 +50,9 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public class BaseStateMachine implements StateMachine, StateMachine.DataApi,
StateMachine.EventApi, StateMachine.LeaderEventApi,
StateMachine.FollowerEventApi {
+ private static final AtomicInteger ID_GENERATOR = new AtomicInteger();
+
+ private final int id = ID_GENERATOR.incrementAndGet();
private final CompletableFuture<RaftServer> server = new
CompletableFuture<>();
@SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type
private volatile RaftGroupId groupId;
@@ -226,7 +230,7 @@ public class BaseStateMachine implements StateMachine,
StateMachine.DataApi,
@Override
public String toString() {
- return JavaUtils.getClassSimpleName(getClass()) + ":"
+ return JavaUtils.getClassSimpleName(getClass()) + "-" + id + ":"
+ (!server.isDone()? "uninitialized": getId() + ":" + groupId);
}
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SimpleStateMachineStorage.java
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SimpleStateMachineStorage.java
index 7e8afbaa8..2552c33c2 100644
---
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SimpleStateMachineStorage.java
+++
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SimpleStateMachineStorage.java
@@ -226,8 +226,12 @@ public class SimpleStateMachineStorage implements
StateMachineStorage {
return null;
}
try {
- return updateLatestSnapshot(findLatestSnapshot(dir.toPath()));
- } catch (IOException ignored) {
+ final SingleFileSnapshotInfo latest =
updateLatestSnapshot(findLatestSnapshot(dir.toPath()));
+ LOG.info("Latest snapshot is {} in {}", latest, dir);
+ return latest;
+ } catch (IOException e) {
+ LOG.warn("Failed to updateLatestSnapshot from {}", dir, e);
+ FileUtils.listDir(dir, s -> LOG.warn(" {}", s), LOG::error);
return null;
}
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index f4785ce90..b18440c83 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -351,9 +351,11 @@ public abstract class MiniRaftCluster implements Closeable
{
: JavaUtils.runRepeatedly(() -> LOG.info("TIMED-PRINT: {}.",
printServers()), 10, 10, TimeUnit.SECONDS));
}
- /**
- * start a stopped server again.
- */
+ public void removeServer(RaftPeerId serverId) {
+ servers.remove(serverId);
+ }
+
+ /** Restart the server with the given id. */
public RaftServer.Division restartServer(RaftPeerId serverId, boolean
format) throws IOException {
return restartServer(serverId, group, format);
}
@@ -361,7 +363,7 @@ public abstract class MiniRaftCluster implements Closeable {
public RaftServer.Division restartServer(RaftPeerId serverId, RaftGroup
raftGroup, boolean format)
throws IOException {
killServer(serverId);
- servers.remove(serverId);
+ removeServer(serverId);
final RaftServer proxy = putNewServer(serverId, raftGroup, format);
proxy.start();