This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 6bdfb58  RATIS-498. Notify Follower to Install Snapshot through state 
machine.  Contributed by Hanisha Koneru
6bdfb58 is described below

commit 6bdfb587763d9e6faeeaf8918b8b6316c4cb7fe0
Author: Tsz Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Apr 3 21:15:48 2019 +0800

    RATIS-498. Notify Follower to Install Snapshot through state machine.  
Contributed by Hanisha Koneru
---
 .../java/org/apache/ratis/grpc/GrpcConfigKeys.java |  19 --
 .../apache/ratis/grpc/server/GrpcLogAppender.java  |  93 +++++++++-
 ratis-proto/src/main/proto/Raft.proto              |   7 +-
 .../apache/ratis/server/RaftServerConfigKeys.java  |  10 ++
 .../org/apache/ratis/server/impl/FollowerInfo.java |   6 +-
 .../apache/ratis/server/impl/FollowerState.java    |   1 +
 .../org/apache/ratis/server/impl/LogAppender.java  |   7 +-
 .../apache/ratis/server/impl/RaftServerImpl.java   | 192 +++++++++++++++++++--
 .../apache/ratis/server/impl/ServerProtoUtils.java |  24 +++
 .../org/apache/ratis/server/impl/ServerState.java  |  24 +++
 .../apache/ratis/statemachine/StateMachine.java    |  14 +-
 .../impl/SimpleStateMachineStorage.java            |   6 +
 .../ratis/statemachine/RaftSnapshotBaseTest.java   |   6 +-
 .../statemachine/SimpleStateMachine4Testing.java   |   4 +
 .../ratis/grpc/TestInstallSnapshotWithGrpc.java    | 188 ++++++++++++++++++++
 15 files changed, 550 insertions(+), 51 deletions(-)

diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
index 08ca49e..1f18810 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
@@ -159,25 +159,6 @@ public interface GrpcConfigKeys {
     }
   }
 
-  interface LogAppender {
-    Logger LOG = LoggerFactory.getLogger(Server.class);
-    static Consumer<String> getDefaultLog() {
-      return LOG::info;
-    }
-
-    String PREFIX = GrpcConfigKeys.PREFIX + ".log.appender";
-
-    String INSTALL_SNAPSHOT_ENABLED_KEY = PREFIX + ".install.snapshot.enabled";
-    boolean INSTALL_SNAPSHOT_ENABLED_DEFAULT = true;
-    static boolean installSnapshotEnabled(RaftProperties properties) {
-      return getBoolean(properties::getBoolean,
-          INSTALL_SNAPSHOT_ENABLED_KEY, INSTALL_SNAPSHOT_ENABLED_DEFAULT, 
getDefaultLog());
-    }
-    static void setInstallSnapshotEnabled(RaftProperties properties, boolean 
shouldInstallSnapshot) {
-      setBoolean(properties::setBoolean, INSTALL_SNAPSHOT_ENABLED_KEY, 
shouldInstallSnapshot);
-    }
-  }
-
   String MESSAGE_SIZE_MAX_KEY = PREFIX + ".message.size.max";
   SizeInBytes MESSAGE_SIZE_MAX_DEFAULT = SizeInBytes.valueOf("64MB");
   static SizeInBytes messageSizeMax(RaftProperties properties, 
Consumer<String> logger) {
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index da803c4..007284e 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -25,6 +25,7 @@ import org.apache.ratis.server.impl.LeaderState;
 import org.apache.ratis.server.impl.LogAppender;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.ServerProtoUtils;
+import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
@@ -68,7 +69,7 @@ public class GrpcLogAppender extends LogAppender {
         server.getProxy().getProperties());
     requestTimeoutDuration = 
RaftServerConfigKeys.Rpc.requestTimeout(server.getProxy().getProperties());
     pendingRequests = new ConcurrentHashMap<>();
-    installSnapshotEnabled = GrpcConfigKeys.LogAppender.installSnapshotEnabled(
+    installSnapshotEnabled = 
RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(
         server.getProxy().getProperties());
   }
 
@@ -99,6 +100,12 @@ public class GrpcLogAppender extends LogAppender {
             installSnapshot(snapshot);
             shouldAppendLog = false;
           }
+        } else {
+          TermIndex installSnapshotNotificationTermIndex = 
shouldNotifyToInstallSnapshot();
+          if (installSnapshotNotificationTermIndex != null) {
+            installSnapshot(installSnapshotNotificationTermIndex);
+            shouldAppendLog = false;
+          }
         }
         if (shouldAppendLog && !shouldWait()) {
           // keep appending log entries or sending heartbeats
@@ -163,7 +170,7 @@ public class GrpcLogAppender extends LogAppender {
         return;
       }
       pendingRequests.put(pending.getServerRequest().getCallId(), pending);
-      updateNextIndex(pending);
+      increaseNextIndex(pending);
       if (appendLogRequestObserver == null) {
         appendLogRequestObserver = getClient().appendEntries(new 
AppendLogResponseHandler());
       }
@@ -193,10 +200,10 @@ public class GrpcLogAppender extends LogAppender {
     }
   }
 
