This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 8620e09 RATIS-514. Check if leader and follower have same
configuration for installSnapshot. Contributed by Hanisha Koneru
8620e09 is described below
commit 8620e09b952b88a68a6912d26e53aa308a3c3b8c
Author: Tsz Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Apr 10 16:51:48 2019 +0800
RATIS-514. Check if leader and follower have same configuration for
installSnapshot. Contributed by Hanisha Koneru
---
.../apache/ratis/grpc/server/GrpcLogAppender.java | 6 +++-
ratis-proto/src/main/proto/Raft.proto | 41 +++++++++++++++-------
.../org/apache/ratis/server/impl/LogAppender.java | 2 +-
.../apache/ratis/server/impl/RaftServerImpl.java | 32 +++++++++++------
.../apache/ratis/server/impl/ServerProtoUtils.java | 39 ++++++++++++++------
.../org/apache/ratis/server/impl/ServerState.java | 4 +--
.../ratis/server/storage/SnapshotManager.java | 8 +++--
7 files changed, 91 insertions(+), 41 deletions(-)
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 007284e..7d9e018 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -347,7 +347,7 @@ public class GrpcLogAppender extends LogAppender {
}
synchronized void addPending(InstallSnapshotRequestProto request) {
- pending.offer(request.getRequestIndex());
+ pending.offer(request.getSnapshotChunk().getRequestIndex());
}
synchronized void removePending(InstallSnapshotReplyProto reply) {
@@ -396,6 +396,10 @@ public class GrpcLogAppender extends LogAppender {
case NOT_LEADER:
checkResponseTerm(reply.getTerm());
break;
+ case CONF_MISMATCH:
+ LOG.error("{}: Configuration Mismatch ({}): Leader {} has it set to
{} but follower {} has it set to {}",
+ server.getId(),
RaftServerConfigKeys.Log.Appender.INSTALL_SNAPSHOT_ENABLED_KEY,
+ server.getId(), installSnapshotEnabled, getFollowerId(),
!installSnapshotEnabled);
case UNRECOGNIZED:
break;
}
diff --git a/ratis-proto/src/main/proto/Raft.proto
b/ratis-proto/src/main/proto/Raft.proto
index 96b91ec..2d24a50 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -126,6 +126,7 @@ enum InstallSnapshotResult {
NOT_LEADER = 1;
IN_PROGRESS = 2;
ALREADY_INSTALLED = 3;
+ CONF_MISMATCH = 4;
}
message RequestVoteRequestProto {
@@ -172,24 +173,38 @@ message AppendEntriesReplyProto {
}
message InstallSnapshotRequestProto {
+ message SnapshotChunkProto {
+ string requestId = 1; // an identifier for chunked-requests.
+ uint32 requestIndex = 2; // the index for this request chunk. Starts from
0.
+ RaftConfigurationProto raftConfiguration = 3;
+ TermIndexProto termIndex = 4;
+ repeated FileChunkProto fileChunks = 5;
+ uint64 totalSize = 6;
+ bool done = 7; // whether this is the final chunk for the same req.
+ }
+
+ message NotificationProto {
+ TermIndexProto firstAvailableTermIndex = 1; // first available log index
to notify Follower to install snapshot.
+ }
+
RaftRpcRequestProto serverRequest = 1;
- string requestId = 2; // an identifier for chunked-requests.
- uint32 requestIndex = 3; // the index for this request chunk. Starts from 0.
- RaftConfigurationProto raftConfiguration = 4;
- uint64 leaderTerm = 5;
- TermIndexProto termIndex = 6;
- repeated FileChunkProto fileChunks = 7;
- uint64 totalSize = 8;
- bool done = 9; // whether this is the final chunk for the same req.
- TermIndexProto firstAvailableLogIndex = 11; // first available log index to
notify Follower to install snapshot
+ uint64 leaderTerm = 2;
+
+ oneof InstallSnapshotRequestBody {
+ SnapshotChunkProto snapshotChunk = 3;
+ NotificationProto notification = 4;
+ }
}
message InstallSnapshotReplyProto {
RaftRpcReplyProto serverReply = 1;
- uint32 requestIndex = 2;
- uint64 term = 3;
- InstallSnapshotResult result = 4;
- uint64 snapshotIndex = 5;
+ uint64 term = 2;
+ InstallSnapshotResult result = 3;
+
+ oneof InstallSnapshotReplyBody {
+ uint32 requestIndex = 4; // index of the snapshot chunk request.
+ uint64 snapshotIndex = 5; // index of snapshot installed after
notification.
+ }
}
message ClientMessageEntryProto {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index 136b4d9..82c942d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -130,7 +130,7 @@ public class LogAppender {
return follower;
}
- RaftPeerId getFollowerId() {
+ protected RaftPeerId getFollowerId() {
return getFollower().getPeer().getId();
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 6788e96..b187108 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1032,19 +1032,31 @@ public class RaftServerImpl implements
RaftServerProtocol, RaftServerAsynchronou
// Check if install snapshot from Leader is enabled
if (installSnapshotEnabled) {
// Leader has sent InstallSnapshot request with SnapshotInfo. Install
the snapshot.
- return checkAndInstallSnapshot(request, leaderId);
+ if (request.hasSnapshotChunk()) {
+ return checkAndInstallSnapshot(request, leaderId);
+ }
} else {
// Leader has only sent a notification to install snapshot. Inform State
Machine to install snapshot.
- return notifyStateMachineToInstallSnapshot(request, leaderId);
+ if (request.hasNotification()) {
+ return notifyStateMachineToInstallSnapshot(request, leaderId);
+ }
}
+ // There is a mismatch between configurations on leader and follower.
+ final InstallSnapshotReplyProto reply = ServerProtoUtils
+ .toInstallSnapshotReplyProto(leaderId, getId(), groupId,
+ InstallSnapshotResult.CONF_MISMATCH);
+ LOG.error("{}: Configuration Mismatch ({}): Leader {} has it set to {} but
follower {} has it set to {}",
+ getId(),
RaftServerConfigKeys.Log.Appender.INSTALL_SNAPSHOT_ENABLED_KEY,
+ leaderId, request.hasSnapshotChunk(), getId(), installSnapshotEnabled);
+ return reply;
}
private InstallSnapshotReplyProto checkAndInstallSnapshot(
InstallSnapshotRequestProto request, RaftPeerId leaderId) throws
IOException {
final long currentTerm;
final long leaderTerm = request.getLeaderTerm();
- final TermIndex lastTermIndex = ServerProtoUtils.toTermIndex(
- request.getTermIndex());
+ InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest =
request.getSnapshotChunk();
+ final TermIndex lastTermIndex =
ServerProtoUtils.toTermIndex(snapshotChunkRequest.getTermIndex());
final long lastIncludedIndex = lastTermIndex.getIndex();
final Optional<FollowerState> followerState;
synchronized (this) {
@@ -1053,7 +1065,7 @@ public class RaftServerImpl implements
RaftServerProtocol, RaftServerAsynchronou
if (!recognized) {
final InstallSnapshotReplyProto reply = ServerProtoUtils
.toInstallSnapshotReplyProto(leaderId, getId(), groupId,
currentTerm,
- request.getRequestIndex(), InstallSnapshotResult.NOT_LEADER);
+ snapshotChunkRequest.getRequestIndex(),
InstallSnapshotResult.NOT_LEADER);
LOG.debug("{}: do not recognize leader for installing snapshot." +
" Reply: {}", getId(), reply);
return reply;
@@ -1076,19 +1088,19 @@ public class RaftServerImpl implements
RaftServerProtocol, RaftServerAsynchronou
// update the committed index
// re-load the state machine if this is the last chunk
- if (request.getDone()) {
+ if (snapshotChunkRequest.getDone()) {
state.reloadStateMachine(lastIncludedIndex, leaderTerm);
}
} finally {
updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_COMPLETE);
}
}
- if (request.getDone()) {
+ if (snapshotChunkRequest.getDone()) {
LOG.info("{}:{} successfully install the whole snapshot-{}", getId(),
getGroupId(),
lastIncludedIndex);
}
return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getId(),
groupId,
- currentTerm, request.getRequestIndex(), InstallSnapshotResult.SUCCESS);
+ currentTerm, snapshotChunkRequest.getRequestIndex(),
InstallSnapshotResult.SUCCESS);
}
private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot(
@@ -1096,7 +1108,7 @@ public class RaftServerImpl implements
RaftServerProtocol, RaftServerAsynchronou
final long currentTerm;
final long leaderTerm = request.getLeaderTerm();
final TermIndex firstAvailableLogTermIndex = ServerProtoUtils.toTermIndex(
- request.getFirstAvailableLogIndex());
+ request.getNotification().getFirstAvailableTermIndex());
final long firstAvailableLogIndex = firstAvailableLogTermIndex.getIndex();
synchronized (this) {
@@ -1105,7 +1117,7 @@ public class RaftServerImpl implements
RaftServerProtocol, RaftServerAsynchronou
if (!recognized) {
final InstallSnapshotReplyProto reply = ServerProtoUtils
.toInstallSnapshotReplyProto(leaderId, getId(), groupId,
currentTerm,
- request.getRequestIndex(), InstallSnapshotResult.NOT_LEADER);
+ InstallSnapshotResult.NOT_LEADER, -1);
LOG.debug("{}: do not recognize leader for installing snapshot." +
" Reply: {}", getId(), reply);
return reply;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index bb5a1fa..adc593e 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -276,52 +276,69 @@ public interface ServerProtoUtils {
static InstallSnapshotReplyProto toInstallSnapshotReplyProto(
RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId,
- long term, int requestIndex, InstallSnapshotResult result) {
+ long currentTerm, int requestIndex, InstallSnapshotResult result) {
final RaftRpcReplyProto.Builder rb =
toRaftRpcReplyProtoBuilder(requestorId,
replyId, groupId, result == InstallSnapshotResult.SUCCESS);
final InstallSnapshotReplyProto.Builder builder = InstallSnapshotReplyProto
- .newBuilder().setServerReply(rb).setTerm(term).setResult(result)
+ .newBuilder().setServerReply(rb).setTerm(currentTerm).setResult(result)
.setRequestIndex(requestIndex);
return builder.build();
}
static InstallSnapshotReplyProto toInstallSnapshotReplyProto(
RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId,
- long term, InstallSnapshotResult result, long installedSnapshotIndex) {
+ long currentTerm, InstallSnapshotResult result, long
installedSnapshotIndex) {
final RaftRpcReplyProto.Builder rb =
toRaftRpcReplyProtoBuilder(requestorId,
replyId, groupId, result == InstallSnapshotResult.SUCCESS);
final InstallSnapshotReplyProto.Builder builder = InstallSnapshotReplyProto
- .newBuilder().setServerReply(rb).setTerm(term).setResult(result);
+
.newBuilder().setServerReply(rb).setTerm(currentTerm).setResult(result);
if (installedSnapshotIndex > 0) {
builder.setSnapshotIndex(installedSnapshotIndex);
}
return builder.build();
}
+ static InstallSnapshotReplyProto toInstallSnapshotReplyProto(
+ RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId,
+ InstallSnapshotResult result) {
+ final RaftRpcReplyProto.Builder rb =
toRaftRpcReplyProtoBuilder(requestorId,
+ replyId, groupId, result == InstallSnapshotResult.SUCCESS);
+ final InstallSnapshotReplyProto.Builder builder = InstallSnapshotReplyProto
+ .newBuilder().setServerReply(rb).setResult(result);
+ return builder.build();
+ }
+
static InstallSnapshotRequestProto toInstallSnapshotRequestProto(
RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId, String
requestId, int requestIndex,
long term, TermIndex lastTermIndex, List<FileChunkProto> chunks,
long totalSize, boolean done) {
+ final InstallSnapshotRequestProto.SnapshotChunkProto.Builder
snapshotChunkProto =
+ InstallSnapshotRequestProto.SnapshotChunkProto.newBuilder()
+ .setRequestId(requestId)
+ .setRequestIndex(requestIndex)
+ .setTermIndex(toTermIndexProto(lastTermIndex))
+ .addAllFileChunks(chunks)
+ .setTotalSize(totalSize)
+ .setDone(done);
return InstallSnapshotRequestProto.newBuilder()
.setServerRequest(toRaftRpcRequestProtoBuilder(requestorId, replyId,
groupId))
- .setRequestId(requestId)
- .setRequestIndex(requestIndex)
// .setRaftConfiguration() TODO: save and pass RaftConfiguration
.setLeaderTerm(term)
- .setTermIndex(toTermIndexProto(lastTermIndex))
- .addAllFileChunks(chunks)
- .setTotalSize(totalSize)
- .setDone(done).build();
+ .setSnapshotChunk(snapshotChunkProto)
+ .build();
}
static InstallSnapshotRequestProto toInstallSnapshotRequestProto(
RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId,
long leaderTerm, TermIndex firstAvailable) {
+ final InstallSnapshotRequestProto.NotificationProto.Builder
notificationProto =
+ InstallSnapshotRequestProto.NotificationProto.newBuilder()
+ .setFirstAvailableTermIndex(toTermIndexProto(firstAvailable));
return InstallSnapshotRequestProto.newBuilder()
.setServerRequest(toRaftRpcRequestProtoBuilder(requestorId, replyId,
groupId))
// .setRaftConfiguration() TODO: save and pass RaftConfiguration
.setLeaderTerm(leaderTerm)
- .setFirstAvailableLogIndex(toTermIndexProto(firstAvailable))
+ .setNotification(notificationProto)
.build();
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 8fb55f2..8f2cbed 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -407,9 +407,9 @@ public class ServerState implements Closeable {
StateMachine sm = server.getStateMachine();
sm.pause(); // pause the SM to prepare for install snapshot
snapshotManager.installSnapshot(sm, request);
- log.syncWithSnapshot(request.getTermIndex().getIndex());
+ log.syncWithSnapshot(request.getSnapshotChunk().getTermIndex().getIndex());
this.latestInstalledSnapshot = ServerProtoUtils.toTermIndex(
- request.getTermIndex());
+ request.getSnapshotChunk().getTermIndex());
}
void updateInstalledSnapshotIndex(TermIndex lastTermIndexInSnapshot) {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
index 77a9963..f0afc2b 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
@@ -54,7 +54,9 @@ public class SnapshotManager {
public void installSnapshot(StateMachine stateMachine,
InstallSnapshotRequestProto request) throws IOException {
- final long lastIncludedIndex = request.getTermIndex().getIndex();
+ final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest =
+ request.getSnapshotChunk();
+ final long lastIncludedIndex =
snapshotChunkRequest.getTermIndex().getIndex();
final RaftStorageDirectory dir = storage.getStorageDir();
File tmpDir = dir.getNewTempDir();
@@ -66,7 +68,7 @@ public class SnapshotManager {
// TODO: Make sure that subsequent requests for the same installSnapshot
are coming in order,
// and are not lost when whole request cycle is done. Check requestId and
requestIndex here
- for (FileChunkProto chunk : request.getFileChunksList()) {
+ for (FileChunkProto chunk : snapshotChunkRequest.getFileChunksList()) {
SnapshotInfo pi = stateMachine.getLatestSnapshot();
if (pi != null && pi.getTermIndex().getIndex() >= lastIncludedIndex) {
throw new IOException("There exists snapshot file "
@@ -124,7 +126,7 @@ public class SnapshotManager {
}
}
- if (request.getDone()) {
+ if (snapshotChunkRequest.getDone()) {
LOG.info("Install snapshot is done, renaming tnp dir:{} to:{}",
tmpDir, dir.getStateMachineDir());
dir.getStateMachineDir().delete();