Repository: incubator-ratis Updated Branches: refs/heads/master 940a169ba -> 9774c5cb1
RATIS-354. Remove LeaderNoOp and rename SMLogEntryProto. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/9774c5cb Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/9774c5cb Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/9774c5cb Branch: refs/heads/master Commit: 9774c5cb10dc14b280c8f5bafd852c0581ca0845 Parents: 940a169 Author: Tsz Wo Nicholas Sze <[email protected]> Authored: Thu Oct 18 09:25:43 2018 +0800 Committer: Tsz Wo Nicholas Sze <[email protected]> Committed: Thu Oct 18 09:25:43 2018 +0800 ---------------------------------------------------------------------- .../java/org/apache/ratis/util/ProtoUtils.java | 27 +++++++------ .../arithmetic/ArithmeticStateMachine.java | 19 ++++++--- .../filestore/FileStoreStateMachine.java | 35 +++++++++-------- .../examples/arithmetic/TestArithmetic.java | 1 - .../org/apache/ratis/grpc/TestRaftStream.java | 4 +- .../ratis/logservice/api/LogStateMachine.java | 4 +- ratis-proto/src/main/proto/Raft.proto | 13 ++----- .../ratis/server/impl/RaftServerImpl.java | 35 +++++++++++------ .../ratis/server/impl/ServerProtoUtils.java | 37 ++++++++++++------ .../apache/ratis/server/impl/ServerState.java | 2 +- .../apache/ratis/server/storage/LogSegment.java | 15 ++++--- .../apache/ratis/server/storage/RaftLog.java | 4 +- .../ratis/statemachine/TransactionContext.java | 14 +++---- .../statemachine/impl/BaseStateMachine.java | 8 +--- .../impl/TransactionContextImpl.java | 41 +++++++++++--------- .../java/org/apache/ratis/RaftAsyncTests.java | 2 +- .../java/org/apache/ratis/RaftTestUtil.java | 22 +++++------ .../ratis/server/impl/RetryCacheTestUtil.java | 4 +- .../server/storage/TestRaftLogSegment.java | 6 +-- .../statemachine/RaftSnapshotBaseTest.java | 2 +- .../SimpleStateMachine4Testing.java | 11 +++--- .../ratis/statemachine/TestStateMachine.java | 7 +--- 22 files changed, 170 insertions(+), 143 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9774c5cb/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java index 2b72397..dff4721 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java @@ -152,43 +152,42 @@ public interface ProtoUtils { } static LogEntryProto toLogEntryProto( - SMLogEntryProto operation, long term, long index, + StateMachineLogEntryProto operation, long term, long index, ClientId clientId, long callId) { return LogEntryProto.newBuilder().setTerm(term).setIndex(index) - .setSmLogEntry(operation) + .setStateMachineLogEntry(operation) .setClientId(clientId.toByteString()).setCallId(callId) .build(); } static boolean shouldReadStateMachineData(LogEntryProto entry) { - if (entry.getLogEntryBodyCase() != LogEntryBodyCase.SMLOGENTRY) { + if (!entry.hasStateMachineLogEntry()) { return false; } - final SMLogEntryProto smLog = entry.getSmLogEntry(); + final StateMachineLogEntryProto smLog = entry.getStateMachineLogEntry(); return smLog.getStateMachineDataAttached() && smLog.getStateMachineData().isEmpty(); } /** - * If the given entry is {@link LogEntryBodyCase#SMLOGENTRY} and it has state machine data, + * If the given entry is {@link LogEntryBodyCase#STATEMACHINELOGENTRY} and it has state machine data, * build a new entry without the state machine data. * * @return a new entry without the state machine data if the given has state machine data; * otherwise, return the given entry. */ static LogEntryProto removeStateMachineData(LogEntryProto entry) { - if (entry.getLogEntryBodyCase() != LogEntryBodyCase.SMLOGENTRY) { + if (!entry.hasStateMachineLogEntry()) { return entry; } - final SMLogEntryProto smLog = entry.getSmLogEntry(); + final StateMachineLogEntryProto smLog = entry.getStateMachineLogEntry(); if (smLog.getStateMachineData().isEmpty()) { return entry; } // build a new LogEntryProto without state machine data // and mark that it has been removed return LogEntryProto.newBuilder(entry) - .setSmLogEntry - (SMLogEntryProto.newBuilder() - .setData(smLog.getData()) + .setStateMachineLogEntry(StateMachineLogEntryProto.newBuilder() + .setLogData(smLog.getLogData()) .setStateMachineDataAttached(true) .setSerializedProtobufSize(entry.getSerializedSize())) .build(); @@ -201,17 +200,17 @@ public interface ProtoUtils { * @return LogEntryProto with stateMachineData added */ static LogEntryProto addStateMachineData(ByteString stateMachineData, LogEntryProto entry) { - final SMLogEntryProto smLogEntryProto = SMLogEntryProto.newBuilder(entry.getSmLogEntry()) + final StateMachineLogEntryProto smLogEntryProto = StateMachineLogEntryProto.newBuilder(entry.getStateMachineLogEntry()) .setStateMachineData(stateMachineData) .build(); - return LogEntryProto.newBuilder(entry).setSmLogEntry(smLogEntryProto).build(); + return LogEntryProto.newBuilder(entry).setStateMachineLogEntry(smLogEntryProto).build(); } static long getSerializedSize(LogEntryProto entry) { - if (entry.getLogEntryBodyCase() != LogEntryBodyCase.SMLOGENTRY) { + if (!entry.hasStateMachineLogEntry()) { return entry.getSerializedSize(); } - final SMLogEntryProto smLog = entry.getSmLogEntry(); + final StateMachineLogEntryProto smLog = entry.getStateMachineLogEntry(); if (!smLog.getStateMachineDataAttached()) { // if state machine data was never set, return the proto serialized size return entry.getSerializedSize(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9774c5cb/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java index f0cb359..4e8dd18 100644 --- a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java +++ b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java @@ -18,14 +18,14 @@ package org.apache.ratis.examples.arithmetic; import org.apache.ratis.examples.arithmetic.expression.Expression; +import org.apache.ratis.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.proto.RaftProtos.RaftPeerRole; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.RaftStorage; -import org.apache.ratis.proto.RaftProtos; -import org.apache.ratis.proto.RaftProtos.LogEntryProto; import org.apache.ratis.statemachine.StateMachineStorage; import org.apache.ratis.statemachine.TransactionContext; import org.apache.ratis.statemachine.impl.BaseStateMachine; @@ -33,7 +33,14 @@ import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo; import org.apache.ratis.util.AutoCloseableLock; -import java.io.*; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -152,7 +159,7 @@ public class ArithmeticStateMachine extends BaseStateMachine { @Override public CompletableFuture<Message> applyTransaction(TransactionContext trx) { final LogEntryProto entry = trx.getLogEntry(); - final AssignmentMessage assignment = new AssignmentMessage(entry.getSmLogEntry().getData()); + final AssignmentMessage assignment = new AssignmentMessage(entry.getStateMachineLogEntry().getLogData()); final long index = entry.getIndex(); final Double result; @@ -163,8 +170,8 @@ public class ArithmeticStateMachine extends BaseStateMachine { final Expression r = Expression.Utils.double2Expression(result); final CompletableFuture<Message> f = CompletableFuture.completedFuture(Expression.Utils.toMessage(r)); - final RaftProtos.RaftPeerRole role = trx.getServerRole(); - if (role == RaftProtos.RaftPeerRole.LEADER) { + final RaftPeerRole role = trx.getServerRole(); + if (role == RaftPeerRole.LEADER) { LOG.info("{}:{}-{}: {} = {}", role, getId(), index, assignment, r); } else { LOG.debug("{}:{}-{}: {} = {}", role, getId(), index, assignment, r); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9774c5cb/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java index bf58c6e..b04437b 100644 --- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java +++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java @@ -19,21 +19,27 @@ package org.apache.ratis.examples.filestore; import org.apache.ratis.conf.ConfUtils; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.proto.ExamplesProtos.DeleteReplyProto; +import org.apache.ratis.proto.ExamplesProtos.DeleteRequestProto; +import org.apache.ratis.proto.ExamplesProtos.FileStoreRequestProto; +import org.apache.ratis.proto.ExamplesProtos.ReadRequestProto; +import org.apache.ratis.proto.ExamplesProtos.WriteRequestHeaderProto; +import org.apache.ratis.proto.ExamplesProtos.WriteRequestProto; +import org.apache.ratis.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.server.RaftServer; +import org.apache.ratis.server.impl.ServerProtoUtils; import org.apache.ratis.server.storage.RaftStorage; -import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; -import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException; -import org.apache.ratis.proto.ExamplesProtos.*; -import org.apache.ratis.proto.RaftProtos.LogEntryProto; -import org.apache.ratis.proto.RaftProtos.SMLogEntryProto; import org.apache.ratis.statemachine.StateMachineStorage; import org.apache.ratis.statemachine.TransactionContext; import org.apache.ratis.statemachine.impl.BaseStateMachine; import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; import org.apache.ratis.statemachine.impl.TransactionContextImpl; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException; import org.apache.ratis.util.FileUtils; import java.io.File; @@ -89,17 +95,14 @@ public class FileStoreStateMachine extends BaseStateMachine { public TransactionContext startTransaction(RaftClientRequest request) throws IOException { final ByteString content = request.getMessage().getContent(); final FileStoreRequestProto proto = FileStoreRequestProto.parseFrom(content); - final SMLogEntryProto log; + final StateMachineLogEntryProto log; if (proto.getRequestCase() == FileStoreRequestProto.RequestCase.WRITE) { final WriteRequestProto write = proto.getWrite(); final FileStoreRequestProto newProto = FileStoreRequestProto.newBuilder() .setWriteHeader(write.getHeader()).build(); - log = SMLogEntryProto.newBuilder() - .setData(newProto.toByteString()) - .setStateMachineData(write.getData()) - .build(); + log = ServerProtoUtils.toStateMachineLogEntryProto(newProto.toByteString(), write.getData()); } else { - log = SMLogEntryProto.newBuilder().setData(content).build(); + log = ServerProtoUtils.toStateMachineLogEntryProto(content, null); } return new TransactionContextImpl(this, request, log); @@ -107,8 +110,8 @@ public class FileStoreStateMachine extends BaseStateMachine { @Override public CompletableFuture<Integer> writeStateMachineData(LogEntryProto entry) { - final SMLogEntryProto smLog = entry.getSmLogEntry(); - final ByteString data = smLog.getData(); + final StateMachineLogEntryProto smLog = entry.getStateMachineLogEntry(); + final ByteString data = smLog.getLogData(); final FileStoreRequestProto proto; try { proto = FileStoreRequestProto.parseFrom(data); @@ -134,13 +137,13 @@ public class FileStoreStateMachine extends BaseStateMachine { final long index = entry.getIndex(); updateLastAppliedTermIndex(entry.getTerm(), index); - final SMLogEntryProto smLog = entry.getSmLogEntry(); + final StateMachineLogEntryProto smLog = entry.getStateMachineLogEntry(); final FileStoreRequestProto request; try { - request = FileStoreRequestProto.parseFrom(smLog.getData()); + request = FileStoreRequestProto.parseFrom(smLog.getLogData()); } catch (InvalidProtocolBufferException e) { return FileStoreCommon.completeExceptionally(index, - "Failed to parse SmLogEntry", e); + "Failed to parse logData in" + smLog, e); } switch(request.getRequestCase()) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9774c5cb/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/TestArithmetic.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/TestArithmetic.java b/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/TestArithmetic.java index bd88000..acd1935 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/TestArithmetic.java +++ b/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/TestArithmetic.java @@ -30,7 +30,6 @@ import org.apache.ratis.util.LogUtils; import org.apache.ratis.util.Preconditions; import org.junit.Assert; import org.junit.Test; -import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9774c5cb/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java index 631bc1a..b3f6a41 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java @@ -111,7 +111,7 @@ public class TestRaftStream extends BaseTest { TermIndex[] entries = raftLog.getEntries(1, expectedCommittedIndex + 1); for (TermIndex entry : entries) { RaftProtos.LogEntryProto log = raftLog.get(entry.getIndex()); - byte[] logData = log.getSmLogEntry().getData().toByteArray(); + byte[] logData = log.getStateMachineLogEntry().getLogData().toByteArray(); byte[] expected = s.get(); LOG.info("log " + entry + " " + log.getLogEntryBodyCase() + " " + StringUtils.bytes2HexString(logData)); Assert.assertEquals(expected.length, logData.length); @@ -245,7 +245,7 @@ public class TestRaftStream extends BaseTest { byte[] actual = new byte[ByteValue.BUFFERSIZE * 8]; totalSize = 0; for (TermIndex e : entries) { - byte[] eValue = log.get(e.getIndex()).getSmLogEntry().getData().toByteArray(); + byte[] eValue = log.get(e.getIndex()).getStateMachineLogEntry().getLogData().toByteArray(); Assert.assertEquals(ByteValue.BUFFERSIZE, eValue.length); System.arraycopy(eValue, 0, actual, totalSize, eValue.length); totalSize += eValue.length; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9774c5cb/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java index 84689be..4bfa294 100644 --- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java @@ -183,12 +183,12 @@ public class LogStateMachine extends BaseStateMachine { try { final LogEntryProto entry = trx.getLogEntry(); LogServiceRequestProto logServiceRequestProto = - LogServiceRequestProto.parseFrom(entry.getSmLogEntry().getData()); + LogServiceRequestProto.parseFrom(entry.getStateMachineLogEntry().getLogData()); CompletableFuture<Message> f = null; switch (logServiceRequestProto.getRequestCase()) { case LOGMESSAGE: org.apache.ratis.proto.logservice.LogServiceProtos.LogMessage logMessage2 = logServiceRequestProto.getLogMessage(); - final LogMessage logMessage = LogMessage.parseFrom((entry.getSmLogEntry().getData())); + final LogMessage logMessage = LogMessage.parseFrom((entry.getStateMachineLogEntry().getLogData())); final long index = entry.getIndex(); Long val = null; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9774c5cb/ratis-proto/src/main/proto/Raft.proto ---------------------------------------------------------------------- diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto index d9a4f08..6462573 100644 --- a/ratis-proto/src/main/proto/Raft.proto +++ b/ratis-proto/src/main/proto/Raft.proto @@ -40,28 +40,23 @@ message RaftConfigurationProto { repeated RaftPeerProto oldPeers = 2; // the peers in the old conf } -message SMLogEntryProto { +message StateMachineLogEntryProto { // TODO: This is not super efficient if the SM itself uses PB to serialize its own data for a - // log entry. Data will be copied twice. We should directly support having any Message from SM - bytes data = 1; + /** RaftLog entry data */ + bytes logData = 1; bytes stateMachineData = 2; // State machine specific data which is not written to log. bool stateMachineDataAttached = 3; // set this flag when state machine data is attached. uint64 serializedProtobufSize = 4; // size of the serialized LogEntryProto along with stateMachineData } -message LeaderNoOp { - // empty -} - message LogEntryProto { uint64 term = 1; uint64 index = 2; oneof LogEntryBody { - SMLogEntryProto smLogEntry = 3; + StateMachineLogEntryProto stateMachineLogEntry = 3; RaftConfigurationProto configurationEntry = 4; - LeaderNoOp noOp = 5; } // clientId and callId are used to rebuild the retry cache. They're not http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9774c5cb/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index a1c1192..c1e3303 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -18,6 +18,7 @@ package org.apache.ratis.server.impl; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.proto.RaftProtos.*; import org.apache.ratis.protocol.*; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.RaftServerMXBean; @@ -28,28 +29,40 @@ import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.FileInfo; import org.apache.ratis.server.storage.RaftLog; import org.apache.ratis.server.storage.RaftStorageDirectory; -import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting; -import org.apache.ratis.proto.RaftProtos.*; import org.apache.ratis.statemachine.SnapshotInfo; import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.statemachine.TransactionContext; import org.apache.ratis.statemachine.impl.TransactionContextImpl; +import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.ratis.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.management.ObjectName; import java.io.IOException; -import java.util.*; -import java.util.concurrent.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.OptionalLong; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; -import static org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.*; -import static org.apache.ratis.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY; -import static org.apache.ratis.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.SMLOGENTRY; -import static org.apache.ratis.util.LifeCycle.State.*; +import static org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.INCONSISTENCY; +import static org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.NOT_LEADER; +import static org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.SUCCESS; +import static org.apache.ratis.util.LifeCycle.State.CLOSED; +import static org.apache.ratis.util.LifeCycle.State.CLOSING; +import static org.apache.ratis.util.LifeCycle.State.NEW; +import static org.apache.ratis.util.LifeCycle.State.RUNNING; +import static org.apache.ratis.util.LifeCycle.State.STARTING; public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronousProtocol, RaftClientProtocol, RaftClientAsynchronousProtocol { @@ -1074,11 +1087,11 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou CompletableFuture<Message> applyLogToStateMachine(LogEntryProto next) { final StateMachine stateMachine = getStateMachine(); - if (next.getLogEntryBodyCase() == CONFIGURATIONENTRY) { + if (next.hasConfigurationEntry()) { // the reply should have already been set. only need to record // the new conf in the state machine. stateMachine.setRaftConfiguration(ServerProtoUtils.toRaftConfiguration(next)); - } else if (next.getLogEntryBodyCase() == SMLOGENTRY) { + } else if (next.hasStateMachineLogEntry()) { // check whether there is a TransactionContext because we are the leader. TransactionContext trx = role.getLeaderState() .map(leader -> leader.getTransactionContext(next.getIndex())).orElseGet( @@ -1102,7 +1115,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou } public void failClientRequest(LogEntryProto logEntry) { - if (logEntry.getLogEntryBodyCase() == LogEntryProto.LogEntryBodyCase.SMLOGENTRY) { + if (logEntry.hasStateMachineLogEntry()) { final ClientId clientId = ClientId.valueOf(logEntry.getClientId()); final RetryCache.CacheEntry cacheEntry = getRetryCache().get(clientId, logEntry.getCallId()); if (cacheEntry != null) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9774c5cb/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java index c75b6ed..6fbce46 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java @@ -17,25 +17,28 @@ */ package org.apache.ratis.server.impl; -import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID; -import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_SEQNUM; +import org.apache.ratis.client.impl.ClientProtoUtils; +import org.apache.ratis.proto.RaftProtos.*; +import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult; +import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.apache.ratis.util.Preconditions; +import org.apache.ratis.util.ProtoUtils; import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.stream.Collectors; -import org.apache.ratis.client.impl.ClientProtoUtils; -import org.apache.ratis.protocol.*; -import org.apache.ratis.server.protocol.TermIndex; -import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; -import org.apache.ratis.proto.RaftProtos.*; -import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.*; -import org.apache.ratis.util.Preconditions; -import org.apache.ratis.util.ProtoUtils; +import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID; +import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_SEQNUM; /** Server proto utilities for internal use. */ -public class ServerProtoUtils { +public interface ServerProtoUtils { public static TermIndex toTermIndex(TermIndexProto p) { return p == null? null: TermIndex.newTermIndex(p.getTerm(), p.getIndex()); } @@ -80,7 +83,7 @@ public class ServerProtoUtils { + ",followerCommit:" + reply.getFollowerCommit(); } - private static String toString(RaftRpcReplyProto reply) { + static String toString(RaftRpcReplyProto reply) { return reply.getRequestorId().toStringUtf8() + "->" + reply.getReplyId().toStringUtf8() + "," + reply.getSuccess(); } @@ -114,6 +117,16 @@ public class ServerProtoUtils { .build(); } + static StateMachineLogEntryProto toStateMachineLogEntryProto( + ByteString logData, ByteString stateMachineData) { + final StateMachineLogEntryProto.Builder b = StateMachineLogEntryProto.newBuilder() + .setLogData(logData); + if (stateMachineData != null) { + b.setStateMachineData(stateMachineData); + } + return b.build(); + } + static RaftRpcReplyProto.Builder toRaftRpcReplyProtoBuilder( RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId, boolean success) { return ClientProtoUtils.toRaftRpcReplyProtoBuilder( http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9774c5cb/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java index 8fcc6b7..6d0477e 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java @@ -348,7 +348,7 @@ public class ServerState implements Closeable { configurationManager.addConfiguration(logIndex, conf); server.getServerRpc().addPeers(conf.getPeers()); LOG.info("{}: set configuration {} at {}", getSelfId(), conf, logIndex); - LOG.debug("{}: {}", getSelfId(), configurationManager); + LOG.trace("{}: {}", getSelfId(), configurationManager); } void updateConfiguration(LogEntryProto[] entries) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9774c5cb/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java index abf92e5..968a1ce 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java @@ -17,19 +17,25 @@ */ package org.apache.ratis.server.storage; +import org.apache.ratis.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto; import org.apache.ratis.server.impl.ServerProtoUtils; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.ratis.thirdparty.com.google.common.cache.CacheLoader; import org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream; -import org.apache.ratis.proto.RaftProtos.LogEntryProto; import org.apache.ratis.util.FileUtils; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.ProtoUtils; import java.io.File; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; @@ -42,10 +48,7 @@ import java.util.function.Consumer; */ class LogSegment implements Comparable<Long> { static long getEntrySize(LogEntryProto entry) { - final int serialized = - entry.getSerializedSize() - - (entry.getSmLogEntry().getStateMachineDataAttached() ? CodedOutputStream - .computeBytesSizeNoTag(entry.getSmLogEntry().getStateMachineData()) : 0); + final int serialized = ProtoUtils.removeStateMachineData(entry).getSerializedSize(); return serialized + CodedOutputStream.computeUInt32SizeNoTag(serialized) + 4; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9774c5cb/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java index c1b05f6..c666624 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java @@ -17,6 +17,7 @@ */ package org.apache.ratis.server.storage; +import org.apache.ratis.proto.RaftProtos.LogEntryProto; import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.StateMachineException; @@ -25,7 +26,6 @@ import org.apache.ratis.server.impl.RaftConfiguration; import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.server.impl.ServerProtoUtils; import org.apache.ratis.server.protocol.TermIndex; -import org.apache.ratis.proto.RaftProtos.LogEntryProto; import org.apache.ratis.statemachine.TransactionContext; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.util.AutoCloseableLock; @@ -154,7 +154,7 @@ public abstract class RaftLog implements Closeable { // build the log entry after calling the StateMachine final LogEntryProto e = ProtoUtils.toLogEntryProto( - operation.getSMLogEntry(), term, nextIndex, clientId, callId); + operation.getStateMachineLogEntry(), term, nextIndex, clientId, callId); int entrySize = e.getSerializedSize(); if (entrySize > maxBufferSize) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9774c5cb/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java index a112772..0a53c9d 100644 --- a/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java +++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java @@ -17,11 +17,11 @@ */ package org.apache.ratis.statemachine; -import org.apache.ratis.protocol.RaftClientRequest; -import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.proto.RaftProtos.LogEntryProto; import org.apache.ratis.proto.RaftProtos.LogEntryProto.LogEntryBodyCase; -import org.apache.ratis.proto.RaftProtos.SMLogEntryProto; +import org.apache.ratis.proto.RaftProtos.RaftPeerRole; +import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto; +import org.apache.ratis.protocol.RaftClientRequest; import java.io.IOException; import java.util.Collection; @@ -46,7 +46,7 @@ import java.util.Collection; */ public interface TransactionContext { /** @return the role of the server when this context is created. */ - RaftProtos.RaftPeerRole getServerRole(); + RaftPeerRole getServerRole(); /** * Returns the original request from the {@link RaftClientRequest} @@ -58,7 +58,7 @@ public interface TransactionContext { * Returns the data from the {@link StateMachine} * @return the data from the {@link StateMachine} */ - SMLogEntryProto getSMLogEntry(); + StateMachineLogEntryProto getStateMachineLogEntry(); /** * Returns the exception from the {@link StateMachine} or the log @@ -82,7 +82,7 @@ public interface TransactionContext { /** * Set the {@link LogEntryProto} the current {@link TransactionContext} specific to. The log - * entry's body case must be {@link LogEntryBodyCase#SMLOGENTRY}. The current + * entry's body case must be {@link LogEntryBodyCase#STATEMACHINELOGENTRY}. The current * {@link TransactionContext} log entry must be null, otherwise, a exception will be thrown * @param logEntry target {@link LogEntryProto} * @return the current {@link TransactionContext} itself @@ -94,7 +94,7 @@ public interface TransactionContext { * @param smLogEntryProto data from {@link StateMachine} * @return the current {@link TransactionContext} itself */ - TransactionContext setSmLogEntryProto(SMLogEntryProto smLogEntryProto); + TransactionContext setStateMachineLogEntryProto(StateMachineLogEntryProto smLogEntryProto); /** * Returns the committed log entry http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9774c5cb/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java index f43fdf8..9792252 100644 --- a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java +++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java @@ -27,7 +27,6 @@ import org.apache.ratis.server.impl.RaftConfiguration; import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.RaftStorage; -import org.apache.ratis.proto.RaftProtos.SMLogEntryProto; import org.apache.ratis.statemachine.SnapshotInfo; import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.statemachine.StateMachineStorage; @@ -108,7 +107,7 @@ public class BaseStateMachine implements StateMachine { public CompletableFuture<Message> applyTransaction(TransactionContext trx) { // return the same message contained in the entry return CompletableFuture.completedFuture( - Message.valueOf(trx.getLogEntry().getSmLogEntry().getData())); + Message.valueOf(trx.getLogEntry().getStateMachineLogEntry().getLogData())); } @Override @@ -185,10 +184,7 @@ public class BaseStateMachine implements StateMachine { @Override public TransactionContext startTransaction(RaftClientRequest request) throws IOException { - return new TransactionContextImpl(this, request, - SMLogEntryProto.newBuilder() - .setData(request.getMessage().getContent()) - .build()); + return new TransactionContextImpl(this, request, null); } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9774c5cb/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java index 5216cff..fd18d46 100644 --- a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java @@ -17,23 +17,25 @@ */ package org.apache.ratis.statemachine.impl; -import java.io.IOException; -import java.util.Objects; -import org.apache.ratis.protocol.RaftClientRequest; -import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.proto.RaftProtos.LogEntryProto; import org.apache.ratis.proto.RaftProtos.LogEntryProto.LogEntryBodyCase; -import org.apache.ratis.proto.RaftProtos.SMLogEntryProto; +import org.apache.ratis.proto.RaftProtos.RaftPeerRole; +import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto; +import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.server.impl.ServerProtoUtils; import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.statemachine.TransactionContext; import org.apache.ratis.util.Preconditions; +import java.io.IOException; +import java.util.Objects; + /** * Implementation of {@link TransactionContext} */ public class TransactionContextImpl implements TransactionContext { /** The role of the server when this object is created. */ - private final RaftProtos.RaftPeerRole serverRole; + private final RaftPeerRole serverRole; /** The {@link StateMachine} that originated the transaction. */ private final StateMachine stateMachine; @@ -44,7 +46,7 @@ public class TransactionContextImpl implements TransactionContext { private Exception exception; /** Data from the {@link StateMachine} */ - private SMLogEntryProto smLogEntryProto; + private StateMachineLogEntryProto smLogEntryProto; /** * Context specific to the state machine. @@ -64,7 +66,7 @@ public class TransactionContextImpl implements TransactionContext { /** Committed LogEntry. */ private LogEntryProto logEntry; - private TransactionContextImpl(RaftProtos.RaftPeerRole serverRole, StateMachine stateMachine) { + private TransactionContextImpl(RaftPeerRole serverRole, StateMachine stateMachine) { this.serverRole = serverRole; this.stateMachine = stateMachine; } @@ -72,7 +74,7 @@ public class TransactionContextImpl implements TransactionContext { /** The same as this(stateMachine, clientRequest, smLogEntryProto, null). */ public TransactionContextImpl( StateMachine stateMachine, RaftClientRequest clientRequest, - SMLogEntryProto smLogEntryProto) { + StateMachineLogEntryProto smLogEntryProto) { this(stateMachine, clientRequest, smLogEntryProto, null); } @@ -84,10 +86,11 @@ public class TransactionContextImpl implements TransactionContext { */ public TransactionContextImpl( StateMachine stateMachine, RaftClientRequest clientRequest, - SMLogEntryProto smLogEntryProto, Object stateMachineContext) { - this(RaftProtos.RaftPeerRole.LEADER, stateMachine); + StateMachineLogEntryProto smLogEntryProto, Object stateMachineContext) { + this(RaftPeerRole.LEADER, stateMachine); this.clientRequest = clientRequest; - this.smLogEntryProto = smLogEntryProto; + this.smLogEntryProto = smLogEntryProto != null? smLogEntryProto + : ServerProtoUtils.toStateMachineLogEntryProto(clientRequest.getMessage().getContent(), null); this.stateMachineContext = stateMachineContext; } @@ -96,14 +99,14 @@ public class TransactionContextImpl implements TransactionContext { * Used by followers for applying committed entries to the state machine. * @param logEntry the log entry to be applied */ - public TransactionContextImpl(RaftProtos.RaftPeerRole serverRole, StateMachine stateMachine, LogEntryProto logEntry) { + public TransactionContextImpl(RaftPeerRole serverRole, StateMachine stateMachine, LogEntryProto logEntry) { this(serverRole, stateMachine); setLogEntry(logEntry); - this.smLogEntryProto = logEntry.getSmLogEntry(); + this.smLogEntryProto = logEntry.getStateMachineLogEntry(); } @Override - public RaftProtos.RaftPeerRole getServerRole() { + public RaftPeerRole getServerRole() { return serverRole; } @@ -113,7 +116,7 @@ public class TransactionContextImpl implements TransactionContext { } @Override - public SMLogEntryProto getSMLogEntry() { + public StateMachineLogEntryProto getStateMachineLogEntry() { return smLogEntryProto; } @@ -136,16 +139,16 @@ public class TransactionContextImpl implements TransactionContext { @Override public TransactionContext setLogEntry(LogEntryProto logEntry) { Objects.requireNonNull(logEntry, "logEntry == null"); - Preconditions.assertTrue(logEntry.getLogEntryBodyCase() == LogEntryBodyCase.SMLOGENTRY, + Preconditions.assertTrue(logEntry.getLogEntryBodyCase() == LogEntryBodyCase.STATEMACHINELOGENTRY, () -> "LogEntryBodyCase = " + logEntry.getLogEntryBodyCase() - + " != " + LogEntryBodyCase.SMLOGENTRY + ", logEntry=" + logEntry); + + " != " + LogEntryBodyCase.STATEMACHINELOGENTRY + ", logEntry=" + logEntry); Preconditions.assertTrue(this.logEntry == null, "this.logEntry != null"); this.logEntry = logEntry; return this; } @Override - public TransactionContext setSmLogEntryProto(SMLogEntryProto smLogEntryProto) { + public TransactionContext setStateMachineLogEntryProto(StateMachineLogEntryProto smLogEntryProto) { this.smLogEntryProto = smLogEntryProto; return this; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9774c5cb/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java index 7e7c559..f79eb6b 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java @@ -235,7 +235,7 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba .thenCombine(staleReadFuture.thenApply(r -> getMessageContent(r)), (expected, computed) -> { try { LOG.info("query " + query + " returns " - + LogEntryProto.parseFrom(expected).getSmLogEntry().getData().toStringUtf8()); + + LogEntryProto.parseFrom(expected).getStateMachineLogEntry().getLogData().toStringUtf8()); } catch (InvalidProtocolBufferException e) { throw new CompletionException(e); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9774c5cb/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java index fa529b6..c789610 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java @@ -18,6 +18,8 @@ package org.apache.ratis; import org.apache.ratis.client.RaftClient; +import org.apache.ratis.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftPeerId; @@ -26,13 +28,10 @@ import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.BlockRequestHandlingInjection; import org.apache.ratis.server.impl.DelayLocalExecutionInjection; import org.apache.ratis.server.impl.RaftServerImpl; -import org.apache.ratis.server.impl.RaftServerProxy; import org.apache.ratis.server.impl.ServerProtoUtils; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.RaftLog; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; -import org.apache.ratis.proto.RaftProtos.LogEntryProto; -import org.apache.ratis.proto.RaftProtos.SMLogEntryProto; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.ProtoUtils; @@ -121,7 +120,7 @@ public interface RaftTestUtil { && idxExpected < expectedMessages.length) { try { if (Arrays.equals(expectedMessages[idxExpected].getContent().toByteArray(), - log.get(termIndices[idxEntries].getIndex()).getSmLogEntry().getData().toByteArray())) { + log.get(termIndices[idxEntries].getIndex()).getStateMachineLogEntry().getLogData().toByteArray())) { ++idxExpected; } } catch (IOException e) { @@ -141,7 +140,7 @@ public interface RaftTestUtil { try { e = log.get(termIndices[i].getIndex()); if (Arrays.equals(expectedMessages[j].getContent().toByteArray(), - e.getSmLogEntry().getData().toByteArray())) { + e.getStateMachineLogEntry().getLogData().toByteArray())) { Assert.assertTrue(predicate.test(e)); } } catch (IOException exception) { @@ -185,8 +184,8 @@ public interface RaftTestUtil { throw new AssertionError("Failed to get log at " + ti, exception); } - if (e.getLogEntryBodyCase() == LogEntryProto.LogEntryBodyCase.SMLOGENTRY) { - LOG.info(ServerProtoUtils.toString(e) + ", " + e.getSmLogEntry().toString().trim().replace("\n", ", ")); + if (e.hasStateMachineLogEntry()) { + LOG.info(ServerProtoUtils.toString(e) + ", " + e.getStateMachineLogEntry().toString().trim().replace("\n", ", ")); entries.add(e); } else if (e.getLogEntryBodyCase() == LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY) { LOG.info("Found " + LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY + " at " + ti @@ -216,7 +215,7 @@ public interface RaftTestUtil { Assert.assertTrue(e.getIndex() > logIndex); logIndex = e.getIndex(); Assert.assertArrayEquals(expectedMessages[i].getContent().toByteArray(), - e.getSmLogEntry().getData().toByteArray()); + e.getStateMachineLogEntry().getLogData().toByteArray()); } } @@ -271,12 +270,11 @@ public interface RaftTestUtil { class SimpleOperation { private final String op; - private final SMLogEntryProto smLogEntryProto; + private final StateMachineLogEntryProto smLogEntryProto; public SimpleOperation(String op) { this.op = Objects.requireNonNull(op); - this.smLogEntryProto = SMLogEntryProto.newBuilder() - .setData(ProtoUtils.toByteString(op)).build(); + this.smLogEntryProto = ServerProtoUtils.toStateMachineLogEntryProto(ProtoUtils.toByteString(op), null); } @Override @@ -296,7 +294,7 @@ public interface RaftTestUtil { return op.hashCode(); } - public SMLogEntryProto getLogEntryContent() { + public StateMachineLogEntryProto getLogEntryContent() { return smLogEntryProto; } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9774c5cb/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java index b31c7e8..039a2d8 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java @@ -30,7 +30,7 @@ public class RetryCacheTestUtil { } public static void createEntry(RetryCache cache, RaftProtos.LogEntryProto logEntry){ - if(logEntry.getLogEntryBodyCase() == RaftProtos.LogEntryProto.LogEntryBodyCase.SMLOGENTRY){ + if(logEntry.hasStateMachineLogEntry()) { ClientId clientId = ClientId.valueOf(logEntry.getClientId()); long callId = logEntry.getCallId(); cache.getOrCreateEntry(clientId, callId); @@ -39,7 +39,7 @@ public class RetryCacheTestUtil { public static void assertFailure(RetryCache cache, RaftProtos.LogEntryProto logEntry, boolean isFailed) { - if(logEntry.getLogEntryBodyCase() == RaftProtos.LogEntryProto.LogEntryBodyCase.SMLOGENTRY){ + if(logEntry.hasStateMachineLogEntry()) { ClientId clientId = ClientId.valueOf(logEntry.getClientId()); long callId = logEntry.getCallId(); Assert.assertEquals(isFailed, cache.get(clientId, callId).isFailed()); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9774c5cb/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java index 8d696b1..0f2a669 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java @@ -25,7 +25,7 @@ import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerConstants.StartupOption; import org.apache.ratis.server.storage.LogSegment.LogRecordWithEntry; import org.apache.ratis.proto.RaftProtos.LogEntryProto; -import org.apache.ratis.proto.RaftProtos.SMLogEntryProto; +import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto; import org.apache.ratis.util.FileUtils; import org.apache.ratis.util.ProtoUtils; import org.apache.ratis.util.SizeInBytes; @@ -179,7 +179,7 @@ public class TestRaftLogSegment extends BaseTest { public void testAppendWithGap() throws Exception { LogSegment segment = LogSegment.newOpenSegment(null, 1000); SimpleOperation op = new SimpleOperation("m"); - final SMLogEntryProto m = op.getLogEntryContent(); + final StateMachineLogEntryProto m = op.getLogEntryContent(); try { LogEntryProto entry = ProtoUtils.toLogEntryProto(m, 0, 1001, clientId, callId); segment.appendToOpenSegment(entry); @@ -284,7 +284,7 @@ public class TestRaftLogSegment extends BaseTest { INVALID_LOG_INDEX, true)) { LogEntryProto entry = in.nextEntry(); Assert.assertArrayEquals(content, - entry.getSmLogEntry().getData().toByteArray()); + entry.getStateMachineLogEntry().getLogData().toByteArray()); Assert.assertNull(in.nextEntry()); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9774c5cb/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java index 584812e..7a326a3 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java @@ -74,7 +74,7 @@ public abstract class RaftSnapshotBaseTest extends BaseTest { Assert.assertEquals(i+1, entries[i].getIndex()); Assert.assertArrayEquals( new SimpleMessage("m" + i).getContent().toByteArray(), - entries[i].getSmLogEntry().getData().toByteArray()); + entries[i].getStateMachineLogEntry().getLogData().toByteArray()); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9774c5cb/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java index b59988a..325683d 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java @@ -29,6 +29,7 @@ import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.ServerProtoUtils; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.LogInputStream; import org.apache.ratis.server.storage.LogOutputStream; @@ -36,7 +37,6 @@ import org.apache.ratis.server.storage.RaftStorage; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.proto.RaftProtos.RoleInfoProto; import org.apache.ratis.proto.RaftProtos.LogEntryProto; -import org.apache.ratis.proto.RaftProtos.SMLogEntryProto; import org.apache.ratis.statemachine.impl.BaseStateMachine; import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo; @@ -278,12 +278,13 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { } } + static final ByteString STATE_MACHINE_DATA = ByteString.copyFromUtf8("StateMachine Data"); + @Override public TransactionContext startTransaction(RaftClientRequest request) throws IOException { blocking.await(Blocking.Type.START_TRANSACTION); - return new TransactionContextImpl(this, request, SMLogEntryProto.newBuilder() - .setData(request.getMessage().getContent()) - .setStateMachineData(ByteString.copyFromUtf8("StateMachine Data")).build()); + return new TransactionContextImpl(this, request, + ServerProtoUtils.toStateMachineLogEntryProto(request.getMessage().getContent(), STATE_MACHINE_DATA)); } @Override @@ -295,7 +296,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { @Override public CompletableFuture<ByteString> readStateMachineData(LogEntryProto entry) { blocking.await(Blocking.Type.READ_STATE_MACHINE_DATA); - return CompletableFuture.completedFuture(null); + return CompletableFuture.completedFuture(STATE_MACHINE_DATA); } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9774c5cb/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java index 3f18c41..4bb3cb6 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java @@ -34,7 +34,6 @@ import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.impl.RaftServerProxy; import org.apache.ratis.server.impl.RaftServerTestUtil; import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc; -import org.apache.ratis.proto.RaftProtos.SMLogEntryProto; import org.apache.ratis.statemachine.impl.TransactionContextImpl; import org.apache.ratis.util.LogUtils; import org.junit.*; @@ -80,16 +79,14 @@ public class TestStateMachine extends BaseTest implements MiniRaftClusterWithSim // only leader will get this call isLeader.set(true); // send the next transaction id as the "context" from SM - return new TransactionContextImpl(this, request, SMLogEntryProto.newBuilder() - .setData(request.getMessage().getContent()) - .build(), transactions.incrementAndGet()); + return new TransactionContextImpl(this, request, null, transactions.incrementAndGet()); } @Override public CompletableFuture<Message> applyTransaction(TransactionContext trx) { try { assertNotNull(trx.getLogEntry()); - assertNotNull(trx.getSMLogEntry()); + assertNotNull(trx.getStateMachineLogEntry()); Object context = trx.getStateMachineContext(); if (isLeader.get()) { assertNotNull(trx.getClientRequest());