-  private void updateNextIndex(AppendEntriesRequestProto request) {
+  private void increaseNextIndex(AppendEntriesRequestProto request) {
     final int count = request.getEntriesCount();
     if (count > 0) {
-      follower.updateNextIndex(request.getEntries(count - 1).getIndex() + 1);
+      follower.increaseNextIndex(request.getEntries(count - 1).getIndex() + 1);
     }
   }
 
@@ -210,8 +217,8 @@ public class GrpcLogAppender extends LogAppender {
      * 1. If the reply is success, update the follower's match index and submit
      *    an event to leaderState
      * 2. If the reply is NOT_LEADER, step down
-     * 3. If the reply is INCONSISTENCY, decrease the follower's next index
-     *    based on the response
+     * 3. If the reply is INCONSISTENCY, increase/ decrease the follower's next
+     *    index based on the response
      */
     @Override
     public void onNext(AppendEntriesReplyProto reply) {
@@ -325,7 +332,8 @@ public class GrpcLogAppender extends LogAppender {
     }
     Preconditions.assertTrue(request.hasPreviousLog());
     if (request.getPreviousLog().getIndex() >= reply.getNextIndex()) {
-      clearPendingRequests(reply.getNextIndex());
+      pendingRequests.clear();
+      follower.updateNextIndex(reply.getNextIndex());
     }
   }
 
@@ -375,6 +383,14 @@ public class GrpcLogAppender extends LogAppender {
 
       switch (reply.getResult()) {
         case SUCCESS:
+        case IN_PROGRESS:
+          removePending(reply);
+          break;
+        case ALREADY_INSTALLED:
+          long followerLatestSnapshotIndex = reply.getSnapshotIndex();
+          LOG.info("{}: Latest snapshot index on follower {} is {}.",
+              server.getId(), follower.getPeer(), followerLatestSnapshotIndex);
+          follower.setSnapshotIndex(followerLatestSnapshotIndex);
           removePending(reply);
           break;
         case NOT_LEADER:
@@ -399,12 +415,15 @@ public class GrpcLogAppender extends LogAppender {
 
     @Override
     public void onCompleted() {
-      LOG.info("{} stops sending snapshots to follower {}", server.getId(),
-          follower);
+      LOG.info("Snapshot(s) sent from {} to follower {}", server.getId(), 
follower);
       close();
     }
   }
 
+  /**
+   * Send installSnapshot request to Follower with a snapshot.
+   * @param snapshot the snapshot to be sent to Follower
+   */
   private void installSnapshot(SnapshotInfo snapshot) {
     LOG.info("{}: follower {}'s next index is {}," +
             " log's start index is {}, need to install snapshot",
@@ -451,4 +470,60 @@ public class GrpcLogAppender extends LogAppender {
           server.getId(), snapshot.getTermIndex().getIndex(), 
follower.getPeer());
     }
   }
+
+  /**
+   * Send installSnapshot request to Follower with only a notification that a 
snapshot needs to be installed.
+   * @param firstAvailableLogTermIndex the first available log's index on the 
Leader
+   */
+  private void installSnapshot(TermIndex firstAvailableLogTermIndex) {
+    LOG.info("{}: follower {}'s next index is {}, log's start index is {}, " +
+            "need to notify follower to install snapshot",
+        server.getId(), follower.getPeer(), follower.getNextIndex(),
+        raftLog.getStartIndex());
+
+    final InstallSnapshotResponseHandler responseHandler = new 
InstallSnapshotResponseHandler();
+    StreamObserver<InstallSnapshotRequestProto> snapshotRequestObserver = null;
+    // prepare and enqueue the notify install snapshot request.
+    InstallSnapshotRequestProto request =
+        createInstallSnapshotNotificationRequest(firstAvailableLogTermIndex);
+    try {
+      snapshotRequestObserver = getClient().installSnapshot(responseHandler);
+      snapshotRequestObserver.onNext(request);
+      follower.updateLastRpcSendTime();
+      responseHandler.addPending(request);
+      snapshotRequestObserver.onCompleted();
+    } catch (Exception e) {
+      LOG.warn("{} failed to notify follower {} to install snapshot. " +
+          "Exception: {}", this, follower, e);
+      if (snapshotRequestObserver != null) {
+        snapshotRequestObserver.onError(e);
+      }
+      return;
+    }
+
+    synchronized (this) {
+      if (isAppenderRunning() && !responseHandler.isDone()) {
+        try {
+          wait();
+        } catch (InterruptedException ignored) {
+        }
+      }
+    }
+  }
+
+  /**
+   * Should the Leader notify the Follower to install the snapshot through
+   * its own State Machine.
+   * @return the first available log's start term index
+   */
+  protected TermIndex shouldNotifyToInstallSnapshot() {
+    if (follower.getNextIndex() < raftLog.getStartIndex()) {
+      // The Leader does not have the logs from the Follower's last log
+      // index onwards. And install snapshot is disabled. So the Follower
+      // should be notified to install the latest snapshot through its
+      // State Machine.
+      return raftLog.getTermIndex(raftLog.getStartIndex());
+    }
+    return null;
+  }
 }
diff --git a/ratis-proto/src/main/proto/Raft.proto 
b/ratis-proto/src/main/proto/Raft.proto
index b081352..96b91ec 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -124,6 +124,8 @@ message FileChunkProto {
 enum InstallSnapshotResult {
   SUCCESS = 0;
   NOT_LEADER = 1;
+  IN_PROGRESS = 2;
+  ALREADY_INSTALLED = 3;
 }
 
 message RequestVoteRequestProto {
@@ -158,7 +160,8 @@ message AppendEntriesReplyProto {
   enum AppendResult {
     SUCCESS = 0;
     NOT_LEADER = 1; // the requester's term is not large enough
-    INCONSISTENCY = 2; // gap between the local log and the entries
+    INCONSISTENCY = 2; // gap between the local log and the entries or 
snapshot installation in progress or
+                       // overlap between local snapshot and the entries
   }
 
   RaftRpcReplyProto serverReply = 1;
@@ -178,6 +181,7 @@ message InstallSnapshotRequestProto {
   repeated FileChunkProto fileChunks = 7;
   uint64 totalSize = 8;
   bool done = 9; // whether this is the final chunk for the same req.
+  TermIndexProto firstAvailableLogIndex = 11; // first available log index to 
notify Follower to install snapshot
 }
 
 message InstallSnapshotReplyProto {
@@ -185,6 +189,7 @@ message InstallSnapshotReplyProto {
   uint32 requestIndex = 2;
   uint64 term = 3;
   InstallSnapshotResult result = 4;
+  uint64 snapshotIndex = 5;
 }
 
 message ClientMessageEntryProto {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java 
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 9cbdf2f..b4f7faf 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -267,6 +267,16 @@ public interface RaftServerConfigKeys {
       static void setSnapshotChunkSizeMax(RaftProperties properties, 
SizeInBytes maxChunkSize) {
         setSizeInBytes(properties::set, SNAPSHOT_CHUNK_SIZE_MAX_KEY, 
maxChunkSize);
       }
+
+      String INSTALL_SNAPSHOT_ENABLED_KEY = PREFIX + 
".install.snapshot.enabled";
+      boolean INSTALL_SNAPSHOT_ENABLED_DEFAULT = true;
+      static boolean installSnapshotEnabled(RaftProperties properties) {
+        return getBoolean(properties::getBoolean,
+            INSTALL_SNAPSHOT_ENABLED_KEY, INSTALL_SNAPSHOT_ENABLED_DEFAULT, 
getDefaultLog());
+      }
+      static void setInstallSnapshotEnabled(RaftProperties properties, boolean 
shouldInstallSnapshot) {
+        setBoolean(properties::setBoolean, INSTALL_SNAPSHOT_ENABLED_KEY, 
shouldInstallSnapshot);
+      }
     }
   }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
index cad9620..5206528 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
@@ -79,7 +79,7 @@ public class FollowerInfo {
     return nextIndex.get();
   }
 
-  public void updateNextIndex(long newNextIndex) {
+  public void increaseNextIndex(long newNextIndex) {
     nextIndex.updateIncreasingly(newNextIndex, debugIndexChange);
   }
 
@@ -87,6 +87,10 @@ public class FollowerInfo {
     nextIndex.updateUnconditionally(old -> old <= 0L? old: Math.min(old - 1, 
newNextIndex), infoIndexChange);
   }
 
+  public void updateNextIndex(long newNextIndex) {
+    nextIndex.updateUnconditionally(old -> newNextIndex >= 0 ? newNextIndex : 
old, infoIndexChange);
+  }
+
   public void setSnapshotIndex(long snapshotIndex) {
     matchIndex.setUnconditionally(snapshotIndex, infoIndexChange);
     nextIndex.setUnconditionally(snapshotIndex + 1, infoIndexChange);
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
index f5d55dd..1d2afda 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
@@ -34,6 +34,7 @@ class FollowerState extends Daemon {
     APPEND_COMPLETE(AtomicInteger::decrementAndGet),
     INSTALL_SNAPSHOT_START(AtomicInteger::incrementAndGet),
     INSTALL_SNAPSHOT_COMPLETE(AtomicInteger::decrementAndGet),
+    INSTALL_SNAPSHOT_NOTIFICATION(AtomicInteger::get),
     REQUEST_VOTE(AtomicInteger::get);
 
     private final ToIntFunction<AtomicInteger> updateFunction;
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index 17a4073..136b4d9 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -316,6 +316,11 @@ public class LogAppender {
     }
   }
 
+  protected InstallSnapshotRequestProto 
createInstallSnapshotNotificationRequest(
+      TermIndex firstLogStartTermIndex) {
+    return server.createInstallSnapshotRequest(getFollowerId(), 
firstLogStartTermIndex);
+  }
+
   private FileChunkProto readFileChunk(FileInfo fileInfo,
       FileInputStream in, byte[] buf, int length, long offset, int chunkIndex)
       throws IOException {
@@ -425,7 +430,7 @@ public class LogAppender {
 
           if (nextIndex > oldNextIndex) {
             follower.updateMatchIndex(nextIndex - 1);
-            follower.updateNextIndex(nextIndex);
+            follower.increaseNextIndex(nextIndex);
             submitEventOnSuccessAppend();
           }
           break;
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index fb57b6c..6788e96 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -45,6 +45,7 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -70,6 +71,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
   private final int minTimeoutMs;
   private final int maxTimeoutMs;
   private final int rpcSlownessTimeoutMs;
+  private final boolean installSnapshotEnabled;
 
   private final LifeCycle lifeCycle;
   private final ServerState state;
@@ -82,6 +84,8 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
 
   private final RaftServerJmxAdapter jmxAdapter;
 
+  private AtomicReference<TermIndex> inProgressInstallSnapshotRequest;
+
   RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy 
proxy) throws IOException {
     final RaftPeerId id = proxy.getId();
     LOG.info("{}: new RaftServerImpl for {} with {}", id, group, stateMachine);
@@ -94,12 +98,14 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
     minTimeoutMs = 
RaftServerConfigKeys.Rpc.timeoutMin(properties).toIntExact(TimeUnit.MILLISECONDS);
     maxTimeoutMs = 
RaftServerConfigKeys.Rpc.timeoutMax(properties).toIntExact(TimeUnit.MILLISECONDS);
     rpcSlownessTimeoutMs = 
RaftServerConfigKeys.Rpc.slownessTimeout(properties).toIntExact(TimeUnit.MILLISECONDS);
+    installSnapshotEnabled = 
RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(properties);
     Preconditions.assertTrue(maxTimeoutMs > minTimeoutMs,
         "max timeout: %s, min timeout: %s", maxTimeoutMs, minTimeoutMs);
     this.proxy = proxy;
 
     this.state = new ServerState(id, group, properties, this, stateMachine);
     this.retryCache = initRetryCache(properties);
+    this.inProgressInstallSnapshotRequest = new AtomicReference<>(null);
 
     this.jmxAdapter = new RaftServerJmxAdapter();
   }
@@ -872,8 +878,8 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
     final List<CompletableFuture<Long>> futures;
 
     final long currentTerm;
-    final long nextIndex = state.getLog().getNextIndex();
     final long followerCommit = state.getLog().getLastCommittedIndex();
+    final long nextIndex = state.getNextIndex();
     final Optional<FollowerState> followerState;
     synchronized (this) {
       final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
@@ -899,22 +905,20 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
       }
       followerState = updateLastRpcTime(FollowerState.UpdateType.APPEND_START);
 
-      // We need to check if "previous" is in the local peer. Note that it is
-      // possible that "previous" is covered by the latest snapshot: e.g.,
-      // it's possible there's no log entries outside of the latest snapshot.
-      // However, it is not possible that "previous" index is smaller than the
-      // last index included in snapshot. This is because indices <= snapshot's
-      // last index should have been committed.
-      if (previous != null && !containPrevious(previous)) {
-        final AppendEntriesReplyProto reply = 
ServerProtoUtils.toAppendEntriesReplyProto(
-            leaderId, getId(), groupId, currentTerm, followerCommit, 
Math.min(nextIndex, previous.getIndex()),
-            INCONSISTENCY, callId);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("{}: inconsistency entries. Leader previous:{}, Reply:{}",
-              getId(), previous, ServerProtoUtils.toString(reply));
-        }
+      // Check that the append entries are not inconsistent. There are 3
+      // scenarios which can result in inconsistency:
+      //      1. There is a snapshot installation in progress
+      //      2. There is an overlap between the snapshot index and the entries
+      //      3. There is a gap between the local log and the entries
+      // In any of these scenarios, we should retrun an INCONSISTENCY reply
+      // back to leader so that the leader can update this follower's next
+      // index.
+
+      AppendEntriesReplyProto inconsistencyReply = 
checkInconsistentAppendEntries(
+          leaderId, currentTerm, followerCommit, previous, nextIndex, callId, 
entries);
+      if (inconsistencyReply != null) {
         followerState.ifPresent(fs -> 
fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE));
-        return CompletableFuture.completedFuture(reply);
+        return CompletableFuture.completedFuture(inconsistencyReply);
       }
 
       state.updateConfiguration(entries);
@@ -942,6 +946,64 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
     });
   }
 
+  private AppendEntriesReplyProto checkInconsistentAppendEntries(RaftPeerId 
leaderId, long currentTerm,
+      long followerCommit, TermIndex previous, long nextIndex, long callId, 
LogEntryProto... entries) {
+    long replyNextIndex = -1;
+
+    // Check if a snapshot installation through state machine is in progress.
+    if (inProgressInstallSnapshotRequest.get() != null) {
+      replyNextIndex = Math.min(nextIndex, previous.getIndex());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("{}: Cannot append entries as snapshot installation is in " +
+            "progress. Follower next index: {}", getId(), replyNextIndex);
+      }
+    }
+
+    // If a snapshot installation has happened, the new snapshot might
+    // include the log entry indices sent as part of the
+    // AppendEntriesRequestProto. Check that the first log entry proto is
+    // greater than the last index included in the latest snapshot. If not,
+    // the leader should be informed about the new snapshot index so that
+    // it can send log entries only from the next log index
+    long snapshotIndex = state.getSnapshotIndex();
+    if (snapshotIndex > 0 && entries != null && entries.length > 0
+        && entries[0].getIndex() <= snapshotIndex) {
+      replyNextIndex = snapshotIndex + 1;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("{}: Cannot append entries as latest snapshot already has " +
+            "the append entries. Snapshot index: {}, first append entry " +
+            "index: {}.", getId(), snapshotIndex, entries[0].getIndex());
+      }
+    }
+
+    // We need to check if "previous" is in the local peer. Note that it is
+    // possible that "previous" is covered by the latest snapshot: e.g.,
+    // it's possible there's no log entries outside of the latest snapshot.
+    // However, it is not possible that "previous" index is smaller than the
+    // last index included in snapshot. This is because indices <= snapshot's
+    // last index should have been committed.
+    if (previous != null && !containPrevious(previous)) {
+      replyNextIndex = Math.min(nextIndex, previous.getIndex());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("{}: Cannot append entries as there is a gap between " +
+            "local log and append entries. Previous is not present. " +
+            "Previous: {}, follower next index: {}", getId(), previous, 
replyNextIndex);
+      }
+    }
+
+    if (replyNextIndex != -1) {
+      final AppendEntriesReplyProto reply = 
ServerProtoUtils.toAppendEntriesReplyProto(
+          leaderId, getId(), groupId, currentTerm, followerCommit, 
replyNextIndex,
+          INCONSISTENCY, callId);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("{}: inconsistency entries. Reply:{}", getId(), 
ServerProtoUtils.toString(reply));
+      }
+      return reply;
+    }
+
+    return null;
+  }
+
   private boolean containPrevious(TermIndex previous) {
     if (LOG.isTraceEnabled()) {
       LOG.trace("{}: prev:{}, latestSnapshot:{}, latestInstalledSnapshot:{}",
@@ -967,6 +1029,18 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
     assertLifeCycleState(STARTING, RUNNING);
     assertGroup(leaderId, leaderGroupId);
 
+    // Check if install snapshot from Leader is enabled
+    if (installSnapshotEnabled) {
+      // Leader has sent InstallSnapshot request with SnapshotInfo. Install 
the snapshot.
+      return checkAndInstallSnapshot(request, leaderId);
+    } else {
+      // Leader has only sent a notification to install snapshot. Inform State 
Machine to install snapshot.
+      return notifyStateMachineToInstallSnapshot(request, leaderId);
+    }
+  }
+
+  private InstallSnapshotReplyProto checkAndInstallSnapshot(
+      InstallSnapshotRequestProto request, RaftPeerId leaderId) throws 
IOException {
     final long currentTerm;
     final long leaderTerm = request.getLeaderTerm();
     final TermIndex lastTermIndex = ServerProtoUtils.toTermIndex(
@@ -1017,6 +1091,84 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
         currentTerm, request.getRequestIndex(), InstallSnapshotResult.SUCCESS);
   }
 
+  private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot(
+      InstallSnapshotRequestProto request, RaftPeerId leaderId) throws 
IOException {
+    final long currentTerm;
+    final long leaderTerm = request.getLeaderTerm();
+    final TermIndex firstAvailableLogTermIndex = ServerProtoUtils.toTermIndex(
+        request.getFirstAvailableLogIndex());
+    final long firstAvailableLogIndex = firstAvailableLogTermIndex.getIndex();
+
+    synchronized (this) {
+      final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
+      currentTerm = state.getCurrentTerm();
+      if (!recognized) {
+        final InstallSnapshotReplyProto reply = ServerProtoUtils
+            .toInstallSnapshotReplyProto(leaderId, getId(), groupId, 
currentTerm,
+                request.getRequestIndex(), InstallSnapshotResult.NOT_LEADER);
+        LOG.debug("{}: do not recognize leader for installing snapshot." +
+            " Reply: {}", getId(), reply);
+        return reply;
+      }
+      changeToFollowerAndPersistMetadata(leaderTerm, "installSnapshot");
+      state.setLeader(leaderId, "installSnapshot");
+
+      
updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_NOTIFICATION);
+
+      if (inProgressInstallSnapshotRequest.compareAndSet(null, 
firstAvailableLogTermIndex)) {
+
+        // Check if snapshot index is already at par or ahead of the first
+        // available log index of the Leader.
+        long snapshotIndex = state.getSnapshotIndex();
+        if (snapshotIndex + 1 >= firstAvailableLogIndex) {
+          // State Machine has already installed the snapshot. Return the
+          // latest snapshot index to the Leader.
+
+          
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogTermIndex, 
null);
+          final InstallSnapshotReplyProto reply = 
ServerProtoUtils.toInstallSnapshotReplyProto(
+              leaderId, getId(), groupId, currentTerm,
+              InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
+          LOG.info("{}: StateMachine latest installed snapshot index: {}. 
Reply: {}",
+              getId(), snapshotIndex, reply);
+
+          return reply;
+        }
+
+        // This is the first installSnapshot notify request for this term and
+        // index. Notify the state machine to install the snapshot.
+        LOG.debug("{}: notifying state machine to install snapshot. Next log " 
+
+                "index is {} but the leader's first available log index is 
{}.",
+            getId(), state.getLog().getNextIndex(), firstAvailableLogIndex);
+
+        
stateMachine.notifyInstallSnapshotFromLeader(firstAvailableLogTermIndex)
+            .whenComplete((reply, exception) -> {
+              if (exception != null) {
+                LOG.error(getId() + ": State Machine failed to install 
snapshot", exception);
+                
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogTermIndex, 
null);
+                return;
+              }
+
+              if (reply != null) {
+                stateMachine.pause();
+                state.reloadStateMachine(reply.getIndex(), leaderTerm);
+                state.updateInstalledSnapshotIndex(reply);
+              }
+              
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogTermIndex, 
null);
+              return;
+            });
+
+        LOG.info("{}: StateMachine notified to install snapshot, Request: {}");
+        return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getId(), 
groupId,
+            currentTerm, InstallSnapshotResult.SUCCESS, -1);
+      }
+
+      LOG.debug("{}: StateMachine snapshot installation is in progress. " +
+              "InProgress Request: {}", getId(), 
inProgressInstallSnapshotRequest.get());
+      return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getId(), 
groupId,
+          currentTerm, InstallSnapshotResult.IN_PROGRESS, -1);
+    }
+  }
+
   synchronized InstallSnapshotRequestProto createInstallSnapshotRequest(
       RaftPeerId targetId, String requestId, int requestIndex,
       SnapshotInfo snapshot, List<FileChunkProto> chunks, boolean done) {
@@ -1028,6 +1180,14 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
         chunks, totalSize.getAsLong(), done);
   }
 
