This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 0bb4de051 RATIS-2318. Add a test to show how to manually restore a 
snapshot. (#1279)
0bb4de051 is described below

commit 0bb4de0519ec8117420cd4d078fe745666f72ae8
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Sep 8 13:14:36 2025 -0700

    RATIS-2318. Add a test to show how to manually restore a snapshot. (#1279)
---
 .../main/java/org/apache/ratis/util/FileUtils.java |  26 ++++
 .../counter/server/CounterStateMachine.java        |  32 +++--
 .../counter/server/TestManualRestoreSnapshot.java  | 146 +++++++++++++++++++++
 .../org/apache/ratis/server/impl/ServerState.java  |  17 ++-
 .../apache/ratis/server/raftlog/RaftLogBase.java   |   3 +-
 .../ratis/statemachine/impl/BaseStateMachine.java  |   6 +-
 .../impl/SimpleStateMachineStorage.java            |   8 +-
 .../apache/ratis/server/impl/MiniRaftCluster.java  |  10 +-
 8 files changed, 225 insertions(+), 23 deletions(-)

diff --git a/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java
index 315010c62..79c00b5a3 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java
@@ -33,6 +33,7 @@ import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.FileVisitResult;
 import java.nio.file.Files;
 import java.nio.file.LinkOption;
+import java.nio.file.NotDirectoryException;
 import java.nio.file.OpenOption;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -43,7 +44,10 @@ import java.nio.file.attribute.BasicFileAttributes;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
+import java.util.stream.Stream;
 
 public interface FileUtils {
   Logger LOG = LoggerFactory.getLogger(FileUtils.class);
@@ -382,4 +386,26 @@ public interface FileUtils {
       }
     });
   }
+
+  static void listDir(File dir, Consumer<Object> out, BiConsumer<String, 
Throwable> err) {
+    listDir(dir.toPath(), out, err);
+  }
+
+  static void listDir(Path dir, Consumer<Object> out, BiConsumer<String, 
Throwable> err) {
+    try {
+      listDir(dir, out);
+    } catch (IOException e) {
+      err.accept("Failed to listDir: " + dir, e);
+    }
+  }
+
+  static void listDir(Path dir, Consumer<Object> out) throws IOException {
+    if (!Files.isDirectory(dir, LinkOption.NOFOLLOW_LINKS)) {
+      throw new NotDirectoryException( "Failed to listDir: " + dir + " is not 
a directory.");
+    }
+
+    try(Stream<Path> s = Files.list(dir)) {
+      s.forEach(out);
+    }
+  }
 }
diff --git 
a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
 
b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
index b5352f0ad..914180feb 100644
--- 
a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
+++ 
b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
@@ -78,6 +78,11 @@ public class CounterStateMachine extends BaseStateMachine {
     int getCounter() {
       return counter;
     }
+
+    @Override
+    public String toString() {
+      return counter + "@" + applied;
+    }
   }
 
   private final SimpleStateMachineStorage storage = new 
SimpleStateMachineStorage();
@@ -94,11 +99,11 @@ public class CounterStateMachine extends BaseStateMachine {
   }
 
   /** @return the current state. */
