Repository: incubator-ratis Updated Branches: refs/heads/master 06fd2e441 -> 2373a4a08
RATIS-70. Separate term/index/offset and log entry content in LogSegment. Contributed by Jing Zhao Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/2373a4a0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/2373a4a0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/2373a4a0 Branch: refs/heads/master Commit: 2373a4a08e5805af3291acae64c2efccd9f528d4 Parents: 06fd2e4 Author: Jing Zhao <[email protected]> Authored: Mon Apr 17 10:43:29 2017 -0700 Committer: Jing Zhao <[email protected]> Committed: Mon Apr 17 10:43:29 2017 -0700 ---------------------------------------------------------------------- .../org/apache/ratis/TestRestartRaftPeer.java | 2 +- .../org/apache/ratis/grpc/TestRaftStream.java | 14 +- .../ratis/server/impl/LeaderElection.java | 5 +- .../apache/ratis/server/impl/LeaderState.java | 12 +- .../apache/ratis/server/impl/LogAppender.java | 3 +- .../ratis/server/impl/RaftServerImpl.java | 30 +++- .../apache/ratis/server/impl/ServerState.java | 26 +++- .../ratis/server/impl/StateMachineUpdater.java | 32 +---- .../apache/ratis/server/protocol/TermIndex.java | 2 + .../apache/ratis/server/storage/LogSegment.java | 112 +++++++++------ .../ratis/server/storage/MemoryRaftLog.java | 37 +++-- .../apache/ratis/server/storage/RaftLog.java | 47 +++--- .../ratis/server/storage/RaftLogCache.java | 144 +++++++++++-------- .../ratis/server/storage/RaftLogWorker.java | 10 +- .../ratis/server/storage/SegmentedRaftLog.java | 53 ++++--- .../ratis/statemachine/BaseStateMachine.java | 4 +- .../apache/ratis/statemachine/StateMachine.java | 4 +- .../java/org/apache/ratis/RaftBasicTests.java | 5 +- .../java/org/apache/ratis/RaftTestUtil.java | 21 +-- .../impl/RaftReconfigurationBaseTest.java | 7 +- .../ratis/server/storage/TestRaftLogCache.java | 3 +- .../server/storage/TestRaftLogSegment.java | 13 +- .../server/storage/TestSegmentedRaftLog.java | 62 ++++---- 23 files changed, 390 insertions(+), 258 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java b/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java index 346d7c0..dc38e82 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java +++ b/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java @@ -111,6 +111,6 @@ public class TestRestartRaftPeer { // make sure the restarted peer's log segments is correct cluster.restartServer(followerId, false); Assert.assertTrue(cluster.getServer(followerId).getState().getLog() - .getLastEntry().getIndex() >= 20); + .getLastEntryTermIndex().getIndex() >= 20); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java index 16bc221..83ca5ec 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java @@ -20,6 +20,7 @@ package org.apache.ratis.grpc; import org.apache.log4j.Level; import org.apache.ratis.RaftTestUtil; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.util.LogUtils; import org.apache.ratis.util.SizeInBytes; import org.apache.ratis.grpc.client.AppendStreamer; @@ -27,7 +28,6 @@ import org.apache.ratis.grpc.client.RaftOutputStream; import org.apache.ratis.protocol.ClientId; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.storage.RaftLog; -import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; import org.junit.After; import org.junit.Assert; import org.junit.Test; @@ -108,9 +108,9 @@ public class TestRaftStream { long committedIndex = raftLog.getLastCommittedIndex(); Assert.assertEquals(expectedCommittedIndex, committedIndex); // check the log content - LogEntryProto[] entries = raftLog.getEntries(1, expectedCommittedIndex + 1); - for (LogEntryProto entry : entries) { - byte[] logData = entry.getSmLogEntry().getData().toByteArray(); + TermIndex[] entries = raftLog.getEntries(1, expectedCommittedIndex + 1); + for (TermIndex entry : entries) { + byte[] logData = raftLog.get(entry.getIndex()).getSmLogEntry().getData().toByteArray(); byte[] expected = s.get(); Assert.assertEquals("log entry: " + entry, expected.length, logData.length); @@ -240,11 +240,11 @@ public class TestRaftStream { // 0.5 + 1 + 2.5 + 4 = 8 Assert.assertEquals(8, leader.getState().getLastAppliedIndex()); Assert.assertEquals(8, log.getLastCommittedIndex()); - LogEntryProto[] entries = log.getEntries(1, 9); + TermIndex[] entries = log.getEntries(1, 9); byte[] actual = new byte[ByteValue.BUFFERSIZE * 8]; totalSize = 0; - for (LogEntryProto e : entries) { - byte[] eValue = e.getSmLogEntry().getData().toByteArray(); + for (TermIndex e : entries) { + byte[] eValue = log.get(e.getIndex()).getSmLogEntry().getData().toByteArray(); Assert.assertEquals(ByteValue.BUFFERSIZE, eValue.length); System.arraycopy(eValue, 0, actual, totalSize, eValue.length); totalSize += eValue.length; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java index d026db6..14a3780 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java @@ -45,7 +45,7 @@ class LeaderElection extends Daemon { List<Exception> exceptions, long newTerm) { LOG.info(server.getId() + ": Election " + result + "; received " + responses.size() + " response(s) " - + responses.stream().map(r -> ProtoUtils.toString(r)).collect(Collectors.toList()) + + responses.stream().map(ProtoUtils::toString).collect(Collectors.toList()) + " and " + exceptions.size() + " exception(s); " + server.getState()); int i = 0; for(Exception e : exceptions) { @@ -127,8 +127,7 @@ class LeaderElection extends Daemon { LOG.info(state.getSelfId() + ": begin an election in Term " + electionTerm); - TermIndex lastEntry = ServerProtoUtils.toTermIndex( - state.getLog().getLastEntry()); + TermIndex lastEntry = state.getLog().getLastEntryTermIndex(); if (lastEntry == null) { // lastEntry may need to be derived from snapshot SnapshotInfo snapshot = state.getLatestSnapshot(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java index 2d1ab52..96bd28b 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java @@ -20,6 +20,7 @@ package org.apache.ratis.server.impl; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.*; import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.RaftLog; import org.apache.ratis.shaded.proto.RaftProtos.LeaderNoOp; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; @@ -401,7 +402,7 @@ public class LeaderState { long majorityInNewConf = computeLastCommitted(voterLists.get(0), conf.containsInConf(selfId)); final long oldLastCommitted = raftLog.getLastCommittedIndex(); - final LogEntryProto[] entriesToCommit; + final TermIndex[] entriesToCommit; if (!conf.isTransitional()) { // copy the entries that may get committed out of the raftlog, to prevent // the possible race that the log gets purged after the statemachine does @@ -420,18 +421,17 @@ public class LeaderState { checkAndUpdateConfiguration(entriesToCommit); } - private boolean committedConf(LogEntryProto[] entries) { + private boolean committedConf(TermIndex[] entries) { final long currentCommitted = raftLog.getLastCommittedIndex(); - for (LogEntryProto entry : entries) { - if (entry.getIndex() <= currentCommitted && - ProtoUtils.isConfigurationLogEntry(entry)) { + for (TermIndex entry : entries) { + if (entry.getIndex() <= currentCommitted && raftLog.isConfigEntry(entry)) { return true; } } return false; } - private void checkAndUpdateConfiguration(LogEntryProto[] entriesToCheck) { + private void checkAndUpdateConfiguration(TermIndex[] entriesToCheck) { final RaftConfiguration conf = server.getRaftConf(); if (committedConf(entriesToCheck)) { if (conf.isTransitional()) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java index 5417e92..ff12f4e 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java @@ -137,8 +137,7 @@ public class LogAppender extends Daemon { } private TermIndex getPrevious() { - TermIndex previous = ServerProtoUtils.toTermIndex( - raftLog.get(follower.getNextIndex() - 1)); + TermIndex previous = raftLog.getTermIndex(follower.getNextIndex() - 1); if (previous == null) { // if previous is null, nextIndex must be equal to the log start // index (otherwise we will install snapshot). http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 0a6a68d..3c704b0 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -17,9 +17,12 @@ */ package org.apache.ratis.server.impl; +import static org.apache.ratis.server.impl.ServerProtoUtils.toRaftConfiguration; import static org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.INCONSISTENCY; import static org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.NOT_LEADER; import static org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.SUCCESS; +import static org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY; +import static org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.SMLOGENTRY; import static org.apache.ratis.util.LifeCycle.State.CLOSED; import static org.apache.ratis.util.LifeCycle.State.CLOSING; import static org.apache.ratis.util.LifeCycle.State.RUNNING; @@ -873,7 +876,7 @@ public class RaftServerImpl implements RaftServer { * @param stateMachineFuture the future returned by the state machine * from which we will get transaction result later */ - void replyPendingRequest(LogEntryProto logEntry, + private void replyPendingRequest(LogEntryProto logEntry, CompletableFuture<Message> stateMachineFuture) { // update the retry cache final ClientId clientId = new ClientId(logEntry.getClientId().toByteArray()); @@ -903,13 +906,36 @@ public class RaftServerImpl implements RaftServer { }); } - TransactionContext getTransactionContext(long index) { + private TransactionContext getTransactionContext(long index) { if (leaderState != null) { // is leader and is running return leaderState.getTransactionContext(index); } return null; } + public void applyLogToStateMachine(LogEntryProto next) { + if (next.getLogEntryBodyCase() == CONFIGURATIONENTRY) { + // the reply should have already been set. only need to record + // the new conf in the state machine. + stateMachine.setRaftConfiguration(toRaftConfiguration(next.getIndex(), + next.getConfigurationEntry())); + } else if (next.getLogEntryBodyCase() == SMLOGENTRY) { + // check whether there is a TransactionContext because we are the leader. + TransactionContext trx = getTransactionContext(next.getIndex()); + if (trx == null) { + trx = new TransactionContext(stateMachine, next); + } + + // Let the StateMachine inject logic for committed transactions in sequential order. + trx = stateMachine.applyTransactionSerial(trx); + + // TODO: This step can be parallelized + CompletableFuture<Message> stateMachineFuture = + stateMachine.applyTransaction(trx); + replyPendingRequest(next, stateMachineFuture); + } + } + @Override public RaftProperties getProperties() { return this.properties; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java index e9d8ef8..da1aa3c 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java @@ -34,6 +34,9 @@ import org.apache.ratis.util.ProtoUtils; import java.io.Closeable; import java.io.IOException; +import java.util.function.Consumer; + +import static org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY; /** * Common states of a raft peer. Protected by RaftServer's lock. @@ -85,7 +88,16 @@ public class ServerState implements Closeable { long lastApplied = initStatemachine(stateMachine, prop); leaderId = null; - log = initLog(id, prop, server, lastApplied); + // we cannot apply log entries to the state machine in this step, since we + // do not know whether the local log entries have been committed. + log = initLog(id, prop, lastApplied, entry -> { + if (entry.getLogEntryBodyCase() == CONFIGURATIONENTRY) { + configurationManager.addConfiguration(entry.getIndex(), + ServerProtoUtils.toRaftConfiguration(entry.getIndex(), + entry.getConfigurationEntry())); + } + }); + RaftLog.Metadata metadata = log.loadMetadata(); currentTerm = metadata.getTerm(); votedFor = metadata.getVotedFor(); @@ -122,7 +134,8 @@ public class ServerState implements Closeable { * know whether they have been committed. */ private RaftLog initLog(RaftPeerId id, RaftProperties prop, - RaftServerImpl server, long lastIndexInSnapshot) throws IOException { + long lastIndexInSnapshot, Consumer<LogEntryProto> logConsumer) + throws IOException { final RaftLog log; if (RaftServerConfigKeys.Log.useMemory(prop)) { log = new MemoryRaftLog(id); @@ -130,7 +143,7 @@ public class ServerState implements Closeable { log = new SegmentedRaftLog(id, server, this.storage, lastIndexInSnapshot, prop); } - log.open(configurationManager, lastIndexInSnapshot); + log.open(lastIndexInSnapshot, logConsumer); return log; } @@ -236,16 +249,15 @@ public class ServerState implements Closeable { } boolean isLogUpToDate(TermIndex candidateLastEntry) { - LogEntryProto lastEntry = log.getLastEntry(); + TermIndex local = log.getLastEntryTermIndex(); // need to take into account snapshot SnapshotInfo snapshot = server.getStateMachine().getLatestSnapshot(); - if (lastEntry == null && snapshot == null) { + if (local == null && snapshot == null) { return true; } else if (candidateLastEntry == null) { return false; } - TermIndex local = ServerProtoUtils.toTermIndex(lastEntry); - if (local == null || (snapshot != null && snapshot.getIndex() > lastEntry.getIndex())) { + if (local == null || (snapshot != null && snapshot.getIndex() > local.getIndex())) { local = snapshot.getTermIndex(); } return local.compareTo(candidateLastEntry) <= 0; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java index 95af956..b9ed6a5 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java @@ -17,21 +17,13 @@ */ package org.apache.ratis.server.impl; -import static org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY; -import static org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.SMLOGENTRY; - -import java.io.IOException; -import java.util.concurrent.CompletableFuture; - import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.protocol.Message; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.storage.RaftLog; import org.apache.ratis.server.storage.RaftStorage; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; import org.apache.ratis.statemachine.SnapshotInfo; import org.apache.ratis.statemachine.StateMachine; -import org.apache.ratis.statemachine.TransactionContext; import org.apache.ratis.util.Daemon; import org.apache.ratis.util.ExitUtils; import org.apache.ratis.util.LifeCycle; @@ -39,6 +31,8 @@ import org.apache.ratis.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; + /** * This class tracks the log entries that have been committed in a quorum and * applies them to the state machine. We let a separate thread do this work @@ -145,27 +139,7 @@ class StateMachineUpdater implements Runnable { while (lastAppliedIndex < committedIndex) { final LogEntryProto next = raftLog.get(lastAppliedIndex + 1); if (next != null) { - if (next.getLogEntryBodyCase() == CONFIGURATIONENTRY) { - // the reply should have already been set. only need to record - // the new conf in the state machine. - stateMachine.setRaftConfiguration( - ServerProtoUtils.toRaftConfiguration(next.getIndex(), - next.getConfigurationEntry())); - } else if (next.getLogEntryBodyCase() == SMLOGENTRY) { - // check whether there is a TransactionContext because we are the leader. - TransactionContext trx = server.getTransactionContext(next.getIndex()); - if (trx == null) { - trx = new TransactionContext(stateMachine, next); - } - - // Let the StateMachine inject logic for committed transactions in sequential order. - trx = stateMachine.applyTransactionSerial(trx); - - // TODO: This step can be parallelized - CompletableFuture<Message> stateMachineFuture = - stateMachine.applyTransaction(trx); - server.replyPendingRequest(next, stateMachineFuture); - } + server.applyLogToStateMachine(next); lastAppliedIndex++; } else { LOG.debug("{}: logEntry {} is null. There may be snapshot to load. state:{}", http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-server/src/main/java/org/apache/ratis/server/protocol/TermIndex.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/protocol/TermIndex.java b/ratis-server/src/main/java/org/apache/ratis/server/protocol/TermIndex.java index 477b70c..a16110f 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/protocol/TermIndex.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/protocol/TermIndex.java @@ -21,6 +21,8 @@ import org.apache.ratis.server.impl.ServerImplUtils; /** The term and the log index defined in the Raft consensus algorithm. */ public interface TermIndex extends Comparable<TermIndex> { + TermIndex[] EMPTY_TERMINDEX_ARRAY = {}; + /** @return the term. */ long getTerm(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java index 46f9f4f..3856585 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java @@ -17,19 +17,23 @@ */ package org.apache.ratis.server.storage; -import org.apache.ratis.server.impl.ConfigurationManager; -import org.apache.ratis.server.impl.ServerProtoUtils; +import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.ratis.shaded.com.google.protobuf.CodedOutputStream; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; import org.apache.ratis.util.FileUtils; import org.apache.ratis.util.Preconditions; +import org.apache.ratis.util.ProtoUtils; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; - -import static org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY; +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; /** * In-memory cache for a log segment file. All the updates will be first written @@ -38,44 +42,45 @@ import static org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBod * This class will be protected by the RaftServer's lock. */ class LogSegment implements Comparable<Long> { + static long getEntrySize(LogEntryProto entry) { + final int serialized = entry.getSerializedSize(); + return serialized + CodedOutputStream.computeUInt32SizeNoTag(serialized) + 4; + } + + @VisibleForTesting static class LogRecord { /** starting offset in the file */ - final long offset; - final LogEntryProto entry; + private final long offset; + private final TermIndex termIndex; LogRecord(long offset, LogEntryProto entry) { this.offset = offset; - this.entry = entry; + termIndex = TermIndex.newTermIndex(entry.getTerm(), entry.getIndex()); } - } - static class SegmentFileInfo { - final long startIndex; // start index of the - final long endIndex; // original end index - final boolean isOpen; - final long targetLength; // position for truncation - final long newEndIndex; // new end index after the truncation - - SegmentFileInfo(long start, long end, boolean isOpen, long targetLength, - long newEndIndex) { - this.startIndex = start; - this.endIndex = end; - this.isOpen = isOpen; - this.targetLength = targetLength; - this.newEndIndex = newEndIndex; + TermIndex getTermIndex() { + return termIndex; } - } - static long getEntrySize(LogEntryProto entry) { - final int serialized = entry.getSerializedSize(); - return serialized + CodedOutputStream.computeUInt32SizeNoTag(serialized) + 4; + long getOffset() { + return offset; + } } private boolean isOpen; - private final List<LogRecord> records = new ArrayList<>(); private long totalSize; private final long startIndex; private long endIndex; + /** + * the list of records is more like the index of a segment + */ + private final List<LogRecord> records = new ArrayList<>(); + /** + * the entryCache caches the content of log entries. + * TODO: currently we cache all the log entries. will fix it soon. + */ + private final Map<TermIndex, LogEntryProto> entryCache = new HashMap<>(); + private final Set<TermIndex> configEntries = new HashSet<>(); private LogSegment(boolean isOpen, long start, long end) { this.isOpen = isOpen; @@ -95,7 +100,7 @@ class LogSegment implements Comparable<Long> { } static LogSegment loadSegment(File file, long start, long end, boolean isOpen, - ConfigurationManager confManager) throws IOException { + Consumer<LogEntryProto> logConsumer) throws IOException { final LogSegment segment; try (LogInputStream in = new LogInputStream(file, start, end, isOpen)) { segment = isOpen ? LogSegment.newOpenSegment(start) : @@ -108,11 +113,9 @@ class LogSegment implements Comparable<Long> { "gap between entry %s and entry %s", prev, next); } segment.append(next); - if (confManager != null && - next.getLogEntryBodyCase() == CONFIGURATIONENTRY) { - confManager.addConfiguration(next.getIndex(), - ServerProtoUtils.toRaftConfiguration(next.getIndex(), - next.getConfigurationEntry())); + + if (logConsumer != null) { + logConsumer.accept(next); } prev = next; } @@ -123,7 +126,7 @@ class LogSegment implements Comparable<Long> { FileUtils.truncateFile(file, segment.getTotalSize()); } - Preconditions.assertTrue(start == segment.records.get(0).entry.getIndex()); + Preconditions.assertTrue(start == segment.records.get(0).getTermIndex().getIndex()); if (!isOpen) { Preconditions.assertTrue(segment.getEndIndex() == end); } @@ -143,7 +146,7 @@ class LogSegment implements Comparable<Long> { } int numOfEntries() { - return (int) (endIndex - startIndex + 1); + return Math.toIntExact(endIndex - startIndex + 1); } void appendToOpenSegment(LogEntryProto... entries) { @@ -167,29 +170,52 @@ class LogSegment implements Comparable<Long> { final LogRecord currentLast = getLastRecord(); if (currentLast != null) { Preconditions.assertTrue( - entry.getIndex() == currentLast.entry.getIndex() + 1, + entry.getIndex() == currentLast.getTermIndex().getIndex() + 1, "gap between entries %s and %s", entry.getIndex(), - currentLast.entry.getIndex()); + currentLast.getTermIndex().getIndex()); } final LogRecord record = new LogRecord(totalSize, entry); records.add(record); + entryCache.put(record.getTermIndex(), entry); + if (ProtoUtils.isConfigurationLogEntry(entry)) { + configEntries.add(record.getTermIndex()); + } totalSize += getEntrySize(entry); endIndex = entry.getIndex(); } } + LogEntryProto getLogEntry(long index) { + LogRecord record = getLogRecord(index); + return record == null ? null : entryCache.get(record.getTermIndex()); + } + + TermIndex getTermIndex(long index) { + LogRecord record = getLogRecord(index); + return record == null ? null : record.getTermIndex(); + } + LogRecord getLogRecord(long index) { if (index >= startIndex && index <= endIndex) { - return records.get((int) (index - startIndex)); + return records.get(Math.toIntExact(index - startIndex)); } return null; } - LogRecord getLastRecord() { + private LogRecord getLastRecord() { return records.isEmpty() ? null : records.get(records.size() - 1); } + TermIndex getLastTermIndex() { + LogRecord last = getLastRecord(); + return last == null ? null : last.getTermIndex(); + } + + boolean isConfigEntry(TermIndex ti) { + return configEntries.contains(ti); + } + long getTotalSize() { return totalSize; } @@ -199,9 +225,11 @@ class LogSegment implements Comparable<Long> { */ void truncate(long fromIndex) { Preconditions.assertTrue(fromIndex >= startIndex && fromIndex <= endIndex); - LogRecord record = records.get((int) (fromIndex - startIndex)); + LogRecord record = records.get(Math.toIntExact(fromIndex - startIndex)); for (long index = endIndex; index >= fromIndex; index--) { - records.remove((int)(index - startIndex)); + LogRecord removed = records.remove(Math.toIntExact(index - startIndex)); + entryCache.remove(removed.getTermIndex()); + configEntries.remove(removed.getTermIndex()); } totalSize = record.offset; isOpen = false; @@ -227,6 +255,8 @@ class LogSegment implements Comparable<Long> { void clear() { records.clear(); + entryCache.clear(); + configEntries.clear(); endIndex = startIndex - 1; } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java index a49db9a..df71d08 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java @@ -24,10 +24,12 @@ import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.impl.RaftConfiguration; import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.server.impl.ServerProtoUtils; +import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; import org.apache.ratis.util.AutoCloseableLock; import org.apache.ratis.util.CodeInjectionForTesting; import org.apache.ratis.util.Preconditions; +import org.apache.ratis.util.ProtoUtils; /** * A simple RaftLog implementation in memory. Used only for testing. @@ -49,15 +51,30 @@ public class MemoryRaftLog extends RaftLog { } @Override - public LogEntryProto[] getEntries(long startIndex, long endIndex) { + public TermIndex getTermIndex(long index) { checkLogState(); try(AutoCloseableLock readLock = readLock()) { - final int i = (int) startIndex; + final int i = (int) index; + return i >= 0 && i < entries.size() ? + ServerProtoUtils.toTermIndex(entries.get(i)) : null; + } + } + + @Override + public TermIndex[] getEntries(long startIndex, long endIndex) { + checkLogState(); + try(AutoCloseableLock readLock = readLock()) { + final int from = (int) startIndex; if (startIndex >= entries.size()) { return null; } - final int toIndex = (int) Math.min(entries.size(), endIndex); - return entries.subList(i, toIndex).toArray(EMPTY_LOGENTRY_ARRAY); + final int to = (int) Math.min(entries.size(), endIndex); + TermIndex[] ti = new TermIndex[to - from]; + for (int i = 0; i < ti.length; i++) { + ti[i] = TermIndex.newTermIndex(entries.get(i).getTerm(), + entries.get(i).getIndex()); + } + return ti; } } @@ -74,11 +91,11 @@ public class MemoryRaftLog extends RaftLog { } @Override - public LogEntryProto getLastEntry() { + public TermIndex getLastEntryTermIndex() { checkLogState(); try(AutoCloseableLock readLock = readLock()) { final int size = entries.size(); - return size == 0 ? null : entries.get(size - 1); + return size == 0 ? null : ServerProtoUtils.toTermIndex(entries.get(size - 1)); } } @@ -146,8 +163,7 @@ public class MemoryRaftLog extends RaftLog { @Override public String toString() { - return "last=" + ServerProtoUtils.toString(getLastEntry()) - + ", committed=" + return "last=" + getLastEntryTermIndex() + ", committed=" + ServerProtoUtils.toString(get(getLastCommittedIndex())); } @@ -180,4 +196,9 @@ public class MemoryRaftLog extends RaftLog { public void syncWithSnapshot(long lastSnapshotIndex) { // do nothing } + + @Override + public boolean isConfigEntry(TermIndex ti) { + return ProtoUtils.isConfigurationLogEntry(get(ti.getIndex())); + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java index e72249a..5002f83 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java @@ -17,15 +17,9 @@ */ package org.apache.ratis.server.storage; -import java.io.Closeable; -import java.io.IOException; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantReadWriteLock; - import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.StateMachineException; -import org.apache.ratis.server.impl.ConfigurationManager; import org.apache.ratis.server.impl.RaftConfiguration; import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.server.impl.ServerProtoUtils; @@ -38,6 +32,13 @@ import org.apache.ratis.util.ProtoUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Closeable; +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; + /** * Base class of RaftLog. Currently we provide two types of RaftLog * implementation: @@ -48,7 +49,6 @@ import org.slf4j.LoggerFactory; */ public abstract class RaftLog implements Closeable { public static final Logger LOG = LoggerFactory.getLogger(RaftLog.class); - public static final LogEntryProto[] EMPTY_LOGENTRY_ARRAY = new LogEntryProto[0]; public static final String LOG_SYNC = RaftLog.class.getSimpleName() + ".logSync"; /** @@ -85,7 +85,7 @@ public abstract class RaftLog implements Closeable { if (lastCommitted.get() < majorityIndex) { // Only update last committed index for current term. See §5.4.2 in // paper for details. - final LogEntryProto entry = get(majorityIndex); + final TermIndex entry = getTermIndex(majorityIndex); if (entry != null && entry.getTerm() == currentTerm) { LOG.debug("{}: Updating lastCommitted to {}", selfId, majorityIndex); lastCommitted.set(majorityIndex); @@ -103,8 +103,7 @@ public abstract class RaftLog implements Closeable { if (ti == null) { return false; } - LogEntryProto entry = get(ti.getIndex()); - TermIndex local = ServerProtoUtils.toTermIndex(entry); + TermIndex local = getTermIndex(ti.getIndex()); return ti.equals(local); } @@ -112,7 +111,7 @@ public abstract class RaftLog implements Closeable { * @return the index of the next log entry to append. */ public long getNextIndex() { - final LogEntryProto last = getLastEntry(); + final TermIndex last = getLastEntryTermIndex(); if (last == null) { // if the log is empty, the last committed index should be consistent with // the last index included in the latest snapshot. @@ -166,7 +165,7 @@ public abstract class RaftLog implements Closeable { } } - public void open(ConfigurationManager confManager, long lastIndexInSnapshot) + public void open(long lastIndexInSnapshot, Consumer<LogEntryProto> consumer) throws IOException { isOpen = true; } @@ -183,17 +182,26 @@ public abstract class RaftLog implements Closeable { public abstract LogEntryProto get(long index); /** + * Get the TermIndex information of the given index. + * + * @param index The given index. + * @return The TermIndex of the log entry associated with the given index. + * Null if there is no log entry with the index. + */ + public abstract TermIndex getTermIndex(long index); + + /** * @param startIndex the starting log index (inclusive) * @param endIndex the ending log index (exclusive) - * @return all log entries within the given index range. Null if startIndex - * is greater than the smallest available index. + * @return TermIndex of all log entries within the given index range. Null if + * startIndex is greater than the smallest available index. */ - public abstract LogEntryProto[] getEntries(long startIndex, long endIndex); + public abstract TermIndex[] getEntries(long startIndex, long endIndex); /** - * @return the last log entry. + * @return the last log entry's term and index. */ - public abstract LogEntryProto getLastEntry(); + public abstract TermIndex getLastEntryTermIndex(); /** * Truncate the log entries till the given index. The log with the given index @@ -250,9 +258,12 @@ public abstract class RaftLog implements Closeable { public abstract void syncWithSnapshot(long lastSnapshotIndex); + public abstract boolean isConfigEntry(TermIndex ti); + @Override public String toString() { - return ServerProtoUtils.toString(getLastEntry()); + TermIndex last = getLastEntryTermIndex(); + return last == null ? "null" : Collections.singletonList(last).toString(); } public static class Metadata { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java index 0a21846..0142bee 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java @@ -26,8 +26,7 @@ import java.util.List; import java.util.NoSuchElementException; import org.apache.ratis.server.impl.RaftServerConstants; -import org.apache.ratis.server.storage.LogSegment.LogRecord; -import org.apache.ratis.server.storage.LogSegment.SegmentFileInfo; +import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; import org.apache.ratis.util.Preconditions; @@ -38,6 +37,35 @@ import org.apache.ratis.util.Preconditions; * requires external lock protection. */ class RaftLogCache { + static class SegmentFileInfo { + final long startIndex; // start index of the segment + final long endIndex; // original end index + final boolean isOpen; + final long targetLength; // position for truncation + final long newEndIndex; // new end index after the truncation + + SegmentFileInfo(long start, long end, boolean isOpen, long targetLength, + long newEndIndex) { + this.startIndex = start; + this.endIndex = end; + this.isOpen = isOpen; + this.targetLength = targetLength; + this.newEndIndex = newEndIndex; + } + } + + static class TruncationSegments { + final SegmentFileInfo toTruncate; // name of the file to be truncated + final SegmentFileInfo[] toDelete; // names of the files to be deleted + + TruncationSegments(SegmentFileInfo toTruncate, + List<SegmentFileInfo> toDelete) { + this.toDelete = toDelete == null ? null : + toDelete.toArray(new SegmentFileInfo[toDelete.size()]); + this.toTruncate = toTruncate; + } + } + private LogSegment openSegment; private final List<LogSegment> closedSegments; @@ -74,25 +102,50 @@ class RaftLogCache { } } - LogEntryProto getEntry(long index) { + LogSegment getOpenSegment() { + return openSegment; + } + + /** + * finalize the current open segment, and start a new open segment + */ + void rollOpenSegment(boolean createNewOpen) { + Preconditions.assertTrue(openSegment != null + && openSegment.numOfEntries() > 0); + final long nextIndex = openSegment.getEndIndex() + 1; + openSegment.close(); + closedSegments.add(openSegment); + if (createNewOpen) { + openSegment = LogSegment.newOpenSegment(nextIndex); + } else { + openSegment = null; + } + } + + private LogSegment getSegment(long index) { if (openSegment != null && index >= openSegment.getStartIndex()) { - final LogRecord record = openSegment.getLogRecord(index); - return record == null ? null : record.entry; + return openSegment; } else { int segmentIndex = Collections.binarySearch(closedSegments, index); - if (segmentIndex < 0) { - return null; - } else { - return closedSegments.get(segmentIndex).getLogRecord(index).entry; - } + return segmentIndex < 0 ? null : closedSegments.get(segmentIndex); } } + LogEntryProto getEntry(long index) { + LogSegment segment = getSegment(index); + return segment == null ? null : segment.getLogEntry(index); + } + + TermIndex getTermIndex(long index) { + LogSegment segment = getSegment(index); + return segment == null ? null : segment.getTermIndex(index); + } + /** * @param startIndex inclusive * @param endIndex exclusive */ - LogEntryProto[] getEntries(final long startIndex, final long endIndex) { + TermIndex[] getEntries(final long startIndex, final long endIndex) { if (startIndex < 0 || startIndex < getStartIndex()) { throw new IndexOutOfBoundsException("startIndex = " + startIndex + ", log cache starts from index " + getStartIndex()); @@ -103,10 +156,10 @@ class RaftLogCache { } final long realEnd = Math.min(getEndIndex() + 1, endIndex); if (startIndex >= realEnd) { - return RaftLog.EMPTY_LOGENTRY_ARRAY; + return TermIndex.EMPTY_TERMINDEX_ARRAY; } - LogEntryProto[] entries = new LogEntryProto[(int) (realEnd - startIndex)]; + TermIndex[] entries = new TermIndex[Math.toIntExact(realEnd - startIndex)]; int segmentIndex = Collections.binarySearch(closedSegments, startIndex); if (segmentIndex < 0) { getEntriesFromSegment(openSegment, startIndex, entries, 0, entries.length); @@ -114,30 +167,37 @@ class RaftLogCache { long index = startIndex; for (int i = segmentIndex; i < closedSegments.size() && index < realEnd; i++) { LogSegment s = closedSegments.get(i); - int numberFromSegment = (int) Math.min(realEnd - index, - s.getEndIndex() - index + 1); - getEntriesFromSegment(s, index, entries, (int) (index - startIndex), - numberFromSegment); + int numberFromSegment = Math.toIntExact( + Math.min(realEnd - index, s.getEndIndex() - index + 1)); + getEntriesFromSegment(s, index, entries, + Math.toIntExact(index - startIndex), numberFromSegment); index += numberFromSegment; } if (index < realEnd) { getEntriesFromSegment(openSegment, index, entries, - (int) (index - startIndex), (int) (realEnd - index)); + Math.toIntExact(index - startIndex), + Math.toIntExact(realEnd - index)); } } return entries; } private void getEntriesFromSegment(LogSegment segment, long startIndex, - LogEntryProto[] entries, int offset, int size) { + TermIndex[] entries, int offset, int size) { long endIndex = segment.getEndIndex(); endIndex = Math.min(endIndex, startIndex + size - 1); int index = offset; for (long i = startIndex; i <= endIndex; i++) { - entries[index++] = segment.getLogRecord(i).entry; + LogSegment.LogRecord r = segment.getLogRecord(i); + entries[index++] = r == null ? null : r.getTermIndex(); } } + boolean isConfigEntry(TermIndex ti) { + LogSegment segment = getSegment(ti.getIndex()); + return segment != null && segment.isConfigEntry(ti); + } + long getStartIndex() { if (closedSegments.isEmpty()) { return openSegment != null ? openSegment.getStartIndex() : @@ -154,15 +214,11 @@ class RaftLogCache { closedSegments.get(closedSegments.size() - 1).getEndIndex()); } - LogEntryProto getLastEntry() { + TermIndex getLastTermIndex() { return (openSegment != null && openSegment.numOfEntries() > 0) ? - openSegment.getLastRecord().entry : + openSegment.getLastTermIndex() : (closedSegments.isEmpty() ? null : - closedSegments.get(closedSegments.size() - 1).getLastRecord().entry); - } - - LogSegment getOpenSegment() { - return openSegment; + closedSegments.get(closedSegments.size() - 1).getLastTermIndex()); } void appendEntry(LogEntryProto entry) { @@ -172,22 +228,6 @@ class RaftLogCache { openSegment.appendToOpenSegment(entry); } - /** - * finalize the current open segment, and start a new open segment - */ - void rollOpenSegment(boolean createNewOpen) { - Preconditions.assertTrue(openSegment != null - && openSegment.numOfEntries() > 0); - final long nextIndex = openSegment.getEndIndex() + 1; - openSegment.close(); - closedSegments.add(openSegment); - if (createNewOpen) { - openSegment = LogSegment.newOpenSegment(nextIndex); - } else { - openSegment = null; - } - } - private SegmentFileInfo deleteOpenSegment() { final long oldEnd = openSegment.getEndIndex(); openSegment.clear(); @@ -246,18 +286,6 @@ class RaftLogCache { return null; } - static class TruncationSegments { - final SegmentFileInfo toTruncate; // name of the file to be truncated - final SegmentFileInfo[] toDelete; // names of the files to be deleted - - TruncationSegments(SegmentFileInfo toTruncate, - List<SegmentFileInfo> toDelete) { - this.toDelete = toDelete == null ? null : - toDelete.toArray(new SegmentFileInfo[toDelete.size()]); - this.toTruncate = toTruncate; - } - } - Iterator<LogEntryProto> iterator(long startIndex) { return new EntryIterator(startIndex); } @@ -294,9 +322,9 @@ class RaftLogCache { @Override public LogEntryProto next() { - LogRecord record; + LogEntryProto entry; if (currentSegment == null || - (record = currentSegment.getLogRecord(nextIndex)) == null) { + (entry = currentSegment.getLogEntry(nextIndex)) == null) { throw new NoSuchElementException(); } if (++nextIndex > currentSegment.getEndIndex()) { @@ -306,7 +334,7 @@ class RaftLogCache { openSegment : closedSegments.get(segmentIndex); } } - return record.entry; + return entry; } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java index 5add8ae..ee21d8b 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java @@ -28,7 +28,7 @@ import org.apache.ratis.io.nativeio.NativeIO; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.server.impl.RaftServerImpl; -import org.apache.ratis.server.storage.LogSegment.SegmentFileInfo; +import org.apache.ratis.server.storage.RaftLogCache.SegmentFileInfo; import org.apache.ratis.server.storage.RaftLogCache.TruncationSegments; import org.apache.ratis.server.storage.SegmentedRaftLog.Task; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; @@ -200,13 +200,13 @@ class RaftLogWorker implements Runnable { * * Thus all the tasks are created and added sequentially. */ - Task startLogSegment(long startIndex) { - return addIOTask(new StartLogSegment(startIndex)); + void startLogSegment(long startIndex) { + addIOTask(new StartLogSegment(startIndex)); } - Task rollLogSegment(LogSegment segmentToClose) { + void rollLogSegment(LogSegment segmentToClose) { addIOTask(new FinalizeLogSegment(segmentToClose)); - return addIOTask(new StartLogSegment(segmentToClose.getEndIndex() + 1)); + addIOTask(new StartLogSegment(segmentToClose.getEndIndex() + 1)); } Task writeLogEntry(LogEntryProto entry) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java index 2fd4dd2..c956f84 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java @@ -20,9 +20,9 @@ package org.apache.ratis.server.storage; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServerConfigKeys; -import org.apache.ratis.server.impl.ConfigurationManager; import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; import org.apache.ratis.util.AutoCloseableLock; @@ -34,6 +34,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.List; +import java.util.function.Consumer; /** * The RaftLog implementation that writes log entries into segmented files in @@ -110,17 +111,18 @@ public class SegmentedRaftLog extends RaftLog { } @Override - public void open(ConfigurationManager confManager, long lastIndexInSnapshot) + public void open(long lastIndexInSnapshot, Consumer<LogEntryProto> consumer) throws IOException { - loadLogSegments(confManager, lastIndexInSnapshot); + loadLogSegments(lastIndexInSnapshot, consumer); File openSegmentFile = null; - if (cache.getOpenSegment() != null) { + LogSegment openSegment = cache.getOpenSegment(); + if (openSegment != null) { openSegmentFile = storage.getStorageDir() - .getOpenLogFile(cache.getOpenSegment().getStartIndex()); + .getOpenLogFile(openSegment.getStartIndex()); } fileLogWorker.start(Math.max(cache.getEndIndex(), lastIndexInSnapshot), openSegmentFile); - super.open(confManager, lastIndexInSnapshot); + super.open(lastIndexInSnapshot, consumer); } @Override @@ -128,12 +130,14 @@ public class SegmentedRaftLog extends RaftLog { return cache.getStartIndex(); } - private void loadLogSegments(ConfigurationManager confManager, - long lastIndexInSnapshot) throws IOException { + private void loadLogSegments(long lastIndexInSnapshot, + Consumer<LogEntryProto> logConsumer) throws IOException { try(AutoCloseableLock writeLock = writeLock()) { List<LogPathAndIndex> paths = storage.getStorageDir().getLogSegmentFiles(); for (LogPathAndIndex pi : paths) { - LogSegment logSegment = parseLogSegment(pi, confManager); + boolean isOpen = pi.endIndex == RaftServerConstants.INVALID_LOG_INDEX; + LogSegment logSegment = LogSegment.loadSegment(pi.path.toFile(), + pi.startIndex, pi.endIndex, isOpen, logConsumer); cache.addSegment(logSegment); } @@ -150,13 +154,6 @@ public class SegmentedRaftLog extends RaftLog { } } - private LogSegment parseLogSegment(LogPathAndIndex pi, - ConfigurationManager confManager) throws IOException { - final boolean isOpen = pi.endIndex == RaftServerConstants.INVALID_LOG_INDEX; - return LogSegment.loadSegment(pi.path.toFile(), pi.startIndex, pi.endIndex, - isOpen, confManager); - } - @Override public LogEntryProto get(long index) { checkLogState(); @@ -166,7 +163,15 @@ public class SegmentedRaftLog extends RaftLog { } @Override - public LogEntryProto[] getEntries(long startIndex, long endIndex) { + public TermIndex getTermIndex(long index) { + checkLogState(); + try(AutoCloseableLock readLock = readLock()) { + return cache.getTermIndex(index); + } + } + + @Override + public TermIndex[] getEntries(long startIndex, long endIndex) { checkLogState(); try(AutoCloseableLock readLock = readLock()) { return cache.getEntries(startIndex, endIndex); @@ -174,10 +179,10 @@ public class SegmentedRaftLog extends RaftLog { } @Override - public LogEntryProto getLastEntry() { + public TermIndex getLastEntryTermIndex() { checkLogState(); try(AutoCloseableLock readLock = readLock()) { - return cache.getLastEntry(); + return cache.getLastTermIndex(); } } @@ -209,10 +214,9 @@ public class SegmentedRaftLog extends RaftLog { cache.rollOpenSegment(true); fileLogWorker.rollLogSegment(currentOpenSegment); } else if (currentOpenSegment.numOfEntries() > 0 && - currentOpenSegment.getLastRecord().entry.getTerm() != entry.getTerm()) { + currentOpenSegment.getLastTermIndex().getTerm() != entry.getTerm()) { // the term changes - final long currentTerm = currentOpenSegment.getLastRecord().entry - .getTerm(); + final long currentTerm = currentOpenSegment.getLastTermIndex().getTerm(); Preconditions.assertTrue(currentTerm < entry.getTerm(), "open segment's term %s is larger than the new entry's term %s", currentTerm, entry.getTerm()); @@ -313,6 +317,11 @@ public class SegmentedRaftLog extends RaftLog { } @Override + public boolean isConfigEntry(TermIndex ti) { + return cache.isConfigEntry(ti); + } + + @Override public void close() throws IOException { super.close(); fileLogWorker.close(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-server/src/main/java/org/apache/ratis/statemachine/BaseStateMachine.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/BaseStateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/BaseStateMachine.java index 2a845c1..9a0e999 100644 --- a/ratis-server/src/main/java/org/apache/ratis/statemachine/BaseStateMachine.java +++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/BaseStateMachine.java @@ -86,12 +86,12 @@ public class BaseStateMachine implements StateMachine { } @Override - public TransactionContext applyTransactionSerial(TransactionContext trx) throws IOException { + public TransactionContext applyTransactionSerial(TransactionContext trx) { return trx; } @Override - public CompletableFuture<Message> applyTransaction(TransactionContext trx) throws IOException { + public CompletableFuture<Message> applyTransaction(TransactionContext trx) { // return the same message contained in the entry Message msg = () -> trx.getLogEntry().get().getSmLogEntry().getData(); return CompletableFuture.completedFuture(msg); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java index 2649dcb..4dcac72 100644 --- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java +++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java @@ -150,7 +150,7 @@ public interface StateMachine extends Closeable { * of the raft peers * @return The Transaction context. */ - TransactionContext applyTransactionSerial(TransactionContext trx) throws IOException; + TransactionContext applyTransactionSerial(TransactionContext trx); /** * Apply a committed log entry to the state machine. This method can be called concurrently with @@ -160,7 +160,7 @@ public interface StateMachine extends Closeable { * of the raft peers */ // TODO: We do not need to return CompletableFuture - CompletableFuture<Message> applyTransaction(TransactionContext trx) throws IOException; + CompletableFuture<Message> applyTransaction(TransactionContext trx); /** * Notify the state machine that the raft peer is no longer leader. http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java index 9e3897d..1e41944 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java @@ -98,8 +98,9 @@ public abstract class RaftBasicTests { LOG.info(cluster.printAllLogs()); cluster.getServers().stream().filter(RaftServerImpl::isAlive) - .map(s -> s.getState().getLog().getEntries(1, Long.MAX_VALUE)) - .forEach(e -> RaftTestUtil.assertLogEntries(e, 1, term, messages)); + .map(s -> s.getState().getLog()) + .forEach(log -> RaftTestUtil.assertLogEntries(log, + log.getEntries(1, Long.MAX_VALUE), 1, term, messages)); } @Test http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java index 202c618..76258e5 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java @@ -23,6 +23,8 @@ import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.BlockRequestHandlingInjection; import org.apache.ratis.server.impl.DelayLocalExecutionInjection; import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.server.storage.RaftLog; import org.apache.ratis.shaded.com.google.protobuf.ByteString; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto; @@ -45,6 +47,7 @@ import java.util.function.IntSupplier; import static org.apache.ratis.util.ProtoUtils.toByteString; public class RaftTestUtil { + public static final LogEntryProto[] EMPTY_LOGENTRY_ARRAY = new LogEntryProto[0]; static final Logger LOG = LoggerFactory.getLogger(RaftTestUtil.class); public static RaftServerImpl waitForLeader(MiniRaftCluster cluster) @@ -91,14 +94,16 @@ public class RaftTestUtil { return leader != null ? leader.getId().toString() : null; } - public static boolean logEntriesContains(LogEntryProto[] entries, + public static boolean logEntriesContains(RaftLog log, SimpleMessage... expectedMessages) { int idxEntries = 0; int idxExpected = 0; - while (idxEntries < entries.length + TermIndex[] termIndices = log.getEntries(0, Long.MAX_VALUE); + while (idxEntries < termIndices.length && idxExpected < expectedMessages.length) { if (Arrays.equals(expectedMessages[idxExpected].getContent().toByteArray(), - entries[idxEntries].getSmLogEntry().getData().toByteArray())) { + log.get(termIndices[idxEntries].getIndex()).getSmLogEntry() + .getData().toByteArray())) { ++idxExpected; } ++idxEntries; @@ -111,8 +116,8 @@ public class RaftTestUtil { final int size = servers.size(); final long count = servers.stream() .filter(RaftServerImpl::isAlive) - .map(s -> s.getState().getLog().getEntries(0, Long.MAX_VALUE)) - .filter(e -> logEntriesContains(e, expectedMessages)) + .map(s -> s.getState().getLog()) + .filter(log -> logEntriesContains(log, expectedMessages)) .count(); if (2*count <= size) { throw new AssertionError("Not in majority: size=" + size @@ -120,11 +125,11 @@ public class RaftTestUtil { } } - public static void assertLogEntries(LogEntryProto[] entries, long startIndex, - long expertedTerm, SimpleMessage... expectedMessages) { + public static void assertLogEntries(RaftLog log, TermIndex[] entries, + long startIndex, long expertedTerm, SimpleMessage... expectedMessages) { Assert.assertEquals(expectedMessages.length, entries.length); for(int i = 0; i < entries.length; i++) { - final LogEntryProto e = entries[i]; + final LogEntryProto e = log.get(entries[i].getIndex()); Assert.assertEquals(expertedTerm, e.getTerm()); Assert.assertEquals(startIndex + i, e.getIndex()); Assert.assertArrayEquals(expectedMessages[i].getContent().toByteArray(), http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java index 6884e40..524405a 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java @@ -50,6 +50,7 @@ import org.apache.ratis.protocol.ReconfigurationInProgressException; import org.apache.ratis.protocol.ReconfigurationTimeoutException; import org.apache.ratis.protocol.SetConfigurationRequest; import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.simulation.RequestHandler; import org.apache.ratis.server.storage.RaftLog; import org.apache.ratis.util.LogUtils; @@ -558,8 +559,9 @@ public abstract class RaftReconfigurationBaseTest { Thread.sleep(500); } Assert.assertEquals(1, log.getLatestFlushedIndex()); + TermIndex last = log.getLastEntryTermIndex(); Assert.assertEquals(CONFIGURATIONENTRY, - log.getLastEntry().getLogEntryBodyCase()); + log.get(last.getIndex()).getLogEntryBodyCase()); // unblock the old leader BlockRequestHandlingInjection.getInstance().unblockReplier(leaderId.toString()); @@ -575,8 +577,9 @@ public abstract class RaftReconfigurationBaseTest { boolean newState = false; for (int i = 0; i < 10 && !newState; i++) { Thread.sleep(500); + TermIndex lastTermIndex = log.getLastEntryTermIndex(); newState = log.getLastCommittedIndex() == 1 && - log.getLastEntry().getLogEntryBodyCase() != CONFIGURATIONENTRY; + log.get(lastTermIndex.getIndex()).getLogEntryBodyCase() != CONFIGURATIONENTRY; } Assert.assertTrue(newState); } finally { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java index 388c269..d9f7c10 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java @@ -21,6 +21,7 @@ import java.util.Iterator; import org.apache.ratis.RaftTestUtil.SimpleOperation; import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.RaftLogCache.TruncationSegments; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; import org.apache.ratis.util.ProtoUtils; @@ -74,7 +75,7 @@ public class TestRaftLogCache { } private void checkCacheEntries(long offset, int size, long end) { - LogEntryProto[] entries = cache.getEntries(offset, offset + size); + TermIndex[] entries = cache.getEntries(offset, offset + size); long realEnd = offset + size > end + 1 ? end + 1 : offset + size; Assert.assertEquals(realEnd - offset, entries.length); for (long i = offset; i < realEnd; i++) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java index 89f4c08..4e90d75 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java @@ -95,11 +95,12 @@ public class TestRaftLogSegment { long offset = SegmentedRaftLog.HEADER_BYTES.length; for (long i = start; i <= end; i++) { LogSegment.LogRecord record = segment.getLogRecord(i); - Assert.assertEquals(i, record.entry.getIndex()); - Assert.assertEquals(term, record.entry.getTerm()); - Assert.assertEquals(offset, record.offset); + LogEntryProto entry = segment.getLogEntry(i); + Assert.assertEquals(i, entry.getIndex()); + Assert.assertEquals(term, entry.getTerm()); + Assert.assertEquals(offset, record.getOffset()); - offset += getEntrySize(record.entry); + offset += getEntrySize(entry); } } @@ -192,13 +193,13 @@ public class TestRaftLogSegment { } // truncate an open segment (remove 1080~1099) - long newSize = segment.getLogRecord(start + 80).offset; + long newSize = segment.getLogRecord(start + 80).getOffset(); segment.truncate(start + 80); Assert.assertEquals(80, segment.numOfEntries()); checkLogSegment(segment, start, start + 79, false, newSize, term); // truncate a closed segment (remove 1050~1079) - newSize = segment.getLogRecord(start + 50).offset; + newSize = segment.getLogRecord(start + 50).getOffset(); segment.truncate(start + 50); Assert.assertEquals(50, segment.numOfEntries()); checkLogSegment(segment, start, start + 49, false, newSize, term); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java index ccf7690..afd7d29 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java @@ -18,21 +18,19 @@ package org.apache.ratis.server.storage; import org.apache.log4j.Level; -import org.apache.ratis.MiniRaftCluster; import org.apache.ratis.RaftTestUtil; import org.apache.ratis.RaftTestUtil.SimpleOperation; import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.util.LogUtils; -import org.apache.ratis.util.SizeInBytes; import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServerConfigKeys; -import org.apache.ratis.server.impl.ConfigurationManager; import org.apache.ratis.server.impl.RaftServerConstants; -import org.apache.ratis.server.impl.RaftServerTestUtil; +import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; import org.apache.ratis.util.FileUtils; +import org.apache.ratis.util.LogUtils; import org.apache.ratis.util.ProtoUtils; +import org.apache.ratis.util.SizeInBytes; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -45,6 +43,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.function.Supplier; +import java.util.stream.Collectors; public class TestSegmentedRaftLog { static { @@ -72,8 +71,6 @@ public class TestSegmentedRaftLog { private File storageDir; private RaftProperties properties; private RaftStorage storage; - private final ConfigurationManager cm = RaftServerTestUtil.newConfigurationManager( - MiniRaftCluster.initConfiguration(3)); @Before public void setup() throws Exception { @@ -123,6 +120,10 @@ public class TestSegmentedRaftLog { return list; } + private LogEntryProto getLastEntry(SegmentedRaftLog raftLog) { + return raftLog.get(raftLog.getLastEntryTermIndex().getIndex()); + } + @Test public void testLoadLogSegments() throws Exception { // first generate log files @@ -132,15 +133,20 @@ public class TestSegmentedRaftLog { // create RaftLog object and load log file try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, null, storage, -1, properties)) { - raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX); + raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); // check if log entries are loaded correctly for (LogEntryProto e : entries) { LogEntryProto entry = raftLog.get(e.getIndex()); Assert.assertEquals(e, entry); } - Assert.assertArrayEquals(entries, raftLog.getEntries(0, 500)); - Assert.assertEquals(entries[entries.length - 1], raftLog.getLastEntry()); + TermIndex[] termIndices = raftLog.getEntries(0, 500); + LogEntryProto[] entriesFromLog = Arrays.stream(termIndices) + .map(ti -> raftLog.get(ti.getIndex())) + .collect(Collectors.toList()) + .toArray(RaftTestUtil.EMPTY_LOGENTRY_ARRAY); + Assert.assertArrayEquals(entries, entriesFromLog); + Assert.assertEquals(entries[entries.length - 1], getLastEntry(raftLog)); } } @@ -169,7 +175,7 @@ public class TestSegmentedRaftLog { try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, null, storage, -1, properties)) { - raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX); + raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); // append entries to the raftlog entries.forEach(raftLog::appendEntry); raftLog.logSync(); @@ -177,7 +183,7 @@ public class TestSegmentedRaftLog { try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, null, storage, -1, properties)) { - raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX); + raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); // check if the raft log is correct checkEntries(raftLog, entries, 0, entries.size()); } @@ -198,7 +204,7 @@ public class TestSegmentedRaftLog { try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, null, storage, -1, properties)) { - raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX); + raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); // append entries to the raftlog entries.forEach(raftLog::appendEntry); raftLog.logSync(); @@ -206,7 +212,7 @@ public class TestSegmentedRaftLog { try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, null, storage, -1, properties)) { - raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX); + raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); // check if the raft log is correct checkEntries(raftLog, entries, 0, entries.size()); Assert.assertEquals(9, raftLog.getRaftLogCache().getNumOfSegments()); @@ -221,7 +227,7 @@ public class TestSegmentedRaftLog { try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, null, storage, -1, properties)) { - raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX); + raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); // append entries to the raftlog entries.forEach(raftLog::appendEntry); raftLog.logSync(); @@ -236,7 +242,7 @@ public class TestSegmentedRaftLog { throws Exception { try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, null, storage, -1, properties)) { - raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX); + raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); // truncate the log raftLog.truncate(fromIndex); raftLog.logSync(); @@ -246,13 +252,13 @@ public class TestSegmentedRaftLog { try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, null, storage, -1, properties)) { - raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX); + raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); // check if the raft log is correct if (fromIndex > 0) { Assert.assertEquals(entries.get((int) (fromIndex - 1)), - raftLog.getLastEntry()); + getLastEntry(raftLog)); } else { - Assert.assertNull(raftLog.getLastEntry()); + Assert.assertNull(raftLog.getLastEntryTermIndex()); } checkEntries(raftLog, entries, 0, (int) fromIndex); } @@ -265,11 +271,15 @@ public class TestSegmentedRaftLog { LogEntryProto entry = raftLog.get(expected.get(i).getIndex()); Assert.assertEquals(expected.get(i), entry); } - LogEntryProto[] entriesFromLog = raftLog.getEntries( + TermIndex[] termIndices = raftLog.getEntries( expected.get(offset).getIndex(), expected.get(offset + size - 1).getIndex() + 1); + LogEntryProto[] entriesFromLog = Arrays.stream(termIndices) + .map(ti -> raftLog.get(ti.getIndex())) + .collect(Collectors.toList()) + .toArray(RaftTestUtil.EMPTY_LOGENTRY_ARRAY); LogEntryProto[] expectedArray = expected.subList(offset, offset + size) - .toArray(SegmentedRaftLog.EMPTY_LOGENTRY_ARRAY); + .toArray(RaftTestUtil.EMPTY_LOGENTRY_ARRAY); Assert.assertArrayEquals(expectedArray, entriesFromLog); } } @@ -285,7 +295,7 @@ public class TestSegmentedRaftLog { try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, null, storage, -1, properties)) { - raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX); + raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); // append entries to the raftlog entries.forEach(raftLog::appendEntry); raftLog.logSync(); @@ -301,14 +311,14 @@ public class TestSegmentedRaftLog { try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, null, storage, -1, properties)) { - raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX); + raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); raftLog.append(newEntries.toArray(new LogEntryProto[newEntries.size()])); raftLog.logSync(); checkEntries(raftLog, entries, 0, 650); checkEntries(raftLog, newEntries, 100, 100); Assert.assertEquals(newEntries.get(newEntries.size() - 1), - raftLog.getLastEntry()); + getLastEntry(raftLog)); Assert.assertEquals(newEntries.get(newEntries.size() - 1).getIndex(), raftLog.getLatestFlushedIndex()); } @@ -316,11 +326,11 @@ public class TestSegmentedRaftLog { // load the raftlog again and check try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, null, storage, -1, properties)) { - raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX); + raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); checkEntries(raftLog, entries, 0, 650); checkEntries(raftLog, newEntries, 100, 100); Assert.assertEquals(newEntries.get(newEntries.size() - 1), - raftLog.getLastEntry()); + getLastEntry(raftLog)); Assert.assertEquals(newEntries.get(newEntries.size() - 1).getIndex(), raftLog.getLatestFlushedIndex());