+  synchronized InstallSnapshotRequestProto createInstallSnapshotRequest(
+      RaftPeerId targetId, TermIndex firstAvailableLogTermIndex) {
+
+    assert (firstAvailableLogTermIndex.getIndex() > 0);
+    return ServerProtoUtils.toInstallSnapshotRequestProto(getId(),
+        targetId, groupId, state.getCurrentTerm(), firstAvailableLogTermIndex);
+  }
+
   synchronized RequestVoteRequestProto createRequestVoteRequest(
       RaftPeerId targetId, long term, TermIndex lastEntry) {
     return ServerProtoUtils.toRequestVoteRequestProto(getId(), targetId,
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index 62c21e9..bb5a1fa 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -285,6 +285,19 @@ public interface ServerProtoUtils {
     return builder.build();
   }
 
+  static InstallSnapshotReplyProto toInstallSnapshotReplyProto(
+      RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId,
+      long term, InstallSnapshotResult result, long installedSnapshotIndex) {
+    final RaftRpcReplyProto.Builder rb = 
toRaftRpcReplyProtoBuilder(requestorId,
+        replyId, groupId, result == InstallSnapshotResult.SUCCESS);
+    final InstallSnapshotReplyProto.Builder builder = InstallSnapshotReplyProto
+        .newBuilder().setServerReply(rb).setTerm(term).setResult(result);
+    if (installedSnapshotIndex > 0) {
+      builder.setSnapshotIndex(installedSnapshotIndex);
+    }
+    return builder.build();
+  }
+
   static InstallSnapshotRequestProto toInstallSnapshotRequestProto(
       RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId, String 
requestId, int requestIndex,
       long term, TermIndex lastTermIndex, List<FileChunkProto> chunks,
@@ -301,6 +314,17 @@ public interface ServerProtoUtils {
         .setDone(done).build();
   }
 
+  static InstallSnapshotRequestProto toInstallSnapshotRequestProto(
+      RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId,
+      long leaderTerm, TermIndex firstAvailable) {
+    return InstallSnapshotRequestProto.newBuilder()
+        .setServerRequest(toRaftRpcRequestProtoBuilder(requestorId, replyId, 
groupId))
+        // .setRaftConfiguration()  TODO: save and pass RaftConfiguration
+        .setLeaderTerm(leaderTerm)
+        .setFirstAvailableLogIndex(toTermIndexProto(firstAvailable))
+        .build();
+  }
+
   static AppendEntriesReplyProto toAppendEntriesReplyProto(
       RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId, long 
term,
       long followerCommit, long nextIndex, AppendResult result, long callId) {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 0a343b2..8fb55f2 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -412,6 +412,11 @@ public class ServerState implements Closeable {
         request.getTermIndex());
   }
 
+  void updateInstalledSnapshotIndex(TermIndex lastTermIndexInSnapshot) {
+    log.syncWithSnapshot(lastTermIndexInSnapshot.getIndex());
+    this.latestInstalledSnapshot = lastTermIndexInSnapshot;
+  }
+
   SnapshotInfo getLatestSnapshot() {
     return 
server.getStateMachine().getStateMachineStorage().getLatestSnapshot();
   }
@@ -420,6 +425,25 @@ public class ServerState implements Closeable {
     return latestInstalledSnapshot;
   }
 
+  /**
+   * The last index included in either the latestSnapshot or the
+   * latestInsalledSnapshot
+   * @return the last snapshot index
+   */
+  public long getSnapshotIndex() {
+    final long latestSnapshotIndex = getLatestSnapshot() != null ?
+        getLatestSnapshot().getIndex() : 0;
+    final long latestInstalledSnapshotIndex = latestInstalledSnapshot != null ?
+        latestInstalledSnapshot.getIndex() : 0;
+    return Math.max(latestSnapshotIndex, latestInstalledSnapshotIndex);
+  }
+
+  public long getNextIndex() {
+    final long logNextIndex = log.getNextIndex();
+    final long snapshotNextIndex = getSnapshotIndex() + 1;
+    return Math.max(logNextIndex, snapshotNextIndex);
+  }
+
   public long getLastAppliedIndex() {
     return stateMachineUpdater.getLastAppliedIndex();
   }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java 
b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index 2c51afb..ab7897c 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -23,7 +23,6 @@ import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.RaftConfiguration;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
@@ -254,4 +253,17 @@ public interface StateMachine extends Closeable {
   default CompletableFuture<Void> truncateStateMachineData(long index) {
     return CompletableFuture.completedFuture(null);
   }
+
+  /**
+   * Notify the Follower's state machine that the leader has purged entries
+   * from its log and hence to catch up, the Follower state machine would have
+   * to install the latest snapshot.
+   * @param firstTermIndexInLog TermIndex of the first append entry available
+   *                           in the Leader's log.
+   * @return After the snapshot installation is complete, return the last
+   * included term index in the snapshot.
+   */
+  default CompletableFuture<TermIndex> 
notifyInstallSnapshotFromLeader(TermIndex firstTermIndexInLog) {
+    return CompletableFuture.completedFuture(null);
+  }
 }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SimpleStateMachineStorage.java
 
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SimpleStateMachineStorage.java
index 78605b4..18501c7 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SimpleStateMachineStorage.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SimpleStateMachineStorage.java
@@ -22,6 +22,7 @@ import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.storage.FileInfo;
 import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.statemachine.StateMachineStorage;
+import 
org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.ratis.util.AtomicFileOutputStream;
 import org.apache.ratis.util.MD5FileUtil;
 import org.slf4j.Logger;
@@ -130,4 +131,9 @@ public class SimpleStateMachineStorage implements 
StateMachineStorage {
   public SingleFileSnapshotInfo getLatestSnapshot() {
     return currentSnapshot;
   }
+
+  @VisibleForTesting
+  public File getSmDir() {
+    return smDir;
+  }
 }
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
index 8a8401c..795c7b2 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
@@ -60,7 +60,7 @@ public abstract class RaftSnapshotBaseTest extends BaseTest {
   static final Logger LOG = 
LoggerFactory.getLogger(RaftSnapshotBaseTest.class);
   private static final int SNAPSHOT_TRIGGER_THRESHOLD = 10;
 
-  static List<File> getSnapshotFiles(MiniRaftCluster cluster, long startIndex, 
long endIndex) {
+  public static List<File> getSnapshotFiles(MiniRaftCluster cluster, long 
startIndex, long endIndex) {
     final RaftServerImpl leader = cluster.getLeader();
     final SimpleStateMachineStorage storage = 
SimpleStateMachine4Testing.get(leader).getStateMachineStorage();
     final long term = leader.getState().getCurrentTerm();
@@ -70,7 +70,7 @@ public abstract class RaftSnapshotBaseTest extends BaseTest {
   }
 
 
-  static void assertLeaderContent(MiniRaftCluster cluster) throws Exception {
+  public static void assertLeaderContent(MiniRaftCluster cluster) throws 
Exception {
     final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
     final RaftLog leaderLog = leader.getState().getLog();
     final long lastIndex = leaderLog.getLastEntryTermIndex().getIndex();
@@ -147,7 +147,7 @@ public abstract class RaftSnapshotBaseTest extends BaseTest 
{
     }
   }
 
-  static boolean exists(File f) {
+  public static boolean exists(File f) {
     if (f.exists()) {
       LOG.info("File exists: " + f);
       return true;
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index 6306ce2..6a8c532 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -375,4 +375,8 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
     LOG.info("{}: notifyExtendedNoLeader {}, {}", this, group, roleInfoProto);
     leaderElectionTimeoutInfo = roleInfoProto;
   }
+
+  protected File getSMdir() {
+    return storage.getSmDir();
+  }
 }
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestInstallSnapshotWithGrpc.java
 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestInstallSnapshotWithGrpc.java
new file mode 100644
index 0000000..e512262
--- /dev/null
+++ 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestInstallSnapshotWithGrpc.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.server.storage.RaftStorageDirectory;
+import org.apache.ratis.statemachine.RaftSnapshotBaseTest;
+import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
+import org.apache.ratis.util.FileUtils;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.LogUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.ratis.BaseTest.ONE_SECOND;
+
+public class TestInstallSnapshotWithGrpc {
+  static {
+    LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+    LogUtils.setLogLevel(RaftLog.LOG, Level.DEBUG);
+    LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
+  }
+
+  static final Logger LOG = 
LoggerFactory.getLogger(TestInstallSnapshotWithGrpc.class);
+  private static final int SNAPSHOT_TRIGGER_THRESHOLD = 10;
+  private static SingleFileSnapshotInfo leaderSnapshotInfo;
+
+  private MiniRaftCluster cluster;
+
+  private MiniRaftCluster.Factory<?> getFactory() {
+    return MiniRaftClusterWithGrpc.FACTORY;
+  }
+
+  @Before
+  public void setup() throws IOException {
+    final RaftProperties prop = new RaftProperties();
+    prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+        TestInstallSnapshotWithGrpc.StateMachineForGRpcTest.class, 
StateMachine.class);
+    RaftServerConfigKeys.Log.Appender.setInstallSnapshotEnabled(prop, false);
+    RaftServerConfigKeys.Snapshot.setAutoTriggerThreshold(
+        prop, SNAPSHOT_TRIGGER_THRESHOLD);
+    RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled(prop, true);
+    this.cluster = getFactory().newCluster(1, prop);
+    cluster.start();
+  }
+
+  @After
+  public void tearDown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  private static class StateMachineForGRpcTest extends
+      SimpleStateMachine4Testing {
+    @Override
+    public CompletableFuture<TermIndex> 
notifyInstallSnapshotFromLeader(TermIndex termIndex) {
+      try {
+        Path leaderSnapshotFile = leaderSnapshotInfo.getFile().getPath();
+        File followerSnapshotFilePath = new File(getSMdir(),
+            leaderSnapshotFile.getFileName().toString());
+        Files.copy(leaderSnapshotFile, followerSnapshotFilePath.toPath());
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+      return 
CompletableFuture.completedFuture(leaderSnapshotInfo.getTermIndex());
+    }
+  }
+
+  /**
+   * Basic test for install snapshot notification: start a one node cluster
+   * (disable install snapshot option) and let it generate a snapshot. Then
+   * delete the log and restart the node, and add more nodes as followers.
+   * The new follower nodes should get a install snapshot notification.
+   */
+  @Test
+  public void testInstallSnapshotNotification() throws Exception {
+    final List<RaftStorageDirectory.LogPathAndIndex> logs;
+    int i = 0;
+    try {
+      RaftTestUtil.waitForLeader(cluster);
+      final RaftPeerId leaderId = cluster.getLeader().getId();
+
+      try(final RaftClient client = cluster.createClient(leaderId)) {
+        for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) {
+          RaftClientReply
+              reply = client.send(new RaftTestUtil.SimpleMessage("m" + i));
+          Assert.assertTrue(reply.isSuccess());
+        }
+      }
+
+      // wait for the snapshot to be done
+      RaftStorageDirectory storageDirectory = cluster.getLeader().getState()
+          .getStorage().getStorageDir();
+
+      final long nextIndex = 
cluster.getLeader().getState().getLog().getNextIndex();
+      LOG.info("nextIndex = {}", nextIndex);
+      final List<File> snapshotFiles = 
RaftSnapshotBaseTest.getSnapshotFiles(cluster,
+          nextIndex - SNAPSHOT_TRIGGER_THRESHOLD, nextIndex);
+      JavaUtils.attempt(() -> 
snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists),
+          10, ONE_SECOND, "snapshotFile.exist", LOG);
+      logs = storageDirectory.getLogSegmentFiles();
+    } finally {
+      cluster.shutdown();
+    }
+
+    // delete the log segments from the leader
+    for (RaftStorageDirectory.LogPathAndIndex path : logs) {
+      FileUtils.delete(path.getPath());
+    }
+
+    // restart the peer
+    LOG.info("Restarting the cluster");
+    cluster.restart(false);
+    try {
+      RaftSnapshotBaseTest.assertLeaderContent(cluster);
+
+      // generate some more traffic
+      try(final RaftClient client = 
cluster.createClient(cluster.getLeader().getId())) {
+        Assert.assertTrue(client.send(new RaftTestUtil.SimpleMessage("m" + 
i)).isSuccess());
+      }
+
+      leaderSnapshotInfo = (SingleFileSnapshotInfo) 
cluster.getLeader().getStateMachine().getLatestSnapshot();
+
+      // add two more peers
+      MiniRaftCluster.PeerChanges change = cluster.addNewPeers(
+          new String[]{"s3", "s4"}, true);
+      // trigger setConfiguration
+      cluster.setConfiguration(change.allPeersInNewConf);
+
+      RaftServerTestUtil
+          .waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null);
+
+      // Check the installed snapshot index on each Follower matches with the
+      // leader snapshot.
+      for (RaftServerImpl follower : cluster.getFollowers()) {
+        follower.getState().getStorage().getStorageDir().getStateMachineDir();
+        Assert.assertEquals(leaderSnapshotInfo.getIndex(),
+            follower.getState().getLatestInstalledSnapshot().getIndex());
+      }
+
+      // restart the peer and check if it can correctly handle conf change
+      cluster.restartServer(cluster.getLeader().getId(), false);
+      RaftSnapshotBaseTest.assertLeaderContent(cluster);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+}

Reply via email to