-  private synchronized CounterState getState() {
+  synchronized CounterState getState() {
     return new CounterState(getLastAppliedTermIndex(), counter.get());
   }
 
-  private synchronized void updateState(TermIndex applied, int counterValue) {
+  synchronized void updateState(TermIndex applied, int counterValue) {
     updateLastAppliedTermIndex(applied);
     counter.set(counterValue);
   }
@@ -141,36 +146,45 @@ public class CounterStateMachine extends BaseStateMachine 
{
     load(storage.loadLatestSnapshot());
   }
 
+  @Override
+  public SimpleStateMachineStorage getStateMachineStorage() {
+    return storage;
+  }
+
   /**
    * Store the current state as a snapshot file in the {@link #storage}.
    *
    * @return the index of the snapshot
    */
   @Override
-  public long takeSnapshot() {
+  public long takeSnapshot() throws IOException {
     //get the current state
     final CounterState state = getState();
     final long index = state.getApplied().getIndex();
 
     //create a file with a proper name to store the snapshot
     final File snapshotFile = 
storage.getSnapshotFile(state.getApplied().getTerm(), index);
+    try {
+      saveSnapshot(state, snapshotFile);
+    } catch (Exception e) {
+      throw new IOException("Failed to save snapshot (" + state + ") to file " 
+ snapshotFile, e);
+    }
 
+    //return the index of the stored snapshot (which is the last applied one)
+    return index;
+  }
+
+  void saveSnapshot(CounterState state, File snapshotFile) throws IOException {
     //write the counter value into the snapshot file
     try (ObjectOutputStream out = new ObjectOutputStream(new 
BufferedOutputStream(
         Files.newOutputStream(snapshotFile.toPath())))) {
       out.writeInt(state.getCounter());
-    } catch (IOException ioe) {
-      LOG.warn("Failed to write snapshot file \"" + snapshotFile
-          + "\", last applied index=" + state.getApplied());
     }
 
     // update storage
     final MD5Hash md5 = MD5FileUtil.computeAndSaveMd5ForFile(snapshotFile);
     final FileInfo info = new FileInfo(snapshotFile.toPath(), md5);
     storage.updateLatestSnapshot(new SingleFileSnapshotInfo(info, 
state.getApplied()));
-
-    //return the index of the stored snapshot (which is the last applied one)
-    return index;
   }
 
   /**
diff --git 
a/ratis-examples/src/test/java/org/apache/ratis/examples/counter/server/TestManualRestoreSnapshot.java
 
b/ratis-examples/src/test/java/org/apache/ratis/examples/counter/server/TestManualRestoreSnapshot.java
new file mode 100644
index 000000000..0480a5364
--- /dev/null
+++ 
b/ratis-examples/src/test/java/org/apache/ratis/examples/counter/server/TestManualRestoreSnapshot.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.examples.counter.server;
+
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.examples.counter.CounterCommand;
+import org.apache.ratis.grpc.MiniRaftClusterWithGrpc;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.impl.MiniRaftCluster;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.statemachine.SnapshotInfo;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
+import org.apache.ratis.util.FileUtils;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test manually restoring a snapshot.
+ * Due to hardware failures or software bugs,
+ * the state of a state machine can become corrupted.
+ * In such case, we may manually copy a snapshot from the leader
+ * and then install it to the corrupted state machine.
+ */
+public class TestManualRestoreSnapshot extends BaseTest implements 
MiniRaftClusterWithGrpc.FactoryGet {
+  public static final int NUM_SERVERS = 3;
+
+  {
+    getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, 
CounterStateMachine.class, StateMachine.class);
+  }
+
+  @Test
+  public void testManualRestoreSnapshot() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::run);
+  }
+
+  void run(MiniRaftCluster cluster) throws Exception {
+    final RaftGroup group = cluster.getGroup();
+
+    // send some messages
+    final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
+    LOG.info("Leader: {}", leader);
+    sendMessages(cluster, 5);
+
+    // kill a follower
+    final RaftServer.Division toBeKilled = cluster.getFollowers().get(0);
+    LOG.info("Follower to be killed: {}", toBeKilled.getId());
+    final SimpleStateMachineStorage smStorage = ((CounterStateMachine) 
toBeKilled.getStateMachine())
+        .getStateMachineStorage();
+    final File raftLogCurrentDir = 
toBeKilled.getRaftStorage().getStorageDir().getCurrentDir();
+    cluster.killServer(toBeKilled.getId());
+
+    // send more messages
+    sendMessages(cluster, 3);
+
+    // get a snapshot from the leader
+    final CounterStateMachine leaderStateMachine = (CounterStateMachine) 
leader.getStateMachine();
+    final CounterStateMachine.CounterState snapshot = 
leaderStateMachine.getState();
+    LOG.info("{}: Leader {}", leader.getId(), snapshot);
+
+    // remove raft log from the killed follower
+    FileUtils.listDir(raftLogCurrentDir, s -> LOG.info("{}", s), LOG::error);
+    final String[] logFiles = raftLogCurrentDir.list((dir, name) -> 
name.startsWith("log"));
+    assertNotNull(logFiles);
+    for (String logFile : logFiles) {
+      FileUtils.deleteFile(new File(raftLogCurrentDir, logFile));
+    }
+
+    // remove the killed follower
+    final RaftPeerId followerId = toBeKilled.getId();
+    cluster.removeServer(followerId);
+
+    // save the leader snapshot to the killed follower
+    final TermIndex applied = snapshot.getApplied();
+    final File snapshotFile = smStorage.getSnapshotFile(applied.getTerm(), 
applied.getIndex());
+    final RaftServer toSaveSnapshot = cluster.putNewServer(followerId, group, 
false);
+    ((CounterStateMachine) 
toSaveSnapshot.getDivision(group.getGroupId()).getStateMachine())
+        .saveSnapshot(snapshot, snapshotFile);
+
+    // start follower and verify last applied
+    LOG.info("Restarting {}", followerId);
+    final RaftServer.Division restartedFollower = 
cluster.restartServer(followerId, group, false);
+    final StateMachine stateMachine = restartedFollower.getStateMachine();
+    final SnapshotInfo info = stateMachine.getLatestSnapshot();
+    LOG.info("{} restarted snapshot info {} from {}", followerId, info, 
stateMachine);
+
+    JavaUtils.attemptUntilTrue(() -> {
+      System.out.println(cluster.printServers());
+      final TermIndex leaderLastApplied = 
leaderStateMachine.getLastAppliedTermIndex();
+      LOG.info("Leader   {} last applied {}", leader.getId(), 
leaderLastApplied);
+      final TermIndex followerLastApplied = 
stateMachine.getLastAppliedTermIndex();
+      LOG.info("Follower {} last applied {}", followerId, followerLastApplied);
+      return followerLastApplied.equals(leaderLastApplied);
+    }, 10, TimeDuration.ONE_SECOND, "followerLastApplied", LOG);
+
+    sendMessages(cluster, 7);
+  }
+
+  static void sendMessages(MiniRaftCluster cluster, int numMessages) throws 
Exception {
+    final List<Message> messages = getUpdateRequests(numMessages);
+    try(final RaftClient client = cluster.createClient()) {
+      for (Message message : messages) {
+        final RaftClientReply reply = client.io().send(message);
+        assertTrue(reply.isSuccess());
+      }
+    }
+  }
+
+  static List<Message> getUpdateRequests(int numMessages) {
+    final List<Message> messages = new ArrayList<>();
+    for(int i = 0; i < numMessages; i++) {
+      messages.add(CounterCommand.INCREMENT.getMessage());
+    }
+    return messages;
+  }
+
+}
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 725c499d1..dc05c63e3 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -123,12 +123,7 @@ class ServerState {
     // On start the leader is null, start the clock now
     this.lastNoLeaderTime = new AtomicReference<>(Timestamp.currentTime());
     this.noLeaderTimeout = 
RaftServerConfigKeys.Notification.noLeaderTimeout(prop);
-
-    final LongSupplier getSnapshotIndexFromStateMachine = () -> 
Optional.ofNullable(stateMachine.getLatestSnapshot())
-        .map(SnapshotInfo::getIndex)
-        .filter(i -> i >= 0)
-        .orElse(RaftLog.INVALID_LOG_INDEX);
-    this.log = JavaUtils.memoize(() -> 
initRaftLog(getSnapshotIndexFromStateMachine, prop));
+    this.log = JavaUtils.memoize(() -> initRaftLog(() -> 
getSnapshotIndexFromStateMachine(stateMachine), prop));
     this.readRequests = new ReadRequests(prop, stateMachine);
     this.stateMachineUpdater = JavaUtils.memoize(() -> new StateMachineUpdater(
         stateMachine, server, this, getLog().getSnapshotIndex(), prop,
@@ -154,6 +149,16 @@ class ServerState {
     return memberId;
   }
 
+  private long getSnapshotIndexFromStateMachine(StateMachine stateMachine) {
+    final SnapshotInfo latest = stateMachine.getLatestSnapshot();
+    LOG.info("{}: getLatestSnapshot({}) returns {}", getMemberId(), 
stateMachine, latest);
+    if (latest == null) {
+      return RaftLog.INVALID_LOG_INDEX;
+    }
+    final long index = latest.getIndex();
+    return index >= 0 ? index : RaftLog.INVALID_LOG_INDEX;
+  }
+
   void writeRaftConfiguration(LogEntryProto conf) {
     getStorage().writeRaftConfiguration(conf);
   }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
index 710dede48..8c2b66f96 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
@@ -86,7 +86,8 @@ public abstract class RaftLogBase implements RaftLog {
                     RaftProperties properties) {
     this.name = memberId + "-" + JavaUtils.getClassSimpleName(getClass());
     this.memberId = memberId;
-    long index = getSnapshotIndexFromStateMachine.getAsLong();
+    final long index = getSnapshotIndexFromStateMachine.getAsLong();
+    LOG.info("{}: snapshotIndexFromStateMachine = {}", name, index);
     this.commitIndex = new RaftLogIndex("commitIndex", index);
     this.snapshotIndex = new RaftLogIndex("snapshotIndex", index);
     this.purgeIndex = new RaftLogIndex("purgeIndex", LEAST_VALID_LOG_INDEX - 
1);
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
 
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
index bb7e9856b..3f18ee538 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
@@ -42,6 +42,7 @@ import java.util.Objects;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -49,6 +50,9 @@ import java.util.concurrent.atomic.AtomicReference;
  */
 public class BaseStateMachine implements StateMachine, StateMachine.DataApi,
     StateMachine.EventApi, StateMachine.LeaderEventApi, 
StateMachine.FollowerEventApi {
+  private static final AtomicInteger ID_GENERATOR = new AtomicInteger();
+
+  private final int id = ID_GENERATOR.incrementAndGet();
   private final CompletableFuture<RaftServer> server = new 
CompletableFuture<>();
   @SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type
   private volatile RaftGroupId groupId;
@@ -226,7 +230,7 @@ public class BaseStateMachine implements StateMachine, 
StateMachine.DataApi,
 
   @Override
   public String toString() {
-    return JavaUtils.getClassSimpleName(getClass()) + ":"
+    return JavaUtils.getClassSimpleName(getClass()) + "-" + id + ":"
         + (!server.isDone()? "uninitialized": getId() + ":" + groupId);
   }
 }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SimpleStateMachineStorage.java
 
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SimpleStateMachineStorage.java
index 7e8afbaa8..2552c33c2 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SimpleStateMachineStorage.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SimpleStateMachineStorage.java
@@ -226,8 +226,12 @@ public class SimpleStateMachineStorage implements 
StateMachineStorage {
       return null;
     }
     try {
-      return updateLatestSnapshot(findLatestSnapshot(dir.toPath()));
-    } catch (IOException ignored) {
+      final SingleFileSnapshotInfo latest = 
updateLatestSnapshot(findLatestSnapshot(dir.toPath()));
+      LOG.info("Latest snapshot is {} in {}", latest, dir);
+      return latest;
+    } catch (IOException e) {
+      LOG.warn("Failed to updateLatestSnapshot from {}", dir, e);
+      FileUtils.listDir(dir, s -> LOG.warn("  {}", s), LOG::error);
       return null;
     }
   }
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index f4785ce90..b18440c83 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -351,9 +351,11 @@ public abstract class MiniRaftCluster implements Closeable 
{
         : JavaUtils.runRepeatedly(() -> LOG.info("TIMED-PRINT: {}.", 
printServers()), 10, 10, TimeUnit.SECONDS));
   }
 
-  /**
-   * start a stopped server again.
-   */
+  public void removeServer(RaftPeerId serverId) {
+    servers.remove(serverId);
+  }
+
+  /** Restart the server with the given id. */
   public RaftServer.Division restartServer(RaftPeerId serverId, boolean 
format) throws IOException {
     return restartServer(serverId, group, format);
   }
@@ -361,7 +363,7 @@ public abstract class MiniRaftCluster implements Closeable {
   public RaftServer.Division restartServer(RaftPeerId serverId, RaftGroup 
raftGroup, boolean format)
       throws IOException {
     killServer(serverId);
-    servers.remove(serverId);
+    removeServer(serverId);
 
     final RaftServer proxy = putNewServer(serverId, raftGroup, format);
     proxy.start();

Reply via email to