Repository: incubator-ratis Updated Branches: refs/heads/master 8ac50a721 -> 392fa5141
RATIS-19. Include clientId and callId in log entries. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/392fa514 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/392fa514 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/392fa514 Branch: refs/heads/master Commit: 392fa51415047b6e265a73d4ef175f1a468535b7 Parents: 8ac50a7 Author: Jing Zhao <[email protected]> Authored: Tue Feb 28 15:13:42 2017 -0800 Committer: Jing Zhao <[email protected]> Committed: Tue Feb 28 15:13:42 2017 -0800 ---------------------------------------------------------------------- .../java/org/apache/ratis/util/ProtoUtils.java | 5 ++++- ratis-proto-shaded/src/main/proto/Raft.proto | 5 +++++ .../ratis/server/impl/RaftServerImpl.java | 3 ++- .../apache/ratis/server/impl/ServerState.java | 6 ++++-- .../apache/ratis/server/storage/RaftLog.java | 8 ++++--- .../ratis/server/storage/TestRaftLogCache.java | 10 ++++++--- .../server/storage/TestRaftLogReadWrite.java | 20 ++++++++++++------ .../server/storage/TestRaftLogSegment.java | 22 ++++++++++++-------- .../server/storage/TestSegmentedRaftLog.java | 7 +++++-- 9 files changed, 59 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/392fa514/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java index 8d9c25e..2613342 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; +import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.shaded.com.google.protobuf.ByteString; @@ -113,9 +114,11 @@ public class ProtoUtils { } public static LogEntryProto toLogEntryProto( - SMLogEntryProto operation, long term, long index) { + SMLogEntryProto operation, long term, long index, + ClientId clientId, long callId) { return LogEntryProto.newBuilder().setTerm(term).setIndex(index) .setSmLogEntry(operation) + .setClientId(toByteString(clientId.toBytes())).setCallId(callId) .build(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/392fa514/ratis-proto-shaded/src/main/proto/Raft.proto ---------------------------------------------------------------------- diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto b/ratis-proto-shaded/src/main/proto/Raft.proto index 182a905..14901f6 100644 --- a/ratis-proto-shaded/src/main/proto/Raft.proto +++ b/ratis-proto-shaded/src/main/proto/Raft.proto @@ -50,6 +50,11 @@ message LogEntryProto { RaftConfigurationProto configurationEntry = 4; LeaderNoOp noOp = 5; } + + // clientId and callId are used to rebuild the retry cache. They're not + // necessary for configuration change since re-conf is idempotent. + bytes clientId = 6; + uint64 callId = 7; } message TermIndexProto { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/392fa514/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 470141c..d729914 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -380,7 +380,8 @@ public class RaftServerImpl implements RaftServer { // append the message to its local log final long entryIndex; try { - entryIndex = state.applyLog(entry); + entryIndex = state.applyLog(entry, request.getClientId(), + request.getSeqNum()); } catch (IOException e) { throw new RaftException(e); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/392fa514/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java index dd2d784..4b7efbd 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java @@ -24,6 +24,7 @@ import java.io.Closeable; import java.io.IOException; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.MemoryRaftLog; @@ -204,8 +205,9 @@ public class ServerState implements Closeable { return log; } - long applyLog(TransactionContext operation) throws IOException { - return log.append(currentTerm, operation); + long applyLog(TransactionContext operation, ClientId clientId, long callId) + throws IOException { + return log.append(currentTerm, operation, clientId, callId); } /** http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/392fa514/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java index f0c7b60..32422d3 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.impl.ConfigurationManager; import org.apache.ratis.server.impl.RaftConfiguration; @@ -125,7 +126,8 @@ public abstract class RaftLog implements Closeable { * Used by the leader. * @return the index of the new log entry. */ - public long append(long term, TransactionContext operation) throws IOException { + public long append(long term, TransactionContext operation, + ClientId clientId, long callId) throws IOException { checkLogState(); try(AutoCloseableLock writeLock = writeLock()) { final long nextIndex = getNextIndex(); @@ -136,7 +138,7 @@ public abstract class RaftLog implements Closeable { // build the log entry after calling the StateMachine final LogEntryProto e = ProtoUtils.toLogEntryProto( - operation.getSMLogEntry().get(), term, nextIndex); + operation.getSMLogEntry().get(), term, nextIndex, clientId, callId); appendEntry(e); operation.setLogEntry(e); @@ -207,7 +209,7 @@ public abstract class RaftLog implements Closeable { * If an existing entry conflicts with a new one (same index but different * terms), delete the existing entry and all entries that follow it (§5.3). * - * This method, {@link #append(long, TransactionContext)}, + * This method, {@link #append(long, TransactionContext, ClientId, long)}, * {@link #append(long, RaftConfiguration)}, and {@link #truncate(long)}, * do not guarantee the changes are persisted. * Need to call {@link #logSync()} to persist the changes. http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/392fa514/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java index f5a18ac..388c269 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java @@ -20,6 +20,7 @@ package org.apache.ratis.server.storage; import java.util.Iterator; import org.apache.ratis.RaftTestUtil.SimpleOperation; +import org.apache.ratis.protocol.ClientId; import org.apache.ratis.server.storage.RaftLogCache.TruncationSegments; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; import org.apache.ratis.util.ProtoUtils; @@ -28,6 +29,9 @@ import org.junit.Before; import org.junit.Test; public class TestRaftLogCache { + private static final ClientId clientId = ClientId.createId(); + private static final long callId = 0; + private RaftLogCache cache; @Before @@ -40,7 +44,7 @@ public class TestRaftLogCache { for (long i = start; i <= end; i++) { SimpleOperation m = new SimpleOperation("m" + i); LogEntryProto entry = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), - 0, i); + 0, i, clientId, callId); s.appendToOpenSegment(entry); } if (!isOpen) { @@ -130,7 +134,7 @@ public class TestRaftLogCache { final SimpleOperation m = new SimpleOperation("m"); try { LogEntryProto entry = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), - 0, 0); + 0, 0, clientId, callId); cache.appendEntry(entry); Assert.fail("the open segment is null"); } catch (IllegalStateException ignored) { @@ -140,7 +144,7 @@ public class TestRaftLogCache { cache.addSegment(openSegment); for (long index = 101; index < 200; index++) { LogEntryProto entry = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), - 0, index); + 0, index, clientId, callId); cache.appendEntry(entry); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/392fa514/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java index bcdb958..f72d007 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java @@ -34,6 +34,7 @@ import org.apache.ratis.RaftTestUtil; import org.apache.ratis.RaftTestUtil.SimpleOperation; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.ChecksumException; +import org.apache.ratis.protocol.ClientId; import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.server.impl.RaftServerConstants.StartupOption; import org.apache.ratis.shaded.com.google.protobuf.CodedOutputStream; @@ -53,9 +54,11 @@ import org.slf4j.LoggerFactory; public class TestRaftLogReadWrite { private static final Logger LOG = LoggerFactory.getLogger(TestRaftLogReadWrite.class); + private static final ClientId clientId = ClientId.createId(); + private static final long callId = 0; + private File storageDir; private RaftProperties properties; - private int segmentMaxSize; @Before public void setup() throws Exception { @@ -90,7 +93,8 @@ public class TestRaftLogReadWrite { long size = 0; for (int i = 0; i < entries.length; i++) { SimpleOperation m = new SimpleOperation("m" + i); - entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i); + entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i, + clientId, callId); final int s = entries[i].getSerializedSize(); size += CodedOutputStream.computeUInt32SizeNoTag(s) + s + 4; out.write(entries[i]); @@ -131,7 +135,8 @@ public class TestRaftLogReadWrite { new LogOutputStream(openSegment, false, properties)) { for (int i = 0; i < 100; i++) { SimpleOperation m = new SimpleOperation("m" + i); - entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i); + entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i, + clientId, callId); out.write(entries[i]); } } @@ -140,7 +145,8 @@ public class TestRaftLogReadWrite { new LogOutputStream(openSegment, true, properties)) { for (int i = 100; i < 200; i++) { SimpleOperation m = new SimpleOperation("m" + i); - entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i); + entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i, + clientId, callId); out.write(entries[i]); } } @@ -196,7 +202,8 @@ public class TestRaftLogReadWrite { LogOutputStream out = new LogOutputStream(openSegment, false, properties); for (int i = 0; i < 10; i++) { SimpleOperation m = new SimpleOperation("m" + i); - entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i); + entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i, + clientId, callId); out.write(entries[i]); } out.flush(); @@ -243,7 +250,8 @@ public class TestRaftLogReadWrite { new LogOutputStream(openSegment, false, properties)) { for (int i = 0; i < 100; i++) { LogEntryProto entry = ProtoUtils.toLogEntryProto( - new SimpleOperation("m" + i).getLogEntryContent(), 0, i); + new SimpleOperation("m" + i).getLogEntryContent(), 0, i, + clientId, callId); out.write(entry); } } finally { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/392fa514/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java index 3092a21..fa72f64 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java @@ -32,6 +32,7 @@ import java.util.List; import org.apache.ratis.RaftTestUtil; import org.apache.ratis.RaftTestUtil.SimpleOperation; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.ClientId; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerConstants.StartupOption; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; @@ -47,6 +48,9 @@ import org.junit.Test; * Test basic functionality of {@link LogSegment} */ public class TestRaftLogSegment { + private static final ClientId clientId = ClientId.createId(); + private static final long callId = 0; + private File storageDir; private final RaftProperties properties = new RaftProperties(); @@ -75,7 +79,7 @@ public class TestRaftLogSegment { for (int i = 0; i < size; i++) { SimpleOperation op = new SimpleOperation("m" + i); entries[i] = ProtoUtils.toLogEntryProto(op.getLogEntryContent(), - term, i + start); + term, i + start, clientId, callId); out.write(entries[i]); } } @@ -132,7 +136,7 @@ public class TestRaftLogSegment { while (size < max) { SimpleOperation op = new SimpleOperation("m" + i); LogEntryProto entry = ProtoUtils.toLogEntryProto(op.getLogEntryContent(), - term, i++ + start); + term, i++ + start, clientId, callId); size += getEntrySize(entry); list.add(entry); } @@ -148,18 +152,18 @@ public class TestRaftLogSegment { SimpleOperation op = new SimpleOperation("m"); final SMLogEntryProto m = op.getLogEntryContent(); try { - LogEntryProto entry = ProtoUtils.toLogEntryProto(m, 0, 1001); + LogEntryProto entry = ProtoUtils.toLogEntryProto(m, 0, 1001, clientId, callId); segment.appendToOpenSegment(entry); Assert.fail("should fail since the entry's index needs to be 1000"); } catch (Exception e) { Assert.assertTrue(e instanceof IllegalArgumentException); } - LogEntryProto entry = ProtoUtils.toLogEntryProto(m, 0, 1000); + LogEntryProto entry = ProtoUtils.toLogEntryProto(m, 0, 1000, clientId, callId); segment.appendToOpenSegment(entry); try { - entry = ProtoUtils.toLogEntryProto(m, 0, 1002); + entry = ProtoUtils.toLogEntryProto(m, 0, 1002, clientId, callId); segment.appendToOpenSegment(entry); Assert.fail("should fail since the entry's index needs to be 1001"); } catch (Exception e) { @@ -168,7 +172,7 @@ public class TestRaftLogSegment { LogEntryProto[] entries = new LogEntryProto[2]; for (int i = 0; i < 2; i++) { - entries[i] = ProtoUtils.toLogEntryProto(m, 0, 1001 + i * 2); + entries[i] = ProtoUtils.toLogEntryProto(m, 0, 1001 + i * 2, clientId, callId); } try { segment.appendToOpenSegment(entries); @@ -185,7 +189,7 @@ public class TestRaftLogSegment { LogSegment segment = LogSegment.newOpenSegment(start); for (int i = 0; i < 100; i++) { LogEntryProto entry = ProtoUtils.toLogEntryProto( - new SimpleOperation("m" + i).getLogEntryContent(), term, i + start); + new SimpleOperation("m" + i).getLogEntryContent(), term, i + start, clientId, callId); segment.appendToOpenSegment(entry); } @@ -251,7 +255,7 @@ public class TestRaftLogSegment { getProperties(1024, 1024))) { SimpleOperation op = new SimpleOperation(new String(content)); LogEntryProto entry = ProtoUtils.toLogEntryProto(op.getLogEntryContent(), - 0, 0); + 0, 0, clientId, callId); size = LogSegment.getEntrySize(entry); out.write(entry); } @@ -282,7 +286,7 @@ public class TestRaftLogSegment { Arrays.fill(content, (byte) 1); SimpleOperation op = new SimpleOperation(new String(content)); LogEntryProto entry = ProtoUtils.toLogEntryProto(op.getLogEntryContent(), - 0, 0); + 0, 0, clientId, callId); final long entrySize = LogSegment.getEntrySize(entry); long totalSize = SegmentedRaftLog.HEADER_BYTES.length; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/392fa514/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java index 08f671f..9b88321 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java @@ -33,6 +33,7 @@ import org.apache.ratis.MiniRaftCluster; import org.apache.ratis.RaftTestUtil; import org.apache.ratis.RaftTestUtil.SimpleOperation; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.ConfigurationManager; @@ -53,6 +54,8 @@ public class TestSegmentedRaftLog { } private static final RaftPeerId peerId = new RaftPeerId("s0"); + private static final ClientId clientId = ClientId.createId(); + private static final long callId = 0; private static class SegmentRange { final long start; @@ -103,7 +106,7 @@ public class TestSegmentedRaftLog { for (int i = 0; i < size; i++) { SimpleOperation m = new SimpleOperation("m" + (i + range.start)); entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), - range.term, i + range.start); + range.term, i + range.start, clientId, callId); out.write(entries[i]); } } @@ -153,7 +156,7 @@ public class TestSegmentedRaftLog { new SimpleOperation("m" + index) : new SimpleOperation(stringSupplier.get()); eList.add(ProtoUtils.toLogEntryProto(m.getLogEntryContent(), - range.term, index)); + range.term, index, clientId, callId)); } } return eList;
